flowly_service/lib.rs
1mod and_then;
2mod concurrent_each;
3mod inspect;
4mod map;
5mod pass;
6mod scope;
7mod spawn_each;
8mod stub;
9mod switch;
10
11pub use and_then::and_then;
12pub use concurrent_each::{ConcurrentEach, concurrent_each};
13use flowly_core::Either;
14pub use map::{filter_map, map, map_if_else, try_filter_map, try_map};
15pub use pass::flow;
16pub use spawn_each::{SpawnEach, spawn_each};
17pub use stub::stub;
18pub use switch::switch;
19use tokio::sync::watch;
20
21use std::{marker::PhantomData, pin::pin};
22
23use futures::{Stream, StreamExt, future};
24
25use crate::scope::Scope;
26
27#[derive(Clone)]
28#[non_exhaustive]
29pub struct Context {
30 pub abort: watch::Sender<bool>,
31 pub abort_recv: watch::Receiver<bool>,
32}
33
34impl Context {
35 pub async fn fuse_abort<F: Future>(&self, fut: F) -> Option<F::Output> {
36 let mut abort_recv = self.abort_recv.clone();
37 let fut1 = pin!(abort_recv.changed());
38 let fut2 = pin!(fut);
39
40 match futures::future::select(fut1, fut2).await {
41 future::Either::Left(..) => None,
42 future::Either::Right((val, _)) => Some(val),
43 }
44 }
45}
46
47impl Default for Context {
48 fn default() -> Self {
49 Self::new()
50 }
51}
52
53impl From<watch::Sender<bool>> for Context {
54 fn from(abort: watch::Sender<bool>) -> Self {
55 Self {
56 abort_recv: abort.subscribe(),
57 abort,
58 }
59 }
60}
61
62impl Context {
63 pub fn new() -> Self {
64 Self::from(watch::Sender::default())
65 }
66}
67
68pub trait Service<In> {
69 type Out;
70
71 fn handle(&mut self, input: In, cx: &Context) -> impl Stream<Item = Self::Out> + Send;
72 fn handle_stream(
73 &mut self,
74 input: impl Stream<Item = In> + Send,
75 cx: &Context,
76 ) -> impl Stream<Item = Self::Out> + Send
77 where
78 In: Send,
79 Self: Send,
80 Self::Out: Send,
81 {
82 async_stream::stream! {
83 let mut input = pin!(input);
84
85 while let Some(item) = input.next().await {
86 let mut s = pin!(self.handle(item, cx));
87
88 while let Some(out) = s.next().await {
89 yield out;
90 }
91 }
92 }
93 }
94
95 #[inline]
96 fn finalize(&mut self, _cx: &Context) -> impl Future<Output = ()>
97 where
98 Self: Sized,
99 {
100 async move {}
101 }
102}
103
104impl<I, O1, E1, O2, E2, S1, S2> Service<I> for (S1, S2)
105where
106 I: Send,
107 O1: Send,
108 O2: Send,
109 E1: Send,
110 E2: Send,
111 S1: Service<I, Out = Result<O1, E1>> + Send,
112 S2: Service<O1, Out = Result<O2, E2>> + Send,
113{
114 type Out = Result<O2, Either<E1, E2>>;
115
116 fn handle(&mut self, msg: I, cx: &Context) -> impl Stream<Item = Self::Out> + Send {
117 async_stream::stream! {
118 let mut s1 = pin!(self.0.handle(msg, cx));
119
120 while let Some(res) = s1.next().await {
121 match res {
122 Ok(ok) => {
123 let mut s2 = pin!(self.1.handle(ok, cx));
124
125 while let Some(i2) = s2.next().await {
126 yield i2.map_err(Either::Right);
127 }
128 },
129 Err(err) => yield Err(Either::Left(err)),
130 }
131 }
132 }
133 }
134}
135
136pub trait ServiceExt<I: Send>: Service<I> {
137 #[inline]
138 fn flow<O1, O2, E1, E2, U>(self, service: U) -> (Self, U)
139 where
140 Self: Sized + Service<I, Out = Result<O1, E1>> + Send,
141 U: Send + Service<O1, Out = Result<O2, E2>>,
142 O1: Send,
143 O2: Send,
144 E1: Send,
145 E2: Send,
146 {
147 (self, service)
148 }
149
150 /// Adds an inspection step that invokes the supplied callback on every
151 /// successful output of the wrapped service.
152 ///
153 /// This method returns a 2‑tuple consisting of:
154 /// * `self` – the original service unchanged.
155 /// * an `inspect::Inspect<O, E, F>` instance that intercepts the
156 /// service’s output. For each successful result (`Ok(o)`), the
157 /// closure `f` is called with a reference to `o`. The output is then
158 /// passed through unchanged.
159 ///
160 /// # Parameters
161 ///
162 /// * `f` – A callback implementing `Fn(&O)`. The callback receives a
163 /// reference to the successful output value. It can be used for
164 /// logging, metrics, or any side‑effect‑only operation.
165 ///
166 /// # Return value
167 ///
168 /// A tuple `(Self, inspect::Inspect<O, E, F>)` that can be used in a
169 /// service pipeline (e.g., within the `flow` combinator). The first
170 /// element is the original service, and the second element is a service
171 /// that performs the inspection.
172 ///
173 /// # Example
174 ///
175 /// ```rust
176 /// use flowly_service::{Service, flow::Flow, inspect::Inspect};
177 ///
178 /// let service = MyService::new();
179 /// let (orig, inspector) = service.flow_inspect(|value: &Result<i32, _>| {
180 /// println!("Got value: {:?}", value);
181 /// });
182 /// let flow = Flow::from(orig).and(inspector);
183 /// ```
184 #[inline]
185 fn flow_inspect<O, E, F>(self, f: F) -> (Self, inspect::Inspect<O, E, F>)
186 where
187 Self: Sized + Service<I, Out = Result<O, E>> + Send,
188 F: Fn(&O) + Send,
189 O: Send,
190 E: Send,
191 {
192 (
193 self,
194 inspect::Inspect::<O, E, F> {
195 cb: f,
196 _m: PhantomData,
197 },
198 )
199 }
200
201 /// Creates a concurrent wrapper around the current service that limits the number of
202 /// parallel executions.
203 ///
204 /// This method returns a `ConcurrentEach<I, Self>` instance that delegates work to a pool
205 /// of worker tasks. Each worker runs the underlying service independently, allowing
206 /// multiple inputs to be processed concurrently. The `limit` argument controls the
207 /// maximum number of worker tasks that may run in parallel.
208 ///
209 /// **Parameters**
210 /// - `self`: The service instance to be wrapped. It must implement `Service<I>` and
211 /// satisfy `Send`, `Clone`, and `'static` bounds.
212 /// - `limit`: The maximum number of concurrent worker tasks to spawn. If `limit` is
213 /// greater than the current number of tasks, new tasks will be created up to this
214 /// bound.
215 ///
216 /// **Return value**
217 /// A `ConcurrentEach<I, Self>` which itself implements `Service`. When handling an
218 /// input, it forwards the input to one of the available workers and returns a stream
219 /// of results that can be awaited asynchronously.
220 ///
221 #[inline]
222 fn concurrent_each(self, limit: usize) -> ConcurrentEach<I, Self>
223 where
224 Self: Sized + Send + Clone + 'static,
225 Self::Out: Send,
226 {
227 ConcurrentEach::new(self, limit)
228 }
229
230 /// Creates a new [`SpawnEach`] wrapper around the current service.
231 ///
232 /// The wrapper spawns a separate task for each input message, forwarding
233 /// the results through a bounded `mpsc` channel. This allows the
234 /// underlying service to process messages concurrently without
235 /// blocking the caller.
236 ///
237 /// # Parameters
238 /// * `self` – The service instance to wrap. The service must implement
239 /// `Service<I>` for some input type `I`.
240 ///
241 /// # Constraints
242 /// * `Self: Sized + Send + Clone + 'static` – The service must be
243 /// clonable and safe to send across threads.
244 /// * `Self::Out: Send` – The output type of the service must be
245 /// `Send` because it will be transported across channels.
246 ///
247 /// # Return value
248 /// Returns a [`SpawnEach<I, Self>`] that implements `Service<I>` with
249 /// the same input type. The new service can be used just like the
250 /// original one, but each invocation of `handle` will spawn a
251 /// dedicated task.
252 ///
253 /// # Example
254 /// ```rust
255 /// use flowly_service::{Service, spawn_each};
256 ///
257 /// struct MyService;
258 /// impl Service<u32> for MyService {
259 /// type Out = Result<String, std::io::Error>;
260 /// fn handle(&mut self, input: u32, _cx: &crate::Context)
261 /// -> impl futures::Stream<Item = Self::Out> + Send
262 /// {
263 /// // …
264 /// }
265 /// }
266 ///
267 /// let service = MyService;
268 /// // Wrap in SpawnEach
269 /// let concurrent_service = service.spawn_each();
270 /// // Now `concurrent_service` can be used as a Service and will process
271 /// // each input concurrently.
272 /// ```
273 ///
274 /// # Note
275 /// The default message buffer size is 2.
276 #[inline]
277 fn spawn_each(self) -> SpawnEach<I, Self>
278 where
279 Self: Sized + Send + Clone + 'static,
280 Self::Out: Send,
281 {
282 SpawnEach::new(self, 2)
283 }
284
285 /// Creates a scoped service wrapper that transforms incoming messages before passing to the wrapped service.
286 ///
287 /// This method consumes the current service and returns a tuple containing the original service and a new
288 /// [`Scope`] service that forwards transformed messages to `s`. The transformation function `f` receives
289 /// a reference to the original input `O` and returns either a message `M` for `s` or an error `E1`.\
290 ///
291 /// # Type Parameters
292 /// * `O`: Type of the original input that will be received by the outer service.
293 /// * `M`: Type of the message that `s` expects.
294 /// * `E1`: Error type returned by the transformation function `f`.
295 /// * `S`: The inner service that will handle the transformed messages.
296 /// * `F`: Function or closure of type `Fn(&O) -> Result<M, E1>`.
297 ///
298 /// # Parameters
299 /// * `self` – The current service (moved into the returned tuple).
300 /// * `f` – Function that transforms `&O` into `Result<M, E1>`.
301 /// * `s` – The inner service to be invoked after successful transformation.
302 ///
303 /// # Returns
304 /// A tuple `(Self, Scope<O, M, E1, S, F>)` where:\n
305 /// * `Self` is the original service that can continue to be used.\n
306 /// * `Scope<O, M, E1, S, F>` is a new service that:\n
307 /// 1. Calls `f` with the incoming input.\n
308 /// 2. If `f` returns `Ok(m)`, forwards `m` to `s` and collects all emitted outputs into `Vec<O>`.\n
309 /// 3. If `f` returns `Err(e)`, immediately returns an error wrapped in `Either::Right(e)` without invoking `s`.\n
310 ///
311 /// # Example
312 /// ```ignore
313 /// let (service, scoped) = flow_scope(service, |msg: &Input| {
314 /// if msg.valid { Ok(transformed_msg) } else { Err(TransformError) }
315 /// }, inner_service);
316 /// ```
317 ///
318 /// # Constraints
319 /// All involved types must be `Send`, and `Self` must implement `Sized + Send`.
320 #[inline]
321 fn flow_scope<O, M, E1, S, F>(self, f: F, s: S) -> (Self, Scope<O, M, E1, S, F>)
322 where
323 F: Fn(&O) -> Result<M, E1>,
324 Self: Sized + Send,
325 O: Send,
326 E1: Send,
327 {
328 (self, scope::scope(f, s))
329 }
330
331 #[inline]
332 fn flow_map<O1, O2, E1, F, H>(self, f: F) -> (Self, map::Map<O2, F>)
333 where
334 Self: Sized + Service<I, Out = Result<O1, E1>> + Send,
335 F: FnMut(O1) -> H + Send,
336 H: Future<Output = O2> + Send,
337 O1: Send,
338 O2: Send,
339 E1: Send,
340 {
341 (self, map::map::<O2, _>(f))
342 }
343
344 #[inline]
345 fn flow_filter_map<O1, O2, E1, F, H>(self, f: F) -> (Self, map::FilterMap<O2, F>)
346 where
347 Self: Sized + Service<I, Out = Result<O1, E1>> + Send,
348 O1: Send,
349 O2: Send,
350 E1: Send,
351 F: FnMut(O1) -> H + Send,
352 H: Future<Output = Option<O2>> + Send,
353 {
354 (self, map::filter_map::<O2, _>(f))
355 }
356}
357
358impl<I: Send, T: Service<I>> ServiceExt<I> for T {}
359
360impl<I: Send, E, S: Service<I, Out = Result<I, E>>> Service<I> for Option<S> {
361 type Out = Result<I, E>;
362
363 fn handle(&mut self, input: I, cx: &Context) -> impl Stream<Item = Self::Out> + Send {
364 if let Some(srv) = self {
365 srv.handle(input, cx).left_stream()
366 } else {
367 futures::stream::once(async move { Ok(input) }).right_stream()
368 }
369 }
370}