flowly_service/
spawn_each.rs1use std::{marker::PhantomData, pin::pin};
2
3use futures::StreamExt;
4
5use crate::Service;
6
7#[derive(Clone)]
8pub struct SpawnEach<M, S> {
9 service: S,
10 buffer: usize,
11 _m: PhantomData<M>,
12}
13
14impl<M, S> SpawnEach<M, S>
15where
16 M: Send,
17 S: Service<M> + Send + Clone + 'static,
18{
19 pub fn new(service: S, buffer: usize) -> Self
20 where
21 S::Out: Send,
22 {
23 Self {
24 service,
25 buffer,
26 _m: PhantomData,
27 }
28 }
29}
30
31impl<M, R, E, S> Service<M> for SpawnEach<M, S>
32where
33 M: Send + 'static,
34 R: Send + 'static,
35 E: Send + 'static,
36 S: Clone + Service<M, Out = Result<R, E>> + Send + 'static,
37{
38 type Out = Result<flowly_spsc::Receiver<Result<R, E>>, E>;
39
40 fn handle(
41 &mut self,
42 input: M,
43 cx: &crate::Context,
44 ) -> impl futures::Stream<Item = Self::Out> + Send {
45 let mut service = self.service.clone();
46 let (mut tx, rx) = flowly_spsc::channel(self.buffer);
47 let cx = cx.clone();
48
49 tokio::spawn(async move {
50 let mut stream = pin!(service.handle(input, &cx));
51
52 while let Some(Some(res)) = cx.fuse_abort(stream.next()).await {
53 if tx.send(res).await.is_err() {
54 log::warn!("mpsc::send failed");
55 break;
56 }
57 }
58 });
59
60 futures::stream::iter([Ok(rx)])
61 }
62}
63
64pub fn spawn_each<M, S>(service: S) -> SpawnEach<M, S>
65where
66 M: Send,
67 S: Send + Service<M> + Clone + 'static,
68 S::Out: Send,
69{
70 SpawnEach::new(service, 2)
71}