use std::future::{ready, Future, Ready};
use crate::{
context::{BusinessContext, InboundContext, OutboundContext},
traits::{Business, Flow, Inbound, Outbound},
Result,
};
pub struct Identity;
pub struct Then<A, B> {
pub a: A,
pub b: B,
}
#[doc(hidden)]
pub enum PipeStep<T, F> {
Ready(Result<Flow<T>>),
Future(F),
}
impl<A, B> Then<A, B> {
pub fn new(a: A, b: B) -> Self {
Self { a, b }
}
}
pub trait InboundPipe<I>: Send + 'static {
type Out: Send + 'static;
fn process<'ctx>(
&'ctx mut self,
ctx: &'ctx mut InboundContext,
msg: I,
) -> impl Future<Output = Result<Flow<Self::Out>>> + Send + 'ctx;
#[doc(hidden)]
fn process_step<'ctx>(
&'ctx mut self,
ctx: &'ctx mut InboundContext,
msg: I,
) -> PipeStep<Self::Out, impl Future<Output = Result<Flow<Self::Out>>> + Send + 'ctx>
where
I: 'ctx,
{
PipeStep::Future(self.process(ctx, msg))
}
}
impl<I: Send + 'static> InboundPipe<I> for Identity {
type Out = I;
fn process(
&mut self,
_ctx: &mut InboundContext,
msg: I,
) -> impl Future<Output = Result<Flow<Self::Out>>> + Send + '_ {
ready(Ok(Flow::Next(msg)))
}
#[inline]
fn process_step<'ctx>(
&'ctx mut self,
_ctx: &'ctx mut InboundContext,
msg: I,
) -> PipeStep<Self::Out, impl Future<Output = Result<Flow<Self::Out>>> + Send + 'ctx>
where
I: 'ctx,
{
PipeStep::<Self::Out, Ready<Result<Flow<Self::Out>>>>::Ready(Ok(Flow::Next(msg)))
}
}
impl<I, A, B> InboundPipe<I> for Then<A, B>
where
I: Send + 'static,
A: InboundPipe<I>,
B: Inbound<A::Out>,
{
type Out = B::Out;
async fn process(&mut self, ctx: &mut InboundContext, msg: I) -> Result<Flow<Self::Out>> {
match self.a.process(ctx, msg).await? {
Flow::Next(mid) => self.b.read(ctx, mid).await,
Flow::Stop => Ok(Flow::Stop),
}
}
}
pub trait BusinessPipe<I>: Send + 'static {
type Out: Send + 'static;
fn process<'ctx>(
&'ctx mut self,
ctx: &'ctx mut BusinessContext,
msg: I,
) -> impl Future<Output = Result<Flow<Self::Out>>> + Send + 'ctx;
#[doc(hidden)]
fn process_step<'ctx>(
&'ctx mut self,
ctx: &'ctx mut BusinessContext,
msg: I,
) -> PipeStep<Self::Out, impl Future<Output = Result<Flow<Self::Out>>> + Send + 'ctx>
where
I: 'ctx,
{
PipeStep::Future(self.process(ctx, msg))
}
}
impl<I: Send + 'static> BusinessPipe<I> for Identity {
type Out = I;
fn process(
&mut self,
_ctx: &mut BusinessContext,
msg: I,
) -> impl Future<Output = Result<Flow<Self::Out>>> + Send + '_ {
ready(Ok(Flow::Next(msg)))
}
#[inline]
fn process_step<'ctx>(
&'ctx mut self,
_ctx: &'ctx mut BusinessContext,
msg: I,
) -> PipeStep<Self::Out, impl Future<Output = Result<Flow<Self::Out>>> + Send + 'ctx>
where
I: 'ctx,
{
PipeStep::<Self::Out, Ready<Result<Flow<Self::Out>>>>::Ready(Ok(Flow::Next(msg)))
}
}
impl<I, A, B> BusinessPipe<I> for Then<A, B>
where
I: Send + 'static,
A: BusinessPipe<I>,
B: Business<A::Out>,
{
type Out = B::Out;
async fn process(&mut self, ctx: &mut BusinessContext, msg: I) -> Result<Flow<Self::Out>> {
match self.a.process(ctx, msg).await? {
Flow::Next(mid) => self.b.handle(ctx, mid).await,
Flow::Stop => Ok(Flow::Stop),
}
}
}
pub trait OutboundPipe<I>: Send + 'static {
type Out: Send + 'static;
fn process<'ctx>(
&'ctx mut self,
ctx: &'ctx mut OutboundContext,
msg: I,
) -> impl Future<Output = Result<Flow<Self::Out>>> + Send + 'ctx;
#[doc(hidden)]
fn process_step<'ctx>(
&'ctx mut self,
ctx: &'ctx mut OutboundContext,
msg: I,
) -> PipeStep<Self::Out, impl Future<Output = Result<Flow<Self::Out>>> + Send + 'ctx>
where
I: 'ctx,
{
PipeStep::Future(self.process(ctx, msg))
}
}
impl<I: Send + 'static> OutboundPipe<I> for Identity {
type Out = I;
fn process(
&mut self,
_ctx: &mut OutboundContext,
msg: I,
) -> impl Future<Output = Result<Flow<Self::Out>>> + Send + '_ {
ready(Ok(Flow::Next(msg)))
}
#[inline]
fn process_step<'ctx>(
&'ctx mut self,
_ctx: &'ctx mut OutboundContext,
msg: I,
) -> PipeStep<Self::Out, impl Future<Output = Result<Flow<Self::Out>>> + Send + 'ctx>
where
I: 'ctx,
{
PipeStep::<Self::Out, Ready<Result<Flow<Self::Out>>>>::Ready(Ok(Flow::Next(msg)))
}
}
impl<I, A, B> OutboundPipe<I> for Then<A, B>
where
I: Send + 'static,
A: OutboundPipe<I>,
B: Outbound<A::Out>,
{
type Out = B::Out;
async fn process(&mut self, ctx: &mut OutboundContext, msg: I) -> Result<Flow<Self::Out>> {
match self.a.process(ctx, msg).await? {
Flow::Next(mid) => self.b.write(ctx, mid).await,
Flow::Stop => Ok(Flow::Stop),
}
}
}