flowly_service/
lib.rs

1mod and_then;
2mod concurrent_each;
3mod except;
4mod inspect;
5mod map;
6mod pass;
7mod scope;
8mod spawn_each;
9mod stub;
10mod switch;
11
12pub use and_then::and_then;
13pub use concurrent_each::{ConcurrentEach, concurrent_each};
14use flowly_core::Either;
15pub use map::{filter_map, map, map_if_else, try_filter_map, try_map};
16pub use pass::flow;
17pub use spawn_each::{SpawnEach, spawn_each};
18pub use stub::stub;
19pub use switch::switch;
20use tokio::sync::watch;
21
22use std::{marker::PhantomData, pin::pin};
23
24use futures::{Stream, StreamExt, future};
25
26use crate::{except::Except, scope::Scope};
27
28#[derive(Clone)]
29#[non_exhaustive]
30pub struct Context {
31    pub abort: watch::Sender<bool>,
32    pub abort_recv: watch::Receiver<bool>,
33}
34
35impl Context {
36    pub async fn fuse_abort<F: Future>(&self, fut: F) -> Option<F::Output> {
37        let mut abort_recv = self.abort_recv.clone();
38        let fut1 = pin!(abort_recv.changed());
39        let fut2 = pin!(fut);
40
41        match futures::future::select(fut1, fut2).await {
42            future::Either::Left(..) => None,
43            future::Either::Right((val, _)) => Some(val),
44        }
45    }
46}
47
48impl Default for Context {
49    fn default() -> Self {
50        Self::new()
51    }
52}
53
54impl From<watch::Sender<bool>> for Context {
55    fn from(abort: watch::Sender<bool>) -> Self {
56        Self {
57            abort_recv: abort.subscribe(),
58            abort,
59        }
60    }
61}
62
63impl Context {
64    pub fn new() -> Self {
65        Self::from(watch::Sender::default())
66    }
67}
68
69pub trait Service<In> {
70    type Out;
71
72    fn handle(&mut self, input: In, cx: &Context) -> impl Stream<Item = Self::Out> + Send;
73    fn handle_stream(
74        &mut self,
75        input: impl Stream<Item = In> + Send,
76        cx: &Context,
77    ) -> impl Stream<Item = Self::Out> + Send
78    where
79        In: Send,
80        Self: Send,
81        Self::Out: Send,
82    {
83        async_stream::stream! {
84            let mut input = pin!(input);
85
86            while let Some(item) = input.next().await {
87                let mut s = pin!(self.handle(item, cx));
88
89                while let Some(out) = s.next().await {
90                    yield out;
91                }
92            }
93        }
94    }
95
96    #[inline]
97    fn finalize(&mut self, _cx: &Context) -> impl Future<Output = ()>
98    where
99        Self: Sized,
100    {
101        async move {}
102    }
103}
104
105impl<I, O1, E1, O2, E2, S1, S2> Service<I> for (S1, S2)
106where
107    I: Send,
108    O1: Send,
109    O2: Send,
110    E1: Send,
111    E2: Send,
112    S1: Service<I, Out = Result<O1, E1>> + Send,
113    S2: Service<O1, Out = Result<O2, E2>> + Send,
114{
115    type Out = Result<O2, Either<E1, E2>>;
116
117    fn handle(&mut self, msg: I, cx: &Context) -> impl Stream<Item = Self::Out> + Send {
118        async_stream::stream! {
119            let mut s1 = pin!(self.0.handle(msg, cx));
120
121            while let Some(res) = s1.next().await {
122                match res {
123                    Ok(ok) => {
124                        let mut s2 = pin!(self.1.handle(ok, cx));
125
126                        while let Some(i2) = s2.next().await {
127                            yield i2.map_err(Either::Right);
128                        }
129                    },
130                    Err(err) => yield Err(Either::Left(err)),
131                }
132            }
133        }
134    }
135}
136
137#[derive(Clone)]
138pub struct Left<S1, S2>(S1, S2);
139impl<I, O1, E, O2, S1, S2> Service<I> for Left<S1, S2>
140where
141    I: Send,
142    O1: Send,
143    O2: Send,
144    E: Send,
145    S1: Service<I, Out = Result<O1, E>> + Send,
146    S2: Service<O1, Out = O2> + Send,
147{
148    type Out = Result<O2, E>;
149
150    fn handle(&mut self, msg: I, cx: &Context) -> impl Stream<Item = Self::Out> + Send {
151        async_stream::stream! {
152            let mut s1 = pin!(self.0.handle(msg, cx));
153
154            while let Some(res) = s1.next().await {
155                match res {
156                    Ok(ok) => {
157                        let mut s2 = pin!(self.1.handle(ok, cx));
158
159                        while let Some(i2) = s2.next().await {
160                            yield Ok(i2);
161                        }
162                    },
163                    Err(err) => yield Err(err),
164                }
165            }
166        }
167    }
168}
169
170pub trait ServiceExt<I: Send>: Service<I> {
171    #[inline]
172    fn flow<O1, O2, E1, E2, U>(self, service: U) -> (Self, U)
173    where
174        Self: Sized + Service<I, Out = Result<O1, E1>> + Send,
175        U: Send + Service<O1, Out = Result<O2, E2>>,
176        O1: Send,
177        O2: Send,
178        E1: Send,
179        E2: Send,
180    {
181        (self, service)
182    }
183
184    #[inline]
185    fn except<F>(self, on_err: F) -> Except<Self, F>
186    where
187        Self: Sized,
188    {
189        Except {
190            service: self,
191            on_err,
192        }
193    }
194
195    /// Adds an inspection step that invokes the supplied callback on every
196    /// successful output of the wrapped service.
197    ///
198    /// This method returns a 2‑tuple consisting of:
199    /// * `self` – the original service unchanged.
200    /// * an `inspect::Inspect<O, E, F>` instance that intercepts the
201    ///   service’s output. For each successful result (`Ok(o)`), the
202    ///   closure `f` is called with a reference to `o`. The output is then
203    ///   passed through unchanged.
204    ///
205    /// # Parameters
206    ///
207    /// * `f` – A callback implementing `Fn(&O)`. The callback receives a
208    ///   reference to the successful output value. It can be used for
209    ///   logging, metrics, or any side‑effect‑only operation.
210    ///
211    /// # Return value
212    ///
213    /// A tuple `(Self, inspect::Inspect<O, E, F>)` that can be used in a
214    /// service pipeline (e.g., within the `flow` combinator). The first
215    /// element is the original service, and the second element is a service
216    /// that performs the inspection.
217    ///
218    /// # Example
219    ///
220    /// ```rust
221    /// use flowly_service::{Service, flow::Flow, inspect::Inspect};
222    ///
223    /// let service = MyService::new();
224    /// let (orig, inspector) = service.flow_inspect(|value: &Result<i32, _>| {
225    ///     println!("Got value: {:?}", value);
226    /// });
227    /// let flow = Flow::from(orig).and(inspector);
228    /// ```
229    #[inline]
230    fn flow_inspect<O, E, F>(self, f: F) -> Left<Self, inspect::Inspect<O, F>>
231    where
232        Self: Sized + Service<I, Out = Result<O, E>> + Send,
233        F: Fn(&O) + Send,
234        O: Send,
235    {
236        Left(
237            self,
238            inspect::Inspect::<O, F> {
239                cb: f,
240                _m: PhantomData,
241            },
242        )
243    }
244
245    /// Creates a concurrent wrapper around the current service that limits the number of
246    /// parallel executions.
247    ///
248    /// This method returns a `ConcurrentEach<I, Self>` instance that delegates work to a pool
249    /// of worker tasks. Each worker runs the underlying service independently, allowing
250    /// multiple inputs to be processed concurrently. The `limit` argument controls the
251    /// maximum number of worker tasks that may run in parallel.
252    ///
253    /// **Parameters**
254    /// - `self`: The service instance to be wrapped. It must implement `Service<I>` and
255    ///   satisfy `Send`, `Clone`, and `'static` bounds.
256    /// - `limit`: The maximum number of concurrent worker tasks to spawn. If `limit` is
257    ///   greater than the current number of tasks, new tasks will be created up to this
258    ///   bound.
259    ///
260    /// **Return value**
261    /// A `ConcurrentEach<I, Self>` which itself implements `Service`. When handling an
262    /// input, it forwards the input to one of the available workers and returns a stream
263    /// of results that can be awaited asynchronously.
264    ///
265    #[inline]
266    fn concurrent_each(self, limit: usize) -> ConcurrentEach<I, Self>
267    where
268        Self: Sized + Send + Clone + 'static,
269        Self::Out: Send,
270    {
271        ConcurrentEach::new(self, limit)
272    }
273
274    /// Creates a new [`SpawnEach`] wrapper around the current service.
275    ///
276    /// The wrapper spawns a separate task for each input message, forwarding
277    /// the results through a bounded `mpsc` channel. This allows the
278    /// underlying service to process messages concurrently without
279    /// blocking the caller.
280    ///
281    /// # Parameters
282    /// * `self` – The service instance to wrap. The service must implement
283    ///   `Service<I>` for some input type `I`.
284    ///
285    /// # Constraints
286    /// * `Self: Sized + Send + Clone + 'static` – The service must be
287    ///   clonable and safe to send across threads.
288    /// * `Self::Out: Send` – The output type of the service must be
289    ///   `Send` because it will be transported across channels.
290    ///
291    /// # Return value
292    /// Returns a [`SpawnEach<I, Self>`] that implements `Service<I>` with
293    /// the same input type. The new service can be used just like the
294    /// original one, but each invocation of `handle` will spawn a
295    /// dedicated task.
296    ///
297    /// # Example
298    /// ```rust
299    /// use flowly_service::{Service, spawn_each};
300    ///
301    /// struct MyService;
302    /// impl Service<u32> for MyService {
303    ///     type Out = Result<String, std::io::Error>;
304    ///     fn handle(&mut self, input: u32, _cx: &crate::Context)
305    ///         -> impl futures::Stream<Item = Self::Out> + Send
306    ///     {
307    ///         // …
308    ///     }
309    /// }
310    ///
311    /// let service = MyService;
312    /// // Wrap in SpawnEach
313    /// let concurrent_service = service.spawn_each();
314    /// // Now `concurrent_service` can be used as a Service and will process
315    /// // each input concurrently.
316    /// ```
317    ///
318    /// # Note
319    /// The default message buffer size is 2.
320    #[inline]
321    fn spawn_each(self) -> SpawnEach<I, Self>
322    where
323        Self: Sized + Send + Clone + 'static,
324        Self::Out: Send,
325    {
326        SpawnEach::new(self, 2)
327    }
328
329    /// Creates a scoped service wrapper that transforms incoming messages before passing to the wrapped service.
330    ///
331    /// This method consumes the current service and returns a tuple containing the original service and a new
332    /// [`Scope`] service that forwards transformed messages to `s`. The transformation function `f` receives
333    /// a reference to the original input `O` and returns either a message `M` for `s` or an error `E1`.\
334    ///
335    /// # Type Parameters
336    /// * `O`: Type of the original input that will be received by the outer service.
337    /// * `M`: Type of the message that `s` expects.
338    /// * `E1`: Error type returned by the transformation function `f`.
339    /// * `S`: The inner service that will handle the transformed messages.
340    /// * `F`: Function or closure of type `Fn(&O) -> Result<M, E1>`.
341    ///
342    /// # Parameters
343    /// * `self` – The current service (moved into the returned tuple).  
344    /// * `f` – Function that transforms `&O` into `Result<M, E1>`.  
345    /// * `s` – The inner service to be invoked after successful transformation.  
346    ///
347    /// # Returns
348    /// A tuple `(Self, Scope<O, M, E1, S, F>)` where:\n
349    /// * `Self` is the original service that can continue to be used.\n
350    /// * `Scope<O, M, E1, S, F>` is a new service that:\n
351    ///   1. Calls `f` with the incoming input.\n
352    ///   2. If `f` returns `Ok(m)`, forwards `m` to `s` and collects all emitted outputs into `Vec<O>`.\n
353    ///   3. If `f` returns `Err(e)`, immediately returns an error wrapped in `Either::Right(e)` without invoking `s`.\n
354    ///
355    /// # Example
356    /// ```ignore
357    /// let (service, scoped) = flow_scope(service, |msg: &Input| {
358    ///     if msg.valid { Ok(transformed_msg) } else { Err(TransformError) }
359    /// }, inner_service);
360    /// ```
361    ///
362    /// # Constraints
363    /// All involved types must be `Send`, and `Self` must implement `Sized + Send`.
364    #[inline]
365    fn flow_scope<O, M, E1, S, F>(self, f: F, s: S) -> (Self, Scope<O, M, E1, S, F>)
366    where
367        F: Fn(&O) -> Result<M, E1>,
368        Self: Sized + Send,
369        O: Send,
370        E1: Send,
371    {
372        (self, scope::scope(f, s))
373    }
374
375    #[inline]
376    fn flow_map<O1, O2, E1, F, H>(self, f: F) -> Left<Self, map::Map<O2, F>>
377    where
378        Self: Sized + Service<I, Out = Result<O1, E1>> + Send,
379        F: FnMut(O1) -> H + Send,
380        H: Future<Output = O2> + Send,
381        O1: Send,
382        O2: Send,
383        E1: Send,
384    {
385        Left(self, map::map::<O2, _>(f))
386    }
387
388    #[inline]
389    fn flow_filter_map<O1, O2, E1, F, H>(self, f: F) -> Left<Self, map::FilterMap<O2, F>>
390    where
391        Self: Sized + Service<I, Out = Result<O1, E1>> + Send,
392        O1: Send,
393        O2: Send,
394        E1: Send,
395        F: FnMut(O1) -> H + Send,
396        H: Future<Output = Option<O2>> + Send,
397    {
398        Left(self, map::filter_map::<O2, _>(f))
399    }
400}
401
402impl<I: Send, T: Service<I>> ServiceExt<I> for T {}
403
404impl<I: Send, E, S: Service<I, Out = Result<I, E>>> Service<I> for Option<S> {
405    type Out = Result<I, E>;
406
407    fn handle(&mut self, input: I, cx: &Context) -> impl Stream<Item = Self::Out> + Send {
408        if let Some(srv) = self {
409            srv.handle(input, cx).left_stream()
410        } else {
411            futures::stream::once(async move { Ok(input) }).right_stream()
412        }
413    }
414}