Skip to main content

razor_stream/server/
dispatch.rs

1use super::RpcSvrReq;
2use super::task::*;
3use crate::Codec;
4use std::marker::PhantomData;
5use std::sync::Arc;
6
7/// Dispatch should be a user-defined struct initialized for every connection, by ServerFacts::new_dispatcher.
8///
9/// Dispatch must have Sync, because the connection reader and writer access concurrently.
10///
11/// A `Codec` should be created and holds inside, shared by the read/write coroutine.
12/// If you have encryption in the Codec, it could have shared states.
13pub trait Dispatch: Send + Sync + Sized + Clone + 'static {
14    type RespTask: ServerTaskResp;
15
16    type Codec: Codec;
17
18    /// Decode and handle the request, called from the connection reader coroutine.
19    ///
20    /// You can dispatch them to a worker pool.
21    /// If you are processing them directly in the connection coroutine, should make sure not
22    /// blocking the thread for long.
23    /// This is an async fn, but you should avoid waiting as much as possible.
24    /// Should return Err(()) when codec decode_req failed.
25    fn dispatch_req<'a>(
26        &'a self, codec: &Arc<Self::Codec>, req: RpcSvrReq<'a>, noti: RespNoti<Self::RespTask>,
27    ) -> impl Future<Output = Result<(), ()>> + Send;
28}
29
30/// A Dispatch trait impl with a closure, only useful for writing tests.
31///
32/// NOTE: The closure requires Clone.
33///
34/// # Example
35///
36/// ```no_compile,ignore
37/// use razor_stream::server::{ServerFacts, Dispatch};
38/// impl ServerFacts for YourServer {
39///
40///     ...
41///
42///     #[inline]
43///     fn new_dispatcher(&self) -> impl Dispatch<Self::RespTask> {
44///         let dispatch_f = move |task: FileServerTask| {
45///             async move {
46///                 todo!();
47///             }
48///         }
49///         return DispatchClosure::<MsgpCodec, YourServerTask, Self::RespTask, _, _>::new(
50///             dispatch_f,
51///         );
52///     }
53/// }
54/// ```
55pub struct DispatchClosure<C, T, R, H, F>
56where
57    C: Codec,
58    T: ServerTaskDecode<R>,
59    R: ServerTaskResp,
60    H: FnOnce(T) -> F + Send + Sync + 'static + Clone,
61    F: Future<Output = Result<(), ()>> + Send + 'static,
62{
63    task_handle: H,
64    _phan: PhantomData<fn(&R, &T, &C)>,
65}
66
67impl<C, T, R, H, F> DispatchClosure<C, T, R, H, F>
68where
69    C: Codec,
70    T: ServerTaskDecode<R>,
71    R: ServerTaskResp,
72    H: FnOnce(T) -> F + Send + Sync + 'static + Clone,
73    F: Future<Output = Result<(), ()>> + Send + 'static,
74{
75    #[inline]
76    pub fn new(task_handle: H) -> Self {
77        Self { task_handle, _phan: Default::default() }
78    }
79}
80
81impl<C, T, R, H, F> Clone for DispatchClosure<C, T, R, H, F>
82where
83    C: Codec,
84    T: ServerTaskDecode<R>,
85    R: ServerTaskResp,
86    H: FnOnce(T) -> F + Send + Sync + 'static + Clone,
87    F: Future<Output = Result<(), ()>> + Send + 'static,
88{
89    #[inline]
90    fn clone(&self) -> Self {
91        Self::new(self.task_handle.clone())
92    }
93}
94impl<C, T, R, H, F> Dispatch for DispatchClosure<C, T, R, H, F>
95where
96    C: Codec,
97    T: ServerTaskDecode<R>,
98    R: ServerTaskResp,
99    H: FnOnce(T) -> F + Send + Sync + 'static + Clone,
100    F: Future<Output = Result<(), ()>> + Send + 'static,
101{
102    type Codec = C;
103
104    type RespTask = R;
105
106    #[inline]
107    async fn dispatch_req<'a>(
108        &'a self, codec: &Arc<Self::Codec>, req: RpcSvrReq<'a>, noti: RespNoti<R>,
109    ) -> Result<(), ()> {
110        match <T as ServerTaskDecode<R>>::decode_req(
111            codec.as_ref(),
112            req.action,
113            req.seq,
114            req.msg,
115            req.blob,
116            noti,
117        ) {
118            Err(_) => {
119                error!("action {:?} seq={} decode err", req.action, req.seq);
120                return Err(());
121            }
122            Ok(task) => {
123                let handle = self.task_handle.clone();
124                if let Err(_) = (handle)(task).await {
125                    error!("action {:?} seq={} dispatch err", req.action, req.seq);
126                    return Err(());
127                }
128                Ok(())
129            }
130        }
131    }
132}