flowly_service/lib.rs
1mod and_then;
2mod concurrent_each;
3mod except;
4mod inspect;
5mod map;
6mod pass;
7mod scope;
8mod spawn_each;
9mod stub;
10mod switch;
11
12pub use and_then::and_then;
13pub use concurrent_each::{ConcurrentEach, concurrent_each};
14pub use map::{filter_map, map, map_if_else, try_filter_map, try_map};
15pub use pass::flow;
16pub use spawn_each::{SpawnEach, spawn_each};
17pub use stub::stub;
18pub use switch::switch;
19use tokio::sync::watch;
20
21use std::{marker::PhantomData, pin::pin};
22
23use futures::{Stream, StreamExt, future};
24
25pub use crate::except::Except;
26pub use crate::scope::{Scope, ScopeEach, scope, scope_each};
27
28#[derive(Clone)]
29#[non_exhaustive]
30pub struct Context {
31 pub abort: watch::Sender<bool>,
32 pub abort_recv: watch::Receiver<bool>,
33}
34
35impl Context {
36 pub async fn fuse_abort<F: Future>(&self, fut: F) -> Option<F::Output> {
37 let mut abort_recv = self.abort_recv.clone();
38 let fut1 = pin!(abort_recv.changed());
39 let fut2 = pin!(fut);
40
41 match futures::future::select(fut1, fut2).await {
42 future::Either::Left(..) => None,
43 future::Either::Right((val, _)) => Some(val),
44 }
45 }
46
47 #[inline]
48 pub fn alive(&self) -> bool {
49 !*self.abort_recv.borrow()
50 }
51}
52
53impl Default for Context {
54 fn default() -> Self {
55 Self::new()
56 }
57}
58
59impl From<watch::Sender<bool>> for Context {
60 fn from(abort: watch::Sender<bool>) -> Self {
61 Self {
62 abort_recv: abort.subscribe(),
63 abort,
64 }
65 }
66}
67
68impl Context {
69 pub fn new() -> Self {
70 Self::from(watch::Sender::default())
71 }
72}
73
74pub trait Service<In>: Send + Sync {
75 type Out;
76
77 fn handle(&self, input: In, cx: &Context) -> impl Stream<Item = Self::Out> + Send;
78 fn handle_stream(
79 &self,
80 input: impl Stream<Item = In> + Send,
81 cx: &Context,
82 ) -> impl Stream<Item = Self::Out> + Send
83 where
84 In: Send,
85 Self: Send,
86 Self::Out: Send,
87 {
88 async_stream::stream! {
89 let mut input = pin!(input);
90
91 while let Some(item) = input.next().await {
92 let mut s = pin!(self.handle(item, cx));
93
94 while let Some(out) = s.next().await {
95 yield out;
96 }
97 }
98 }
99 }
100
101 #[inline]
102 fn finalize(&mut self, _cx: &Context) -> impl Future<Output = ()>
103 where
104 Self: Sized,
105 {
106 async move {}
107 }
108}
109
110impl<I, O1, E, O2, EI, S1, S2> Service<I> for (S1, S2)
111where
112 I: Send,
113 O1: Send,
114 O2: Send,
115 E: Send + From<EI>,
116 EI: Send,
117 S1: Service<I, Out = Result<O1, E>> + Send,
118 S2: Service<O1, Out = Result<O2, EI>> + Send,
119{
120 type Out = Result<O2, E>;
121
122 fn handle(&self, msg: I, cx: &Context) -> impl Stream<Item = Self::Out> + Send {
123 async_stream::stream! {
124 let mut s1 = pin!(self.0.handle(msg, cx));
125
126 while let Some(res) = s1.next().await {
127 match res {
128 Ok(ok) => {
129 let mut s2 = pin!(self.1.handle(ok, cx));
130
131 while let Some(i2) = s2.next().await {
132 yield i2.map_err(E::from);
133 }
134 },
135 Err(err) => yield Err(err),
136 }
137 }
138 }
139 }
140}
141
142#[derive(Clone)]
143pub struct Left<S1, S2>(S1, S2);
144impl<I, O1, E, O2, S1, S2> Service<I> for Left<S1, S2>
145where
146 I: Send,
147 O1: Send,
148 O2: Send,
149 E: Send,
150 S1: Service<I, Out = Result<O1, E>> + Send,
151 S2: Service<O1, Out = O2> + Send,
152{
153 type Out = Result<O2, E>;
154
155 fn handle(&self, msg: I, cx: &Context) -> impl Stream<Item = Self::Out> + Send {
156 async_stream::stream! {
157 let mut s1 = pin!(self.0.handle(msg, cx));
158
159 while let Some(res) = s1.next().await {
160 match res {
161 Ok(ok) => {
162 let mut s2 = pin!(self.1.handle(ok, cx));
163
164 while let Some(i2) = s2.next().await {
165 yield Ok(i2);
166 }
167 },
168 Err(err) => yield Err(err),
169 }
170 }
171 }
172 }
173}
174
175pub trait ServiceExt<I: Send>: Service<I> {
176 #[inline]
177 fn flow<O1, O2, E, EI, U>(self, service: U) -> (Self, U)
178 where
179 Self: Sized + Service<I, Out = Result<O1, E>> + Send,
180 U: Send + Service<O1, Out = Result<O2, EI>>,
181 O1: Send,
182 O2: Send,
183 E: Send + From<EI>,
184 EI: Send,
185 {
186 (self, service)
187 }
188
189 #[inline]
190 fn except<F>(self, on_err: F) -> Except<Self, F>
191 where
192 Self: Sized,
193 {
194 Except {
195 service: self,
196 on_err,
197 }
198 }
199
200 /// Adds an inspection step that invokes the supplied callback on every
201 /// successful output of the wrapped service.
202 ///
203 /// This method returns a 2‑tuple consisting of:
204 /// * `self` – the original service unchanged.
205 /// * an `inspect::Inspect<O, E, F>` instance that intercepts the
206 /// service’s output. For each successful result (`Ok(o)`), the
207 /// closure `f` is called with a reference to `o`. The output is then
208 /// passed through unchanged.
209 ///
210 /// # Parameters
211 ///
212 /// * `f` – A callback implementing `Fn(&O)`. The callback receives a
213 /// reference to the successful output value. It can be used for
214 /// logging, metrics, or any side‑effect‑only operation.
215 ///
216 /// # Return value
217 ///
218 /// A tuple `(Self, inspect::Inspect<O, E, F>)` that can be used in a
219 /// service pipeline (e.g., within the `flow` combinator). The first
220 /// element is the original service, and the second element is a service
221 /// that performs the inspection.
222 ///
223 /// # Example
224 ///
225 /// ```rust
226 /// use flowly_service::{Service, flow::Flow, inspect::Inspect};
227 ///
228 /// let service = MyService::new();
229 /// let (orig, inspector) = service.flow_inspect(|value: &Result<i32, _>| {
230 /// println!("Got value: {:?}", value);
231 /// });
232 /// let flow = Flow::from(orig).and(inspector);
233 /// ```
234 #[inline]
235 fn flow_inspect<O, E, F>(self, f: F) -> Left<Self, inspect::Inspect<O, F>>
236 where
237 Self: Sized + Service<I, Out = Result<O, E>> + Send,
238 F: Fn(&O) + Send,
239 O: Send,
240 {
241 Left(
242 self,
243 inspect::Inspect::<O, F> {
244 cb: f,
245 _m: PhantomData,
246 },
247 )
248 }
249
250 /// Creates a concurrent wrapper around the current service that limits the number of
251 /// parallel executions.
252 ///
253 /// This method returns a `ConcurrentEach<I, Self>` instance that delegates work to a pool
254 /// of worker tasks. Each worker runs the underlying service independently, allowing
255 /// multiple inputs to be processed concurrently. The `limit` argument controls the
256 /// maximum number of worker tasks that may run in parallel.
257 ///
258 /// **Parameters**
259 /// - `self`: The service instance to be wrapped. It must implement `Service<I>` and
260 /// satisfy `Send`, `Clone`, and `'static` bounds.
261 /// - `limit`: The maximum number of concurrent worker tasks to spawn. If `limit` is
262 /// greater than the current number of tasks, new tasks will be created up to this
263 /// bound.
264 ///
265 /// **Return value**
266 /// A `ConcurrentEach<I, Self>` which itself implements `Service`. When handling an
267 /// input, it forwards the input to one of the available workers and returns a stream
268 /// of results that can be awaited asynchronously.
269 ///
270 #[inline]
271 fn concurrent_each(self, limit: usize) -> ConcurrentEach<I, Self>
272 where
273 Self: Sized + Send + Clone + 'static,
274 Self::Out: Send,
275 {
276 ConcurrentEach::new(self, limit)
277 }
278
279 /// Creates a new [`SpawnEach`] wrapper around the current service.
280 ///
281 /// The wrapper spawns a separate task for each input message, forwarding
282 /// the results through a bounded `mpsc` channel. This allows the
283 /// underlying service to process messages concurrently without
284 /// blocking the caller.
285 ///
286 /// # Parameters
287 /// * `self` – The service instance to wrap. The service must implement
288 /// `Service<I>` for some input type `I`.
289 ///
290 /// # Constraints
291 /// * `Self: Sized + Send + Clone + 'static` – The service must be
292 /// clonable and safe to send across threads.
293 /// * `Self::Out: Send` – The output type of the service must be
294 /// `Send` because it will be transported across channels.
295 ///
296 /// # Return value
297 /// Returns a [`SpawnEach<I, Self>`] that implements `Service<I>` with
298 /// the same input type. The new service can be used just like the
299 /// original one, but each invocation of `handle` will spawn a
300 /// dedicated task.
301 ///
302 /// # Example
303 /// ```rust
304 /// use flowly_service::{Service, spawn_each};
305 ///
306 /// struct MyService;
307 /// impl Service<u32> for MyService {
308 /// type Out = Result<String, std::io::Error>;
309 /// fn handle(&mut self, input: u32, _cx: &crate::Context)
310 /// -> impl futures::Stream<Item = Self::Out> + Send
311 /// {
312 /// // …
313 /// }
314 /// }
315 ///
316 /// let service = MyService;
317 /// // Wrap in SpawnEach
318 /// let concurrent_service = service.spawn_each();
319 /// // Now `concurrent_service` can be used as a Service and will process
320 /// // each input concurrently.
321 /// ```
322 ///
323 /// # Note
324 /// The default message buffer size is 2.
325 #[inline]
326 fn spawn_each(self) -> SpawnEach<I, Self>
327 where
328 Self: Sized + Send + Clone + 'static,
329 Self::Out: Send,
330 {
331 SpawnEach::new(self, 2)
332 }
333
334 /// Creates a scoped service wrapper that transforms incoming messages before passing to the wrapped service.
335 ///
336 /// This method consumes the current service and returns a tuple containing the original service and a new
337 /// [`Scope`] service that forwards transformed messages to `s`. The transformation function `f` receives
338 /// a reference to the original input `O` and returns either a message `M` for `s` or an error `E1`.\
339 ///
340 /// # Type Parameters
341 /// * `O`: Type of the original input that will be received by the outer service.
342 /// * `M`: Type of the message that `s` expects.
343 /// * `E1`: Error type returned by the transformation function `f`.
344 /// * `S`: The inner service that will handle the transformed messages.
345 /// * `F`: Function or closure of type `Fn(&O) -> Result<M, E1>`.
346 ///
347 /// # Parameters
348 /// * `self` – The current service (moved into the returned tuple).
349 /// * `f` – Function that transforms `&O` into `Result<M, E1>`.
350 /// * `s` – The inner service to be invoked after successful transformation.
351 ///
352 /// # Returns
353 /// A tuple `(Self, Scope<O, M, E1, S, F>)` where:\n
354 /// * `Self` is the original service that can continue to be used.\n
355 /// * `Scope<O, M, E1, S, F>` is a new service that:\n
356 /// 1. Calls `f` with the incoming input.\n
357 /// 2. If `f` returns `Ok(m)`, forwards `m` to `s` and collects all emitted outputs into `Vec<O>`.\n
358 /// 3. If `f` returns `Err(e)`, immediately returns an error wrapped in `Either::Right(e)` without invoking `s`.\n
359 ///
360 /// # Example
361 /// ```ignore
362 /// let (service, scoped) = flow_scope(service, |msg: &Input| {
363 /// if msg.valid { Ok(transformed_msg) } else { Err(TransformError) }
364 /// }, inner_service);
365 /// ```
366 ///
367 /// # Constraints
368 /// All involved types must be `Send`, and `Self` must implement `Sized + Send`.
369 #[inline]
370 fn flow_scope<O, M, E, S, F>(self, f: F, s: S) -> (Self, Scope<O, M, E, S, F>)
371 where
372 F: Fn(&O) -> Result<M, E>,
373 Self: Sized + Send,
374 O: Send,
375 E: Send,
376 {
377 (self, scope(f, s))
378 }
379
380 #[inline]
381 fn flow_scope_each<O, M, E, S, F>(self, f: F, s: S) -> (Self, ScopeEach<O, M, S, F, E>)
382 where
383 F: Fn(&O) -> Result<M, E>,
384 Self: Sized + Send,
385 E: Send,
386 O: Send + Clone,
387 {
388 (self, scope_each(f, s))
389 }
390
391 #[inline]
392 fn flow_map<O1, O2, E1, F, H>(self, f: F) -> Left<Self, map::Map<O2, F>>
393 where
394 Self: Sized + Service<I, Out = Result<O1, E1>> + Send,
395 F: Fn(O1) -> H + Send,
396 H: Future<Output = O2> + Send,
397 O1: Send,
398 O2: Send,
399 E1: Send,
400 {
401 Left(self, map::map::<O2, _>(f))
402 }
403
404 #[inline]
405 fn flow_filter_map<O1, O2, E1, F, H>(self, f: F) -> Left<Self, map::FilterMap<O2, F>>
406 where
407 Self: Sized + Service<I, Out = Result<O1, E1>> + Send,
408 O1: Send,
409 O2: Send,
410 E1: Send,
411 F: Fn(O1) -> H + Send,
412 H: Future<Output = Option<O2>> + Send,
413 {
414 Left(self, map::filter_map::<O2, _>(f))
415 }
416}
417
418impl<I: Send, T: Service<I>> ServiceExt<I> for T {}
419
420impl<I: Send, E, S: Service<I, Out = Result<I, E>>> Service<I> for Option<S> {
421 type Out = Result<I, E>;
422
423 fn handle(&self, input: I, cx: &Context) -> impl Stream<Item = Self::Out> + Send {
424 if let Some(srv) = self {
425 srv.handle(input, cx).left_stream()
426 } else {
427 futures::stream::once(async move { Ok(input) }).right_stream()
428 }
429 }
430}