flowly_service/
spawn_each.rs1use std::{marker::PhantomData, pin::pin, sync::Arc};
2
3use futures::StreamExt;
4use tokio::sync::mpsc;
5use tokio_stream::wrappers::ReceiverStream;
6
7use crate::Service;
8
9#[derive(Clone)]
10pub struct SpawnEach<M, S> {
11 service: Arc<S>,
12 buffer: usize,
13 _m: PhantomData<M>,
14}
15
16impl<M, S> SpawnEach<M, S>
17where
18 M: Send,
19 S: Service<M> + Send + 'static,
20{
21 pub fn new(service: S, buffer: usize) -> Self
22 where
23 S::Out: Send,
24 {
25 Self {
26 service: Arc::new(service),
27 buffer,
28 _m: PhantomData,
29 }
30 }
31}
32
33impl<M, R, E, S> Service<M> for SpawnEach<M, S>
34where
35 M: Send + Sync + 'static,
36 R: Send + 'static,
37 E: Send + 'static,
38 S: Service<M, Out = Result<R, E>> + 'static,
39{
40 type Out = Result<ReceiverStream<Result<R, E>>, E>;
41
42 fn handle(
43 &self,
44 input: M,
45 cx: &crate::Context,
46 ) -> impl futures::Stream<Item = Self::Out> + Send {
47 let service = self.service.clone();
48 let (tx, rx) = mpsc::channel(self.buffer);
49
50 let cx = cx.clone();
51
52 tokio::spawn(async move {
53 let mut stream = pin!(service.handle(input, &cx));
54
55 while let Some(Some(res)) = cx.fuse_abort(stream.next()).await {
56 if tx.send(res).await.is_err() {
57 log::warn!("mpsc::send failed");
58 break;
59 }
60 }
61 });
62
63 futures::stream::iter([Ok(ReceiverStream::new(rx))])
64 }
65}
66
67pub fn spawn_each<M, S>(service: S) -> SpawnEach<M, S>
68where
69 M: Send,
70 S: Send + Service<M> + Clone + 'static,
71 S::Out: Send,
72{
73 SpawnEach::new(service, 2)
74}