flowly_service/lib.rs
1mod and_then;
2mod concurrent_each;
3mod except;
4mod inspect;
5mod map;
6mod pass;
7mod scope;
8mod spawn_each;
9mod stub;
10mod switch;
11
12pub use and_then::and_then;
13pub use concurrent_each::{ConcurrentEach, concurrent_each};
14use flowly_core::Either;
15pub use map::{filter_map, map, map_if_else, try_filter_map, try_map};
16pub use pass::flow;
17pub use spawn_each::{SpawnEach, spawn_each};
18pub use stub::stub;
19pub use switch::switch;
20use tokio::sync::watch;
21
22use std::{marker::PhantomData, pin::pin};
23
24use futures::{Stream, StreamExt, future};
25
26pub use crate::except::Except;
27pub use crate::scope::{Scope, ScopeEach, scope, scope_each};
28
29#[derive(Clone)]
30#[non_exhaustive]
31pub struct Context {
32 pub abort: watch::Sender<bool>,
33 pub abort_recv: watch::Receiver<bool>,
34}
35
36impl Context {
37 pub async fn fuse_abort<F: Future>(&self, fut: F) -> Option<F::Output> {
38 let mut abort_recv = self.abort_recv.clone();
39 let fut1 = pin!(abort_recv.changed());
40 let fut2 = pin!(fut);
41
42 match futures::future::select(fut1, fut2).await {
43 future::Either::Left(..) => None,
44 future::Either::Right((val, _)) => Some(val),
45 }
46 }
47}
48
49impl Default for Context {
50 fn default() -> Self {
51 Self::new()
52 }
53}
54
55impl From<watch::Sender<bool>> for Context {
56 fn from(abort: watch::Sender<bool>) -> Self {
57 Self {
58 abort_recv: abort.subscribe(),
59 abort,
60 }
61 }
62}
63
64impl Context {
65 pub fn new() -> Self {
66 Self::from(watch::Sender::default())
67 }
68}
69
70pub trait Service<In> {
71 type Out;
72
73 fn handle(&mut self, input: In, cx: &Context) -> impl Stream<Item = Self::Out> + Send;
74 fn handle_stream(
75 &mut self,
76 input: impl Stream<Item = In> + Send,
77 cx: &Context,
78 ) -> impl Stream<Item = Self::Out> + Send
79 where
80 In: Send,
81 Self: Send,
82 Self::Out: Send,
83 {
84 async_stream::stream! {
85 let mut input = pin!(input);
86
87 while let Some(item) = input.next().await {
88 let mut s = pin!(self.handle(item, cx));
89
90 while let Some(out) = s.next().await {
91 yield out;
92 }
93 }
94 }
95 }
96
97 #[inline]
98 fn finalize(&mut self, _cx: &Context) -> impl Future<Output = ()>
99 where
100 Self: Sized,
101 {
102 async move {}
103 }
104}
105
106impl<I, O1, E1, O2, E2, S1, S2> Service<I> for (S1, S2)
107where
108 I: Send,
109 O1: Send,
110 O2: Send,
111 E1: Send,
112 E2: Send,
113 S1: Service<I, Out = Result<O1, E1>> + Send,
114 S2: Service<O1, Out = Result<O2, E2>> + Send,
115{
116 type Out = Result<O2, Either<E1, E2>>;
117
118 fn handle(&mut self, msg: I, cx: &Context) -> impl Stream<Item = Self::Out> + Send {
119 async_stream::stream! {
120 let mut s1 = pin!(self.0.handle(msg, cx));
121
122 while let Some(res) = s1.next().await {
123 match res {
124 Ok(ok) => {
125 let mut s2 = pin!(self.1.handle(ok, cx));
126
127 while let Some(i2) = s2.next().await {
128 yield i2.map_err(Either::Right);
129 }
130 },
131 Err(err) => yield Err(Either::Left(err)),
132 }
133 }
134 }
135 }
136}
137
138#[derive(Clone)]
139pub struct Left<S1, S2>(S1, S2);
140impl<I, O1, E, O2, S1, S2> Service<I> for Left<S1, S2>
141where
142 I: Send,
143 O1: Send,
144 O2: Send,
145 E: Send,
146 S1: Service<I, Out = Result<O1, E>> + Send,
147 S2: Service<O1, Out = O2> + Send,
148{
149 type Out = Result<O2, E>;
150
151 fn handle(&mut self, msg: I, cx: &Context) -> impl Stream<Item = Self::Out> + Send {
152 async_stream::stream! {
153 let mut s1 = pin!(self.0.handle(msg, cx));
154
155 while let Some(res) = s1.next().await {
156 match res {
157 Ok(ok) => {
158 let mut s2 = pin!(self.1.handle(ok, cx));
159
160 while let Some(i2) = s2.next().await {
161 yield Ok(i2);
162 }
163 },
164 Err(err) => yield Err(err),
165 }
166 }
167 }
168 }
169}
170
171pub trait ServiceExt<I: Send>: Service<I> {
172 #[inline]
173 fn flow<O1, O2, E1, E2, U>(self, service: U) -> (Self, U)
174 where
175 Self: Sized + Service<I, Out = Result<O1, E1>> + Send,
176 U: Send + Service<O1, Out = Result<O2, E2>>,
177 O1: Send,
178 O2: Send,
179 E1: Send,
180 E2: Send,
181 {
182 (self, service)
183 }
184
185 #[inline]
186 fn except<F>(self, on_err: F) -> Except<Self, F>
187 where
188 Self: Sized,
189 {
190 Except {
191 service: self,
192 on_err,
193 }
194 }
195
196 /// Adds an inspection step that invokes the supplied callback on every
197 /// successful output of the wrapped service.
198 ///
199 /// This method returns a 2‑tuple consisting of:
200 /// * `self` – the original service unchanged.
201 /// * an `inspect::Inspect<O, E, F>` instance that intercepts the
202 /// service’s output. For each successful result (`Ok(o)`), the
203 /// closure `f` is called with a reference to `o`. The output is then
204 /// passed through unchanged.
205 ///
206 /// # Parameters
207 ///
208 /// * `f` – A callback implementing `Fn(&O)`. The callback receives a
209 /// reference to the successful output value. It can be used for
210 /// logging, metrics, or any side‑effect‑only operation.
211 ///
212 /// # Return value
213 ///
214 /// A tuple `(Self, inspect::Inspect<O, E, F>)` that can be used in a
215 /// service pipeline (e.g., within the `flow` combinator). The first
216 /// element is the original service, and the second element is a service
217 /// that performs the inspection.
218 ///
219 /// # Example
220 ///
221 /// ```rust
222 /// use flowly_service::{Service, flow::Flow, inspect::Inspect};
223 ///
224 /// let service = MyService::new();
225 /// let (orig, inspector) = service.flow_inspect(|value: &Result<i32, _>| {
226 /// println!("Got value: {:?}", value);
227 /// });
228 /// let flow = Flow::from(orig).and(inspector);
229 /// ```
230 #[inline]
231 fn flow_inspect<O, E, F>(self, f: F) -> Left<Self, inspect::Inspect<O, F>>
232 where
233 Self: Sized + Service<I, Out = Result<O, E>> + Send,
234 F: Fn(&O) + Send,
235 O: Send,
236 {
237 Left(
238 self,
239 inspect::Inspect::<O, F> {
240 cb: f,
241 _m: PhantomData,
242 },
243 )
244 }
245
246 /// Creates a concurrent wrapper around the current service that limits the number of
247 /// parallel executions.
248 ///
249 /// This method returns a `ConcurrentEach<I, Self>` instance that delegates work to a pool
250 /// of worker tasks. Each worker runs the underlying service independently, allowing
251 /// multiple inputs to be processed concurrently. The `limit` argument controls the
252 /// maximum number of worker tasks that may run in parallel.
253 ///
254 /// **Parameters**
255 /// - `self`: The service instance to be wrapped. It must implement `Service<I>` and
256 /// satisfy `Send`, `Clone`, and `'static` bounds.
257 /// - `limit`: The maximum number of concurrent worker tasks to spawn. If `limit` is
258 /// greater than the current number of tasks, new tasks will be created up to this
259 /// bound.
260 ///
261 /// **Return value**
262 /// A `ConcurrentEach<I, Self>` which itself implements `Service`. When handling an
263 /// input, it forwards the input to one of the available workers and returns a stream
264 /// of results that can be awaited asynchronously.
265 ///
266 #[inline]
267 fn concurrent_each(self, limit: usize) -> ConcurrentEach<I, Self>
268 where
269 Self: Sized + Send + Clone + 'static,
270 Self::Out: Send,
271 {
272 ConcurrentEach::new(self, limit)
273 }
274
275 /// Creates a new [`SpawnEach`] wrapper around the current service.
276 ///
277 /// The wrapper spawns a separate task for each input message, forwarding
278 /// the results through a bounded `mpsc` channel. This allows the
279 /// underlying service to process messages concurrently without
280 /// blocking the caller.
281 ///
282 /// # Parameters
283 /// * `self` – The service instance to wrap. The service must implement
284 /// `Service<I>` for some input type `I`.
285 ///
286 /// # Constraints
287 /// * `Self: Sized + Send + Clone + 'static` – The service must be
288 /// clonable and safe to send across threads.
289 /// * `Self::Out: Send` – The output type of the service must be
290 /// `Send` because it will be transported across channels.
291 ///
292 /// # Return value
293 /// Returns a [`SpawnEach<I, Self>`] that implements `Service<I>` with
294 /// the same input type. The new service can be used just like the
295 /// original one, but each invocation of `handle` will spawn a
296 /// dedicated task.
297 ///
298 /// # Example
299 /// ```rust
300 /// use flowly_service::{Service, spawn_each};
301 ///
302 /// struct MyService;
303 /// impl Service<u32> for MyService {
304 /// type Out = Result<String, std::io::Error>;
305 /// fn handle(&mut self, input: u32, _cx: &crate::Context)
306 /// -> impl futures::Stream<Item = Self::Out> + Send
307 /// {
308 /// // …
309 /// }
310 /// }
311 ///
312 /// let service = MyService;
313 /// // Wrap in SpawnEach
314 /// let concurrent_service = service.spawn_each();
315 /// // Now `concurrent_service` can be used as a Service and will process
316 /// // each input concurrently.
317 /// ```
318 ///
319 /// # Note
320 /// The default message buffer size is 2.
321 #[inline]
322 fn spawn_each(self) -> SpawnEach<I, Self>
323 where
324 Self: Sized + Send + Clone + 'static,
325 Self::Out: Send,
326 {
327 SpawnEach::new(self, 2)
328 }
329
330 /// Creates a scoped service wrapper that transforms incoming messages before passing to the wrapped service.
331 ///
332 /// This method consumes the current service and returns a tuple containing the original service and a new
333 /// [`Scope`] service that forwards transformed messages to `s`. The transformation function `f` receives
334 /// a reference to the original input `O` and returns either a message `M` for `s` or an error `E1`.\
335 ///
336 /// # Type Parameters
337 /// * `O`: Type of the original input that will be received by the outer service.
338 /// * `M`: Type of the message that `s` expects.
339 /// * `E1`: Error type returned by the transformation function `f`.
340 /// * `S`: The inner service that will handle the transformed messages.
341 /// * `F`: Function or closure of type `Fn(&O) -> Result<M, E1>`.
342 ///
343 /// # Parameters
344 /// * `self` – The current service (moved into the returned tuple).
345 /// * `f` – Function that transforms `&O` into `Result<M, E1>`.
346 /// * `s` – The inner service to be invoked after successful transformation.
347 ///
348 /// # Returns
349 /// A tuple `(Self, Scope<O, M, E1, S, F>)` where:\n
350 /// * `Self` is the original service that can continue to be used.\n
351 /// * `Scope<O, M, E1, S, F>` is a new service that:\n
352 /// 1. Calls `f` with the incoming input.\n
353 /// 2. If `f` returns `Ok(m)`, forwards `m` to `s` and collects all emitted outputs into `Vec<O>`.\n
354 /// 3. If `f` returns `Err(e)`, immediately returns an error wrapped in `Either::Right(e)` without invoking `s`.\n
355 ///
356 /// # Example
357 /// ```ignore
358 /// let (service, scoped) = flow_scope(service, |msg: &Input| {
359 /// if msg.valid { Ok(transformed_msg) } else { Err(TransformError) }
360 /// }, inner_service);
361 /// ```
362 ///
363 /// # Constraints
364 /// All involved types must be `Send`, and `Self` must implement `Sized + Send`.
365 #[inline]
366 fn flow_scope<O, M, E1, S, F>(self, f: F, s: S) -> (Self, Scope<O, M, E1, S, F>)
367 where
368 F: Fn(&O) -> Result<M, E1>,
369 Self: Sized + Send,
370 O: Send,
371 E1: Send,
372 {
373 (self, scope(f, s))
374 }
375
376 #[inline]
377 fn flow_scope_each<O, M, E1, S, F>(self, f: F, s: S) -> (Self, ScopeEach<O, M, E1, S, F>)
378 where
379 F: Fn(&O) -> Result<M, E1>,
380 Self: Sized + Send,
381 E1: Send,
382 O: Send + Clone,
383 {
384 (self, scope_each(f, s))
385 }
386
387 #[inline]
388 fn flow_map<O1, O2, E1, F, H>(self, f: F) -> Left<Self, map::Map<O2, F>>
389 where
390 Self: Sized + Service<I, Out = Result<O1, E1>> + Send,
391 F: FnMut(O1) -> H + Send,
392 H: Future<Output = O2> + Send,
393 O1: Send,
394 O2: Send,
395 E1: Send,
396 {
397 Left(self, map::map::<O2, _>(f))
398 }
399
400 #[inline]
401 fn flow_filter_map<O1, O2, E1, F, H>(self, f: F) -> Left<Self, map::FilterMap<O2, F>>
402 where
403 Self: Sized + Service<I, Out = Result<O1, E1>> + Send,
404 O1: Send,
405 O2: Send,
406 E1: Send,
407 F: FnMut(O1) -> H + Send,
408 H: Future<Output = Option<O2>> + Send,
409 {
410 Left(self, map::filter_map::<O2, _>(f))
411 }
412}
413
414impl<I: Send, T: Service<I>> ServiceExt<I> for T {}
415
416impl<I: Send, E, S: Service<I, Out = Result<I, E>>> Service<I> for Option<S> {
417 type Out = Result<I, E>;
418
419 fn handle(&mut self, input: I, cx: &Context) -> impl Stream<Item = Self::Out> + Send {
420 if let Some(srv) = self {
421 srv.handle(input, cx).left_stream()
422 } else {
423 futures::stream::once(async move { Ok(input) }).right_stream()
424 }
425 }
426}