unipipe 0.3.1

A simple Rust pipe abstraction that extends to iterator and stream.
Documentation
#[doc(hidden)]
pub use async_stream::stream;
#[doc(hidden)]
pub use futures::{Stream, StreamExt};

pub use unipipe_macros::*;

pub trait UniPipe {
    type Input;
    type Output;

    fn next(&mut self, input: Option<Self::Input>) -> Output<Self::Output>;
}

pub enum Output<T> {
    Next,
    One(T),
    Many(Vec<T>),
    Done,
    DoneWithOne(T),
    DoneWithMany(Vec<T>),
}

impl<T> Output<T> {
    pub fn is_done(&self) -> bool {
        matches!(
            self,
            Self::Done | Self::DoneWithOne(_) | Self::DoneWithMany(_)
        )
    }

    pub fn map<TMapped, TCallback>(self, mut callback: TCallback) -> Output<TMapped>
    where
        TCallback: FnMut(T) -> TMapped,
    {
        match self {
            Self::Next => Output::<TMapped>::Next,
            Self::One(value) => Output::<TMapped>::One(callback(value)),
            Self::Many(values) => {
                Output::<TMapped>::Many(values.into_iter().map(callback).collect())
            }
            Self::Done => Output::<TMapped>::Done,
            Self::DoneWithOne(value) => Output::<TMapped>::DoneWithOne(callback(value)),
            Self::DoneWithMany(values) => {
                Output::<TMapped>::DoneWithMany(values.into_iter().map(callback).collect())
            }
        }
    }

    pub fn filter_map<TMapped, TCallback>(self, mut callback: TCallback) -> Output<TMapped>
    where
        TCallback: FnMut(T) -> Option<TMapped>,
    {
        match self {
            Self::Next => Output::<TMapped>::Next,
            Self::One(value) => {
                callback(value).map_or(Output::<TMapped>::Next, Output::<TMapped>::One)
            }
            Self::Many(values) => {
                Output::<TMapped>::Many(values.into_iter().filter_map(callback).collect())
            }
            Self::Done => Output::<TMapped>::Done,
            Self::DoneWithOne(value) => {
                callback(value).map_or(Output::<TMapped>::Done, Output::<TMapped>::DoneWithOne)
            }
            Self::DoneWithMany(values) => {
                Output::<TMapped>::DoneWithMany(values.into_iter().filter_map(callback).collect())
            }
        }
    }

    pub fn pipe<TPipe, TPipeOutput>(self, pipe: &mut TPipe) -> Output<TPipeOutput>
    where
        TPipe: UniPipe<Input = T, Output = TPipeOutput>,
    {
        let upper_done = self.is_done();

        let output = match self {
            Self::Next => Output::<TPipeOutput>::Next,
            Self::One(value) | Self::DoneWithOne(value) => pipe.next(Some(value)),
            Self::Many(values) | Self::DoneWithMany(values) => {
                let mut aggregated_outputs = Vec::new();

                let mut inputs = values.into_iter().map(Some).collect::<Vec<_>>();

                if upper_done {
                    inputs.push(None);
                }

                for value in inputs {
                    let next_output: Output<_> = pipe.next(value);

                    let done = next_output.is_done();

                    match next_output {
                        Output::One(output) | Output::DoneWithOne(output) => {
                            aggregated_outputs.push(output)
                        }
                        Output::Many(outputs) | Output::DoneWithMany(outputs) => {
                            aggregated_outputs.extend(outputs)
                        }
                        Output::Next | Output::Done => {}
                    }

                    if done {
                        return Output::<TPipeOutput>::DoneWithMany(aggregated_outputs);
                    }
                }

                Output::<TPipeOutput>::Many(aggregated_outputs)
            }
            Self::Done => pipe.next(None),
        };

        if upper_done { output.done() } else { output }
    }

    fn done(self) -> Output<T> {
        match self {
            Self::Next => Self::Done,
            Self::One(value) => Self::DoneWithOne(value),
            Self::Many(values) => Self::DoneWithMany(values),
            _ => self,
        }
    }
}

impl<T> IntoIterator for Output<T> {
    type Item = T;
    type IntoIter = std::vec::IntoIter<T>;

    fn into_iter(self) -> Self::IntoIter {
        match self {
            Self::Next => vec![],
            Self::One(value) => vec![value],
            Self::Many(values) => values,
            Self::Done => vec![],
            Self::DoneWithOne(value) => vec![value],
            Self::DoneWithMany(values) => values,
        }
        .into_iter()
    }
}

impl<T> From<Option<T>> for Output<T> {
    fn from(value: Option<T>) -> Self {
        match value {
            None => Self::Next,
            Some(value) => Self::One(value),
        }
    }
}