flowly-service 0.4.13

Flowly is a library of modular and reusable components for building robust pipelines processing audio, video and other.
Documentation
use std::{marker::PhantomData, pin::pin};

use futures::StreamExt;

use crate::Service;

#[derive(Clone)]
pub struct SpawnEach<M, S> {
    service: S,
    buffer: usize,
    _m: PhantomData<M>,
}

impl<M, S> SpawnEach<M, S>
where
    M: Send,
    S: Service<M> + Send + Clone + 'static,
{
    pub fn new(service: S, buffer: usize) -> Self
    where
        S::Out: Send,
    {
        Self {
            service,
            buffer,
            _m: PhantomData,
        }
    }
}

impl<M, R, E, S> Service<M> for SpawnEach<M, S>
where
    M: Send + 'static,
    R: Send + 'static,
    E: Send + 'static,
    S: Clone + Service<M, Out = Result<R, E>> + Send + 'static,
{
    type Out = Result<flowly_spsc::Receiver<Result<R, E>>, E>;

    fn handle(
        &mut self,
        input: M,
        cx: &crate::Context,
    ) -> impl futures::Stream<Item = Self::Out> + Send {
        let mut service = self.service.clone();
        let (mut tx, rx) = flowly_spsc::channel(self.buffer);
        let cx = cx.clone();

        tokio::spawn(async move {
            let mut stream = pin!(service.handle(input, &cx));

            while let Some(Some(res)) = cx.fuse_abort(stream.next()).await {
                if tx.send(res).await.is_err() {
                    log::warn!("mpsc::send failed");
                    break;
                }
            }
        });

        futures::stream::iter([Ok(rx)])
    }
}

pub fn spawn_each<M, S>(service: S) -> SpawnEach<M, S>
where
    M: Send,
    S: Send + Service<M> + Clone + 'static,
    S::Out: Send,
{
    SpawnEach::new(service, 2)
}