grpc/server/
method.rs

1use std::sync::Arc;
2
3use crate::common::sink::SinkCommon;
4
5use crate::method::GrpcStreamingClientStreaming;
6use crate::method::GrpcStreamingFlavor;
7use crate::method::GrpcStreamingServerStreaming;
8use crate::method::GrpcStreamingUnary;
9use crate::method::MethodDescriptor;
10use crate::method::{GrpcStreaming, GrpcStreamingBidi};
11use crate::or_static::arc::ArcOrStatic;
12use crate::or_static::string::StringOrStatic;
13use crate::result;
14use crate::server::ctx::ServerHandlerContext;
15use crate::server::req_handler::ServerRequest;
16use crate::server::req_handler::ServerRequestUnaryHandler;
17use crate::server::req_handler::ServerRequestUntyped;
18use crate::server::req_single::ServerRequestSingle;
19use crate::server::resp_sink::ServerResponseSink;
20use crate::server::resp_sink_untyped::ServerResponseUntypedSink;
21use crate::ServerResponseUnarySink;
22use std::marker;
23
24pub trait MethodHandler<Req, Resp>
25where
26    Req: Send + 'static,
27    Resp: Send + 'static,
28{
29    fn handle(
30        &self,
31        context: ServerHandlerContext,
32        req: ServerRequest<Req>,
33        resp: ServerResponseSink<Resp>,
34    ) -> result::Result<()>;
35}
36
37pub struct MethodHandlerUnary<F> {
38    f: Arc<F>,
39}
40
41pub struct MethodHandlerServerStreaming<F> {
42    f: Arc<F>,
43}
44
45pub struct MethodHandlerClientStreaming<F> {
46    f: Arc<F>,
47}
48
49pub struct MethodHandlerBidi<F> {
50    f: Arc<F>,
51}
52
53impl<F> GrpcStreamingFlavor for MethodHandlerUnary<F> {
54    type Flavor = GrpcStreamingUnary;
55
56    fn streaming() -> GrpcStreaming {
57        GrpcStreaming::Unary
58    }
59}
60
61impl<F> GrpcStreamingFlavor for MethodHandlerClientStreaming<F> {
62    type Flavor = GrpcStreamingClientStreaming;
63
64    fn streaming() -> GrpcStreaming {
65        GrpcStreaming::ClientStreaming
66    }
67}
68
69impl<F> GrpcStreamingFlavor for MethodHandlerServerStreaming<F> {
70    type Flavor = GrpcStreamingServerStreaming;
71
72    fn streaming() -> GrpcStreaming {
73        GrpcStreaming::ServerStreaming
74    }
75}
76
77impl<F> GrpcStreamingFlavor for MethodHandlerBidi<F> {
78    type Flavor = GrpcStreamingBidi;
79
80    fn streaming() -> GrpcStreaming {
81        GrpcStreaming::Bidi
82    }
83}
84
85impl<F> MethodHandlerUnary<F> {
86    pub fn new<Req, Resp>(f: F) -> Self
87    where
88        Req: Send + 'static,
89        Resp: Send + 'static,
90        F: Fn(
91                ServerHandlerContext,
92                ServerRequestSingle<Req>,
93                ServerResponseUnarySink<Resp>,
94            ) -> result::Result<()>
95            + Send
96            + 'static,
97    {
98        MethodHandlerUnary { f: Arc::new(f) }
99    }
100}
101
102impl<F> MethodHandlerClientStreaming<F> {
103    pub fn new<Req, Resp>(f: F) -> Self
104    where
105        Req: Send + 'static,
106        Resp: Send + 'static,
107        F: Fn(
108                ServerHandlerContext,
109                ServerRequest<Req>,
110                ServerResponseUnarySink<Resp>,
111            ) -> result::Result<()>
112            + Send
113            + 'static,
114    {
115        MethodHandlerClientStreaming { f: Arc::new(f) }
116    }
117}
118
119impl<F> MethodHandlerServerStreaming<F> {
120    pub fn new<Req, Resp>(f: F) -> Self
121    where
122        Req: Send + 'static,
123        Resp: Send + 'static,
124        F: Fn(
125                ServerHandlerContext,
126                ServerRequestSingle<Req>,
127                ServerResponseSink<Resp>,
128            ) -> result::Result<()>
129            + Send
130            + 'static,
131    {
132        MethodHandlerServerStreaming { f: Arc::new(f) }
133    }
134}
135
136impl<F> MethodHandlerBidi<F> {
137    pub fn new<Req, Resp>(f: F) -> Self
138    where
139        Req: Send + 'static,
140        Resp: Send + 'static,
141        F: Fn(
142                ServerHandlerContext,
143                ServerRequest<Req>,
144                ServerResponseSink<Resp>,
145            ) -> result::Result<()>
146            + Send
147            + 'static,
148    {
149        MethodHandlerBidi { f: Arc::new(f) }
150    }
151}
152
153impl<Req, Resp, F> MethodHandler<Req, Resp> for MethodHandlerUnary<F>
154where
155    Req: Send + 'static,
156    Resp: Send + 'static,
157    F: Fn(
158            ServerHandlerContext,
159            ServerRequestSingle<Req>,
160            ServerResponseUnarySink<Resp>,
161        ) -> result::Result<()>
162        + Send
163        + Sync
164        + 'static,
165{
166    fn handle(
167        &self,
168        ctx: ServerHandlerContext,
169        req: ServerRequest<Req>,
170        resp: ServerResponseSink<Resp>,
171    ) -> result::Result<()> {
172        struct HandlerImpl<F, Req, Resp>
173        where
174            Req: Send + 'static,
175            Resp: Send + 'static,
176            F: Fn(
177                    ServerHandlerContext,
178                    ServerRequestSingle<Req>,
179                    ServerResponseUnarySink<Resp>,
180                ) -> result::Result<()>
181                + Send
182                + Sync
183                + 'static,
184        {
185            ctx: ServerHandlerContext,
186            f: Arc<F>,
187            resp: ServerResponseSink<Resp>,
188            _marker: marker::PhantomData<Req>,
189        }
190
191        impl<F, Req, Resp> ServerRequestUnaryHandler<Req> for Option<HandlerImpl<F, Req, Resp>>
192        where
193            Req: Send + 'static,
194            Resp: Send + 'static,
195            F: Fn(
196                    ServerHandlerContext,
197                    ServerRequestSingle<Req>,
198                    ServerResponseUnarySink<Resp>,
199                ) -> result::Result<()>
200                + Send
201                + Sync
202                + 'static,
203        {
204            fn grpc_message(&mut self, message: Req) -> result::Result<()> {
205                let HandlerImpl {
206                    ctx,
207                    f,
208                    resp,
209                    _marker,
210                } = self.take().unwrap();
211                let metadata = ctx.metadata.clone();
212                let req = ServerRequestSingle { metadata, message };
213                let resp = ServerResponseUnarySink { sink: resp };
214                f(ctx, req, resp)
215            }
216        }
217
218        req.register_unary_handler(Some(HandlerImpl {
219            ctx,
220            f: self.f.clone(),
221            resp,
222            _marker: marker::PhantomData,
223        }));
224
225        Ok(())
226    }
227}
228
229impl<Req: Send + 'static, Resp: Send + 'static, F> MethodHandler<Req, Resp>
230    for MethodHandlerClientStreaming<F>
231where
232    Resp: Send + 'static,
233    F: Fn(
234            ServerHandlerContext,
235            ServerRequest<Req>,
236            ServerResponseUnarySink<Resp>,
237        ) -> result::Result<()>
238        + Send
239        + Sync
240        + 'static,
241{
242    fn handle(
243        &self,
244        ctx: ServerHandlerContext,
245        req: ServerRequest<Req>,
246        resp: ServerResponseSink<Resp>,
247    ) -> result::Result<()> {
248        let resp = ServerResponseUnarySink { sink: resp };
249        (self.f)(ctx, req, resp)
250    }
251}
252
253impl<Req, Resp, F> MethodHandler<Req, Resp> for MethodHandlerServerStreaming<F>
254where
255    Req: Send + 'static,
256    Resp: Send + 'static,
257    F: Fn(
258            ServerHandlerContext,
259            ServerRequestSingle<Req>,
260            ServerResponseSink<Resp>,
261        ) -> result::Result<()>
262        + Send
263        + Sync
264        + 'static,
265{
266    fn handle(
267        &self,
268        ctx: ServerHandlerContext,
269        req: ServerRequest<Req>,
270        resp: ServerResponseSink<Resp>,
271    ) -> result::Result<()> {
272        struct HandlerImpl<F, Req, Resp>
273        where
274            Req: Send + 'static,
275            Resp: Send + 'static,
276            F: Fn(
277                    ServerHandlerContext,
278                    ServerRequestSingle<Req>,
279                    ServerResponseSink<Resp>,
280                ) -> result::Result<()>
281                + Send
282                + Sync
283                + 'static,
284        {
285            ctx: ServerHandlerContext,
286            f: Arc<F>,
287            resp: ServerResponseSink<Resp>,
288            _marker: marker::PhantomData<Req>,
289        }
290
291        impl<F, Req, Resp> ServerRequestUnaryHandler<Req> for Option<HandlerImpl<F, Req, Resp>>
292        where
293            Req: Send + 'static,
294            Resp: Send + 'static,
295            F: Fn(
296                    ServerHandlerContext,
297                    ServerRequestSingle<Req>,
298                    ServerResponseSink<Resp>,
299                ) -> result::Result<()>
300                + Send
301                + Sync
302                + 'static,
303        {
304            fn grpc_message(&mut self, req: Req) -> result::Result<()> {
305                let HandlerImpl {
306                    ctx,
307                    f,
308                    resp,
309                    _marker,
310                } = self.take().unwrap();
311                let metadata = ctx.metadata.clone();
312                let req = ServerRequestSingle {
313                    metadata,
314                    message: req,
315                };
316                f(ctx, req, resp)
317            }
318        }
319
320        req.register_unary_handler(Some(HandlerImpl {
321            ctx,
322            f: self.f.clone(),
323            resp,
324            _marker: marker::PhantomData,
325        }));
326
327        Ok(())
328    }
329}
330
331impl<Req, Resp, F> MethodHandler<Req, Resp> for MethodHandlerBidi<F>
332where
333    Req: Send + 'static,
334    Resp: Send + 'static,
335    F: Fn(ServerHandlerContext, ServerRequest<Req>, ServerResponseSink<Resp>) -> result::Result<()>
336        + Send
337        + Sync
338        + 'static,
339{
340    fn handle(
341        &self,
342        ctx: ServerHandlerContext,
343        req: ServerRequest<Req>,
344        resp: ServerResponseSink<Resp>,
345    ) -> result::Result<()> {
346        (self.f)(ctx, req, resp)
347    }
348}
349
350pub(crate) trait MethodHandlerDispatchUntyped {
351    fn start_request(
352        &self,
353        ctx: ServerHandlerContext,
354        req: ServerRequestUntyped,
355        resp: ServerResponseUntypedSink,
356    ) -> result::Result<()>;
357}
358
359struct MethodHandlerDispatchImpl<Req: 'static, Resp: 'static> {
360    desc: ArcOrStatic<MethodDescriptor<Req, Resp>>,
361    method_handler: Box<dyn MethodHandler<Req, Resp> + Sync + Send>,
362}
363
364impl<Req, Resp> MethodHandlerDispatchUntyped for MethodHandlerDispatchImpl<Req, Resp>
365where
366    Req: Send + 'static,
367    Resp: Send + 'static,
368{
369    fn start_request(
370        &self,
371        ctx: ServerHandlerContext,
372        req: ServerRequestUntyped,
373        resp: ServerResponseUntypedSink,
374    ) -> result::Result<()> {
375        let req = ServerRequest {
376            req,
377            marshaller: self.desc.req_marshaller.clone(),
378        };
379
380        let resp = ServerResponseSink {
381            common: SinkCommon {
382                marshaller: self.desc.resp_marshaller.clone(),
383                sink: resp,
384            },
385        };
386
387        // TODO: catch unwind for better diag
388        self.method_handler.handle(ctx, req, resp)
389        //        let resp = catch_unwind(AssertUnwindSafe(|| {
390        //
391        //        }));
392        //        match resp {
393        //            Ok(resp) => {
394        //                let desc_copy = self.desc.clone();
395        //                resp.and_then_items(move |resp| desc_copy.resp_marshaller.write(&resp))
396        //            }
397        //            Err(e) => {
398        //                let message = any_to_string(e);
399        //                StreamingResponse::err(Error::Panic(message))
400        //            }
401        //        }
402    }
403}
404
405pub struct ServerMethod {
406    pub(crate) name: StringOrStatic,
407    pub(crate) dispatch: Box<dyn MethodHandlerDispatchUntyped + Sync + Send>,
408}
409
410impl ServerMethod {
411    pub fn new<Req, Resp, H>(
412        method: ArcOrStatic<MethodDescriptor<Req, Resp>>,
413        handler: H,
414    ) -> ServerMethod
415    where
416        Req: Send + 'static,
417        Resp: Send + 'static,
418        H: MethodHandler<Req, Resp> + 'static + Sync + Send,
419    {
420        ServerMethod {
421            name: method.name.clone(),
422            dispatch: Box::new(MethodHandlerDispatchImpl {
423                desc: method,
424                method_handler: Box::new(handler),
425            }),
426        }
427    }
428}