krpc_core/protocol/
h2_handler.rs

1use super::StreamHandler;
2use crate::{
3    filter::{KrpcFilter, RpcServerRoute},
4    support::triple::{TripleExceptionWrapper, TripleRequestWrapper, TripleResponseWrapper},
5};
6use bytes::Bytes;
7use h2::server::Builder;
8use http::{HeaderMap, HeaderValue, Request, Response};
9use krpc_common::{KrpcMsg, RpcError};
10use prost::Message;
11use std::time::Duration;
12
13impl StreamHandler {
14    pub async fn run_v2(mut self) {
15        let mut connection = get_server_builder()
16            .handshake::<_, Bytes>(self.tcp_stream)
17            .await
18            .unwrap();
19        self.filter_list.push(RpcServerRoute::new(self.rpc_server));
20        while let Some(result) = connection.accept().await {
21            let filter_list = self.filter_list.clone();
22            tokio::spawn(async move {
23                let (request, mut respond) = result.unwrap();
24                let mut trailers = HeaderMap::new();
25                match decode_filter(request).await {
26                    Ok(mut msg) => {
27                        for idx in 0..filter_list.len() {
28                            msg = filter_list[idx].call(msg).await.unwrap()
29                        }
30                        let res = encode_filter(msg).await;
31                        let mut send = respond.send_response(res.0, false).unwrap();
32                        let _ = send.send_data(res.2, false);
33                        trailers.insert("grpc-status", HeaderValue::from_str(&res.1).unwrap());
34                        let _ = send.send_trailers(trailers);
35                    }
36                    Err(err) => {
37                        let response: Response<()> = Response::builder()
38                            .header("grpc-status", "99")
39                            .header("grpc-message", err.to_string())
40                            .body(())
41                            .unwrap();
42                        let _send = respond.send_response(response, true).unwrap();
43                    }
44                };
45            });
46        }
47    }
48}
49
50async fn decode_filter(mut req: Request<h2::RecvStream>) -> crate::Result<KrpcMsg> {
51    let url = req.uri().path().to_string();
52    let data = req.body_mut().data().await.unwrap().unwrap();
53    let trip = match TripleRequestWrapper::decode(&data[5..]) {
54        Ok(data) => data,
55        Err(err) => {
56            return Err(Box::new(err));
57        }
58    };
59    let path: Vec<&str> = url.split("/").collect();
60    let version = req
61        .headers()
62        .get("tri-service-version")
63        .map(|e| String::from_utf8_lossy(e.as_bytes()).to_string());
64    return Ok(KrpcMsg::new(
65        "unique_identifier".to_string(),
66        version,
67        path[1].to_string(),
68        path[2].to_string(),
69        trip.get_req(),
70        Result::Err(RpcError::Server("empty".to_string())),
71    ));
72}
73
74async fn encode_filter(msg: KrpcMsg) -> (Response<()>, String, bytes::Bytes) {
75    let mut status = "0";
76    let res_data = match msg.res {
77        Ok(data) => bytes::Bytes::from(TripleResponseWrapper::get_buf(data)),
78        Err(err) => bytes::Bytes::from(TripleExceptionWrapper::get_buf(match err {
79            RpcError::Client(msg) => {
80                status = "90";
81                msg
82            }
83            RpcError::Method(msg) => {
84                status = "91";
85                msg
86            }
87            RpcError::Null => {
88                status = "92";
89                "RpcError::Null".to_string()
90            }
91            RpcError::Server(msg) => {
92                status = "93";
93                msg
94            }
95        })),
96    };
97    let response: Response<()> = Response::builder()
98        .header("content-type", "application/grpc")
99        .header("te", "trailers")
100        .body(())
101        .unwrap();
102    return (response, status.to_string(), res_data);
103}
104
105fn get_server_builder() -> Builder {
106    let mut config: Config = Default::default();
107    config.initial_conn_window_size = SPEC_WINDOW_SIZE;
108    config.initial_stream_window_size = SPEC_WINDOW_SIZE;
109    let mut builder = h2::server::Builder::default();
110    builder
111        .initial_window_size(config.initial_stream_window_size)
112        .initial_connection_window_size(config.initial_conn_window_size)
113        .max_frame_size(config.max_frame_size)
114        .max_header_list_size(config.max_header_list_size)
115        .max_send_buffer_size(config.max_send_buffer_size);
116    if let Some(max) = config.max_concurrent_streams {
117        builder.max_concurrent_streams(max);
118    }
119    if config.enable_connect_protocol {
120        builder.enable_connect_protocol();
121    }
122    return builder;
123}
124
125// Our defaults are chosen for the "majority" case, which usually are not
126// resource constrained, and so the spec default of 64kb can be too limiting
127// for performance.
128//
129// At the same time, a server more often has multiple clients connected, and
130// so is more likely to use more resources than a client would.
131const DEFAULT_CONN_WINDOW: u32 = 1024 * 1024; // 1mb
132const DEFAULT_STREAM_WINDOW: u32 = 1024 * 1024; // 1mb
133const DEFAULT_MAX_FRAME_SIZE: u32 = 1024 * 16; // 16kb
134const DEFAULT_MAX_SEND_BUF_SIZE: usize = 1024 * 400; // 400kb
135                                                     // 16 MB "sane default" taken from golang http2
136const DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE: u32 = 16 << 20;
137/// Default initial stream window size defined in HTTP2 spec.
138pub(crate) const SPEC_WINDOW_SIZE: u32 = 65_535;
139#[derive(Clone, Debug)]
140pub(crate) struct Config {
141    pub(crate) _adaptive_window: bool,
142    pub(crate) initial_conn_window_size: u32,
143    pub(crate) initial_stream_window_size: u32,
144    pub(crate) max_frame_size: u32,
145    pub(crate) enable_connect_protocol: bool,
146    pub(crate) max_concurrent_streams: Option<u32>,
147    pub(crate) _keep_alive_interval: Option<Duration>,
148    pub(crate) _keep_alive_timeout: Duration,
149    pub(crate) max_send_buffer_size: usize,
150    pub(crate) max_header_list_size: u32,
151}
152
153impl Default for Config {
154    fn default() -> Config {
155        Config {
156            _adaptive_window: false,
157            initial_conn_window_size: DEFAULT_CONN_WINDOW,
158            initial_stream_window_size: DEFAULT_STREAM_WINDOW,
159            max_frame_size: DEFAULT_MAX_FRAME_SIZE,
160            enable_connect_protocol: false,
161            max_concurrent_streams: Some(200),
162            _keep_alive_interval: None,
163            _keep_alive_timeout: Duration::from_secs(20),
164            max_send_buffer_size: DEFAULT_MAX_SEND_BUF_SIZE,
165            max_header_list_size: DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE,
166        }
167    }
168}