rs-netty 0.2.3

A Tokio-native typed TCP/UDP pipeline framework inspired by Netty.
Documentation
use std::future::Future;

use crate::{
    context::{BusinessContext, InboundContext, OutboundContext},
    traits::{Business, Flow, Inbound, Outbound},
    Result,
};

pub struct Identity;

pub struct Then<A, B> {
    pub a: A,
    pub b: B,
}

impl<A, B> Then<A, B> {
    pub fn new(a: A, b: B) -> Self {
        Self { a, b }
    }
}

pub trait InboundPipe<I>: Send + 'static {
    type Out: Send + 'static;

    fn process<'ctx>(
        &'ctx mut self,
        ctx: &'ctx mut InboundContext,
        msg: I,
    ) -> impl Future<Output = Result<Flow<Self::Out>>> + Send + 'ctx;
}

impl<I: Send + 'static> InboundPipe<I> for Identity {
    type Out = I;

    async fn process(&mut self, _ctx: &mut InboundContext, msg: I) -> Result<Flow<Self::Out>> {
        Ok(Flow::Next(msg))
    }
}

impl<I, A, B> InboundPipe<I> for Then<A, B>
where
    I: Send + 'static,
    A: InboundPipe<I>,
    B: Inbound<A::Out>,
{
    type Out = B::Out;

    async fn process(&mut self, ctx: &mut InboundContext, msg: I) -> Result<Flow<Self::Out>> {
        match self.a.process(ctx, msg).await? {
            Flow::Next(mid) => self.b.read(ctx, mid).await,
            Flow::Stop => Ok(Flow::Stop),
        }
    }
}

pub trait BusinessPipe<I>: Send + 'static {
    type Out: Send + 'static;

    fn process<'ctx>(
        &'ctx mut self,
        ctx: &'ctx mut BusinessContext,
        msg: I,
    ) -> impl Future<Output = Result<Flow<Self::Out>>> + Send + 'ctx;
}

impl<I: Send + 'static> BusinessPipe<I> for Identity {
    type Out = I;

    async fn process(&mut self, _ctx: &mut BusinessContext, msg: I) -> Result<Flow<Self::Out>> {
        Ok(Flow::Next(msg))
    }
}

impl<I, A, B> BusinessPipe<I> for Then<A, B>
where
    I: Send + 'static,
    A: BusinessPipe<I>,
    B: Business<A::Out>,
{
    type Out = B::Out;

    async fn process(&mut self, ctx: &mut BusinessContext, msg: I) -> Result<Flow<Self::Out>> {
        match self.a.process(ctx, msg).await? {
            Flow::Next(mid) => self.b.handle(ctx, mid).await,
            Flow::Stop => Ok(Flow::Stop),
        }
    }
}

pub trait OutboundPipe<I>: Send + 'static {
    type Out: Send + 'static;

    fn process<'ctx>(
        &'ctx mut self,
        ctx: &'ctx mut OutboundContext,
        msg: I,
    ) -> impl Future<Output = Result<Flow<Self::Out>>> + Send + 'ctx;
}

impl<I: Send + 'static> OutboundPipe<I> for Identity {
    type Out = I;

    async fn process(&mut self, _ctx: &mut OutboundContext, msg: I) -> Result<Flow<Self::Out>> {
        Ok(Flow::Next(msg))
    }
}

impl<I, A, B> OutboundPipe<I> for Then<A, B>
where
    I: Send + 'static,
    A: OutboundPipe<I>,
    B: Outbound<A::Out>,
{
    type Out = B::Out;

    async fn process(&mut self, ctx: &mut OutboundContext, msg: I) -> Result<Flow<Self::Out>> {
        match self.a.process(ctx, msg).await? {
            Flow::Next(mid) => self.b.write(ctx, mid).await,
            Flow::Stop => Ok(Flow::Stop),
        }
    }
}