flowly_service/
spawn_each.rs

1use std::{marker::PhantomData, pin::pin};
2
3use futures::StreamExt;
4
5use crate::Service;
6
7#[derive(Clone)]
8pub struct SpawnEach<M, S> {
9    service: S,
10    buffer: usize,
11    _m: PhantomData<M>,
12}
13
14impl<M, S> SpawnEach<M, S>
15where
16    M: Send,
17    S: Service<M> + Send + Clone + 'static,
18{
19    pub fn new(service: S, buffer: usize) -> Self
20    where
21        S::Out: Send,
22    {
23        Self {
24            service,
25            buffer,
26            _m: PhantomData,
27        }
28    }
29}
30
31impl<M, R, E, S> Service<M> for SpawnEach<M, S>
32where
33    M: Send + 'static,
34    R: Send + 'static,
35    E: Send + 'static,
36    S: Clone + Service<M, Out = Result<R, E>> + Send + 'static,
37{
38    type Out = Result<flowly_spsc::Receiver<Result<R, E>>, E>;
39
40    fn handle(
41        &mut self,
42        input: M,
43        cx: &crate::Context,
44    ) -> impl futures::Stream<Item = Self::Out> + Send {
45        let mut service = self.service.clone();
46        let (mut tx, rx) = flowly_spsc::channel(self.buffer);
47        let cx = cx.clone();
48
49        tokio::spawn(async move {
50            let mut stream = pin!(service.handle(input, &cx));
51
52            while let Some(Some(res)) = cx.fuse_abort(stream.next()).await {
53                if tx.send(res).await.is_err() {
54                    log::warn!("mpsc::send failed");
55                    break;
56                }
57            }
58        });
59
60        futures::stream::iter([Ok(rx)])
61    }
62}
63
64pub fn spawn_each<M, S>(service: S) -> SpawnEach<M, S>
65where
66    M: Send,
67    S: Send + Service<M> + Clone + 'static,
68    S::Out: Send,
69{
70    SpawnEach::new(service, 2)
71}