flowly_service/
lib.rs

1mod and_then;
2mod concurrent_each;
3mod inspect;
4mod map;
5mod pass;
6mod scope;
7mod spawn_each;
8mod stub;
9mod switch;
10
11pub use and_then::and_then;
12pub use concurrent_each::{ConcurrentEach, concurrent_each};
13use flowly_core::Either;
14pub use map::{filter_map, map, map_if_else, try_filter_map, try_map};
15pub use pass::flow;
16pub use spawn_each::{SpawnEach, spawn_each};
17pub use stub::stub;
18pub use switch::switch;
19use tokio::sync::watch;
20
21use std::{marker::PhantomData, pin::pin};
22
23use futures::{Stream, StreamExt, future};
24
25use crate::scope::Scope;
26
27#[derive(Clone)]
28#[non_exhaustive]
29pub struct Context {
30    pub abort: watch::Sender<bool>,
31    pub abort_recv: watch::Receiver<bool>,
32}
33
34impl Context {
35    pub async fn fuse_abort<F: Future>(&self, fut: F) -> Option<F::Output> {
36        let mut abort_recv = self.abort_recv.clone();
37        let fut1 = pin!(abort_recv.changed());
38        let fut2 = pin!(fut);
39
40        match futures::future::select(fut1, fut2).await {
41            future::Either::Left(..) => None,
42            future::Either::Right((val, _)) => Some(val),
43        }
44    }
45}
46
47impl Default for Context {
48    fn default() -> Self {
49        Self::new()
50    }
51}
52
53impl From<watch::Sender<bool>> for Context {
54    fn from(abort: watch::Sender<bool>) -> Self {
55        Self {
56            abort_recv: abort.subscribe(),
57            abort,
58        }
59    }
60}
61
62impl Context {
63    pub fn new() -> Self {
64        Self::from(watch::Sender::default())
65    }
66}
67
68pub trait Service<In> {
69    type Out;
70
71    fn handle(&mut self, input: In, cx: &Context) -> impl Stream<Item = Self::Out> + Send;
72    fn handle_stream(
73        &mut self,
74        input: impl Stream<Item = In> + Send,
75        cx: &Context,
76    ) -> impl Stream<Item = Self::Out> + Send
77    where
78        In: Send,
79        Self: Send,
80        Self::Out: Send,
81    {
82        async_stream::stream! {
83            let mut input = pin!(input);
84
85            while let Some(item) = input.next().await {
86                let mut s = pin!(self.handle(item, cx));
87
88                while let Some(out) = s.next().await {
89                    yield out;
90                }
91            }
92        }
93    }
94
95    #[inline]
96    fn finalize(&mut self, _cx: &Context) -> impl Future<Output = ()>
97    where
98        Self: Sized,
99    {
100        async move {}
101    }
102}
103
104impl<I, O1, E1, O2, E2, S1, S2> Service<I> for (S1, S2)
105where
106    I: Send,
107    O1: Send,
108    O2: Send,
109    E1: Send,
110    E2: Send,
111    S1: Service<I, Out = Result<O1, E1>> + Send,
112    S2: Service<O1, Out = Result<O2, E2>> + Send,
113{
114    type Out = Result<O2, Either<E1, E2>>;
115
116    fn handle(&mut self, msg: I, cx: &Context) -> impl Stream<Item = Self::Out> + Send {
117        async_stream::stream! {
118            let mut s1 = pin!(self.0.handle(msg, cx));
119
120            while let Some(res) = s1.next().await {
121                match res {
122                    Ok(ok) => {
123                        let mut s2 = pin!(self.1.handle(ok, cx));
124
125                        while let Some(i2) = s2.next().await {
126                            yield i2.map_err(Either::Right);
127                        }
128                    },
129                    Err(err) => yield Err(Either::Left(err)),
130                }
131            }
132        }
133    }
134}
135
136pub trait ServiceExt<I: Send>: Service<I> {
137    #[inline]
138    fn flow<O1, O2, E1, E2, U>(self, service: U) -> (Self, U)
139    where
140        Self: Sized + Service<I, Out = Result<O1, E1>> + Send,
141        U: Send + Service<O1, Out = Result<O2, E2>>,
142        O1: Send,
143        O2: Send,
144        E1: Send,
145        E2: Send,
146    {
147        (self, service)
148    }
149
150    /// Adds an inspection step that invokes the supplied callback on every
151    /// successful output of the wrapped service.
152    ///
153    /// This method returns a 2‑tuple consisting of:
154    /// * `self` – the original service unchanged.
155    /// * an `inspect::Inspect<O, E, F>` instance that intercepts the
156    ///   service’s output. For each successful result (`Ok(o)`), the
157    ///   closure `f` is called with a reference to `o`. The output is then
158    ///   passed through unchanged.
159    ///
160    /// # Parameters
161    ///
162    /// * `f` – A callback implementing `Fn(&O)`. The callback receives a
163    ///   reference to the successful output value. It can be used for
164    ///   logging, metrics, or any side‑effect‑only operation.
165    ///
166    /// # Return value
167    ///
168    /// A tuple `(Self, inspect::Inspect<O, E, F>)` that can be used in a
169    /// service pipeline (e.g., within the `flow` combinator). The first
170    /// element is the original service, and the second element is a service
171    /// that performs the inspection.
172    ///
173    /// # Example
174    ///
175    /// ```rust
176    /// use flowly_service::{Service, flow::Flow, inspect::Inspect};
177    ///
178    /// let service = MyService::new();
179    /// let (orig, inspector) = service.flow_inspect(|value: &Result<i32, _>| {
180    ///     println!("Got value: {:?}", value);
181    /// });
182    /// let flow = Flow::from(orig).and(inspector);
183    /// ```
184    #[inline]
185    fn flow_inspect<O, E, F>(self, f: F) -> (Self, inspect::Inspect<O, E, F>)
186    where
187        Self: Sized + Service<I, Out = Result<O, E>> + Send,
188        F: Fn(&O) + Send,
189        O: Send,
190        E: Send,
191    {
192        (
193            self,
194            inspect::Inspect::<O, E, F> {
195                cb: f,
196                _m: PhantomData,
197            },
198        )
199    }
200
201    /// Creates a concurrent wrapper around the current service that limits the number of
202    /// parallel executions.
203    ///
204    /// This method returns a `ConcurrentEach<I, Self>` instance that delegates work to a pool
205    /// of worker tasks. Each worker runs the underlying service independently, allowing
206    /// multiple inputs to be processed concurrently. The `limit` argument controls the
207    /// maximum number of worker tasks that may run in parallel.
208    ///
209    /// **Parameters**
210    /// - `self`: The service instance to be wrapped. It must implement `Service<I>` and
211    ///   satisfy `Send`, `Clone`, and `'static` bounds.
212    /// - `limit`: The maximum number of concurrent worker tasks to spawn. If `limit` is
213    ///   greater than the current number of tasks, new tasks will be created up to this
214    ///   bound.
215    ///
216    /// **Return value**
217    /// A `ConcurrentEach<I, Self>` which itself implements `Service`. When handling an
218    /// input, it forwards the input to one of the available workers and returns a stream
219    /// of results that can be awaited asynchronously.
220    ///
221    #[inline]
222    fn concurrent_each(self, limit: usize) -> ConcurrentEach<I, Self>
223    where
224        Self: Sized + Send + Clone + 'static,
225        Self::Out: Send,
226    {
227        ConcurrentEach::new(self, limit)
228    }
229
230    /// Creates a new [`SpawnEach`] wrapper around the current service.
231    ///
232    /// The wrapper spawns a separate task for each input message, forwarding
233    /// the results through a bounded `mpsc` channel. This allows the
234    /// underlying service to process messages concurrently without
235    /// blocking the caller.
236    ///
237    /// # Parameters
238    /// * `self` – The service instance to wrap. The service must implement
239    ///   `Service<I>` for some input type `I`.
240    ///
241    /// # Constraints
242    /// * `Self: Sized + Send + Clone + 'static` – The service must be
243    ///   clonable and safe to send across threads.
244    /// * `Self::Out: Send` – The output type of the service must be
245    ///   `Send` because it will be transported across channels.
246    ///
247    /// # Return value
248    /// Returns a [`SpawnEach<I, Self>`] that implements `Service<I>` with
249    /// the same input type. The new service can be used just like the
250    /// original one, but each invocation of `handle` will spawn a
251    /// dedicated task.
252    ///
253    /// # Example
254    /// ```rust
255    /// use flowly_service::{Service, spawn_each};
256    ///
257    /// struct MyService;
258    /// impl Service<u32> for MyService {
259    ///     type Out = Result<String, std::io::Error>;
260    ///     fn handle(&mut self, input: u32, _cx: &crate::Context)
261    ///         -> impl futures::Stream<Item = Self::Out> + Send
262    ///     {
263    ///         // …
264    ///     }
265    /// }
266    ///
267    /// let service = MyService;
268    /// // Wrap in SpawnEach
269    /// let concurrent_service = service.spawn_each();
270    /// // Now `concurrent_service` can be used as a Service and will process
271    /// // each input concurrently.
272    /// ```
273    ///
274    /// # Note
275    /// The default message buffer size is 2.
276    #[inline]
277    fn spawn_each(self) -> SpawnEach<I, Self>
278    where
279        Self: Sized + Send + Clone + 'static,
280        Self::Out: Send,
281    {
282        SpawnEach::new(self, 2)
283    }
284
285    /// Creates a scoped service wrapper that transforms incoming messages before passing to the wrapped service.
286    ///
287    /// This method consumes the current service and returns a tuple containing the original service and a new
288    /// [`Scope`] service that forwards transformed messages to `s`. The transformation function `f` receives
289    /// a reference to the original input `O` and returns either a message `M` for `s` or an error `E1`.\
290    ///
291    /// # Type Parameters
292    /// * `O`: Type of the original input that will be received by the outer service.
293    /// * `M`: Type of the message that `s` expects.
294    /// * `E1`: Error type returned by the transformation function `f`.
295    /// * `S`: The inner service that will handle the transformed messages.
296    /// * `F`: Function or closure of type `Fn(&O) -> Result<M, E1>`.
297    ///
298    /// # Parameters
299    /// * `self` – The current service (moved into the returned tuple).  
300    /// * `f` – Function that transforms `&O` into `Result<M, E1>`.  
301    /// * `s` – The inner service to be invoked after successful transformation.  
302    ///
303    /// # Returns
304    /// A tuple `(Self, Scope<O, M, E1, S, F>)` where:\n
305    /// * `Self` is the original service that can continue to be used.\n
306    /// * `Scope<O, M, E1, S, F>` is a new service that:\n
307    ///   1. Calls `f` with the incoming input.\n
308    ///   2. If `f` returns `Ok(m)`, forwards `m` to `s` and collects all emitted outputs into `Vec<O>`.\n
309    ///   3. If `f` returns `Err(e)`, immediately returns an error wrapped in `Either::Right(e)` without invoking `s`.\n
310    ///
311    /// # Example
312    /// ```ignore
313    /// let (service, scoped) = flow_scope(service, |msg: &Input| {
314    ///     if msg.valid { Ok(transformed_msg) } else { Err(TransformError) }
315    /// }, inner_service);
316    /// ```
317    ///
318    /// # Constraints
319    /// All involved types must be `Send`, and `Self` must implement `Sized + Send`.
320    #[inline]
321    fn flow_scope<O, M, E1, S, F>(self, f: F, s: S) -> (Self, Scope<O, M, E1, S, F>)
322    where
323        F: Fn(&O) -> Result<M, E1>,
324        Self: Sized + Send,
325        O: Send,
326        E1: Send,
327    {
328        (self, scope::scope(f, s))
329    }
330
331    #[inline]
332    fn flow_map<O1, O2, E1, F, H>(self, f: F) -> (Self, map::Map<O2, F>)
333    where
334        Self: Sized + Service<I, Out = Result<O1, E1>> + Send,
335        F: FnMut(O1) -> H + Send,
336        H: Future<Output = O2> + Send,
337        O1: Send,
338        O2: Send,
339        E1: Send,
340    {
341        (self, map::map::<O2, _>(f))
342    }
343
344    #[inline]
345    fn flow_filter_map<O1, O2, E1, F, H>(self, f: F) -> (Self, map::FilterMap<O2, F>)
346    where
347        Self: Sized + Service<I, Out = Result<O1, E1>> + Send,
348        O1: Send,
349        O2: Send,
350        E1: Send,
351        F: FnMut(O1) -> H + Send,
352        H: Future<Output = Option<O2>> + Send,
353    {
354        (self, map::filter_map::<O2, _>(f))
355    }
356}
357
358impl<I: Send, T: Service<I>> ServiceExt<I> for T {}
359
360impl<I: Send, E, S: Service<I, Out = Result<I, E>>> Service<I> for Option<S> {
361    type Out = Result<I, E>;
362
363    fn handle(&mut self, input: I, cx: &Context) -> impl Stream<Item = Self::Out> + Send {
364        if let Some(srv) = self {
365            srv.handle(input, cx).left_stream()
366        } else {
367            futures::stream::once(async move { Ok(input) }).right_stream()
368        }
369    }
370}