flowly_service/
lib.rs

1mod and_then;
2mod concurrent_each;
3mod except;
4mod inspect;
5mod map;
6mod pass;
7mod scope;
8mod spawn_each;
9mod stub;
10mod switch;
11
12pub use and_then::and_then;
13pub use concurrent_each::{ConcurrentEach, concurrent_each};
14use flowly_core::Either;
15pub use map::{filter_map, map, map_if_else, try_filter_map, try_map};
16pub use pass::flow;
17pub use spawn_each::{SpawnEach, spawn_each};
18pub use stub::stub;
19pub use switch::switch;
20use tokio::sync::watch;
21
22use std::{marker::PhantomData, pin::pin};
23
24use futures::{Stream, StreamExt, future};
25
26pub use crate::except::Except;
27pub use crate::scope::{Scope, ScopeEach, scope, scope_each};
28
29#[derive(Clone)]
30#[non_exhaustive]
31pub struct Context {
32    pub abort: watch::Sender<bool>,
33    pub abort_recv: watch::Receiver<bool>,
34}
35
36impl Context {
37    pub async fn fuse_abort<F: Future>(&self, fut: F) -> Option<F::Output> {
38        let mut abort_recv = self.abort_recv.clone();
39        let fut1 = pin!(abort_recv.changed());
40        let fut2 = pin!(fut);
41
42        match futures::future::select(fut1, fut2).await {
43            future::Either::Left(..) => None,
44            future::Either::Right((val, _)) => Some(val),
45        }
46    }
47}
48
49impl Default for Context {
50    fn default() -> Self {
51        Self::new()
52    }
53}
54
55impl From<watch::Sender<bool>> for Context {
56    fn from(abort: watch::Sender<bool>) -> Self {
57        Self {
58            abort_recv: abort.subscribe(),
59            abort,
60        }
61    }
62}
63
64impl Context {
65    pub fn new() -> Self {
66        Self::from(watch::Sender::default())
67    }
68}
69
70pub trait Service<In> {
71    type Out;
72
73    fn handle(&mut self, input: In, cx: &Context) -> impl Stream<Item = Self::Out> + Send;
74    fn handle_stream(
75        &mut self,
76        input: impl Stream<Item = In> + Send,
77        cx: &Context,
78    ) -> impl Stream<Item = Self::Out> + Send
79    where
80        In: Send,
81        Self: Send,
82        Self::Out: Send,
83    {
84        async_stream::stream! {
85            let mut input = pin!(input);
86
87            while let Some(item) = input.next().await {
88                let mut s = pin!(self.handle(item, cx));
89
90                while let Some(out) = s.next().await {
91                    yield out;
92                }
93            }
94        }
95    }
96
97    #[inline]
98    fn finalize(&mut self, _cx: &Context) -> impl Future<Output = ()>
99    where
100        Self: Sized,
101    {
102        async move {}
103    }
104}
105
106impl<I, O1, E1, O2, E2, S1, S2> Service<I> for (S1, S2)
107where
108    I: Send,
109    O1: Send,
110    O2: Send,
111    E1: Send,
112    E2: Send,
113    S1: Service<I, Out = Result<O1, E1>> + Send,
114    S2: Service<O1, Out = Result<O2, E2>> + Send,
115{
116    type Out = Result<O2, Either<E1, E2>>;
117
118    fn handle(&mut self, msg: I, cx: &Context) -> impl Stream<Item = Self::Out> + Send {
119        async_stream::stream! {
120            let mut s1 = pin!(self.0.handle(msg, cx));
121
122            while let Some(res) = s1.next().await {
123                match res {
124                    Ok(ok) => {
125                        let mut s2 = pin!(self.1.handle(ok, cx));
126
127                        while let Some(i2) = s2.next().await {
128                            yield i2.map_err(Either::Right);
129                        }
130                    },
131                    Err(err) => yield Err(Either::Left(err)),
132                }
133            }
134        }
135    }
136}
137
138#[derive(Clone)]
139pub struct Left<S1, S2>(S1, S2);
140impl<I, O1, E, O2, S1, S2> Service<I> for Left<S1, S2>
141where
142    I: Send,
143    O1: Send,
144    O2: Send,
145    E: Send,
146    S1: Service<I, Out = Result<O1, E>> + Send,
147    S2: Service<O1, Out = O2> + Send,
148{
149    type Out = Result<O2, E>;
150
151    fn handle(&mut self, msg: I, cx: &Context) -> impl Stream<Item = Self::Out> + Send {
152        async_stream::stream! {
153            let mut s1 = pin!(self.0.handle(msg, cx));
154
155            while let Some(res) = s1.next().await {
156                match res {
157                    Ok(ok) => {
158                        let mut s2 = pin!(self.1.handle(ok, cx));
159
160                        while let Some(i2) = s2.next().await {
161                            yield Ok(i2);
162                        }
163                    },
164                    Err(err) => yield Err(err),
165                }
166            }
167        }
168    }
169}
170
171pub trait ServiceExt<I: Send>: Service<I> {
172    #[inline]
173    fn flow<O1, O2, E1, E2, U>(self, service: U) -> (Self, U)
174    where
175        Self: Sized + Service<I, Out = Result<O1, E1>> + Send,
176        U: Send + Service<O1, Out = Result<O2, E2>>,
177        O1: Send,
178        O2: Send,
179        E1: Send,
180        E2: Send,
181    {
182        (self, service)
183    }
184
185    #[inline]
186    fn except<F>(self, on_err: F) -> Except<Self, F>
187    where
188        Self: Sized,
189    {
190        Except {
191            service: self,
192            on_err,
193        }
194    }
195
196    /// Adds an inspection step that invokes the supplied callback on every
197    /// successful output of the wrapped service.
198    ///
199    /// This method returns a 2‑tuple consisting of:
200    /// * `self` – the original service unchanged.
201    /// * an `inspect::Inspect<O, E, F>` instance that intercepts the
202    ///   service’s output. For each successful result (`Ok(o)`), the
203    ///   closure `f` is called with a reference to `o`. The output is then
204    ///   passed through unchanged.
205    ///
206    /// # Parameters
207    ///
208    /// * `f` – A callback implementing `Fn(&O)`. The callback receives a
209    ///   reference to the successful output value. It can be used for
210    ///   logging, metrics, or any side‑effect‑only operation.
211    ///
212    /// # Return value
213    ///
214    /// A tuple `(Self, inspect::Inspect<O, E, F>)` that can be used in a
215    /// service pipeline (e.g., within the `flow` combinator). The first
216    /// element is the original service, and the second element is a service
217    /// that performs the inspection.
218    ///
219    /// # Example
220    ///
221    /// ```rust
222    /// use flowly_service::{Service, flow::Flow, inspect::Inspect};
223    ///
224    /// let service = MyService::new();
225    /// let (orig, inspector) = service.flow_inspect(|value: &Result<i32, _>| {
226    ///     println!("Got value: {:?}", value);
227    /// });
228    /// let flow = Flow::from(orig).and(inspector);
229    /// ```
230    #[inline]
231    fn flow_inspect<O, E, F>(self, f: F) -> Left<Self, inspect::Inspect<O, F>>
232    where
233        Self: Sized + Service<I, Out = Result<O, E>> + Send,
234        F: Fn(&O) + Send,
235        O: Send,
236    {
237        Left(
238            self,
239            inspect::Inspect::<O, F> {
240                cb: f,
241                _m: PhantomData,
242            },
243        )
244    }
245
246    /// Creates a concurrent wrapper around the current service that limits the number of
247    /// parallel executions.
248    ///
249    /// This method returns a `ConcurrentEach<I, Self>` instance that delegates work to a pool
250    /// of worker tasks. Each worker runs the underlying service independently, allowing
251    /// multiple inputs to be processed concurrently. The `limit` argument controls the
252    /// maximum number of worker tasks that may run in parallel.
253    ///
254    /// **Parameters**
255    /// - `self`: The service instance to be wrapped. It must implement `Service<I>` and
256    ///   satisfy `Send`, `Clone`, and `'static` bounds.
257    /// - `limit`: The maximum number of concurrent worker tasks to spawn. If `limit` is
258    ///   greater than the current number of tasks, new tasks will be created up to this
259    ///   bound.
260    ///
261    /// **Return value**
262    /// A `ConcurrentEach<I, Self>` which itself implements `Service`. When handling an
263    /// input, it forwards the input to one of the available workers and returns a stream
264    /// of results that can be awaited asynchronously.
265    ///
266    #[inline]
267    fn concurrent_each(self, limit: usize) -> ConcurrentEach<I, Self>
268    where
269        Self: Sized + Send + Clone + 'static,
270        Self::Out: Send,
271    {
272        ConcurrentEach::new(self, limit)
273    }
274
275    /// Creates a new [`SpawnEach`] wrapper around the current service.
276    ///
277    /// The wrapper spawns a separate task for each input message, forwarding
278    /// the results through a bounded `mpsc` channel. This allows the
279    /// underlying service to process messages concurrently without
280    /// blocking the caller.
281    ///
282    /// # Parameters
283    /// * `self` – The service instance to wrap. The service must implement
284    ///   `Service<I>` for some input type `I`.
285    ///
286    /// # Constraints
287    /// * `Self: Sized + Send + Clone + 'static` – The service must be
288    ///   clonable and safe to send across threads.
289    /// * `Self::Out: Send` – The output type of the service must be
290    ///   `Send` because it will be transported across channels.
291    ///
292    /// # Return value
293    /// Returns a [`SpawnEach<I, Self>`] that implements `Service<I>` with
294    /// the same input type. The new service can be used just like the
295    /// original one, but each invocation of `handle` will spawn a
296    /// dedicated task.
297    ///
298    /// # Example
299    /// ```rust
300    /// use flowly_service::{Service, spawn_each};
301    ///
302    /// struct MyService;
303    /// impl Service<u32> for MyService {
304    ///     type Out = Result<String, std::io::Error>;
305    ///     fn handle(&mut self, input: u32, _cx: &crate::Context)
306    ///         -> impl futures::Stream<Item = Self::Out> + Send
307    ///     {
308    ///         // …
309    ///     }
310    /// }
311    ///
312    /// let service = MyService;
313    /// // Wrap in SpawnEach
314    /// let concurrent_service = service.spawn_each();
315    /// // Now `concurrent_service` can be used as a Service and will process
316    /// // each input concurrently.
317    /// ```
318    ///
319    /// # Note
320    /// The default message buffer size is 2.
321    #[inline]
322    fn spawn_each(self) -> SpawnEach<I, Self>
323    where
324        Self: Sized + Send + Clone + 'static,
325        Self::Out: Send,
326    {
327        SpawnEach::new(self, 2)
328    }
329
330    /// Creates a scoped service wrapper that transforms incoming messages before passing to the wrapped service.
331    ///
332    /// This method consumes the current service and returns a tuple containing the original service and a new
333    /// [`Scope`] service that forwards transformed messages to `s`. The transformation function `f` receives
334    /// a reference to the original input `O` and returns either a message `M` for `s` or an error `E1`.\
335    ///
336    /// # Type Parameters
337    /// * `O`: Type of the original input that will be received by the outer service.
338    /// * `M`: Type of the message that `s` expects.
339    /// * `E1`: Error type returned by the transformation function `f`.
340    /// * `S`: The inner service that will handle the transformed messages.
341    /// * `F`: Function or closure of type `Fn(&O) -> Result<M, E1>`.
342    ///
343    /// # Parameters
344    /// * `self` – The current service (moved into the returned tuple).  
345    /// * `f` – Function that transforms `&O` into `Result<M, E1>`.  
346    /// * `s` – The inner service to be invoked after successful transformation.  
347    ///
348    /// # Returns
349    /// A tuple `(Self, Scope<O, M, E1, S, F>)` where:\n
350    /// * `Self` is the original service that can continue to be used.\n
351    /// * `Scope<O, M, E1, S, F>` is a new service that:\n
352    ///   1. Calls `f` with the incoming input.\n
353    ///   2. If `f` returns `Ok(m)`, forwards `m` to `s` and collects all emitted outputs into `Vec<O>`.\n
354    ///   3. If `f` returns `Err(e)`, immediately returns an error wrapped in `Either::Right(e)` without invoking `s`.\n
355    ///
356    /// # Example
357    /// ```ignore
358    /// let (service, scoped) = flow_scope(service, |msg: &Input| {
359    ///     if msg.valid { Ok(transformed_msg) } else { Err(TransformError) }
360    /// }, inner_service);
361    /// ```
362    ///
363    /// # Constraints
364    /// All involved types must be `Send`, and `Self` must implement `Sized + Send`.
365    #[inline]
366    fn flow_scope<O, M, E1, S, F>(self, f: F, s: S) -> (Self, Scope<O, M, E1, S, F>)
367    where
368        F: Fn(&O) -> Result<M, E1>,
369        Self: Sized + Send,
370        O: Send,
371        E1: Send,
372    {
373        (self, scope(f, s))
374    }
375
376    #[inline]
377    fn flow_scope_each<O, M, E1, S, F>(self, f: F, s: S) -> (Self, ScopeEach<O, M, E1, S, F>)
378    where
379        F: Fn(&O) -> Result<M, E1>,
380        Self: Sized + Send,
381        E1: Send,
382        O: Send + Clone,
383    {
384        (self, scope_each(f, s))
385    }
386
387    #[inline]
388    fn flow_map<O1, O2, E1, F, H>(self, f: F) -> Left<Self, map::Map<O2, F>>
389    where
390        Self: Sized + Service<I, Out = Result<O1, E1>> + Send,
391        F: FnMut(O1) -> H + Send,
392        H: Future<Output = O2> + Send,
393        O1: Send,
394        O2: Send,
395        E1: Send,
396    {
397        Left(self, map::map::<O2, _>(f))
398    }
399
400    #[inline]
401    fn flow_filter_map<O1, O2, E1, F, H>(self, f: F) -> Left<Self, map::FilterMap<O2, F>>
402    where
403        Self: Sized + Service<I, Out = Result<O1, E1>> + Send,
404        O1: Send,
405        O2: Send,
406        E1: Send,
407        F: FnMut(O1) -> H + Send,
408        H: Future<Output = Option<O2>> + Send,
409    {
410        Left(self, map::filter_map::<O2, _>(f))
411    }
412}
413
414impl<I: Send, T: Service<I>> ServiceExt<I> for T {}
415
416impl<I: Send, E, S: Service<I, Out = Result<I, E>>> Service<I> for Option<S> {
417    type Out = Result<I, E>;
418
419    fn handle(&mut self, input: I, cx: &Context) -> impl Stream<Item = Self::Out> + Send {
420        if let Some(srv) = self {
421            srv.handle(input, cx).left_stream()
422        } else {
423            futures::stream::once(async move { Ok(input) }).right_stream()
424        }
425    }
426}