flowly-service 0.4.13

Flowly is a library of modular and reusable components for building robust pipelines processing audio, video and other.
Documentation
use std::pin::pin;

use futures::{Stream, StreamExt};

use crate::Service;

#[derive(Clone)]
pub struct Finally<F> {
    pub(crate) f: F,
}

impl<I, E, C, F> Service<Result<I, E>> for Finally<C>
where
    F: Future<Output = Result<(), E>> + Send,
    C: Send + FnMut() -> F,
    E: Send,
    I: Send,
{
    type Out = Result<I, E>;

    fn handle(
        mut self,
        input: impl Stream<Item = Result<I, E>> + Send,
    ) -> impl Stream<Item = Self::Out> + Send {
        async_stream::stream! {
            let mut stream = pin!(input);

            while let Some(res) = stream.next().await {
                yield res;
            }

            if let Err(err) = (self.f)().await {
                yield Err(err);
            }
        }
    }
}

#[derive(Clone)]
pub struct Except<F> {
    pub(crate) f: F,
}

impl<I, E, C, F> Service<Result<I, E>> for Except<C>
where
    F: Future<Output = Result<(), E>> + Send,
    C: Send + FnMut(E) -> F,
    E: Send,
    I: Send,
{
    type Out = Result<I, E>;

    fn handle(
        mut self,
        input: impl Stream<Item = Result<I, E>> + Send,
    ) -> impl Stream<Item = Self::Out> + Send {
        async_stream::stream! {
            let mut stream = pin!(input);

            while let Some(res) = stream.next().await {
                match res {
                    Ok(x) => {
                        yield Ok(x)
                    },

                    Err(err) => {
                        if let Err(err) = (self.f)(err).await {
                            yield Err(err)
                        }
                    }
                };
            }
        }
    }
}