use crate::{packet::*, RpcContext, Session};
use anyhow::Context;
use downcast::AnySync;
use lockfree::map::Map;
use serde::{Deserialize, Serialize};
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
pub trait Service: Send + Sync + AnySync {
fn handle<'a>(&'a self, _ss: &'a Arc<Session>, _pack: Packet) -> ServiceResult<'a>;
}
downcast::impl_downcast_sync!(dyn Service);
#[cfg(not(target_arch = "wasm32"))]
pub use core::marker::Send as NeedSend;
#[cfg(target_arch = "wasm32")]
pub trait NeedSend {}
#[cfg(target_arch = "wasm32")]
impl<T> NeedSend for T {}
#[doc(hidden)]
#[cfg(not(target_arch = "wasm32"))]
pub type DynFuture<'a> = dyn Future<Output = anyhow::Result<()>> + NeedSend + 'a;
#[doc(hidden)]
#[cfg(target_arch = "wasm32")]
pub type DynFuture<'a> = dyn Future<Output = anyhow::Result<()>> + 'a;
pub type ServiceResult<'a> = Pin<Box<DynFuture<'a>>>;
pub type RpcMethod<T> =
(dyn Fn(&'_ T, Arc<RpcContext>) -> anyhow::Result<ServiceResult<'_>> + Sync + Send);
pub struct MappedService<T = ()> {
map: Map<Method, Arc<RpcMethod<T>>>,
method: AtomicU32,
pub handler: Option<Box<RpcMethod<T>>>,
pub this: T,
}
impl<T> MappedService<T> {
pub fn new(this: T) -> Self {
Self {
map: Default::default(),
method: Default::default(),
handler: Default::default(),
this,
}
}
}
impl<T: Default> Default for MappedService<T> {
fn default() -> Self {
Self::new(T::default())
}
}
impl<T: Send + Sync + 'static> Service for MappedService<T> {
fn handle<'a>(&'a self, ss: &'a Arc<Session>, packet: Packet) -> ServiceResult<'a> {
let context = Arc::new(RpcContext {
session: ss.clone(),
packet,
responsed: false.into(),
});
let method = unify_integer(context.packet.method().unwrap().clone());
let handler = self.map.get(&method).map(|x| x.1.clone());
Box::pin(async move {
let handler = handler
.as_ref()
.map(Arc::as_ref)
.or_else(|| self.handler.as_ref().map(Box::as_ref))
.with_context(|| format!("{method:?} not found"))?;
context
.response(handler(&self.this, context.clone())?.await)
.await
})
}
}
fn unify_integer(method: Method) -> Method {
match method {
Method::U8(n) => Method::U32(n as u32),
Method::U16(n) => Method::U32(n as u32),
Method::U32(n) => Method::U32(n as u32),
Method::U64(n) => Method::U32(n as u32),
Method::I8(n) => Method::U32(n as u32),
Method::I16(n) => Method::U32(n as u32),
Method::I32(n) => Method::U32(n as u32),
Method::I64(n) => Method::U32(n as u32),
_ => method,
}
}
impl<T: Send + Sync + 'static> MappedService<T> {
#[inline(always)]
pub fn add<'a, M: Serialize, A: 'a, R: 'a>(&self, m: M, method: impl SerdeMethod<'a, A, R, T>) {
self.map.insert(
unify_integer(serde_value::to_value(m).unwrap()),
method.into_boxed(),
);
}
pub fn next_method(&self) -> u32 {
loop {
let next = self.method.fetch_add(1, Ordering::SeqCst) + 1;
let notcontain = self
.map
.get(&serde_value::to_value(next).unwrap())
.is_none();
if notcontain {
self.add(next, || async {});
return next;
}
}
}
}
#[doc(hidden)]
type MappedMethod<T> = Arc<RpcMethod<T>>;
pub trait SerdeMethod<'a, ARGS: 'a, RET: 'a, T = ()> {
fn into_boxed(self) -> MappedMethod<T>;
}
impl<'a, T> SerdeMethod<'a, (), (), T> for Arc<RpcMethod<T>> {
fn into_boxed(self) -> MappedMethod<T> {
self
}
}
pub trait MethodReturn<C>: Send {
type R: Serialize + Send + Sync;
type E: core::fmt::Debug + Send + Sync;
fn into_return(self) -> Result<Self::R, Self::E>;
}
impl<R> MethodReturn<()> for R
where
R: Serialize + Send + Sync,
{
type R = R;
type E = ();
#[inline(always)]
fn into_return(self) -> Result<Self::R, Self::E> {
Ok(self)
}
}
impl<R, E> MethodReturn<(R, E)> for Result<R, E>
where
R: Serialize + Send + Sync,
E: core::fmt::Debug + Send + Sync,
{
type R = R;
type E = E;
#[inline(always)]
fn into_return(self) -> Result<Self::R, Self::E> {
self.map_err(Into::into)
}
}
macro_rules! impl_method {
(@boxed async ($(this = $this:ident,)? $(ctx = $ctx:ident,)?) $($arg:ident: $ty:ident),*) => {
#[inline(always)]
fn into_boxed(self) -> MappedMethod<T> {
Arc::new(move |_this, ctx| {
let _ctx: &'t Arc<$crate::RpcContext> = unsafe { core::mem::transmute(&ctx) };
let ($($arg),*): ($($ty),*) = rmp_serde::from_slice(_ctx.packet.data())?;
$(let $this: &'t T = unsafe {core::mem::transmute(_this)};)?
$(let $ctx = ctx.clone();)?
let fut = self($($this,)? $($ctx,)? $($arg,)*);
Ok(Box::pin(async move {
ctx.response(fut.await.into_return()).await
}))
})
}
};
(@boxed ($(this = $this:ident,)? $(ctx = $ctx:ident,)?) $($arg:ident: $ty:ident),*) => {
#[inline(always)]
fn into_boxed(self) -> MappedMethod<T> {
Arc::new(move |_this, ctx| {
let _ctx: &'t Arc<$crate::RpcContext> = unsafe { core::mem::transmute(&ctx) };
let ($($arg),*): ($($ty),*) = rmp_serde::from_slice(_ctx.packet.data())?;
$(let $this: &'t T = unsafe {core::mem::transmute(_this)};)?
$(let $ctx = ctx.clone();)?
let fut = self($($this,)? $($ctx,)? $($arg,)*);
Ok(Box::pin(async move {
ctx.response(fut.into_return()).await
}))
})
}
};
($($arg:ident: $ty:ident),*) => {
#[allow(non_snake_case, unused_parens)]
impl<'t,
T: 't,
METHOD: Fn($($ty),*) -> FUT + Sync + Send + 'static,
FUT: Future<Output = RET> + NeedSend + 't + 'static,
RET: MethodReturn<CON> + 't + 'static,
CON: Send + 't,
$($ty: Deserialize<'t> + 't),*
> SerdeMethod<'t, ($($ty,)*), (FUT, RET, CON), T> for METHOD {
impl_method!(@boxed async () $($arg: $ty),*);
}
#[allow(non_snake_case, unused_parens)]
impl<'t,
T: 't,
METHOD: Fn(&'t T, $($ty),*) -> FUT + Sync + Send + 'static,
FUT: Future<Output = RET> + NeedSend + 't + 'static,
RET: MethodReturn<CON> + 't + 'static,
CON: Send + 't,
$($ty: Deserialize<'t> + 't),*
> SerdeMethod<'t, ($($ty,)*), (FUT, RET, CON, T), T> for METHOD {
impl_method!(@boxed async (this = t,) $($arg: $ty),*);
}
#[allow(non_snake_case, unused_parens)]
impl<'t,
T: 't,
METHOD: Fn(Arc<RpcContext>, $($ty),*) -> FUT + Sync + Send + 'static,
FUT: Future<Output = RET> + NeedSend + 't + 'static,
RET: MethodReturn<CON> + 't + 'static,
CON: Send + 't,
$($ty: Deserialize<'t> + Send + 't),*
> SerdeMethod<'t, ($($ty,)*), (RpcContext, FUT, RET, CON), T> for METHOD {
impl_method!(@boxed async (ctx = ctx,) $($arg: $ty),*);
}
#[allow(non_snake_case, unused_parens)]
impl<'t,
T: 't,
METHOD: Fn(&'t T, Arc<RpcContext>, $($ty),*) -> FUT + Sync + Send + 'static,
FUT: Future<Output = RET> + NeedSend + 't + 'static,
RET: MethodReturn<CON> + 't + 'static,
CON: Send + 't,
$($ty: Deserialize<'t> + Send + 't),*
> SerdeMethod<'t, ($($ty,)*), (RpcContext, FUT, RET, CON, T), T> for METHOD {
impl_method!(@boxed async (this = t, ctx = ctx,) $($arg: $ty),*);
}
#[allow(non_snake_case, unused_parens)]
impl<'t,
T: 't,
METHOD: Fn($($ty),*) -> RET + Sync + Send + 'static,
RET,
CON: Send + 't,
$($ty),*
> SerdeMethod<'t, ($($ty,)*), [(RET, CON); 0], T> for METHOD where
RET: MethodReturn<CON> + 't + 'static, $( $ty: Deserialize<'t> + 't),* {
impl_method!(@boxed () $($arg: $ty),*);
}
#[allow(non_snake_case, unused_parens)]
impl<'t,
T: 't,
METHOD: Fn(&'t T, $($ty),*) -> RET + Sync + Send + 'static,
RET,
CON: Send + 't,
$($ty),*
> SerdeMethod<'t, ($($ty,)*), [(RET, CON, T); 0], T> for METHOD where
RET: MethodReturn<CON> + 't + 'static, $( $ty: Deserialize<'t> + 't),* {
impl_method!(@boxed (this = t,) $($arg: $ty),*);
}
#[allow(non_snake_case, unused_parens)]
impl<'t,
T: 't,
METHOD: Fn(Arc<RpcContext>, $($ty),*) -> RET + Sync + Send + 'static,
RET,
CON: Send + 't,
$($ty),*
> SerdeMethod<'t, ($($ty,)*), [(RpcContext, RET, CON); 1], T> for METHOD where
RET: MethodReturn<CON> + 't + 'static, $( $ty: Deserialize<'t> + 't),* {
impl_method!(@boxed (ctx = ctx,) $($arg: $ty),*);
}
#[allow(non_snake_case, unused_parens)]
impl<'t,
T: 't,
METHOD: Fn(&'t T, Arc<RpcContext>, $($ty),*) -> RET + Sync + Send + 'static,
RET,
CON: Send + 't,
$($ty),*
> SerdeMethod<'t, ($($ty,)*), [(RpcContext, RET, CON, T); 0], T> for METHOD where
RET: MethodReturn<CON> + 't + 'static, $( $ty: Deserialize<'t> + 't),* {
impl_method!(@boxed (this = t, ctx = ctx,) $($arg: $ty),*);
}
};
}
impl_method!();
impl_method!(a: A);
impl_method!(a: A, b: B);
impl_method!(a: A, b: B, c: C);
impl_method!(a: A, b: B, c: C, d: D);
impl_method!(a: A, b: B, c: C, d: D, e: E);
impl_method!(a: A, b: B, c: C, d: D, e: E, f: F);
impl_method!(a: A, b: B, c: C, d: D, e: E, f: F, g: G);
impl_method!(a: A, b: B, c: C, d: D, e: E, f: F, g: G, h: H);
impl_method!(a: A, b: B, c: C, d: D, e: E, f: F, g: G, h: H, j: J);
impl_method!(a: A, b: B, c: C, d: D, e: E, f: F, g: G, h: H, j: J, k: K);