Skip to main content

flowly_service/
lib.rs

1mod and_then;
2mod concurrent_each;
3mod except;
4mod inspect;
5mod map;
6mod pass;
7mod scope;
8mod spawn_each;
9mod stub;
10mod switch;
11
12pub use and_then::and_then;
13pub use concurrent_each::{ConcurrentEach, concurrent_each};
14pub use map::{filter_map, map, map_if_else, try_filter_map, try_map};
15pub use pass::flow;
16pub use spawn_each::{SpawnEach, spawn_each};
17pub use stub::stub;
18pub use switch::switch;
19use tokio::sync::watch;
20
21use std::{marker::PhantomData, pin::pin};
22
23use futures::{Stream, StreamExt, future};
24
25pub use crate::except::Except;
26pub use crate::scope::{Scope, ScopeEach, scope, scope_each};
27
28#[derive(Clone)]
29#[non_exhaustive]
30pub struct Context {
31    pub abort: watch::Sender<bool>,
32    pub abort_recv: watch::Receiver<bool>,
33}
34
35impl Context {
36    pub async fn fuse_abort<F: Future>(&self, fut: F) -> Option<F::Output> {
37        let mut abort_recv = self.abort_recv.clone();
38        let fut1 = pin!(abort_recv.changed());
39        let fut2 = pin!(fut);
40
41        match futures::future::select(fut1, fut2).await {
42            future::Either::Left(..) => None,
43            future::Either::Right((val, _)) => Some(val),
44        }
45    }
46
47    #[inline]
48    pub fn alive(&self) -> bool {
49        !*self.abort_recv.borrow()
50    }
51}
52
53impl Default for Context {
54    fn default() -> Self {
55        Self::new()
56    }
57}
58
59impl From<watch::Sender<bool>> for Context {
60    fn from(abort: watch::Sender<bool>) -> Self {
61        Self {
62            abort_recv: abort.subscribe(),
63            abort,
64        }
65    }
66}
67
68impl Context {
69    pub fn new() -> Self {
70        Self::from(watch::Sender::default())
71    }
72}
73
74pub trait Service<In>: Send + Sync {
75    type Out;
76
77    fn handle(&self, input: In, cx: &Context) -> impl Stream<Item = Self::Out> + Send;
78    fn handle_stream(
79        &self,
80        input: impl Stream<Item = In> + Send,
81        cx: &Context,
82    ) -> impl Stream<Item = Self::Out> + Send
83    where
84        In: Send,
85        Self: Send,
86        Self::Out: Send,
87    {
88        async_stream::stream! {
89            let mut input = pin!(input);
90
91            while let Some(item) = input.next().await {
92                let mut s = pin!(self.handle(item, cx));
93
94                while let Some(out) = s.next().await {
95                    yield out;
96                }
97            }
98        }
99    }
100
101    #[inline]
102    fn finalize(&mut self, _cx: &Context) -> impl Future<Output = ()>
103    where
104        Self: Sized,
105    {
106        async move {}
107    }
108}
109
110impl<I, O1, E, O2, EI, S1, S2> Service<I> for (S1, S2)
111where
112    I: Send,
113    O1: Send,
114    O2: Send,
115    E: Send + From<EI>,
116    EI: Send,
117    S1: Service<I, Out = Result<O1, E>> + Send,
118    S2: Service<O1, Out = Result<O2, EI>> + Send,
119{
120    type Out = Result<O2, E>;
121
122    fn handle(&self, msg: I, cx: &Context) -> impl Stream<Item = Self::Out> + Send {
123        async_stream::stream! {
124            let mut s1 = pin!(self.0.handle(msg, cx));
125
126            while let Some(res) = s1.next().await {
127                match res {
128                    Ok(ok) => {
129                        let mut s2 = pin!(self.1.handle(ok, cx));
130
131                        while let Some(i2) = s2.next().await {
132                            yield i2.map_err(E::from);
133                        }
134                    },
135                    Err(err) => yield Err(err),
136                }
137            }
138        }
139    }
140}
141
142#[derive(Clone)]
143pub struct Left<S1, S2>(S1, S2);
144impl<I, O1, E, O2, S1, S2> Service<I> for Left<S1, S2>
145where
146    I: Send,
147    O1: Send,
148    O2: Send,
149    E: Send,
150    S1: Service<I, Out = Result<O1, E>> + Send,
151    S2: Service<O1, Out = O2> + Send,
152{
153    type Out = Result<O2, E>;
154
155    fn handle(&self, msg: I, cx: &Context) -> impl Stream<Item = Self::Out> + Send {
156        async_stream::stream! {
157            let mut s1 = pin!(self.0.handle(msg, cx));
158
159            while let Some(res) = s1.next().await {
160                match res {
161                    Ok(ok) => {
162                        let mut s2 = pin!(self.1.handle(ok, cx));
163
164                        while let Some(i2) = s2.next().await {
165                            yield Ok(i2);
166                        }
167                    },
168                    Err(err) => yield Err(err),
169                }
170            }
171        }
172    }
173}
174
175pub trait ServiceExt<I: Send>: Service<I> {
176    #[inline]
177    fn flow<O1, O2, E, EI, U>(self, service: U) -> (Self, U)
178    where
179        Self: Sized + Service<I, Out = Result<O1, E>> + Send,
180        U: Send + Service<O1, Out = Result<O2, EI>>,
181        O1: Send,
182        O2: Send,
183        E: Send + From<EI>,
184        EI: Send,
185    {
186        (self, service)
187    }
188
189    #[inline]
190    fn except<F>(self, on_err: F) -> Except<Self, F>
191    where
192        Self: Sized,
193    {
194        Except {
195            service: self,
196            on_err,
197        }
198    }
199
200    /// Adds an inspection step that invokes the supplied callback on every
201    /// successful output of the wrapped service.
202    ///
203    /// This method returns a 2‑tuple consisting of:
204    /// * `self` – the original service unchanged.
205    /// * an `inspect::Inspect<O, E, F>` instance that intercepts the
206    ///   service’s output. For each successful result (`Ok(o)`), the
207    ///   closure `f` is called with a reference to `o`. The output is then
208    ///   passed through unchanged.
209    ///
210    /// # Parameters
211    ///
212    /// * `f` – A callback implementing `Fn(&O)`. The callback receives a
213    ///   reference to the successful output value. It can be used for
214    ///   logging, metrics, or any side‑effect‑only operation.
215    ///
216    /// # Return value
217    ///
218    /// A tuple `(Self, inspect::Inspect<O, E, F>)` that can be used in a
219    /// service pipeline (e.g., within the `flow` combinator). The first
220    /// element is the original service, and the second element is a service
221    /// that performs the inspection.
222    ///
223    /// # Example
224    ///
225    /// ```rust
226    /// use flowly_service::{Service, flow::Flow, inspect::Inspect};
227    ///
228    /// let service = MyService::new();
229    /// let (orig, inspector) = service.flow_inspect(|value: &Result<i32, _>| {
230    ///     println!("Got value: {:?}", value);
231    /// });
232    /// let flow = Flow::from(orig).and(inspector);
233    /// ```
234    #[inline]
235    fn flow_inspect<O, E, F>(self, f: F) -> Left<Self, inspect::Inspect<O, F>>
236    where
237        Self: Sized + Service<I, Out = Result<O, E>> + Send,
238        F: Fn(&O) + Send,
239        O: Send,
240    {
241        Left(
242            self,
243            inspect::Inspect::<O, F> {
244                cb: f,
245                _m: PhantomData,
246            },
247        )
248    }
249
250    /// Creates a concurrent wrapper around the current service that limits the number of
251    /// parallel executions.
252    ///
253    /// This method returns a `ConcurrentEach<I, Self>` instance that delegates work to a pool
254    /// of worker tasks. Each worker runs the underlying service independently, allowing
255    /// multiple inputs to be processed concurrently. The `limit` argument controls the
256    /// maximum number of worker tasks that may run in parallel.
257    ///
258    /// **Parameters**
259    /// - `self`: The service instance to be wrapped. It must implement `Service<I>` and
260    ///   satisfy `Send`, `Clone`, and `'static` bounds.
261    /// - `limit`: The maximum number of concurrent worker tasks to spawn. If `limit` is
262    ///   greater than the current number of tasks, new tasks will be created up to this
263    ///   bound.
264    ///
265    /// **Return value**
266    /// A `ConcurrentEach<I, Self>` which itself implements `Service`. When handling an
267    /// input, it forwards the input to one of the available workers and returns a stream
268    /// of results that can be awaited asynchronously.
269    ///
270    #[inline]
271    fn concurrent_each(self, limit: usize) -> ConcurrentEach<I, Self>
272    where
273        Self: Sized + Send + Clone + 'static,
274        Self::Out: Send,
275    {
276        ConcurrentEach::new(self, limit)
277    }
278
279    /// Creates a new [`SpawnEach`] wrapper around the current service.
280    ///
281    /// The wrapper spawns a separate task for each input message, forwarding
282    /// the results through a bounded `mpsc` channel. This allows the
283    /// underlying service to process messages concurrently without
284    /// blocking the caller.
285    ///
286    /// # Parameters
287    /// * `self` – The service instance to wrap. The service must implement
288    ///   `Service<I>` for some input type `I`.
289    ///
290    /// # Constraints
291    /// * `Self: Sized + Send + Clone + 'static` – The service must be
292    ///   clonable and safe to send across threads.
293    /// * `Self::Out: Send` – The output type of the service must be
294    ///   `Send` because it will be transported across channels.
295    ///
296    /// # Return value
297    /// Returns a [`SpawnEach<I, Self>`] that implements `Service<I>` with
298    /// the same input type. The new service can be used just like the
299    /// original one, but each invocation of `handle` will spawn a
300    /// dedicated task.
301    ///
302    /// # Example
303    /// ```rust
304    /// use flowly_service::{Service, spawn_each};
305    ///
306    /// struct MyService;
307    /// impl Service<u32> for MyService {
308    ///     type Out = Result<String, std::io::Error>;
309    ///     fn handle(&mut self, input: u32, _cx: &crate::Context)
310    ///         -> impl futures::Stream<Item = Self::Out> + Send
311    ///     {
312    ///         // …
313    ///     }
314    /// }
315    ///
316    /// let service = MyService;
317    /// // Wrap in SpawnEach
318    /// let concurrent_service = service.spawn_each();
319    /// // Now `concurrent_service` can be used as a Service and will process
320    /// // each input concurrently.
321    /// ```
322    ///
323    /// # Note
324    /// The default message buffer size is 2.
325    #[inline]
326    fn spawn_each(self) -> SpawnEach<I, Self>
327    where
328        Self: Sized + Send + Clone + 'static,
329        Self::Out: Send,
330    {
331        SpawnEach::new(self, 2)
332    }
333
334    /// Creates a scoped service wrapper that transforms incoming messages before passing to the wrapped service.
335    ///
336    /// This method consumes the current service and returns a tuple containing the original service and a new
337    /// [`Scope`] service that forwards transformed messages to `s`. The transformation function `f` receives
338    /// a reference to the original input `O` and returns either a message `M` for `s` or an error `E1`.\
339    ///
340    /// # Type Parameters
341    /// * `O`: Type of the original input that will be received by the outer service.
342    /// * `M`: Type of the message that `s` expects.
343    /// * `E1`: Error type returned by the transformation function `f`.
344    /// * `S`: The inner service that will handle the transformed messages.
345    /// * `F`: Function or closure of type `Fn(&O) -> Result<M, E1>`.
346    ///
347    /// # Parameters
348    /// * `self` – The current service (moved into the returned tuple).  
349    /// * `f` – Function that transforms `&O` into `Result<M, E1>`.  
350    /// * `s` – The inner service to be invoked after successful transformation.  
351    ///
352    /// # Returns
353    /// A tuple `(Self, Scope<O, M, E1, S, F>)` where:\n
354    /// * `Self` is the original service that can continue to be used.\n
355    /// * `Scope<O, M, E1, S, F>` is a new service that:\n
356    ///   1. Calls `f` with the incoming input.\n
357    ///   2. If `f` returns `Ok(m)`, forwards `m` to `s` and collects all emitted outputs into `Vec<O>`.\n
358    ///   3. If `f` returns `Err(e)`, immediately returns an error wrapped in `Either::Right(e)` without invoking `s`.\n
359    ///
360    /// # Example
361    /// ```ignore
362    /// let (service, scoped) = flow_scope(service, |msg: &Input| {
363    ///     if msg.valid { Ok(transformed_msg) } else { Err(TransformError) }
364    /// }, inner_service);
365    /// ```
366    ///
367    /// # Constraints
368    /// All involved types must be `Send`, and `Self` must implement `Sized + Send`.
369    #[inline]
370    fn flow_scope<O, M, E, S, F>(self, f: F, s: S) -> (Self, Scope<O, M, E, S, F>)
371    where
372        F: Fn(&O) -> Result<M, E>,
373        Self: Sized + Send,
374        O: Send,
375        E: Send,
376    {
377        (self, scope(f, s))
378    }
379
380    #[inline]
381    fn flow_scope_each<O, M, E, S, F>(self, f: F, s: S) -> (Self, ScopeEach<O, M, S, F, E>)
382    where
383        F: Fn(&O) -> Result<M, E>,
384        Self: Sized + Send,
385        E: Send,
386        O: Send + Clone,
387    {
388        (self, scope_each(f, s))
389    }
390
391    #[inline]
392    fn flow_map<O1, O2, E1, F, H>(self, f: F) -> Left<Self, map::Map<O2, F>>
393    where
394        Self: Sized + Service<I, Out = Result<O1, E1>> + Send,
395        F: Fn(O1) -> H + Send,
396        H: Future<Output = O2> + Send,
397        O1: Send,
398        O2: Send,
399        E1: Send,
400    {
401        Left(self, map::map::<O2, _>(f))
402    }
403
404    #[inline]
405    fn flow_filter_map<O1, O2, E1, F, H>(self, f: F) -> Left<Self, map::FilterMap<O2, F>>
406    where
407        Self: Sized + Service<I, Out = Result<O1, E1>> + Send,
408        O1: Send,
409        O2: Send,
410        E1: Send,
411        F: Fn(O1) -> H + Send,
412        H: Future<Output = Option<O2>> + Send,
413    {
414        Left(self, map::filter_map::<O2, _>(f))
415    }
416}
417
418impl<I: Send, T: Service<I>> ServiceExt<I> for T {}
419
420impl<I: Send, E, S: Service<I, Out = Result<I, E>>> Service<I> for Option<S> {
421    type Out = Result<I, E>;
422
423    fn handle(&self, input: I, cx: &Context) -> impl Stream<Item = Self::Out> + Send {
424        if let Some(srv) = self {
425            srv.handle(input, cx).left_stream()
426        } else {
427            futures::stream::once(async move { Ok(input) }).right_stream()
428        }
429    }
430}