rs-netty 1.1.0

A Tokio-native typed TCP/UDP pipeline framework inspired by Netty.
Documentation
use std::future::{ready, Future, Ready};

use crate::{
    context::{BusinessContext, InboundContext, OutboundContext},
    traits::{Business, Flow, Inbound, Outbound},
    Result,
};

pub struct Identity;

pub struct Then<A, B> {
    pub a: A,
    pub b: B,
}

#[doc(hidden)]
pub enum PipeStep<T, F> {
    Ready(Result<Flow<T>>),
    Future(F),
}

impl<A, B> Then<A, B> {
    pub fn new(a: A, b: B) -> Self {
        Self { a, b }
    }
}

pub trait InboundPipe<I>: Send + 'static {
    type Out: Send + 'static;

    fn process<'ctx>(
        &'ctx mut self,
        ctx: &'ctx mut InboundContext,
        msg: I,
    ) -> impl Future<Output = Result<Flow<Self::Out>>> + Send + 'ctx;

    #[doc(hidden)]
    fn process_step<'ctx>(
        &'ctx mut self,
        ctx: &'ctx mut InboundContext,
        msg: I,
    ) -> PipeStep<Self::Out, impl Future<Output = Result<Flow<Self::Out>>> + Send + 'ctx>
    where
        I: 'ctx,
    {
        PipeStep::Future(self.process(ctx, msg))
    }
}

impl<I: Send + 'static> InboundPipe<I> for Identity {
    type Out = I;

    fn process(
        &mut self,
        _ctx: &mut InboundContext,
        msg: I,
    ) -> impl Future<Output = Result<Flow<Self::Out>>> + Send + '_ {
        ready(Ok(Flow::Next(msg)))
    }

    #[inline]
    fn process_step<'ctx>(
        &'ctx mut self,
        _ctx: &'ctx mut InboundContext,
        msg: I,
    ) -> PipeStep<Self::Out, impl Future<Output = Result<Flow<Self::Out>>> + Send + 'ctx>
    where
        I: 'ctx,
    {
        PipeStep::<Self::Out, Ready<Result<Flow<Self::Out>>>>::Ready(Ok(Flow::Next(msg)))
    }
}

impl<I, A, B> InboundPipe<I> for Then<A, B>
where
    I: Send + 'static,
    A: InboundPipe<I>,
    B: Inbound<A::Out>,
{
    type Out = B::Out;

    async fn process(&mut self, ctx: &mut InboundContext, msg: I) -> Result<Flow<Self::Out>> {
        match self.a.process(ctx, msg).await? {
            Flow::Next(mid) => self.b.read(ctx, mid).await,
            Flow::Stop => Ok(Flow::Stop),
        }
    }
}

pub trait BusinessPipe<I>: Send + 'static {
    type Out: Send + 'static;

    fn process<'ctx>(
        &'ctx mut self,
        ctx: &'ctx mut BusinessContext,
        msg: I,
    ) -> impl Future<Output = Result<Flow<Self::Out>>> + Send + 'ctx;

    #[doc(hidden)]
    fn process_step<'ctx>(
        &'ctx mut self,
        ctx: &'ctx mut BusinessContext,
        msg: I,
    ) -> PipeStep<Self::Out, impl Future<Output = Result<Flow<Self::Out>>> + Send + 'ctx>
    where
        I: 'ctx,
    {
        PipeStep::Future(self.process(ctx, msg))
    }
}

impl<I: Send + 'static> BusinessPipe<I> for Identity {
    type Out = I;

    fn process(
        &mut self,
        _ctx: &mut BusinessContext,
        msg: I,
    ) -> impl Future<Output = Result<Flow<Self::Out>>> + Send + '_ {
        ready(Ok(Flow::Next(msg)))
    }

    #[inline]
    fn process_step<'ctx>(
        &'ctx mut self,
        _ctx: &'ctx mut BusinessContext,
        msg: I,
    ) -> PipeStep<Self::Out, impl Future<Output = Result<Flow<Self::Out>>> + Send + 'ctx>
    where
        I: 'ctx,
    {
        PipeStep::<Self::Out, Ready<Result<Flow<Self::Out>>>>::Ready(Ok(Flow::Next(msg)))
    }
}

impl<I, A, B> BusinessPipe<I> for Then<A, B>
where
    I: Send + 'static,
    A: BusinessPipe<I>,
    B: Business<A::Out>,
{
    type Out = B::Out;

    async fn process(&mut self, ctx: &mut BusinessContext, msg: I) -> Result<Flow<Self::Out>> {
        match self.a.process(ctx, msg).await? {
            Flow::Next(mid) => self.b.handle(ctx, mid).await,
            Flow::Stop => Ok(Flow::Stop),
        }
    }
}

pub trait OutboundPipe<I>: Send + 'static {
    type Out: Send + 'static;

    fn process<'ctx>(
        &'ctx mut self,
        ctx: &'ctx mut OutboundContext,
        msg: I,
    ) -> impl Future<Output = Result<Flow<Self::Out>>> + Send + 'ctx;

    #[doc(hidden)]
    fn process_step<'ctx>(
        &'ctx mut self,
        ctx: &'ctx mut OutboundContext,
        msg: I,
    ) -> PipeStep<Self::Out, impl Future<Output = Result<Flow<Self::Out>>> + Send + 'ctx>
    where
        I: 'ctx,
    {
        PipeStep::Future(self.process(ctx, msg))
    }
}

impl<I: Send + 'static> OutboundPipe<I> for Identity {
    type Out = I;

    fn process(
        &mut self,
        _ctx: &mut OutboundContext,
        msg: I,
    ) -> impl Future<Output = Result<Flow<Self::Out>>> + Send + '_ {
        ready(Ok(Flow::Next(msg)))
    }

    #[inline]
    fn process_step<'ctx>(
        &'ctx mut self,
        _ctx: &'ctx mut OutboundContext,
        msg: I,
    ) -> PipeStep<Self::Out, impl Future<Output = Result<Flow<Self::Out>>> + Send + 'ctx>
    where
        I: 'ctx,
    {
        PipeStep::<Self::Out, Ready<Result<Flow<Self::Out>>>>::Ready(Ok(Flow::Next(msg)))
    }
}

impl<I, A, B> OutboundPipe<I> for Then<A, B>
where
    I: Send + 'static,
    A: OutboundPipe<I>,
    B: Outbound<A::Out>,
{
    type Out = B::Out;

    async fn process(&mut self, ctx: &mut OutboundContext, msg: I) -> Result<Flow<Self::Out>> {
        match self.a.process(ctx, msg).await? {
            Flow::Next(mid) => self.b.write(ctx, mid).await,
            Flow::Stop => Ok(Flow::Stop),
        }
    }
}