use super::RpcSvrReq;
use super::task::*;
use crate::Codec;
use std::marker::PhantomData;
use std::sync::Arc;
pub trait Dispatch: Send + Sync + Sized + Clone + 'static {
type RespTask: ServerTaskResp;
type Codec: Codec;
fn dispatch_req<'a>(
&'a self, codec: &Arc<Self::Codec>, req: RpcSvrReq<'a>, noti: RespNoti<Self::RespTask>,
) -> impl Future<Output = Result<(), ()>> + Send;
}
pub struct DispatchClosure<C, T, R, H, F>
where
C: Codec,
T: ServerTaskDecode<R>,
R: ServerTaskResp,
H: FnOnce(T) -> F + Send + Sync + 'static + Clone,
F: Future<Output = Result<(), ()>> + Send + 'static,
{
task_handle: H,
_phan: PhantomData<fn(&R, &T, &C)>,
}
impl<C, T, R, H, F> DispatchClosure<C, T, R, H, F>
where
C: Codec,
T: ServerTaskDecode<R>,
R: ServerTaskResp,
H: FnOnce(T) -> F + Send + Sync + 'static + Clone,
F: Future<Output = Result<(), ()>> + Send + 'static,
{
#[inline]
pub fn new(task_handle: H) -> Self {
Self { task_handle, _phan: Default::default() }
}
}
impl<C, T, R, H, F> Clone for DispatchClosure<C, T, R, H, F>
where
C: Codec,
T: ServerTaskDecode<R>,
R: ServerTaskResp,
H: FnOnce(T) -> F + Send + Sync + 'static + Clone,
F: Future<Output = Result<(), ()>> + Send + 'static,
{
#[inline]
fn clone(&self) -> Self {
Self::new(self.task_handle.clone())
}
}
impl<C, T, R, H, F> Dispatch for DispatchClosure<C, T, R, H, F>
where
C: Codec,
T: ServerTaskDecode<R>,
R: ServerTaskResp,
H: FnOnce(T) -> F + Send + Sync + 'static + Clone,
F: Future<Output = Result<(), ()>> + Send + 'static,
{
type Codec = C;
type RespTask = R;
#[inline]
async fn dispatch_req<'a>(
&'a self, codec: &Arc<Self::Codec>, req: RpcSvrReq<'a>, noti: RespNoti<R>,
) -> Result<(), ()> {
match <T as ServerTaskDecode<R>>::decode_req(
codec.as_ref(),
req.action,
req.seq,
req.msg,
req.blob,
noti,
) {
Err(_) => {
error!("action {:?} seq={} decode err", req.action, req.seq);
return Err(());
}
Ok(task) => {
let handle = self.task_handle.clone();
if (handle)(task).await.is_err() {
error!("action {:?} seq={} dispatch err", req.action, req.seq);
return Err(());
}
Ok(())
}
}
}
}