flowly_service/
spawn_each.rs1use std::{marker::PhantomData, pin::pin};
2
3use futures::StreamExt;
4use tokio::sync::mpsc;
5use tokio_stream::wrappers::ReceiverStream;
6
7use crate::Service;
8
9#[derive(Clone)]
10pub struct SpawnEach<M, S> {
11 service: S,
12 buffer: usize,
13 _m: PhantomData<M>,
14}
15
16impl<M, S> SpawnEach<M, S>
17where
18 M: Send,
19 S: Service<M> + Send + Clone + 'static,
20{
21 pub fn new(service: S, buffer: usize) -> Self
22 where
23 S::Out: Send,
24 {
25 Self {
26 service,
27 buffer,
28 _m: PhantomData,
29 }
30 }
31}
32
33impl<M, R, E, S> Service<M> for SpawnEach<M, S>
34where
35 M: Send + 'static,
36 R: Send + 'static,
37 E: Send + 'static,
38 S: Clone + Service<M, Out = Result<R, E>> + Send + 'static,
39{
40 type Out = Result<ReceiverStream<Result<R, E>>, E>;
41
42 fn handle(
43 &mut self,
44 input: M,
45 cx: &crate::Context,
46 ) -> impl futures::Stream<Item = Self::Out> + Send {
47 let mut service = self.service.clone();
48 let (tx, rx) = mpsc::channel(self.buffer);
49 let cx = cx.clone();
50 tokio::spawn(async move {
51 let mut stream = pin!(service.handle(input, &cx));
52
53 while let Some(Some(res)) = cx.fuse_abort(stream.next()).await {
54 if tx.send(res).await.is_err() {
55 log::warn!("mpsc::send failed");
56 break;
57 }
58 }
59 });
60
61 futures::stream::iter([Ok(ReceiverStream::new(rx))])
62 }
63}
64
65pub fn spawn_each<M, S>(service: S) -> SpawnEach<M, S>
66where
67 M: Send,
68 S: Send + Service<M> + Clone + 'static,
69 S::Out: Send,
70{
71 SpawnEach::new(service, 2)
72}