krpc_core/protocol/
h2_handler.rs1use super::StreamHandler;
2use crate::{
3 filter::{KrpcFilter, RpcServerRoute},
4 support::triple::{TripleExceptionWrapper, TripleRequestWrapper, TripleResponseWrapper},
5};
6use bytes::Bytes;
7use h2::server::Builder;
8use http::{HeaderMap, HeaderValue, Request, Response};
9use krpc_common::{KrpcMsg, RpcError};
10use prost::Message;
11use std::time::Duration;
12
13impl StreamHandler {
14 pub async fn run_v2(mut self) {
15 let mut connection = get_server_builder()
16 .handshake::<_, Bytes>(self.tcp_stream)
17 .await
18 .unwrap();
19 self.filter_list.push(RpcServerRoute::new(self.rpc_server));
20 while let Some(result) = connection.accept().await {
21 let filter_list = self.filter_list.clone();
22 tokio::spawn(async move {
23 let (request, mut respond) = result.unwrap();
24 let mut trailers = HeaderMap::new();
25 match decode_filter(request).await {
26 Ok(mut msg) => {
27 for idx in 0..filter_list.len() {
28 msg = filter_list[idx].call(msg).await.unwrap()
29 }
30 let res = encode_filter(msg).await;
31 let mut send = respond.send_response(res.0, false).unwrap();
32 let _ = send.send_data(res.2, false);
33 trailers.insert("grpc-status", HeaderValue::from_str(&res.1).unwrap());
34 let _ = send.send_trailers(trailers);
35 }
36 Err(err) => {
37 let response: Response<()> = Response::builder()
38 .header("grpc-status", "99")
39 .header("grpc-message", err.to_string())
40 .body(())
41 .unwrap();
42 let _send = respond.send_response(response, true).unwrap();
43 }
44 };
45 });
46 }
47 }
48}
49
50async fn decode_filter(mut req: Request<h2::RecvStream>) -> crate::Result<KrpcMsg> {
51 let url = req.uri().path().to_string();
52 let data = req.body_mut().data().await.unwrap().unwrap();
53 let trip = match TripleRequestWrapper::decode(&data[5..]) {
54 Ok(data) => data,
55 Err(err) => {
56 return Err(Box::new(err));
57 }
58 };
59 let path: Vec<&str> = url.split("/").collect();
60 let version = req
61 .headers()
62 .get("tri-service-version")
63 .map(|e| String::from_utf8_lossy(e.as_bytes()).to_string());
64 return Ok(KrpcMsg::new(
65 "unique_identifier".to_string(),
66 version,
67 path[1].to_string(),
68 path[2].to_string(),
69 trip.get_req(),
70 Result::Err(RpcError::Server("empty".to_string())),
71 ));
72}
73
74async fn encode_filter(msg: KrpcMsg) -> (Response<()>, String, bytes::Bytes) {
75 let mut status = "0";
76 let res_data = match msg.res {
77 Ok(data) => bytes::Bytes::from(TripleResponseWrapper::get_buf(data)),
78 Err(err) => bytes::Bytes::from(TripleExceptionWrapper::get_buf(match err {
79 RpcError::Client(msg) => {
80 status = "90";
81 msg
82 }
83 RpcError::Method(msg) => {
84 status = "91";
85 msg
86 }
87 RpcError::Null => {
88 status = "92";
89 "RpcError::Null".to_string()
90 }
91 RpcError::Server(msg) => {
92 status = "93";
93 msg
94 }
95 })),
96 };
97 let response: Response<()> = Response::builder()
98 .header("content-type", "application/grpc")
99 .header("te", "trailers")
100 .body(())
101 .unwrap();
102 return (response, status.to_string(), res_data);
103}
104
105fn get_server_builder() -> Builder {
106 let mut config: Config = Default::default();
107 config.initial_conn_window_size = SPEC_WINDOW_SIZE;
108 config.initial_stream_window_size = SPEC_WINDOW_SIZE;
109 let mut builder = h2::server::Builder::default();
110 builder
111 .initial_window_size(config.initial_stream_window_size)
112 .initial_connection_window_size(config.initial_conn_window_size)
113 .max_frame_size(config.max_frame_size)
114 .max_header_list_size(config.max_header_list_size)
115 .max_send_buffer_size(config.max_send_buffer_size);
116 if let Some(max) = config.max_concurrent_streams {
117 builder.max_concurrent_streams(max);
118 }
119 if config.enable_connect_protocol {
120 builder.enable_connect_protocol();
121 }
122 return builder;
123}
124
125const DEFAULT_CONN_WINDOW: u32 = 1024 * 1024; const DEFAULT_STREAM_WINDOW: u32 = 1024 * 1024; const DEFAULT_MAX_FRAME_SIZE: u32 = 1024 * 16; const DEFAULT_MAX_SEND_BUF_SIZE: usize = 1024 * 400; const DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE: u32 = 16 << 20;
137pub(crate) const SPEC_WINDOW_SIZE: u32 = 65_535;
139#[derive(Clone, Debug)]
140pub(crate) struct Config {
141 pub(crate) _adaptive_window: bool,
142 pub(crate) initial_conn_window_size: u32,
143 pub(crate) initial_stream_window_size: u32,
144 pub(crate) max_frame_size: u32,
145 pub(crate) enable_connect_protocol: bool,
146 pub(crate) max_concurrent_streams: Option<u32>,
147 pub(crate) _keep_alive_interval: Option<Duration>,
148 pub(crate) _keep_alive_timeout: Duration,
149 pub(crate) max_send_buffer_size: usize,
150 pub(crate) max_header_list_size: u32,
151}
152
153impl Default for Config {
154 fn default() -> Config {
155 Config {
156 _adaptive_window: false,
157 initial_conn_window_size: DEFAULT_CONN_WINDOW,
158 initial_stream_window_size: DEFAULT_STREAM_WINDOW,
159 max_frame_size: DEFAULT_MAX_FRAME_SIZE,
160 enable_connect_protocol: false,
161 max_concurrent_streams: Some(200),
162 _keep_alive_interval: None,
163 _keep_alive_timeout: Duration::from_secs(20),
164 max_send_buffer_size: DEFAULT_MAX_SEND_BUF_SIZE,
165 max_header_list_size: DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE,
166 }
167 }
168}