razor_stream/server/
dispatch.rs1use super::RpcSvrReq;
2use super::task::*;
3use crate::Codec;
4use std::marker::PhantomData;
5use std::sync::Arc;
6
7pub trait Dispatch: Send + Sync + Sized + Clone + 'static {
14 type RespTask: ServerTaskResp;
15
16 type Codec: Codec;
17
18 fn dispatch_req<'a>(
26 &'a self, codec: &Arc<Self::Codec>, req: RpcSvrReq<'a>, noti: RespNoti<Self::RespTask>,
27 ) -> impl Future<Output = Result<(), ()>> + Send;
28}
29
30pub struct DispatchClosure<C, T, R, H, F>
56where
57 C: Codec,
58 T: ServerTaskDecode<R>,
59 R: ServerTaskResp,
60 H: FnOnce(T) -> F + Send + Sync + 'static + Clone,
61 F: Future<Output = Result<(), ()>> + Send + 'static,
62{
63 task_handle: H,
64 _phan: PhantomData<fn(&R, &T, &C)>,
65}
66
67impl<C, T, R, H, F> DispatchClosure<C, T, R, H, F>
68where
69 C: Codec,
70 T: ServerTaskDecode<R>,
71 R: ServerTaskResp,
72 H: FnOnce(T) -> F + Send + Sync + 'static + Clone,
73 F: Future<Output = Result<(), ()>> + Send + 'static,
74{
75 #[inline]
76 pub fn new(task_handle: H) -> Self {
77 Self { task_handle, _phan: Default::default() }
78 }
79}
80
81impl<C, T, R, H, F> Clone for DispatchClosure<C, T, R, H, F>
82where
83 C: Codec,
84 T: ServerTaskDecode<R>,
85 R: ServerTaskResp,
86 H: FnOnce(T) -> F + Send + Sync + 'static + Clone,
87 F: Future<Output = Result<(), ()>> + Send + 'static,
88{
89 #[inline]
90 fn clone(&self) -> Self {
91 Self::new(self.task_handle.clone())
92 }
93}
94impl<C, T, R, H, F> Dispatch for DispatchClosure<C, T, R, H, F>
95where
96 C: Codec,
97 T: ServerTaskDecode<R>,
98 R: ServerTaskResp,
99 H: FnOnce(T) -> F + Send + Sync + 'static + Clone,
100 F: Future<Output = Result<(), ()>> + Send + 'static,
101{
102 type Codec = C;
103
104 type RespTask = R;
105
106 #[inline]
107 async fn dispatch_req<'a>(
108 &'a self, codec: &Arc<Self::Codec>, req: RpcSvrReq<'a>, noti: RespNoti<R>,
109 ) -> Result<(), ()> {
110 match <T as ServerTaskDecode<R>>::decode_req(
111 codec.as_ref(),
112 req.action,
113 req.seq,
114 req.msg,
115 req.blob,
116 noti,
117 ) {
118 Err(_) => {
119 error!("action {:?} seq={} decode err", req.action, req.seq);
120 return Err(());
121 }
122 Ok(task) => {
123 let handle = self.task_handle.clone();
124 if let Err(_) = (handle)(task).await {
125 error!("action {:?} seq={} dispatch err", req.action, req.seq);
126 return Err(());
127 }
128 Ok(())
129 }
130 }
131 }
132}