1use std::sync::Arc;
2
3use crate::common::sink::SinkCommon;
4
5use crate::method::GrpcStreamingClientStreaming;
6use crate::method::GrpcStreamingFlavor;
7use crate::method::GrpcStreamingServerStreaming;
8use crate::method::GrpcStreamingUnary;
9use crate::method::MethodDescriptor;
10use crate::method::{GrpcStreaming, GrpcStreamingBidi};
11use crate::or_static::arc::ArcOrStatic;
12use crate::or_static::string::StringOrStatic;
13use crate::result;
14use crate::server::ctx::ServerHandlerContext;
15use crate::server::req_handler::ServerRequest;
16use crate::server::req_handler::ServerRequestUnaryHandler;
17use crate::server::req_handler::ServerRequestUntyped;
18use crate::server::req_single::ServerRequestSingle;
19use crate::server::resp_sink::ServerResponseSink;
20use crate::server::resp_sink_untyped::ServerResponseUntypedSink;
21use crate::ServerResponseUnarySink;
22use std::marker;
23
24pub trait MethodHandler<Req, Resp>
25where
26 Req: Send + 'static,
27 Resp: Send + 'static,
28{
29 fn handle(
30 &self,
31 context: ServerHandlerContext,
32 req: ServerRequest<Req>,
33 resp: ServerResponseSink<Resp>,
34 ) -> result::Result<()>;
35}
36
37pub struct MethodHandlerUnary<F> {
38 f: Arc<F>,
39}
40
41pub struct MethodHandlerServerStreaming<F> {
42 f: Arc<F>,
43}
44
45pub struct MethodHandlerClientStreaming<F> {
46 f: Arc<F>,
47}
48
49pub struct MethodHandlerBidi<F> {
50 f: Arc<F>,
51}
52
53impl<F> GrpcStreamingFlavor for MethodHandlerUnary<F> {
54 type Flavor = GrpcStreamingUnary;
55
56 fn streaming() -> GrpcStreaming {
57 GrpcStreaming::Unary
58 }
59}
60
61impl<F> GrpcStreamingFlavor for MethodHandlerClientStreaming<F> {
62 type Flavor = GrpcStreamingClientStreaming;
63
64 fn streaming() -> GrpcStreaming {
65 GrpcStreaming::ClientStreaming
66 }
67}
68
69impl<F> GrpcStreamingFlavor for MethodHandlerServerStreaming<F> {
70 type Flavor = GrpcStreamingServerStreaming;
71
72 fn streaming() -> GrpcStreaming {
73 GrpcStreaming::ServerStreaming
74 }
75}
76
77impl<F> GrpcStreamingFlavor for MethodHandlerBidi<F> {
78 type Flavor = GrpcStreamingBidi;
79
80 fn streaming() -> GrpcStreaming {
81 GrpcStreaming::Bidi
82 }
83}
84
85impl<F> MethodHandlerUnary<F> {
86 pub fn new<Req, Resp>(f: F) -> Self
87 where
88 Req: Send + 'static,
89 Resp: Send + 'static,
90 F: Fn(
91 ServerHandlerContext,
92 ServerRequestSingle<Req>,
93 ServerResponseUnarySink<Resp>,
94 ) -> result::Result<()>
95 + Send
96 + 'static,
97 {
98 MethodHandlerUnary { f: Arc::new(f) }
99 }
100}
101
102impl<F> MethodHandlerClientStreaming<F> {
103 pub fn new<Req, Resp>(f: F) -> Self
104 where
105 Req: Send + 'static,
106 Resp: Send + 'static,
107 F: Fn(
108 ServerHandlerContext,
109 ServerRequest<Req>,
110 ServerResponseUnarySink<Resp>,
111 ) -> result::Result<()>
112 + Send
113 + 'static,
114 {
115 MethodHandlerClientStreaming { f: Arc::new(f) }
116 }
117}
118
119impl<F> MethodHandlerServerStreaming<F> {
120 pub fn new<Req, Resp>(f: F) -> Self
121 where
122 Req: Send + 'static,
123 Resp: Send + 'static,
124 F: Fn(
125 ServerHandlerContext,
126 ServerRequestSingle<Req>,
127 ServerResponseSink<Resp>,
128 ) -> result::Result<()>
129 + Send
130 + 'static,
131 {
132 MethodHandlerServerStreaming { f: Arc::new(f) }
133 }
134}
135
136impl<F> MethodHandlerBidi<F> {
137 pub fn new<Req, Resp>(f: F) -> Self
138 where
139 Req: Send + 'static,
140 Resp: Send + 'static,
141 F: Fn(
142 ServerHandlerContext,
143 ServerRequest<Req>,
144 ServerResponseSink<Resp>,
145 ) -> result::Result<()>
146 + Send
147 + 'static,
148 {
149 MethodHandlerBidi { f: Arc::new(f) }
150 }
151}
152
153impl<Req, Resp, F> MethodHandler<Req, Resp> for MethodHandlerUnary<F>
154where
155 Req: Send + 'static,
156 Resp: Send + 'static,
157 F: Fn(
158 ServerHandlerContext,
159 ServerRequestSingle<Req>,
160 ServerResponseUnarySink<Resp>,
161 ) -> result::Result<()>
162 + Send
163 + Sync
164 + 'static,
165{
166 fn handle(
167 &self,
168 ctx: ServerHandlerContext,
169 req: ServerRequest<Req>,
170 resp: ServerResponseSink<Resp>,
171 ) -> result::Result<()> {
172 struct HandlerImpl<F, Req, Resp>
173 where
174 Req: Send + 'static,
175 Resp: Send + 'static,
176 F: Fn(
177 ServerHandlerContext,
178 ServerRequestSingle<Req>,
179 ServerResponseUnarySink<Resp>,
180 ) -> result::Result<()>
181 + Send
182 + Sync
183 + 'static,
184 {
185 ctx: ServerHandlerContext,
186 f: Arc<F>,
187 resp: ServerResponseSink<Resp>,
188 _marker: marker::PhantomData<Req>,
189 }
190
191 impl<F, Req, Resp> ServerRequestUnaryHandler<Req> for Option<HandlerImpl<F, Req, Resp>>
192 where
193 Req: Send + 'static,
194 Resp: Send + 'static,
195 F: Fn(
196 ServerHandlerContext,
197 ServerRequestSingle<Req>,
198 ServerResponseUnarySink<Resp>,
199 ) -> result::Result<()>
200 + Send
201 + Sync
202 + 'static,
203 {
204 fn grpc_message(&mut self, message: Req) -> result::Result<()> {
205 let HandlerImpl {
206 ctx,
207 f,
208 resp,
209 _marker,
210 } = self.take().unwrap();
211 let metadata = ctx.metadata.clone();
212 let req = ServerRequestSingle { metadata, message };
213 let resp = ServerResponseUnarySink { sink: resp };
214 f(ctx, req, resp)
215 }
216 }
217
218 req.register_unary_handler(Some(HandlerImpl {
219 ctx,
220 f: self.f.clone(),
221 resp,
222 _marker: marker::PhantomData,
223 }));
224
225 Ok(())
226 }
227}
228
229impl<Req: Send + 'static, Resp: Send + 'static, F> MethodHandler<Req, Resp>
230 for MethodHandlerClientStreaming<F>
231where
232 Resp: Send + 'static,
233 F: Fn(
234 ServerHandlerContext,
235 ServerRequest<Req>,
236 ServerResponseUnarySink<Resp>,
237 ) -> result::Result<()>
238 + Send
239 + Sync
240 + 'static,
241{
242 fn handle(
243 &self,
244 ctx: ServerHandlerContext,
245 req: ServerRequest<Req>,
246 resp: ServerResponseSink<Resp>,
247 ) -> result::Result<()> {
248 let resp = ServerResponseUnarySink { sink: resp };
249 (self.f)(ctx, req, resp)
250 }
251}
252
253impl<Req, Resp, F> MethodHandler<Req, Resp> for MethodHandlerServerStreaming<F>
254where
255 Req: Send + 'static,
256 Resp: Send + 'static,
257 F: Fn(
258 ServerHandlerContext,
259 ServerRequestSingle<Req>,
260 ServerResponseSink<Resp>,
261 ) -> result::Result<()>
262 + Send
263 + Sync
264 + 'static,
265{
266 fn handle(
267 &self,
268 ctx: ServerHandlerContext,
269 req: ServerRequest<Req>,
270 resp: ServerResponseSink<Resp>,
271 ) -> result::Result<()> {
272 struct HandlerImpl<F, Req, Resp>
273 where
274 Req: Send + 'static,
275 Resp: Send + 'static,
276 F: Fn(
277 ServerHandlerContext,
278 ServerRequestSingle<Req>,
279 ServerResponseSink<Resp>,
280 ) -> result::Result<()>
281 + Send
282 + Sync
283 + 'static,
284 {
285 ctx: ServerHandlerContext,
286 f: Arc<F>,
287 resp: ServerResponseSink<Resp>,
288 _marker: marker::PhantomData<Req>,
289 }
290
291 impl<F, Req, Resp> ServerRequestUnaryHandler<Req> for Option<HandlerImpl<F, Req, Resp>>
292 where
293 Req: Send + 'static,
294 Resp: Send + 'static,
295 F: Fn(
296 ServerHandlerContext,
297 ServerRequestSingle<Req>,
298 ServerResponseSink<Resp>,
299 ) -> result::Result<()>
300 + Send
301 + Sync
302 + 'static,
303 {
304 fn grpc_message(&mut self, req: Req) -> result::Result<()> {
305 let HandlerImpl {
306 ctx,
307 f,
308 resp,
309 _marker,
310 } = self.take().unwrap();
311 let metadata = ctx.metadata.clone();
312 let req = ServerRequestSingle {
313 metadata,
314 message: req,
315 };
316 f(ctx, req, resp)
317 }
318 }
319
320 req.register_unary_handler(Some(HandlerImpl {
321 ctx,
322 f: self.f.clone(),
323 resp,
324 _marker: marker::PhantomData,
325 }));
326
327 Ok(())
328 }
329}
330
331impl<Req, Resp, F> MethodHandler<Req, Resp> for MethodHandlerBidi<F>
332where
333 Req: Send + 'static,
334 Resp: Send + 'static,
335 F: Fn(ServerHandlerContext, ServerRequest<Req>, ServerResponseSink<Resp>) -> result::Result<()>
336 + Send
337 + Sync
338 + 'static,
339{
340 fn handle(
341 &self,
342 ctx: ServerHandlerContext,
343 req: ServerRequest<Req>,
344 resp: ServerResponseSink<Resp>,
345 ) -> result::Result<()> {
346 (self.f)(ctx, req, resp)
347 }
348}
349
350pub(crate) trait MethodHandlerDispatchUntyped {
351 fn start_request(
352 &self,
353 ctx: ServerHandlerContext,
354 req: ServerRequestUntyped,
355 resp: ServerResponseUntypedSink,
356 ) -> result::Result<()>;
357}
358
359struct MethodHandlerDispatchImpl<Req: 'static, Resp: 'static> {
360 desc: ArcOrStatic<MethodDescriptor<Req, Resp>>,
361 method_handler: Box<dyn MethodHandler<Req, Resp> + Sync + Send>,
362}
363
364impl<Req, Resp> MethodHandlerDispatchUntyped for MethodHandlerDispatchImpl<Req, Resp>
365where
366 Req: Send + 'static,
367 Resp: Send + 'static,
368{
369 fn start_request(
370 &self,
371 ctx: ServerHandlerContext,
372 req: ServerRequestUntyped,
373 resp: ServerResponseUntypedSink,
374 ) -> result::Result<()> {
375 let req = ServerRequest {
376 req,
377 marshaller: self.desc.req_marshaller.clone(),
378 };
379
380 let resp = ServerResponseSink {
381 common: SinkCommon {
382 marshaller: self.desc.resp_marshaller.clone(),
383 sink: resp,
384 },
385 };
386
387 self.method_handler.handle(ctx, req, resp)
389 }
403}
404
405pub struct ServerMethod {
406 pub(crate) name: StringOrStatic,
407 pub(crate) dispatch: Box<dyn MethodHandlerDispatchUntyped + Sync + Send>,
408}
409
410impl ServerMethod {
411 pub fn new<Req, Resp, H>(
412 method: ArcOrStatic<MethodDescriptor<Req, Resp>>,
413 handler: H,
414 ) -> ServerMethod
415 where
416 Req: Send + 'static,
417 Resp: Send + 'static,
418 H: MethodHandler<Req, Resp> + 'static + Sync + Send,
419 {
420 ServerMethod {
421 name: method.name.clone(),
422 dispatch: Box::new(MethodHandlerDispatchImpl {
423 desc: method,
424 method_handler: Box::new(handler),
425 }),
426 }
427 }
428}