streameroo 0.4.4

Compilation of mini-frameworks & utilities for data-streaming applications
Documentation
use crate::amqp::{AMQPResult, BoxError, DeliveryContext, Error, FromDeliveryContext};
use crate::event::Decode;
use std::future::Future;

/// - `P`: Type for the parameters of the future generating closure
/// - `T`: Type for the return value of the future generated by the closure, implements `AMQPResult`
/// - `Err`: Error type of the handler
pub trait AMQPHandler<P, T, E>: Clone + Send + 'static
where
    T: AMQPResult,
{
    fn call(
        &self,
        payload: Vec<u8>,
        delivery_context: &DeliveryContext,
    ) -> impl Future<Output = Result<T, Error>> + Send;
}

pub trait AMQPDecode: Sized {
    fn decode(payload: Vec<u8>, context: &DeliveryContext) -> Result<Self, Error>;
}

impl<E> AMQPDecode for E
where
    E: Decode,
{
    fn decode(payload: Vec<u8>, _: &DeliveryContext) -> Result<Self, Error> {
        E::decode(payload).map_err(Error::event)
    }
}

macro_rules! impl_handler {
    (
        [$($ty:ident),*]
    ) => {
        #[allow(non_snake_case, unused_variables, unused_parens)]
        impl<F, Fut, T, Err, $($ty,)* E> AMQPHandler<($($ty,)* E), T, Err> for F
        where
            F: Fn($($ty,)* E) -> Fut + Send + Sync + 'static + Clone,
            Err: Into<BoxError>,
            Fut: Future<Output = Result<T, Err>> + Send,
            $( $ty: for<'a> FromDeliveryContext<'a>, )*
            E: AMQPDecode,
            T: AMQPResult,
        {
            async fn call(&self, payload: Vec<u8>, delivery_context: &DeliveryContext) -> Result<T, Error> {
                let event = E::decode(payload, delivery_context)?;
                $(
                    let $ty = $ty::from_delivery_context(&delivery_context);
                )*
                self($($ty,)* event).await.map_err(|e| Error::Handler(e.into()))
            }
        }
    };
}

impl_handler!([]);
impl_handler!([T1]);
impl_handler!([T1, T2]);
impl_handler!([T1, T2, T3]);
impl_handler!([T1, T2, T3, T4]);
impl_handler!([T1, T2, T3, T4, T5]);
impl_handler!([T1, T2, T3, T4, T5, T6]);
impl_handler!([T1, T2, T3, T4, T5, T6, T7]);
impl_handler!([T1, T2, T3, T4, T5, T6, T7, T8]);
impl_handler!([T1, T2, T3, T4, T5, T6, T7, T8, T9]);
impl_handler!([T1, T2, T3, T4, T5, T6, T7, T8, T9, T10]);
impl_handler!([T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11]);
impl_handler!([T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12]);
impl_handler!([T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13]);