apache_dubbo/triple/server/
triple.rs1use futures_util::{future, stream, StreamExt, TryStreamExt};
19use http_body::Body;
20
21use crate::invocation::Request;
22use crate::triple::codec::Codec;
23use crate::triple::compression::{CompressionEncoding, COMPRESSIONS};
24use crate::triple::decode::Decoding;
25use crate::triple::encode::encode_server;
26use crate::triple::server::service::{
27 ClientStreamingSvc, ServerStreamingSvc, StreamingSvc, UnarySvc,
28};
29use crate::BoxBody;
30use dubbo_config::BusinessConfig;
31
32pub const GRPC_ACCEPT_ENCODING: &str = "grpc-accept-encoding";
33pub const GRPC_ENCODING: &str = "grpc-encoding";
34
35pub struct TripleServer<T> {
36 codec: T,
37 compression: Option<CompressionEncoding>,
38}
39
40impl<T> TripleServer<T> {
41 pub fn new(codec: T) -> Self {
42 Self {
43 codec,
44 compression: None,
45 }
46 }
47}
48
49impl<T> TripleServer<T>
50where
51 T: Codec,
52{
53 pub async fn client_streaming<S, B>(
54 &mut self,
55 mut service: S,
56 req: http::Request<B>,
57 ) -> http::Response<BoxBody>
58 where
59 S: ClientStreamingSvc<T::Decode, Response = T::Encode>,
60 B: Body + Send + 'static,
61 B::Error: Into<crate::Error> + Send,
62 {
63 let mut accept_encoding = CompressionEncoding::from_accept_encoding(req.headers());
64 if self.compression.is_none() || accept_encoding.is_none() {
65 accept_encoding = None;
66 }
67
68 let compression = match self.get_encoding_from_req(req.headers()) {
70 Ok(val) => val,
71 Err(status) => return status.to_http(),
72 };
73
74 let req_stream = req.map(|body| Decoding::new(body, self.codec.decoder(), compression));
75
76 let resp = service.call(Request::from_http(req_stream)).await;
77
78 let (mut parts, resp_body) = match resp {
79 Ok(v) => v.into_http().into_parts(),
80 Err(err) => return err.to_http(),
81 };
82
83 let resp_body = encode_server(
84 self.codec.encoder(),
85 stream::once(future::ready(resp_body)).map(Ok).into_stream(),
86 accept_encoding,
87 );
88
89 parts.headers.insert(
90 http::header::CONTENT_TYPE,
91 http::HeaderValue::from_static("application/grpc"),
92 );
93 if let Some(encoding) = accept_encoding {
94 parts
95 .headers
96 .insert(GRPC_ENCODING, encoding.into_header_value());
97 }
98 parts.status = http::StatusCode::OK;
99 http::Response::from_parts(parts, BoxBody::new(resp_body))
100 }
101
102 pub async fn bidi_streaming<S, B>(
103 &mut self,
104 mut service: S,
105 req: http::Request<B>,
106 ) -> http::Response<BoxBody>
107 where
108 S: StreamingSvc<T::Decode, Response = T::Encode>,
109 S::ResponseStream: Send + 'static,
110 B: Body + Send + 'static,
111 B::Error: Into<crate::Error> + Send,
112 {
113 let mut accept_encoding = CompressionEncoding::from_accept_encoding(req.headers());
116 if self.compression.is_none() || accept_encoding.is_none() {
117 accept_encoding = None;
118 }
119
120 let compression = match self.get_encoding_from_req(req.headers()) {
122 Ok(val) => val,
123 Err(status) => return status.to_http(),
124 };
125
126 let req_stream = req.map(|body| Decoding::new(body, self.codec.decoder(), compression));
127
128 let resp = service.call(Request::from_http(req_stream)).await;
129
130 let (mut parts, resp_body) = match resp {
131 Ok(v) => v.into_http().into_parts(),
132 Err(err) => return err.to_http(),
133 };
134 let resp_body = encode_server(self.codec.encoder(), resp_body, compression);
135
136 parts.headers.insert(
137 http::header::CONTENT_TYPE,
138 http::HeaderValue::from_static("application/grpc"),
139 );
140 if let Some(encoding) = accept_encoding {
141 parts
142 .headers
143 .insert(GRPC_ENCODING, encoding.into_header_value());
144 }
145 parts.status = http::StatusCode::OK;
146 http::Response::from_parts(parts, BoxBody::new(resp_body))
147 }
148
149 pub async fn server_streaming<S, B>(
150 &mut self,
151 mut service: S,
152 req: http::Request<B>,
153 ) -> http::Response<BoxBody>
154 where
155 S: ServerStreamingSvc<T::Decode, Response = T::Encode>,
156 S::ResponseStream: Send + 'static,
157 B: Body + Send + 'static,
158 B::Error: Into<crate::Error> + Send,
159 {
160 let mut accept_encoding = CompressionEncoding::from_accept_encoding(req.headers());
163 if self.compression.is_none() || accept_encoding.is_none() {
164 accept_encoding = None;
165 }
166
167 let compression = match self.get_encoding_from_req(req.headers()) {
169 Ok(val) => val,
170 Err(status) => return status.to_http(),
171 };
172
173 let req_stream = req.map(|body| Decoding::new(body, self.codec.decoder(), compression));
174 let (parts, mut body) = Request::from_http(req_stream).into_parts();
175 let msg = body.try_next().await.unwrap().ok_or_else(|| {
176 crate::status::Status::new(crate::status::Code::Unknown, "request wrong".to_string())
177 });
178 let msg = match msg {
179 Ok(v) => v,
180 Err(err) => return err.to_http(),
181 };
182
183 let resp = service.call(Request::from_parts(parts, msg)).await;
184
185 let (mut parts, resp_body) = match resp {
186 Ok(v) => v.into_http().into_parts(),
187 Err(err) => return err.to_http(),
188 };
189 let resp_body = encode_server(self.codec.encoder(), resp_body, compression);
190
191 parts.headers.insert(
192 http::header::CONTENT_TYPE,
193 http::HeaderValue::from_static("application/grpc"),
194 );
195 if let Some(encoding) = accept_encoding {
196 parts
197 .headers
198 .insert(GRPC_ENCODING, encoding.into_header_value());
199 }
200 parts.status = http::StatusCode::OK;
201 http::Response::from_parts(parts, BoxBody::new(resp_body))
202 }
203
204 pub async fn unary<S, B>(
205 &mut self,
206 mut service: S,
207 req: http::Request<B>,
208 ) -> http::Response<BoxBody>
209 where
210 S: UnarySvc<T::Decode, Response = T::Encode>,
211 B: Body + Send + 'static,
212 B::Error: Into<crate::Error> + Send,
213 {
214 let mut accept_encoding = CompressionEncoding::from_accept_encoding(req.headers());
215 if self.compression.is_none() || accept_encoding.is_none() {
216 accept_encoding = None;
217 }
218
219 let compression = match self.get_encoding_from_req(req.headers()) {
220 Ok(val) => val,
221 Err(status) => return status.to_http(),
222 };
223
224 let req_stream = req.map(|body| Decoding::new(body, self.codec.decoder(), compression));
225 let (parts, mut body) = Request::from_http(req_stream).into_parts();
226 let msg = body.try_next().await.unwrap().ok_or_else(|| {
227 crate::status::Status::new(crate::status::Code::Unknown, "request wrong".to_string())
228 });
229 let msg = match msg {
230 Ok(v) => v,
231 Err(err) => return err.to_http(),
232 };
233
234 let resp = service.call(Request::from_parts(parts, msg)).await;
235
236 let (mut parts, resp_body) = match resp {
237 Ok(v) => v.into_http().into_parts(),
238 Err(err) => return err.to_http(),
239 };
240 let resp_body = encode_server(
241 self.codec.encoder(),
242 stream::once(future::ready(resp_body)).map(Ok).into_stream(),
243 accept_encoding,
244 );
245
246 parts.headers.insert(
247 http::header::CONTENT_TYPE,
248 http::HeaderValue::from_static("application/grpc"),
249 );
250 if let Some(encoding) = accept_encoding {
251 parts
252 .headers
253 .insert(GRPC_ENCODING, encoding.into_header_value());
254 }
255 parts.status = http::StatusCode::OK;
256 http::Response::from_parts(parts, BoxBody::new(resp_body))
257 }
258
259 fn get_encoding_from_req(
260 &self,
261 header: &http::HeaderMap,
262 ) -> Result<Option<CompressionEncoding>, crate::status::Status> {
263 let encoding = match header.get(GRPC_ENCODING) {
264 Some(val) => val.to_str().unwrap(),
265 None => return Ok(None),
266 };
267
268 let compression = match COMPRESSIONS.get(encoding) {
269 Some(val) => val.to_owned(),
270 None => {
271 let status = crate::status::Status::new(
272 crate::status::Code::Unimplemented,
273 format!("grpc-accept-encoding: {} not support!", encoding),
274 );
275
276 return Err(status);
277 }
278 };
279 Ok(compression)
280 }
281}
282
283impl<T> BusinessConfig for TripleServer<T> {
284 fn init() -> Self {
285 todo!()
286 }
287
288 fn load() -> Result<(), std::convert::Infallible> {
289 todo!()
290 }
291}