apache_dubbo/triple/server/
triple.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18use futures_util::{future, stream, StreamExt, TryStreamExt};
19use http_body::Body;
20
21use crate::invocation::Request;
22use crate::triple::codec::Codec;
23use crate::triple::compression::{CompressionEncoding, COMPRESSIONS};
24use crate::triple::decode::Decoding;
25use crate::triple::encode::encode_server;
26use crate::triple::server::service::{
27    ClientStreamingSvc, ServerStreamingSvc, StreamingSvc, UnarySvc,
28};
29use crate::BoxBody;
30use dubbo_config::BusinessConfig;
31
32pub const GRPC_ACCEPT_ENCODING: &str = "grpc-accept-encoding";
33pub const GRPC_ENCODING: &str = "grpc-encoding";
34
35pub struct TripleServer<T> {
36    codec: T,
37    compression: Option<CompressionEncoding>,
38}
39
40impl<T> TripleServer<T> {
41    pub fn new(codec: T) -> Self {
42        Self {
43            codec,
44            compression: None,
45        }
46    }
47}
48
49impl<T> TripleServer<T>
50where
51    T: Codec,
52{
53    pub async fn client_streaming<S, B>(
54        &mut self,
55        mut service: S,
56        req: http::Request<B>,
57    ) -> http::Response<BoxBody>
58    where
59        S: ClientStreamingSvc<T::Decode, Response = T::Encode>,
60        B: Body + Send + 'static,
61        B::Error: Into<crate::Error> + Send,
62    {
63        let mut accept_encoding = CompressionEncoding::from_accept_encoding(req.headers());
64        if self.compression.is_none() || accept_encoding.is_none() {
65            accept_encoding = None;
66        }
67
68        // Get grpc_encoding from http_header, decompress message.
69        let compression = match self.get_encoding_from_req(req.headers()) {
70            Ok(val) => val,
71            Err(status) => return status.to_http(),
72        };
73
74        let req_stream = req.map(|body| Decoding::new(body, self.codec.decoder(), compression));
75
76        let resp = service.call(Request::from_http(req_stream)).await;
77
78        let (mut parts, resp_body) = match resp {
79            Ok(v) => v.into_http().into_parts(),
80            Err(err) => return err.to_http(),
81        };
82
83        let resp_body = encode_server(
84            self.codec.encoder(),
85            stream::once(future::ready(resp_body)).map(Ok).into_stream(),
86            accept_encoding,
87        );
88
89        parts.headers.insert(
90            http::header::CONTENT_TYPE,
91            http::HeaderValue::from_static("application/grpc"),
92        );
93        if let Some(encoding) = accept_encoding {
94            parts
95                .headers
96                .insert(GRPC_ENCODING, encoding.into_header_value());
97        }
98        parts.status = http::StatusCode::OK;
99        http::Response::from_parts(parts, BoxBody::new(resp_body))
100    }
101
102    pub async fn bidi_streaming<S, B>(
103        &mut self,
104        mut service: S,
105        req: http::Request<B>,
106    ) -> http::Response<BoxBody>
107    where
108        S: StreamingSvc<T::Decode, Response = T::Encode>,
109        S::ResponseStream: Send + 'static,
110        B: Body + Send + 'static,
111        B::Error: Into<crate::Error> + Send,
112    {
113        // Firstly, get grpc_accept_encoding from http_header, get compression
114        // Secondly, if server enable compression and compression is valid, this method should compress response
115        let mut accept_encoding = CompressionEncoding::from_accept_encoding(req.headers());
116        if self.compression.is_none() || accept_encoding.is_none() {
117            accept_encoding = None;
118        }
119
120        // Get grpc_encoding from http_header, decompress message.
121        let compression = match self.get_encoding_from_req(req.headers()) {
122            Ok(val) => val,
123            Err(status) => return status.to_http(),
124        };
125
126        let req_stream = req.map(|body| Decoding::new(body, self.codec.decoder(), compression));
127
128        let resp = service.call(Request::from_http(req_stream)).await;
129
130        let (mut parts, resp_body) = match resp {
131            Ok(v) => v.into_http().into_parts(),
132            Err(err) => return err.to_http(),
133        };
134        let resp_body = encode_server(self.codec.encoder(), resp_body, compression);
135
136        parts.headers.insert(
137            http::header::CONTENT_TYPE,
138            http::HeaderValue::from_static("application/grpc"),
139        );
140        if let Some(encoding) = accept_encoding {
141            parts
142                .headers
143                .insert(GRPC_ENCODING, encoding.into_header_value());
144        }
145        parts.status = http::StatusCode::OK;
146        http::Response::from_parts(parts, BoxBody::new(resp_body))
147    }
148
149    pub async fn server_streaming<S, B>(
150        &mut self,
151        mut service: S,
152        req: http::Request<B>,
153    ) -> http::Response<BoxBody>
154    where
155        S: ServerStreamingSvc<T::Decode, Response = T::Encode>,
156        S::ResponseStream: Send + 'static,
157        B: Body + Send + 'static,
158        B::Error: Into<crate::Error> + Send,
159    {
160        // Firstly, get grpc_accept_encoding from http_header, get compression
161        // Secondly, if server enable compression and compression is valid, this method should compress response
162        let mut accept_encoding = CompressionEncoding::from_accept_encoding(req.headers());
163        if self.compression.is_none() || accept_encoding.is_none() {
164            accept_encoding = None;
165        }
166
167        // Get grpc_encoding from http_header, decompress message.
168        let compression = match self.get_encoding_from_req(req.headers()) {
169            Ok(val) => val,
170            Err(status) => return status.to_http(),
171        };
172
173        let req_stream = req.map(|body| Decoding::new(body, self.codec.decoder(), compression));
174        let (parts, mut body) = Request::from_http(req_stream).into_parts();
175        let msg = body.try_next().await.unwrap().ok_or_else(|| {
176            crate::status::Status::new(crate::status::Code::Unknown, "request wrong".to_string())
177        });
178        let msg = match msg {
179            Ok(v) => v,
180            Err(err) => return err.to_http(),
181        };
182
183        let resp = service.call(Request::from_parts(parts, msg)).await;
184
185        let (mut parts, resp_body) = match resp {
186            Ok(v) => v.into_http().into_parts(),
187            Err(err) => return err.to_http(),
188        };
189        let resp_body = encode_server(self.codec.encoder(), resp_body, compression);
190
191        parts.headers.insert(
192            http::header::CONTENT_TYPE,
193            http::HeaderValue::from_static("application/grpc"),
194        );
195        if let Some(encoding) = accept_encoding {
196            parts
197                .headers
198                .insert(GRPC_ENCODING, encoding.into_header_value());
199        }
200        parts.status = http::StatusCode::OK;
201        http::Response::from_parts(parts, BoxBody::new(resp_body))
202    }
203
204    pub async fn unary<S, B>(
205        &mut self,
206        mut service: S,
207        req: http::Request<B>,
208    ) -> http::Response<BoxBody>
209    where
210        S: UnarySvc<T::Decode, Response = T::Encode>,
211        B: Body + Send + 'static,
212        B::Error: Into<crate::Error> + Send,
213    {
214        let mut accept_encoding = CompressionEncoding::from_accept_encoding(req.headers());
215        if self.compression.is_none() || accept_encoding.is_none() {
216            accept_encoding = None;
217        }
218
219        let compression = match self.get_encoding_from_req(req.headers()) {
220            Ok(val) => val,
221            Err(status) => return status.to_http(),
222        };
223
224        let req_stream = req.map(|body| Decoding::new(body, self.codec.decoder(), compression));
225        let (parts, mut body) = Request::from_http(req_stream).into_parts();
226        let msg = body.try_next().await.unwrap().ok_or_else(|| {
227            crate::status::Status::new(crate::status::Code::Unknown, "request wrong".to_string())
228        });
229        let msg = match msg {
230            Ok(v) => v,
231            Err(err) => return err.to_http(),
232        };
233
234        let resp = service.call(Request::from_parts(parts, msg)).await;
235
236        let (mut parts, resp_body) = match resp {
237            Ok(v) => v.into_http().into_parts(),
238            Err(err) => return err.to_http(),
239        };
240        let resp_body = encode_server(
241            self.codec.encoder(),
242            stream::once(future::ready(resp_body)).map(Ok).into_stream(),
243            accept_encoding,
244        );
245
246        parts.headers.insert(
247            http::header::CONTENT_TYPE,
248            http::HeaderValue::from_static("application/grpc"),
249        );
250        if let Some(encoding) = accept_encoding {
251            parts
252                .headers
253                .insert(GRPC_ENCODING, encoding.into_header_value());
254        }
255        parts.status = http::StatusCode::OK;
256        http::Response::from_parts(parts, BoxBody::new(resp_body))
257    }
258
259    fn get_encoding_from_req(
260        &self,
261        header: &http::HeaderMap,
262    ) -> Result<Option<CompressionEncoding>, crate::status::Status> {
263        let encoding = match header.get(GRPC_ENCODING) {
264            Some(val) => val.to_str().unwrap(),
265            None => return Ok(None),
266        };
267
268        let compression = match COMPRESSIONS.get(encoding) {
269            Some(val) => val.to_owned(),
270            None => {
271                let status = crate::status::Status::new(
272                    crate::status::Code::Unimplemented,
273                    format!("grpc-accept-encoding: {} not support!", encoding),
274                );
275
276                return Err(status);
277            }
278        };
279        Ok(compression)
280    }
281}
282
283impl<T> BusinessConfig for TripleServer<T> {
284    fn init() -> Self {
285        todo!()
286    }
287
288    fn load() -> Result<(), std::convert::Infallible> {
289        todo!()
290    }
291}