flowly-service 0.4.13

Flowly is a library of modular and reusable components for building robust pipelines processing audio, video and other.
Documentation
use std::pin::pin;

use futures::{Stream, StreamExt};

use crate::Service;

#[derive(Clone)]
pub struct TryFlattenMap<F> {
    pub(crate) f: F,
}

impl<I, G, Ft, U, E, F> Service<Result<I, E>> for TryFlattenMap<F>
where
    G: Send,
    E: Send,
    I: Send,
    Ft: Future<Output = Result<U, E>> + Send,
    F: FnMut(I) -> Ft + Send,
    U: Stream<Item = Result<G, E>> + Send,
{
    type Out = Result<G, E>;

    fn handle(
        mut self,
        input: impl Stream<Item = Result<I, E>> + Send,
    ) -> impl Stream<Item = Result<G, E>> + Send {
        async_stream::stream! {
            let mut stream = pin!(input);

            while let Some(res) = stream.next().await {
                match res {
                    Ok(item) => {
                        match (self.f)(item).await {
                            Ok(x) => {
                                let mut x = pin!(x);
                                while let Some(res) = x.next().await {
                                    yield res;
                                }
                            }
                            Err(err) => yield Err(err)
                        }
                    }
                    Err(err) => yield Err(err)
                }
            }
        }
    }
}