ezrpc 0.1.1

Ergonomic, flexible and Zero-cost RPC framework
Documentation
use crate::{packet::*, RpcContext, Session};
use anyhow::Context;
use downcast::AnySync;
use lockfree::map::Map;
use serde::{Deserialize, Serialize};
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;

/// The service object, for RPC method handling
pub trait Service: Send + Sync + AnySync {
    fn handle<'a>(&'a self, _ss: &'a Arc<Session>, _pack: Packet) -> ServiceResult<'a>;
}

downcast::impl_downcast_sync!(dyn Service);

#[cfg(not(target_arch = "wasm32"))]
pub use core::marker::Send as NeedSend;

#[cfg(target_arch = "wasm32")]
pub trait NeedSend {}

#[cfg(target_arch = "wasm32")]
impl<T> NeedSend for T {}

#[doc(hidden)]
#[cfg(not(target_arch = "wasm32"))]
pub type DynFuture<'a> = dyn Future<Output = anyhow::Result<()>> + NeedSend + 'a;

#[doc(hidden)]
#[cfg(target_arch = "wasm32")]
pub type DynFuture<'a> = dyn Future<Output = anyhow::Result<()>> + 'a;

/// The Future object returned from the service
pub type ServiceResult<'a> = Pin<Box<DynFuture<'a>>>;

/// The most original RPC method
pub type RpcMethod<T> =
    (dyn Fn(&'_ T, Arc<RpcContext>) -> anyhow::Result<ServiceResult<'_>> + Sync + Send);

/// The default and common [`Service`] implementation
pub struct MappedService<T = ()> {
    map: Map<Method, Arc<RpcMethod<T>>>,
    method: AtomicU32,
    /// The final handler, if no registered method can be found
    pub handler: Option<Box<RpcMethod<T>>>,
    /// Associated data for this service
    pub this: T,
}

impl<T> MappedService<T> {
    pub fn new(this: T) -> Self {
        Self {
            map: Default::default(),
            method: Default::default(),
            handler: Default::default(),
            this,
        }
    }
}

impl<T: Default> Default for MappedService<T> {
    fn default() -> Self {
        Self::new(T::default())
    }
}

impl<T: Send + Sync + 'static> Service for MappedService<T> {
    fn handle<'a>(&'a self, ss: &'a Arc<Session>, packet: Packet) -> ServiceResult<'a> {
        let context = Arc::new(RpcContext {
            session: ss.clone(),
            packet,
            responsed: false.into(),
        });
        let method = unify_integer(context.packet.method().unwrap().clone());
        let handler = self.map.get(&method).map(|x| x.1.clone());
        Box::pin(async move {
            let handler = handler
                .as_ref()
                .map(Arc::as_ref)
                .or_else(|| self.handler.as_ref().map(Box::as_ref))
                .with_context(|| format!("{method:?} not found"))?;
            context
                .response(handler(&self.this, context.clone())?.await)
                .await
        })
    }
}

fn unify_integer(method: Method) -> Method {
    match method {
        Method::U8(n) => Method::U32(n as u32),
        Method::U16(n) => Method::U32(n as u32),
        Method::U32(n) => Method::U32(n as u32),
        Method::U64(n) => Method::U32(n as u32),
        Method::I8(n) => Method::U32(n as u32),
        Method::I16(n) => Method::U32(n as u32),
        Method::I32(n) => Method::U32(n as u32),
        Method::I64(n) => Method::U32(n as u32),
        _ => method,
    }
}

impl<T: Send + Sync + 'static> MappedService<T> {
    #[inline(always)]
    pub fn add<'a, M: Serialize, A: 'a, R: 'a>(&self, m: M, method: impl SerdeMethod<'a, A, R, T>) {
        self.map.insert(
            unify_integer(serde_value::to_value(m).unwrap()),
            method.into_boxed(),
        );
    }

    pub fn next_method(&self) -> u32 {
        loop {
            let next = self.method.fetch_add(1, Ordering::SeqCst) + 1;
            let notcontain = self
                .map
                .get(&serde_value::to_value(next).unwrap())
                .is_none();
            if notcontain {
                self.add(next, || async {});
                return next;
            }
        }
    }
}

#[doc(hidden)]
type MappedMethod<T> = Arc<RpcMethod<T>>;

/// Trait for closures which can be converted to RPC method
pub trait SerdeMethod<'a, ARGS: 'a, RET: 'a, T = ()> {
    fn into_boxed(self) -> MappedMethod<T>;
}

impl<'a, T> SerdeMethod<'a, (), (), T> for Arc<RpcMethod<T>> {
    fn into_boxed(self) -> MappedMethod<T> {
        self
    }
}

/// Trait for types that can be returned by RPC method, is commonly serializable type, or a result wrapper for it
pub trait MethodReturn<C>: Send {
    type R: Serialize + Send + Sync;
    type E: core::fmt::Debug + Send + Sync;

    fn into_return(self) -> Result<Self::R, Self::E>;
}

impl<R> MethodReturn<()> for R
where
    R: Serialize + Send + Sync,
{
    type R = R;
    type E = ();

    #[inline(always)]
    fn into_return(self) -> Result<Self::R, Self::E> {
        Ok(self)
    }
}

impl<R, E> MethodReturn<(R, E)> for Result<R, E>
where
    R: Serialize + Send + Sync,
    E: core::fmt::Debug + Send + Sync,
{
    type R = R;
    type E = E;

    #[inline(always)]
    fn into_return(self) -> Result<Self::R, Self::E> {
        self.map_err(Into::into)
    }
}

macro_rules! impl_method {
    (@boxed async ($(this = $this:ident,)? $(ctx = $ctx:ident,)?) $($arg:ident: $ty:ident),*) => {
        #[inline(always)]
        fn into_boxed(self) -> MappedMethod<T> {
            Arc::new(move |_this, ctx| {
                let _ctx: &'t Arc<$crate::RpcContext> = unsafe { core::mem::transmute(&ctx) };
                let ($($arg),*): ($($ty),*) = rmp_serde::from_slice(_ctx.packet.data())?;
                $(let $this: &'t T = unsafe {core::mem::transmute(_this)};)?
                $(let $ctx = ctx.clone();)?
                let fut = self($($this,)? $($ctx,)? $($arg,)*);
                Ok(Box::pin(async move {
                    ctx.response(fut.await.into_return()).await
                }))
            })
        }
    };

    (@boxed ($(this = $this:ident,)? $(ctx = $ctx:ident,)?) $($arg:ident: $ty:ident),*) => {
        #[inline(always)]
        fn into_boxed(self) -> MappedMethod<T> {
            Arc::new(move |_this, ctx| {
                let _ctx: &'t Arc<$crate::RpcContext> = unsafe { core::mem::transmute(&ctx) };
                let ($($arg),*): ($($ty),*) = rmp_serde::from_slice(_ctx.packet.data())?;
                // $(let $this = _this;)?
                $(let $this: &'t T = unsafe {core::mem::transmute(_this)};)?
                $(let $ctx = ctx.clone();)?
                let fut = self($($this,)? $($ctx,)? $($arg,)*);
                Ok(Box::pin(async move {
                    ctx.response(fut.into_return()).await
                }))
            })
        }
    };

    ($($arg:ident: $ty:ident),*) => {
        // async ...
        #[allow(non_snake_case, unused_parens)]
        impl<'t,
            T: 't,
            METHOD: Fn($($ty),*) -> FUT + Sync + Send + 'static,
            FUT: Future<Output = RET> + NeedSend + 't + 'static,
            RET: MethodReturn<CON> + 't + 'static,
            CON: Send + 't,
            $($ty: Deserialize<'t> + 't),*
        > SerdeMethod<'t, ($($ty,)*), (FUT, RET, CON), T> for METHOD {
            impl_method!(@boxed async () $($arg: $ty),*);
        }

        // async this, ...
        #[allow(non_snake_case, unused_parens)]
        impl<'t,
            T: 't,
            METHOD: Fn(&'t T, $($ty),*) -> FUT + Sync + Send + 'static,
            FUT: Future<Output = RET> + NeedSend + 't + 'static,
            RET: MethodReturn<CON> + 't + 'static,
            CON: Send + 't,
            $($ty: Deserialize<'t> + 't),*
        > SerdeMethod<'t, ($($ty,)*), (FUT, RET, CON, T), T> for METHOD {
            impl_method!(@boxed async (this = t,) $($arg: $ty),*);
        }

        // async, context, ...
        #[allow(non_snake_case, unused_parens)]
        impl<'t,
            T: 't,
            METHOD: Fn(Arc<RpcContext>, $($ty),*) -> FUT + Sync + Send + 'static,
            FUT: Future<Output = RET> + NeedSend + 't + 'static,
            RET: MethodReturn<CON> + 't + 'static,
            CON: Send + 't,
            $($ty:  Deserialize<'t> + Send + 't),*
        > SerdeMethod<'t, ($($ty,)*), (RpcContext, FUT, RET, CON), T> for METHOD {
            impl_method!(@boxed async (ctx = ctx,) $($arg: $ty),*);
        }

        // async, this, context, ...
        #[allow(non_snake_case, unused_parens)]
        impl<'t,
            T: 't,
            METHOD: Fn(&'t T, Arc<RpcContext>, $($ty),*) -> FUT + Sync + Send + 'static,
            FUT: Future<Output = RET> + NeedSend + 't + 'static,
            RET: MethodReturn<CON> + 't + 'static,
            CON: Send + 't,
            $($ty:  Deserialize<'t> + Send + 't),*
        > SerdeMethod<'t, ($($ty,)*), (RpcContext, FUT, RET, CON, T), T> for METHOD {
            impl_method!(@boxed async (this = t, ctx = ctx,) $($arg: $ty),*);
        }

        // ...
        #[allow(non_snake_case, unused_parens)]
        impl<'t,
            T: 't,
            METHOD: Fn($($ty),*) -> RET + Sync + Send + 'static,
            RET,
            CON: Send + 't,
            $($ty),*
        > SerdeMethod<'t, ($($ty,)*), [(RET, CON); 0], T> for METHOD where
            RET: MethodReturn<CON> + 't + 'static, $( $ty: Deserialize<'t> + 't),* {
            impl_method!(@boxed () $($arg: $ty),*);
        }

        // ctx, ...
        #[allow(non_snake_case, unused_parens)]
        impl<'t,
            T: 't,
            METHOD: Fn(&'t T, $($ty),*) -> RET + Sync + Send + 'static,
            RET,
            CON: Send + 't,
            $($ty),*
        > SerdeMethod<'t, ($($ty,)*), [(RET, CON, T); 0], T> for METHOD where
            RET: MethodReturn<CON> + 't + 'static, $( $ty: Deserialize<'t> + 't),* {
            impl_method!(@boxed (this = t,) $($arg: $ty),*);
        }

        #[allow(non_snake_case, unused_parens)]
        impl<'t,
            T: 't,
            METHOD: Fn(Arc<RpcContext>, $($ty),*) -> RET + Sync + Send + 'static,
            RET,
            CON: Send + 't,
            $($ty),*
        > SerdeMethod<'t, ($($ty,)*), [(RpcContext, RET, CON); 1], T> for METHOD where
            RET: MethodReturn<CON> + 't + 'static, $( $ty: Deserialize<'t> + 't),* {
            impl_method!(@boxed (ctx = ctx,) $($arg: $ty),*);
        }

        #[allow(non_snake_case, unused_parens)]
        impl<'t,
            T: 't,
            METHOD: Fn(&'t T, Arc<RpcContext>, $($ty),*) -> RET + Sync + Send + 'static,
            RET,
            CON: Send + 't,
            $($ty),*
        > SerdeMethod<'t, ($($ty,)*), [(RpcContext, RET, CON, T); 0], T> for METHOD where
            RET: MethodReturn<CON> + 't + 'static, $( $ty: Deserialize<'t> + 't),* {
            impl_method!(@boxed (this = t, ctx = ctx,) $($arg: $ty),*);
        }
    };
}

impl_method!();
impl_method!(a: A);
impl_method!(a: A, b: B);
impl_method!(a: A, b: B, c: C);
impl_method!(a: A, b: B, c: C, d: D);
impl_method!(a: A, b: B, c: C, d: D, e: E);
impl_method!(a: A, b: B, c: C, d: D, e: E, f: F);
impl_method!(a: A, b: B, c: C, d: D, e: E, f: F, g: G);
impl_method!(a: A, b: B, c: C, d: D, e: E, f: F, g: G, h: H);
impl_method!(a: A, b: B, c: C, d: D, e: E, f: F, g: G, h: H, j: J);
impl_method!(a: A, b: B, c: C, d: D, e: E, f: F, g: G, h: H, j: J, k: K);