flowly_service/
spawn_each.rs1use std::{marker::PhantomData, pin::pin};
2
3use futures::StreamExt;
4use tokio::sync::mpsc;
5use tokio_stream::wrappers::ReceiverStream;
6
7use crate::Service;
8
9pub struct SpawnEach<M, S> {
10 service: S,
11 buffer: usize,
12 _m: PhantomData<M>,
13}
14
15impl<M, S> SpawnEach<M, S>
16where
17 M: Send,
18 S: Service<M> + Send + Clone + 'static,
19{
20 pub fn new(service: S, buffer: usize) -> Self
21 where
22 S::Out: Send,
23 {
24 Self {
25 service,
26 buffer,
27 _m: PhantomData,
28 }
29 }
30}
31
32impl<M, R, E, S> Service<M> for SpawnEach<M, S>
33where
34 M: Send + 'static,
35 R: Send + 'static,
36 E: Send + 'static,
37 S: Clone + Service<M, Out = Result<R, E>> + Send + 'static,
38{
39 type Out = Result<ReceiverStream<Result<R, E>>, E>;
40
41 fn handle(
42 &mut self,
43 input: M,
44 cx: &crate::Context,
45 ) -> impl futures::Stream<Item = Self::Out> + Send {
46 let mut service = self.service.clone();
47 let (tx, rx) = mpsc::channel(self.buffer);
48 let cx = cx.clone();
49 tokio::spawn(async move {
50 let mut stream = pin!(service.handle(input, &cx));
51
52 while let Some(Some(res)) = cx.fuse_abort(stream.next()).await {
53 if tx.send(res).await.is_err() {
54 log::warn!("mpsc::send failed");
55 break;
56 }
57 }
58 });
59
60 futures::stream::iter([Ok(ReceiverStream::new(rx))])
61 }
62}
63pub fn spawn_each<M, S>(service: S) -> SpawnEach<M, S>
64where
65 M: Send,
66 S: Send + Service<M> + Clone + 'static,
67 S::Out: Send,
68{
69 SpawnEach::new(service, 2)
70}