flowly_service/
lib.rs

1mod and_then;
2mod filter;
3mod inspect;
4mod map;
5mod pass;
6mod scope;
7mod spawn;
8mod stub;
9mod switch;
10
11pub use and_then::and_then;
12use flowly_core::Either;
13pub use map::{filter_map, map, map_if_else, try_filter_map, try_map};
14pub use pass::flow;
15pub use spawn::{SpawnEach, spawn_each};
16pub use stub::stub;
17pub use switch::switch;
18use tokio::sync::watch;
19
20use std::{marker::PhantomData, pin::pin};
21
22use futures::{Stream, StreamExt};
23
24use crate::scope::Scope;
25
26#[derive(Clone)]
27#[non_exhaustive]
28pub struct Context {
29    pub abort: watch::Sender<bool>,
30    pub abort_recv: watch::Receiver<bool>,
31}
32
33impl Default for Context {
34    fn default() -> Self {
35        Self::new()
36    }
37}
38
39impl From<watch::Sender<bool>> for Context {
40    fn from(abort: watch::Sender<bool>) -> Self {
41        Self {
42            abort_recv: abort.subscribe(),
43            abort,
44        }
45    }
46}
47
48impl Context {
49    pub fn new() -> Self {
50        Self::from(watch::Sender::default())
51    }
52}
53
54pub trait Service<In> {
55    type Out;
56
57    fn handle(&mut self, input: In, cx: &Context) -> impl Stream<Item = Self::Out> + Send;
58
59    fn handle_stream(
60        &mut self,
61        input: impl Stream<Item = In> + Send,
62        cx: &Context,
63    ) -> impl Stream<Item = Self::Out> + Send
64    where
65        In: Send,
66        Self: Send,
67        Self::Out: Send,
68    {
69        async_stream::stream! {
70            let mut input = pin!(input);
71
72            while let Some(item) = input.next().await {
73                let mut s = pin!(self.handle(item, cx));
74
75                while let Some(out) = s.next().await {
76                    yield out;
77                }
78            }
79        }
80    }
81
82    fn finalize(&mut self, _cx: &Context) -> impl Future<Output = ()>
83    where
84        Self: Sized,
85    {
86        async move {}
87    }
88}
89
90impl<I, O1, E1, O2, E2, S1, S2> Service<I> for (S1, S2)
91where
92    I: Send,
93    O1: Send,
94    O2: Send,
95    E1: Send,
96    E2: Send,
97    S1: Service<I, Out = Result<O1, E1>> + Send,
98    S2: Service<O1, Out = Result<O2, E2>> + Send,
99{
100    type Out = Result<O2, Either<E1, E2>>;
101
102    fn handle(&mut self, msg: I, cx: &Context) -> impl Stream<Item = Self::Out> + Send {
103        async_stream::stream! {
104            let mut s1 = pin!(self.0.handle(msg, cx));
105
106            while let Some(res) = s1.next().await {
107                match res {
108                    Ok(ok) => {
109                        let mut s2 = pin!(self.1.handle(ok, cx));
110
111                        while let Some(i2) = s2.next().await {
112                            yield i2.map_err(Either::Right);
113                        }
114                    },
115                    Err(err) => yield Err(Either::Left(err)),
116                }
117            }
118        }
119    }
120}
121
122pub trait ServiceExt<I: Send>: Service<I> {
123    #[inline]
124    fn flow<O1, O2, E1, E2, U>(self, service: U) -> (Self, U)
125    where
126        Self: Sized + Service<I, Out = Result<O1, E1>> + Send,
127        U: Send + Service<O1, Out = Result<O2, E2>>,
128        O1: Send,
129        O2: Send,
130        E1: Send,
131        E2: Send,
132    {
133        (self, service)
134    }
135
136    #[inline]
137    fn flow_inspect<O, E, F>(self, f: F) -> (Self, inspect::Inspect<O, E, F>)
138    where
139        Self: Sized + Service<I, Out = Result<O, E>> + Send,
140        F: Fn(&O) + Send,
141        O: Send,
142        E: Send,
143    {
144        (
145            self,
146            inspect::Inspect::<O, E, F> {
147                cb: f,
148                _m: PhantomData,
149            },
150        )
151    }
152
153    #[inline]
154    fn spawn_each(self) -> SpawnEach<I, Self>
155    where
156        Self: Sized + Send + Clone,
157        Self::Out: Send,
158    {
159        SpawnEach::new(self)
160    }
161
162    #[inline]
163    fn flow_scope<O, M, E1, S, F>(self, f: F, s: S) -> (Self, Scope<O, M, E1, S, F>)
164    where
165        F: Fn(&O) -> Result<M, E1>,
166        Self: Sized + Send,
167        O: Send,
168        E1: Send,
169    {
170        (self, scope::scope(f, s))
171    }
172
173    #[inline]
174    fn flow_map<O1, O2, E1, F, H>(self, f: F) -> (Self, map::Map<O2, F>)
175    where
176        Self: Sized + Service<I, Out = Result<O1, E1>> + Send,
177        F: FnMut(O1) -> H + Send,
178        H: Future<Output = O2> + Send,
179        O1: Send,
180        O2: Send,
181        E1: Send,
182    {
183        (self, map::map::<O2, _>(f))
184    }
185
186    #[inline]
187    fn flow_filter_map<O1, O2, E1, F, H>(self, f: F) -> (Self, map::FilterMap<O2, F>)
188    where
189        Self: Sized + Service<I, Out = Result<O1, E1>> + Send,
190        O1: Send,
191        O2: Send,
192        E1: Send,
193        F: FnMut(O1) -> H + Send,
194        H: Future<Output = Option<O2>> + Send,
195    {
196        (self, map::filter_map::<O2, _>(f))
197    }
198}
199
200impl<I: Send, T: Service<I>> ServiceExt<I> for T {}
201
202impl<I: Send, E, S: Service<I, Out = Result<I, E>>> Service<I> for Option<S> {
203    type Out = Result<I, E>;
204
205    fn handle(&mut self, input: I, cx: &Context) -> impl Stream<Item = Self::Out> + Send {
206        if let Some(srv) = self {
207            srv.handle(input, cx).left_stream()
208        } else {
209            futures::stream::once(async move { Ok(input) }).right_stream()
210        }
211    }
212}