flowly_service/
spawn_each.rs

1use std::{marker::PhantomData, pin::pin};
2
3use futures::StreamExt;
4use tokio::sync::mpsc;
5use tokio_stream::wrappers::ReceiverStream;
6
7use crate::Service;
8
9pub struct SpawnEach<M, S> {
10    service: S,
11    buffer: usize,
12    _m: PhantomData<M>,
13}
14
15impl<M, S> SpawnEach<M, S>
16where
17    M: Send,
18    S: Service<M> + Send + Clone + 'static,
19{
20    pub fn new(service: S, buffer: usize) -> Self
21    where
22        S::Out: Send,
23    {
24        Self {
25            service,
26            buffer,
27            _m: PhantomData,
28        }
29    }
30}
31
32impl<M, R, E, S> Service<M> for SpawnEach<M, S>
33where
34    M: Send + 'static,
35    R: Send + 'static,
36    E: Send + 'static,
37    S: Clone + Service<M, Out = Result<R, E>> + Send + 'static,
38{
39    type Out = Result<ReceiverStream<Result<R, E>>, E>;
40
41    fn handle(
42        &mut self,
43        input: M,
44        cx: &crate::Context,
45    ) -> impl futures::Stream<Item = Self::Out> + Send {
46        let mut service = self.service.clone();
47        let (tx, rx) = mpsc::channel(self.buffer);
48        let cx = cx.clone();
49        tokio::spawn(async move {
50            let mut stream = pin!(service.handle(input, &cx));
51
52            while let Some(Some(res)) = cx.fuse_abort(stream.next()).await {
53                if tx.send(res).await.is_err() {
54                    log::warn!("mpsc::send failed");
55                    break;
56                }
57            }
58        });
59
60        futures::stream::iter([Ok(ReceiverStream::new(rx))])
61    }
62}
63pub fn spawn_each<M, S>(service: S) -> SpawnEach<M, S>
64where
65    M: Send,
66    S: Send + Service<M> + Clone + 'static,
67    S::Out: Send,
68{
69    SpawnEach::new(service, 2)
70}