unipipe 0.1.0

A simple Rust pipe abstraction that extends to iterator and stream.
Documentation
#[macro_export]
macro_rules! extend_stream {
    ($visibility:vis $struct_name:ident) => {
        ::paste::paste! {
            $crate::extend_stream!($visibility $struct_name, [<$struct_name:snake>]);
        }
    };
    ($visibility:vis $struct_name:ident, $method_name:ident) => {
        ::paste::paste! {
            $visibility trait [<$struct_name UniPipeStreamExt>]:
                futures::Stream<Item = <$struct_name as $crate::UniPipe>::Input> + Sized
            {
                fn $method_name(
                    mut self,
                ) -> impl futures::Stream<Item = <$struct_name as $crate::UniPipe>::Output> {
                    use futures::StreamExt as _;

                    async_stream::stream!({
                        let mut pipe = <$struct_name as Default>::default();

                        let mut source = Box::pin(self);

                        while let Some(input) = source.next().await {
                            if let Some(output) = pipe.next(Some(input)) {
                                yield output;
                            }
                        }

                        if let Some(output) = pipe.next(None) {
                            yield output;
                        }
                    })
                }
            }

            impl<TStream> [<$struct_name UniPipeStreamExt>] for TStream where
                TStream: futures::Stream<Item = <$struct_name as $crate::UniPipe>::Input>
            {}
        }
    };
}

#[macro_export]
macro_rules! extend_try_stream {
    ($visibility:vis $struct_name:ident) => {
        ::paste::paste! {
            $crate::extend_try_stream!($visibility $struct_name, [<try_ $struct_name:snake>]);
        }
    };
    ($visibility:vis $struct_name:ident, $method_name:ident) => {
        ::paste::paste! {
            $visibility trait [<$struct_name UniPipeTryStreamExt>]<TError>:
                futures::Stream<Item = Result<<$struct_name as $crate::UniPipe>::Input, TError>> + Sized
            {
                fn $method_name(mut self) -> impl futures::Stream<Item = Result<<$struct_name as $crate::UniPipe>::Output, TError>> {
                    use futures::StreamExt as _;

                    async_stream::stream!({
                        let mut pipe = <$struct_name as Default>::default();

                        let mut source = Box::pin(self);

                        while let Some(input) = source.next().await {
                            match input {
                                Ok(input) => {
                                    if let Some(output) = pipe.next(Some(input)) {
                                        yield Ok(output);
                                    }
                                }
                                Err(error) => yield Err(error),
                            }
                        }

                        if let Some(output) = pipe.next(None) {
                            yield Ok(output);
                        }
                    })
                }
            }

            impl<TStream, TError> [<$struct_name UniPipeTryStreamExt>]<TError> for TStream where
                TStream: futures::Stream<Item = Result<<$struct_name as $crate::UniPipe>::Input, TError>>
            {}
        }
    };
}