flowly_service/
spawn_each.rs

1use std::{marker::PhantomData, pin::pin};
2
3use futures::StreamExt;
4use tokio::sync::mpsc;
5use tokio_stream::wrappers::ReceiverStream;
6
7use crate::Service;
8
9#[derive(Clone)]
10pub struct SpawnEach<M, S> {
11    service: S,
12    buffer: usize,
13    _m: PhantomData<M>,
14}
15
16impl<M, S> SpawnEach<M, S>
17where
18    M: Send,
19    S: Service<M> + Send + Clone + 'static,
20{
21    pub fn new(service: S, buffer: usize) -> Self
22    where
23        S::Out: Send,
24    {
25        Self {
26            service,
27            buffer,
28            _m: PhantomData,
29        }
30    }
31}
32
33impl<M, R, E, S> Service<M> for SpawnEach<M, S>
34where
35    M: Send + 'static,
36    R: Send + 'static,
37    E: Send + 'static,
38    S: Clone + Service<M, Out = Result<R, E>> + Send + 'static,
39{
40    type Out = Result<ReceiverStream<Result<R, E>>, E>;
41
42    fn handle(
43        &mut self,
44        input: M,
45        cx: &crate::Context,
46    ) -> impl futures::Stream<Item = Self::Out> + Send {
47        let mut service = self.service.clone();
48        let (tx, rx) = mpsc::channel(self.buffer);
49        let cx = cx.clone();
50        tokio::spawn(async move {
51            let mut stream = pin!(service.handle(input, &cx));
52
53            while let Some(Some(res)) = cx.fuse_abort(stream.next()).await {
54                if tx.send(res).await.is_err() {
55                    log::warn!("mpsc::send failed");
56                    break;
57                }
58            }
59        });
60
61        futures::stream::iter([Ok(ReceiverStream::new(rx))])
62    }
63}
64
65pub fn spawn_each<M, S>(service: S) -> SpawnEach<M, S>
66where
67    M: Send,
68    S: Send + Service<M> + Clone + 'static,
69    S::Out: Send,
70{
71    SpawnEach::new(service, 2)
72}