flowly_service/lib.rs
1mod and_then;
2mod concurrent_each;
3mod except;
4mod inspect;
5mod map;
6mod pass;
7mod scope;
8mod spawn_each;
9mod stub;
10mod switch;
11
12pub use and_then::and_then;
13pub use concurrent_each::{ConcurrentEach, concurrent_each};
14use flowly_core::Either;
15pub use map::{filter_map, map, map_if_else, try_filter_map, try_map};
16pub use pass::flow;
17pub use spawn_each::{SpawnEach, spawn_each};
18pub use stub::stub;
19pub use switch::switch;
20use tokio::sync::watch;
21
22use std::{marker::PhantomData, pin::pin};
23
24use futures::{Stream, StreamExt, future};
25
26use crate::{except::Except, scope::Scope};
27
28#[derive(Clone)]
29#[non_exhaustive]
30pub struct Context {
31 pub abort: watch::Sender<bool>,
32 pub abort_recv: watch::Receiver<bool>,
33}
34
35impl Context {
36 pub async fn fuse_abort<F: Future>(&self, fut: F) -> Option<F::Output> {
37 let mut abort_recv = self.abort_recv.clone();
38 let fut1 = pin!(abort_recv.changed());
39 let fut2 = pin!(fut);
40
41 match futures::future::select(fut1, fut2).await {
42 future::Either::Left(..) => None,
43 future::Either::Right((val, _)) => Some(val),
44 }
45 }
46}
47
48impl Default for Context {
49 fn default() -> Self {
50 Self::new()
51 }
52}
53
54impl From<watch::Sender<bool>> for Context {
55 fn from(abort: watch::Sender<bool>) -> Self {
56 Self {
57 abort_recv: abort.subscribe(),
58 abort,
59 }
60 }
61}
62
63impl Context {
64 pub fn new() -> Self {
65 Self::from(watch::Sender::default())
66 }
67}
68
69pub trait Service<In> {
70 type Out;
71
72 fn handle(&mut self, input: In, cx: &Context) -> impl Stream<Item = Self::Out> + Send;
73 fn handle_stream(
74 &mut self,
75 input: impl Stream<Item = In> + Send,
76 cx: &Context,
77 ) -> impl Stream<Item = Self::Out> + Send
78 where
79 In: Send,
80 Self: Send,
81 Self::Out: Send,
82 {
83 async_stream::stream! {
84 let mut input = pin!(input);
85
86 while let Some(item) = input.next().await {
87 let mut s = pin!(self.handle(item, cx));
88
89 while let Some(out) = s.next().await {
90 yield out;
91 }
92 }
93 }
94 }
95
96 #[inline]
97 fn finalize(&mut self, _cx: &Context) -> impl Future<Output = ()>
98 where
99 Self: Sized,
100 {
101 async move {}
102 }
103}
104
105impl<I, O1, E1, O2, E2, S1, S2> Service<I> for (S1, S2)
106where
107 I: Send,
108 O1: Send,
109 O2: Send,
110 E1: Send,
111 E2: Send,
112 S1: Service<I, Out = Result<O1, E1>> + Send,
113 S2: Service<O1, Out = Result<O2, E2>> + Send,
114{
115 type Out = Result<O2, Either<E1, E2>>;
116
117 fn handle(&mut self, msg: I, cx: &Context) -> impl Stream<Item = Self::Out> + Send {
118 async_stream::stream! {
119 let mut s1 = pin!(self.0.handle(msg, cx));
120
121 while let Some(res) = s1.next().await {
122 match res {
123 Ok(ok) => {
124 let mut s2 = pin!(self.1.handle(ok, cx));
125
126 while let Some(i2) = s2.next().await {
127 yield i2.map_err(Either::Right);
128 }
129 },
130 Err(err) => yield Err(Either::Left(err)),
131 }
132 }
133 }
134 }
135}
136
137#[derive(Clone)]
138pub struct Left<S1, S2>(S1, S2);
139impl<I, O1, E, O2, S1, S2> Service<I> for Left<S1, S2>
140where
141 I: Send,
142 O1: Send,
143 O2: Send,
144 E: Send,
145 S1: Service<I, Out = Result<O1, E>> + Send,
146 S2: Service<O1, Out = O2> + Send,
147{
148 type Out = Result<O2, E>;
149
150 fn handle(&mut self, msg: I, cx: &Context) -> impl Stream<Item = Self::Out> + Send {
151 async_stream::stream! {
152 let mut s1 = pin!(self.0.handle(msg, cx));
153
154 while let Some(res) = s1.next().await {
155 match res {
156 Ok(ok) => {
157 let mut s2 = pin!(self.1.handle(ok, cx));
158
159 while let Some(i2) = s2.next().await {
160 yield Ok(i2);
161 }
162 },
163 Err(err) => yield Err(err),
164 }
165 }
166 }
167 }
168}
169
170pub trait ServiceExt<I: Send>: Service<I> {
171 #[inline]
172 fn flow<O1, O2, E1, E2, U>(self, service: U) -> (Self, U)
173 where
174 Self: Sized + Service<I, Out = Result<O1, E1>> + Send,
175 U: Send + Service<O1, Out = Result<O2, E2>>,
176 O1: Send,
177 O2: Send,
178 E1: Send,
179 E2: Send,
180 {
181 (self, service)
182 }
183
184 #[inline]
185 fn except<F>(self, on_err: F) -> Except<Self, F>
186 where
187 Self: Sized,
188 {
189 Except {
190 service: self,
191 on_err,
192 }
193 }
194
195 /// Adds an inspection step that invokes the supplied callback on every
196 /// successful output of the wrapped service.
197 ///
198 /// This method returns a 2‑tuple consisting of:
199 /// * `self` – the original service unchanged.
200 /// * an `inspect::Inspect<O, E, F>` instance that intercepts the
201 /// service’s output. For each successful result (`Ok(o)`), the
202 /// closure `f` is called with a reference to `o`. The output is then
203 /// passed through unchanged.
204 ///
205 /// # Parameters
206 ///
207 /// * `f` – A callback implementing `Fn(&O)`. The callback receives a
208 /// reference to the successful output value. It can be used for
209 /// logging, metrics, or any side‑effect‑only operation.
210 ///
211 /// # Return value
212 ///
213 /// A tuple `(Self, inspect::Inspect<O, E, F>)` that can be used in a
214 /// service pipeline (e.g., within the `flow` combinator). The first
215 /// element is the original service, and the second element is a service
216 /// that performs the inspection.
217 ///
218 /// # Example
219 ///
220 /// ```rust
221 /// use flowly_service::{Service, flow::Flow, inspect::Inspect};
222 ///
223 /// let service = MyService::new();
224 /// let (orig, inspector) = service.flow_inspect(|value: &Result<i32, _>| {
225 /// println!("Got value: {:?}", value);
226 /// });
227 /// let flow = Flow::from(orig).and(inspector);
228 /// ```
229 #[inline]
230 fn flow_inspect<O, E, F>(self, f: F) -> Left<Self, inspect::Inspect<O, F>>
231 where
232 Self: Sized + Service<I, Out = Result<O, E>> + Send,
233 F: Fn(&O) + Send,
234 O: Send,
235 {
236 Left(
237 self,
238 inspect::Inspect::<O, F> {
239 cb: f,
240 _m: PhantomData,
241 },
242 )
243 }
244
245 /// Creates a concurrent wrapper around the current service that limits the number of
246 /// parallel executions.
247 ///
248 /// This method returns a `ConcurrentEach<I, Self>` instance that delegates work to a pool
249 /// of worker tasks. Each worker runs the underlying service independently, allowing
250 /// multiple inputs to be processed concurrently. The `limit` argument controls the
251 /// maximum number of worker tasks that may run in parallel.
252 ///
253 /// **Parameters**
254 /// - `self`: The service instance to be wrapped. It must implement `Service<I>` and
255 /// satisfy `Send`, `Clone`, and `'static` bounds.
256 /// - `limit`: The maximum number of concurrent worker tasks to spawn. If `limit` is
257 /// greater than the current number of tasks, new tasks will be created up to this
258 /// bound.
259 ///
260 /// **Return value**
261 /// A `ConcurrentEach<I, Self>` which itself implements `Service`. When handling an
262 /// input, it forwards the input to one of the available workers and returns a stream
263 /// of results that can be awaited asynchronously.
264 ///
265 #[inline]
266 fn concurrent_each(self, limit: usize) -> ConcurrentEach<I, Self>
267 where
268 Self: Sized + Send + Clone + 'static,
269 Self::Out: Send,
270 {
271 ConcurrentEach::new(self, limit)
272 }
273
274 /// Creates a new [`SpawnEach`] wrapper around the current service.
275 ///
276 /// The wrapper spawns a separate task for each input message, forwarding
277 /// the results through a bounded `mpsc` channel. This allows the
278 /// underlying service to process messages concurrently without
279 /// blocking the caller.
280 ///
281 /// # Parameters
282 /// * `self` – The service instance to wrap. The service must implement
283 /// `Service<I>` for some input type `I`.
284 ///
285 /// # Constraints
286 /// * `Self: Sized + Send + Clone + 'static` – The service must be
287 /// clonable and safe to send across threads.
288 /// * `Self::Out: Send` – The output type of the service must be
289 /// `Send` because it will be transported across channels.
290 ///
291 /// # Return value
292 /// Returns a [`SpawnEach<I, Self>`] that implements `Service<I>` with
293 /// the same input type. The new service can be used just like the
294 /// original one, but each invocation of `handle` will spawn a
295 /// dedicated task.
296 ///
297 /// # Example
298 /// ```rust
299 /// use flowly_service::{Service, spawn_each};
300 ///
301 /// struct MyService;
302 /// impl Service<u32> for MyService {
303 /// type Out = Result<String, std::io::Error>;
304 /// fn handle(&mut self, input: u32, _cx: &crate::Context)
305 /// -> impl futures::Stream<Item = Self::Out> + Send
306 /// {
307 /// // …
308 /// }
309 /// }
310 ///
311 /// let service = MyService;
312 /// // Wrap in SpawnEach
313 /// let concurrent_service = service.spawn_each();
314 /// // Now `concurrent_service` can be used as a Service and will process
315 /// // each input concurrently.
316 /// ```
317 ///
318 /// # Note
319 /// The default message buffer size is 2.
320 #[inline]
321 fn spawn_each(self) -> SpawnEach<I, Self>
322 where
323 Self: Sized + Send + Clone + 'static,
324 Self::Out: Send,
325 {
326 SpawnEach::new(self, 2)
327 }
328
329 /// Creates a scoped service wrapper that transforms incoming messages before passing to the wrapped service.
330 ///
331 /// This method consumes the current service and returns a tuple containing the original service and a new
332 /// [`Scope`] service that forwards transformed messages to `s`. The transformation function `f` receives
333 /// a reference to the original input `O` and returns either a message `M` for `s` or an error `E1`.\
334 ///
335 /// # Type Parameters
336 /// * `O`: Type of the original input that will be received by the outer service.
337 /// * `M`: Type of the message that `s` expects.
338 /// * `E1`: Error type returned by the transformation function `f`.
339 /// * `S`: The inner service that will handle the transformed messages.
340 /// * `F`: Function or closure of type `Fn(&O) -> Result<M, E1>`.
341 ///
342 /// # Parameters
343 /// * `self` – The current service (moved into the returned tuple).
344 /// * `f` – Function that transforms `&O` into `Result<M, E1>`.
345 /// * `s` – The inner service to be invoked after successful transformation.
346 ///
347 /// # Returns
348 /// A tuple `(Self, Scope<O, M, E1, S, F>)` where:\n
349 /// * `Self` is the original service that can continue to be used.\n
350 /// * `Scope<O, M, E1, S, F>` is a new service that:\n
351 /// 1. Calls `f` with the incoming input.\n
352 /// 2. If `f` returns `Ok(m)`, forwards `m` to `s` and collects all emitted outputs into `Vec<O>`.\n
353 /// 3. If `f` returns `Err(e)`, immediately returns an error wrapped in `Either::Right(e)` without invoking `s`.\n
354 ///
355 /// # Example
356 /// ```ignore
357 /// let (service, scoped) = flow_scope(service, |msg: &Input| {
358 /// if msg.valid { Ok(transformed_msg) } else { Err(TransformError) }
359 /// }, inner_service);
360 /// ```
361 ///
362 /// # Constraints
363 /// All involved types must be `Send`, and `Self` must implement `Sized + Send`.
364 #[inline]
365 fn flow_scope<O, M, E1, S, F>(self, f: F, s: S) -> (Self, Scope<O, M, E1, S, F>)
366 where
367 F: Fn(&O) -> Result<M, E1>,
368 Self: Sized + Send,
369 O: Send,
370 E1: Send,
371 {
372 (self, scope::scope(f, s))
373 }
374
375 #[inline]
376 fn flow_map<O1, O2, E1, F, H>(self, f: F) -> Left<Self, map::Map<O2, F>>
377 where
378 Self: Sized + Service<I, Out = Result<O1, E1>> + Send,
379 F: FnMut(O1) -> H + Send,
380 H: Future<Output = O2> + Send,
381 O1: Send,
382 O2: Send,
383 E1: Send,
384 {
385 Left(self, map::map::<O2, _>(f))
386 }
387
388 #[inline]
389 fn flow_filter_map<O1, O2, E1, F, H>(self, f: F) -> Left<Self, map::FilterMap<O2, F>>
390 where
391 Self: Sized + Service<I, Out = Result<O1, E1>> + Send,
392 O1: Send,
393 O2: Send,
394 E1: Send,
395 F: FnMut(O1) -> H + Send,
396 H: Future<Output = Option<O2>> + Send,
397 {
398 Left(self, map::filter_map::<O2, _>(f))
399 }
400}
401
402impl<I: Send, T: Service<I>> ServiceExt<I> for T {}
403
404impl<I: Send, E, S: Service<I, Out = Result<I, E>>> Service<I> for Option<S> {
405 type Out = Result<I, E>;
406
407 fn handle(&mut self, input: I, cx: &Context) -> impl Stream<Item = Self::Out> + Send {
408 if let Some(srv) = self {
409 srv.handle(input, cx).left_stream()
410 } else {
411 futures::stream::once(async move { Ok(input) }).right_stream()
412 }
413 }
414}