flowly_service/
lib.rs

1mod and_then;
2mod concurrent_each;
3mod except;
4mod inspect;
5mod map;
6mod pass;
7mod scope;
8mod spawn_each;
9mod stub;
10mod switch;
11
12pub use and_then::and_then;
13pub use concurrent_each::{ConcurrentEach, concurrent_each};
14use flowly_core::Either;
15pub use map::{filter_map, map, map_if_else, try_filter_map, try_map};
16pub use pass::flow;
17pub use spawn_each::{SpawnEach, spawn_each};
18pub use stub::stub;
19pub use switch::switch;
20use tokio::sync::watch;
21
22use std::{marker::PhantomData, pin::pin};
23
24use futures::{Stream, StreamExt, future};
25
26use crate::{except::Except, scope::Scope};
27
28#[derive(Clone)]
29#[non_exhaustive]
30pub struct Context {
31    pub abort: watch::Sender<bool>,
32    pub abort_recv: watch::Receiver<bool>,
33}
34
35impl Context {
36    pub async fn fuse_abort<F: Future>(&self, fut: F) -> Option<F::Output> {
37        let mut abort_recv = self.abort_recv.clone();
38        let fut1 = pin!(abort_recv.changed());
39        let fut2 = pin!(fut);
40
41        match futures::future::select(fut1, fut2).await {
42            future::Either::Left(..) => None,
43            future::Either::Right((val, _)) => Some(val),
44        }
45    }
46}
47
48impl Default for Context {
49    fn default() -> Self {
50        Self::new()
51    }
52}
53
54impl From<watch::Sender<bool>> for Context {
55    fn from(abort: watch::Sender<bool>) -> Self {
56        Self {
57            abort_recv: abort.subscribe(),
58            abort,
59        }
60    }
61}
62
63impl Context {
64    pub fn new() -> Self {
65        Self::from(watch::Sender::default())
66    }
67}
68
69pub trait Service<In> {
70    type Out;
71
72    fn handle(&mut self, input: In, cx: &Context) -> impl Stream<Item = Self::Out> + Send;
73    fn handle_stream(
74        &mut self,
75        input: impl Stream<Item = In> + Send,
76        cx: &Context,
77    ) -> impl Stream<Item = Self::Out> + Send
78    where
79        In: Send,
80        Self: Send,
81        Self::Out: Send,
82    {
83        async_stream::stream! {
84            let mut input = pin!(input);
85
86            while let Some(item) = input.next().await {
87                let mut s = pin!(self.handle(item, cx));
88
89                while let Some(out) = s.next().await {
90                    yield out;
91                }
92            }
93        }
94    }
95
96    #[inline]
97    fn finalize(&mut self, _cx: &Context) -> impl Future<Output = ()>
98    where
99        Self: Sized,
100    {
101        async move {}
102    }
103}
104
105impl<I, O1, E1, O2, E2, S1, S2> Service<I> for (S1, S2)
106where
107    I: Send,
108    O1: Send,
109    O2: Send,
110    E1: Send,
111    E2: Send,
112    S1: Service<I, Out = Result<O1, E1>> + Send,
113    S2: Service<O1, Out = Result<O2, E2>> + Send,
114{
115    type Out = Result<O2, Either<E1, E2>>;
116
117    fn handle(&mut self, msg: I, cx: &Context) -> impl Stream<Item = Self::Out> + Send {
118        async_stream::stream! {
119            let mut s1 = pin!(self.0.handle(msg, cx));
120
121            while let Some(res) = s1.next().await {
122                match res {
123                    Ok(ok) => {
124                        let mut s2 = pin!(self.1.handle(ok, cx));
125
126                        while let Some(i2) = s2.next().await {
127                            yield i2.map_err(Either::Right);
128                        }
129                    },
130                    Err(err) => yield Err(Either::Left(err)),
131                }
132            }
133        }
134    }
135}
136
137pub struct Left<S1, S2>(S1, S2);
138impl<I, O1, E, O2, S1, S2> Service<I> for Left<S1, S2>
139where
140    I: Send,
141    O1: Send,
142    O2: Send,
143    E: Send,
144    S1: Service<I, Out = Result<O1, E>> + Send,
145    S2: Service<O1, Out = O2> + Send,
146{
147    type Out = Result<O2, E>;
148
149    fn handle(&mut self, msg: I, cx: &Context) -> impl Stream<Item = Self::Out> + Send {
150        async_stream::stream! {
151            let mut s1 = pin!(self.0.handle(msg, cx));
152
153            while let Some(res) = s1.next().await {
154                match res {
155                    Ok(ok) => {
156                        let mut s2 = pin!(self.1.handle(ok, cx));
157
158                        while let Some(i2) = s2.next().await {
159                            yield Ok(i2);
160                        }
161                    },
162                    Err(err) => yield Err(err),
163                }
164            }
165        }
166    }
167}
168
169pub trait ServiceExt<I: Send>: Service<I> {
170    #[inline]
171    fn flow<O1, O2, E1, E2, U>(self, service: U) -> (Self, U)
172    where
173        Self: Sized + Service<I, Out = Result<O1, E1>> + Send,
174        U: Send + Service<O1, Out = Result<O2, E2>>,
175        O1: Send,
176        O2: Send,
177        E1: Send,
178        E2: Send,
179    {
180        (self, service)
181    }
182
183    #[inline]
184    fn except<F>(self, on_err: F) -> Except<Self, F>
185    where
186        Self: Sized,
187    {
188        Except {
189            service: self,
190            on_err,
191        }
192    }
193
194    /// Adds an inspection step that invokes the supplied callback on every
195    /// successful output of the wrapped service.
196    ///
197    /// This method returns a 2‑tuple consisting of:
198    /// * `self` – the original service unchanged.
199    /// * an `inspect::Inspect<O, E, F>` instance that intercepts the
200    ///   service’s output. For each successful result (`Ok(o)`), the
201    ///   closure `f` is called with a reference to `o`. The output is then
202    ///   passed through unchanged.
203    ///
204    /// # Parameters
205    ///
206    /// * `f` – A callback implementing `Fn(&O)`. The callback receives a
207    ///   reference to the successful output value. It can be used for
208    ///   logging, metrics, or any side‑effect‑only operation.
209    ///
210    /// # Return value
211    ///
212    /// A tuple `(Self, inspect::Inspect<O, E, F>)` that can be used in a
213    /// service pipeline (e.g., within the `flow` combinator). The first
214    /// element is the original service, and the second element is a service
215    /// that performs the inspection.
216    ///
217    /// # Example
218    ///
219    /// ```rust
220    /// use flowly_service::{Service, flow::Flow, inspect::Inspect};
221    ///
222    /// let service = MyService::new();
223    /// let (orig, inspector) = service.flow_inspect(|value: &Result<i32, _>| {
224    ///     println!("Got value: {:?}", value);
225    /// });
226    /// let flow = Flow::from(orig).and(inspector);
227    /// ```
228    #[inline]
229    fn flow_inspect<O, E, F>(self, f: F) -> Left<Self, inspect::Inspect<O, F>>
230    where
231        Self: Sized + Service<I, Out = Result<O, E>> + Send,
232        F: Fn(&O) + Send,
233        O: Send,
234    {
235        Left(
236            self,
237            inspect::Inspect::<O, F> {
238                cb: f,
239                _m: PhantomData,
240            },
241        )
242    }
243
244    /// Creates a concurrent wrapper around the current service that limits the number of
245    /// parallel executions.
246    ///
247    /// This method returns a `ConcurrentEach<I, Self>` instance that delegates work to a pool
248    /// of worker tasks. Each worker runs the underlying service independently, allowing
249    /// multiple inputs to be processed concurrently. The `limit` argument controls the
250    /// maximum number of worker tasks that may run in parallel.
251    ///
252    /// **Parameters**
253    /// - `self`: The service instance to be wrapped. It must implement `Service<I>` and
254    ///   satisfy `Send`, `Clone`, and `'static` bounds.
255    /// - `limit`: The maximum number of concurrent worker tasks to spawn. If `limit` is
256    ///   greater than the current number of tasks, new tasks will be created up to this
257    ///   bound.
258    ///
259    /// **Return value**
260    /// A `ConcurrentEach<I, Self>` which itself implements `Service`. When handling an
261    /// input, it forwards the input to one of the available workers and returns a stream
262    /// of results that can be awaited asynchronously.
263    ///
264    #[inline]
265    fn concurrent_each(self, limit: usize) -> ConcurrentEach<I, Self>
266    where
267        Self: Sized + Send + Clone + 'static,
268        Self::Out: Send,
269    {
270        ConcurrentEach::new(self, limit)
271    }
272
273    /// Creates a new [`SpawnEach`] wrapper around the current service.
274    ///
275    /// The wrapper spawns a separate task for each input message, forwarding
276    /// the results through a bounded `mpsc` channel. This allows the
277    /// underlying service to process messages concurrently without
278    /// blocking the caller.
279    ///
280    /// # Parameters
281    /// * `self` – The service instance to wrap. The service must implement
282    ///   `Service<I>` for some input type `I`.
283    ///
284    /// # Constraints
285    /// * `Self: Sized + Send + Clone + 'static` – The service must be
286    ///   clonable and safe to send across threads.
287    /// * `Self::Out: Send` – The output type of the service must be
288    ///   `Send` because it will be transported across channels.
289    ///
290    /// # Return value
291    /// Returns a [`SpawnEach<I, Self>`] that implements `Service<I>` with
292    /// the same input type. The new service can be used just like the
293    /// original one, but each invocation of `handle` will spawn a
294    /// dedicated task.
295    ///
296    /// # Example
297    /// ```rust
298    /// use flowly_service::{Service, spawn_each};
299    ///
300    /// struct MyService;
301    /// impl Service<u32> for MyService {
302    ///     type Out = Result<String, std::io::Error>;
303    ///     fn handle(&mut self, input: u32, _cx: &crate::Context)
304    ///         -> impl futures::Stream<Item = Self::Out> + Send
305    ///     {
306    ///         // …
307    ///     }
308    /// }
309    ///
310    /// let service = MyService;
311    /// // Wrap in SpawnEach
312    /// let concurrent_service = service.spawn_each();
313    /// // Now `concurrent_service` can be used as a Service and will process
314    /// // each input concurrently.
315    /// ```
316    ///
317    /// # Note
318    /// The default message buffer size is 2.
319    #[inline]
320    fn spawn_each(self) -> SpawnEach<I, Self>
321    where
322        Self: Sized + Send + Clone + 'static,
323        Self::Out: Send,
324    {
325        SpawnEach::new(self, 2)
326    }
327
328    /// Creates a scoped service wrapper that transforms incoming messages before passing to the wrapped service.
329    ///
330    /// This method consumes the current service and returns a tuple containing the original service and a new
331    /// [`Scope`] service that forwards transformed messages to `s`. The transformation function `f` receives
332    /// a reference to the original input `O` and returns either a message `M` for `s` or an error `E1`.\
333    ///
334    /// # Type Parameters
335    /// * `O`: Type of the original input that will be received by the outer service.
336    /// * `M`: Type of the message that `s` expects.
337    /// * `E1`: Error type returned by the transformation function `f`.
338    /// * `S`: The inner service that will handle the transformed messages.
339    /// * `F`: Function or closure of type `Fn(&O) -> Result<M, E1>`.
340    ///
341    /// # Parameters
342    /// * `self` – The current service (moved into the returned tuple).  
343    /// * `f` – Function that transforms `&O` into `Result<M, E1>`.  
344    /// * `s` – The inner service to be invoked after successful transformation.  
345    ///
346    /// # Returns
347    /// A tuple `(Self, Scope<O, M, E1, S, F>)` where:\n
348    /// * `Self` is the original service that can continue to be used.\n
349    /// * `Scope<O, M, E1, S, F>` is a new service that:\n
350    ///   1. Calls `f` with the incoming input.\n
351    ///   2. If `f` returns `Ok(m)`, forwards `m` to `s` and collects all emitted outputs into `Vec<O>`.\n
352    ///   3. If `f` returns `Err(e)`, immediately returns an error wrapped in `Either::Right(e)` without invoking `s`.\n
353    ///
354    /// # Example
355    /// ```ignore
356    /// let (service, scoped) = flow_scope(service, |msg: &Input| {
357    ///     if msg.valid { Ok(transformed_msg) } else { Err(TransformError) }
358    /// }, inner_service);
359    /// ```
360    ///
361    /// # Constraints
362    /// All involved types must be `Send`, and `Self` must implement `Sized + Send`.
363    #[inline]
364    fn flow_scope<O, M, E1, S, F>(self, f: F, s: S) -> (Self, Scope<O, M, E1, S, F>)
365    where
366        F: Fn(&O) -> Result<M, E1>,
367        Self: Sized + Send,
368        O: Send,
369        E1: Send,
370    {
371        (self, scope::scope(f, s))
372    }
373
374    #[inline]
375    fn flow_map<O1, O2, E1, F, H>(self, f: F) -> Left<Self, map::Map<O2, F>>
376    where
377        Self: Sized + Service<I, Out = Result<O1, E1>> + Send,
378        F: FnMut(O1) -> H + Send,
379        H: Future<Output = O2> + Send,
380        O1: Send,
381        O2: Send,
382        E1: Send,
383    {
384        Left(self, map::map::<O2, _>(f))
385    }
386
387    #[inline]
388    fn flow_filter_map<O1, O2, E1, F, H>(self, f: F) -> Left<Self, map::FilterMap<O2, F>>
389    where
390        Self: Sized + Service<I, Out = Result<O1, E1>> + Send,
391        O1: Send,
392        O2: Send,
393        E1: Send,
394        F: FnMut(O1) -> H + Send,
395        H: Future<Output = Option<O2>> + Send,
396    {
397        Left(self, map::filter_map::<O2, _>(f))
398    }
399}
400
401impl<I: Send, T: Service<I>> ServiceExt<I> for T {}
402
403impl<I: Send, E, S: Service<I, Out = Result<I, E>>> Service<I> for Option<S> {
404    type Out = Result<I, E>;
405
406    fn handle(&mut self, input: I, cx: &Context) -> impl Stream<Item = Self::Out> + Send {
407        if let Some(srv) = self {
408            srv.handle(input, cx).left_stream()
409        } else {
410            futures::stream::once(async move { Ok(input) }).right_stream()
411        }
412    }
413}