flowly_service/lib.rs
1mod and_then;
2mod concurrent_each;
3mod except;
4mod inspect;
5mod map;
6mod pass;
7mod scope;
8mod spawn_each;
9mod stub;
10mod switch;
11
12pub use and_then::and_then;
13pub use concurrent_each::{ConcurrentEach, concurrent_each};
14use flowly_core::Either;
15pub use map::{filter_map, map, map_if_else, try_filter_map, try_map};
16pub use pass::flow;
17pub use spawn_each::{SpawnEach, spawn_each};
18pub use stub::stub;
19pub use switch::switch;
20use tokio::sync::watch;
21
22use std::{marker::PhantomData, pin::pin};
23
24use futures::{Stream, StreamExt, future};
25
26use crate::{except::Except, scope::Scope};
27
28#[derive(Clone)]
29#[non_exhaustive]
30pub struct Context {
31 pub abort: watch::Sender<bool>,
32 pub abort_recv: watch::Receiver<bool>,
33}
34
35impl Context {
36 pub async fn fuse_abort<F: Future>(&self, fut: F) -> Option<F::Output> {
37 let mut abort_recv = self.abort_recv.clone();
38 let fut1 = pin!(abort_recv.changed());
39 let fut2 = pin!(fut);
40
41 match futures::future::select(fut1, fut2).await {
42 future::Either::Left(..) => None,
43 future::Either::Right((val, _)) => Some(val),
44 }
45 }
46}
47
48impl Default for Context {
49 fn default() -> Self {
50 Self::new()
51 }
52}
53
54impl From<watch::Sender<bool>> for Context {
55 fn from(abort: watch::Sender<bool>) -> Self {
56 Self {
57 abort_recv: abort.subscribe(),
58 abort,
59 }
60 }
61}
62
63impl Context {
64 pub fn new() -> Self {
65 Self::from(watch::Sender::default())
66 }
67}
68
69pub trait Service<In> {
70 type Out;
71
72 fn handle(&mut self, input: In, cx: &Context) -> impl Stream<Item = Self::Out> + Send;
73 fn handle_stream(
74 &mut self,
75 input: impl Stream<Item = In> + Send,
76 cx: &Context,
77 ) -> impl Stream<Item = Self::Out> + Send
78 where
79 In: Send,
80 Self: Send,
81 Self::Out: Send,
82 {
83 async_stream::stream! {
84 let mut input = pin!(input);
85
86 while let Some(item) = input.next().await {
87 let mut s = pin!(self.handle(item, cx));
88
89 while let Some(out) = s.next().await {
90 yield out;
91 }
92 }
93 }
94 }
95
96 #[inline]
97 fn finalize(&mut self, _cx: &Context) -> impl Future<Output = ()>
98 where
99 Self: Sized,
100 {
101 async move {}
102 }
103}
104
105impl<I, O1, E1, O2, E2, S1, S2> Service<I> for (S1, S2)
106where
107 I: Send,
108 O1: Send,
109 O2: Send,
110 E1: Send,
111 E2: Send,
112 S1: Service<I, Out = Result<O1, E1>> + Send,
113 S2: Service<O1, Out = Result<O2, E2>> + Send,
114{
115 type Out = Result<O2, Either<E1, E2>>;
116
117 fn handle(&mut self, msg: I, cx: &Context) -> impl Stream<Item = Self::Out> + Send {
118 async_stream::stream! {
119 let mut s1 = pin!(self.0.handle(msg, cx));
120
121 while let Some(res) = s1.next().await {
122 match res {
123 Ok(ok) => {
124 let mut s2 = pin!(self.1.handle(ok, cx));
125
126 while let Some(i2) = s2.next().await {
127 yield i2.map_err(Either::Right);
128 }
129 },
130 Err(err) => yield Err(Either::Left(err)),
131 }
132 }
133 }
134 }
135}
136
137pub struct Left<S1, S2>(S1, S2);
138impl<I, O1, E, O2, S1, S2> Service<I> for Left<S1, S2>
139where
140 I: Send,
141 O1: Send,
142 O2: Send,
143 E: Send,
144 S1: Service<I, Out = Result<O1, E>> + Send,
145 S2: Service<O1, Out = O2> + Send,
146{
147 type Out = Result<O2, E>;
148
149 fn handle(&mut self, msg: I, cx: &Context) -> impl Stream<Item = Self::Out> + Send {
150 async_stream::stream! {
151 let mut s1 = pin!(self.0.handle(msg, cx));
152
153 while let Some(res) = s1.next().await {
154 match res {
155 Ok(ok) => {
156 let mut s2 = pin!(self.1.handle(ok, cx));
157
158 while let Some(i2) = s2.next().await {
159 yield Ok(i2);
160 }
161 },
162 Err(err) => yield Err(err),
163 }
164 }
165 }
166 }
167}
168
169pub trait ServiceExt<I: Send>: Service<I> {
170 #[inline]
171 fn flow<O1, O2, E1, E2, U>(self, service: U) -> (Self, U)
172 where
173 Self: Sized + Service<I, Out = Result<O1, E1>> + Send,
174 U: Send + Service<O1, Out = Result<O2, E2>>,
175 O1: Send,
176 O2: Send,
177 E1: Send,
178 E2: Send,
179 {
180 (self, service)
181 }
182
183 #[inline]
184 fn except<F>(self, on_err: F) -> Except<Self, F>
185 where
186 Self: Sized,
187 {
188 Except {
189 service: self,
190 on_err,
191 }
192 }
193
194 /// Adds an inspection step that invokes the supplied callback on every
195 /// successful output of the wrapped service.
196 ///
197 /// This method returns a 2‑tuple consisting of:
198 /// * `self` – the original service unchanged.
199 /// * an `inspect::Inspect<O, E, F>` instance that intercepts the
200 /// service’s output. For each successful result (`Ok(o)`), the
201 /// closure `f` is called with a reference to `o`. The output is then
202 /// passed through unchanged.
203 ///
204 /// # Parameters
205 ///
206 /// * `f` – A callback implementing `Fn(&O)`. The callback receives a
207 /// reference to the successful output value. It can be used for
208 /// logging, metrics, or any side‑effect‑only operation.
209 ///
210 /// # Return value
211 ///
212 /// A tuple `(Self, inspect::Inspect<O, E, F>)` that can be used in a
213 /// service pipeline (e.g., within the `flow` combinator). The first
214 /// element is the original service, and the second element is a service
215 /// that performs the inspection.
216 ///
217 /// # Example
218 ///
219 /// ```rust
220 /// use flowly_service::{Service, flow::Flow, inspect::Inspect};
221 ///
222 /// let service = MyService::new();
223 /// let (orig, inspector) = service.flow_inspect(|value: &Result<i32, _>| {
224 /// println!("Got value: {:?}", value);
225 /// });
226 /// let flow = Flow::from(orig).and(inspector);
227 /// ```
228 #[inline]
229 fn flow_inspect<O, E, F>(self, f: F) -> Left<Self, inspect::Inspect<O, F>>
230 where
231 Self: Sized + Service<I, Out = Result<O, E>> + Send,
232 F: Fn(&O) + Send,
233 O: Send,
234 {
235 Left(
236 self,
237 inspect::Inspect::<O, F> {
238 cb: f,
239 _m: PhantomData,
240 },
241 )
242 }
243
244 /// Creates a concurrent wrapper around the current service that limits the number of
245 /// parallel executions.
246 ///
247 /// This method returns a `ConcurrentEach<I, Self>` instance that delegates work to a pool
248 /// of worker tasks. Each worker runs the underlying service independently, allowing
249 /// multiple inputs to be processed concurrently. The `limit` argument controls the
250 /// maximum number of worker tasks that may run in parallel.
251 ///
252 /// **Parameters**
253 /// - `self`: The service instance to be wrapped. It must implement `Service<I>` and
254 /// satisfy `Send`, `Clone`, and `'static` bounds.
255 /// - `limit`: The maximum number of concurrent worker tasks to spawn. If `limit` is
256 /// greater than the current number of tasks, new tasks will be created up to this
257 /// bound.
258 ///
259 /// **Return value**
260 /// A `ConcurrentEach<I, Self>` which itself implements `Service`. When handling an
261 /// input, it forwards the input to one of the available workers and returns a stream
262 /// of results that can be awaited asynchronously.
263 ///
264 #[inline]
265 fn concurrent_each(self, limit: usize) -> ConcurrentEach<I, Self>
266 where
267 Self: Sized + Send + Clone + 'static,
268 Self::Out: Send,
269 {
270 ConcurrentEach::new(self, limit)
271 }
272
273 /// Creates a new [`SpawnEach`] wrapper around the current service.
274 ///
275 /// The wrapper spawns a separate task for each input message, forwarding
276 /// the results through a bounded `mpsc` channel. This allows the
277 /// underlying service to process messages concurrently without
278 /// blocking the caller.
279 ///
280 /// # Parameters
281 /// * `self` – The service instance to wrap. The service must implement
282 /// `Service<I>` for some input type `I`.
283 ///
284 /// # Constraints
285 /// * `Self: Sized + Send + Clone + 'static` – The service must be
286 /// clonable and safe to send across threads.
287 /// * `Self::Out: Send` – The output type of the service must be
288 /// `Send` because it will be transported across channels.
289 ///
290 /// # Return value
291 /// Returns a [`SpawnEach<I, Self>`] that implements `Service<I>` with
292 /// the same input type. The new service can be used just like the
293 /// original one, but each invocation of `handle` will spawn a
294 /// dedicated task.
295 ///
296 /// # Example
297 /// ```rust
298 /// use flowly_service::{Service, spawn_each};
299 ///
300 /// struct MyService;
301 /// impl Service<u32> for MyService {
302 /// type Out = Result<String, std::io::Error>;
303 /// fn handle(&mut self, input: u32, _cx: &crate::Context)
304 /// -> impl futures::Stream<Item = Self::Out> + Send
305 /// {
306 /// // …
307 /// }
308 /// }
309 ///
310 /// let service = MyService;
311 /// // Wrap in SpawnEach
312 /// let concurrent_service = service.spawn_each();
313 /// // Now `concurrent_service` can be used as a Service and will process
314 /// // each input concurrently.
315 /// ```
316 ///
317 /// # Note
318 /// The default message buffer size is 2.
319 #[inline]
320 fn spawn_each(self) -> SpawnEach<I, Self>
321 where
322 Self: Sized + Send + Clone + 'static,
323 Self::Out: Send,
324 {
325 SpawnEach::new(self, 2)
326 }
327
328 /// Creates a scoped service wrapper that transforms incoming messages before passing to the wrapped service.
329 ///
330 /// This method consumes the current service and returns a tuple containing the original service and a new
331 /// [`Scope`] service that forwards transformed messages to `s`. The transformation function `f` receives
332 /// a reference to the original input `O` and returns either a message `M` for `s` or an error `E1`.\
333 ///
334 /// # Type Parameters
335 /// * `O`: Type of the original input that will be received by the outer service.
336 /// * `M`: Type of the message that `s` expects.
337 /// * `E1`: Error type returned by the transformation function `f`.
338 /// * `S`: The inner service that will handle the transformed messages.
339 /// * `F`: Function or closure of type `Fn(&O) -> Result<M, E1>`.
340 ///
341 /// # Parameters
342 /// * `self` – The current service (moved into the returned tuple).
343 /// * `f` – Function that transforms `&O` into `Result<M, E1>`.
344 /// * `s` – The inner service to be invoked after successful transformation.
345 ///
346 /// # Returns
347 /// A tuple `(Self, Scope<O, M, E1, S, F>)` where:\n
348 /// * `Self` is the original service that can continue to be used.\n
349 /// * `Scope<O, M, E1, S, F>` is a new service that:\n
350 /// 1. Calls `f` with the incoming input.\n
351 /// 2. If `f` returns `Ok(m)`, forwards `m` to `s` and collects all emitted outputs into `Vec<O>`.\n
352 /// 3. If `f` returns `Err(e)`, immediately returns an error wrapped in `Either::Right(e)` without invoking `s`.\n
353 ///
354 /// # Example
355 /// ```ignore
356 /// let (service, scoped) = flow_scope(service, |msg: &Input| {
357 /// if msg.valid { Ok(transformed_msg) } else { Err(TransformError) }
358 /// }, inner_service);
359 /// ```
360 ///
361 /// # Constraints
362 /// All involved types must be `Send`, and `Self` must implement `Sized + Send`.
363 #[inline]
364 fn flow_scope<O, M, E1, S, F>(self, f: F, s: S) -> (Self, Scope<O, M, E1, S, F>)
365 where
366 F: Fn(&O) -> Result<M, E1>,
367 Self: Sized + Send,
368 O: Send,
369 E1: Send,
370 {
371 (self, scope::scope(f, s))
372 }
373
374 #[inline]
375 fn flow_map<O1, O2, E1, F, H>(self, f: F) -> Left<Self, map::Map<O2, F>>
376 where
377 Self: Sized + Service<I, Out = Result<O1, E1>> + Send,
378 F: FnMut(O1) -> H + Send,
379 H: Future<Output = O2> + Send,
380 O1: Send,
381 O2: Send,
382 E1: Send,
383 {
384 Left(self, map::map::<O2, _>(f))
385 }
386
387 #[inline]
388 fn flow_filter_map<O1, O2, E1, F, H>(self, f: F) -> Left<Self, map::FilterMap<O2, F>>
389 where
390 Self: Sized + Service<I, Out = Result<O1, E1>> + Send,
391 O1: Send,
392 O2: Send,
393 E1: Send,
394 F: FnMut(O1) -> H + Send,
395 H: Future<Output = Option<O2>> + Send,
396 {
397 Left(self, map::filter_map::<O2, _>(f))
398 }
399}
400
401impl<I: Send, T: Service<I>> ServiceExt<I> for T {}
402
403impl<I: Send, E, S: Service<I, Out = Result<I, E>>> Service<I> for Option<S> {
404 type Out = Result<I, E>;
405
406 fn handle(&mut self, input: I, cx: &Context) -> impl Stream<Item = Self::Out> + Send {
407 if let Some(srv) = self {
408 srv.handle(input, cx).left_stream()
409 } else {
410 futures::stream::once(async move { Ok(input) }).right_stream()
411 }
412 }
413}