fluxion_exec/
subscribe.rs

1// Copyright 2025 Umberto Gotti <umberto.gotti@umbertogotti.dev>
2// Licensed under the Apache License, Version 2.0
3// http://www.apache.org/licenses/LICENSE-2.0
4
5use alloc::boxed::Box;
6use async_trait::async_trait;
7use core::fmt::Debug;
8use core::future::Future;
9use fluxion_core::{CancellationToken, Result};
10use futures::stream::Stream;
11use futures::stream::StreamExt;
12
13/// Extension trait providing async subscription capabilities for streams.
14///
15/// This trait enables processing stream items with async handlers in a sequential manner.
16#[cfg(target_arch = "wasm32")]
17#[async_trait(?Send)]
18pub trait SubscribeExt<T>: Stream<Item = T> + Sized {
19    /// Subscribes to the stream with an async handler, processing items sequentially.
20    ///
21    /// This method consumes the stream and processes each item with the provided handler.
22    /// Items are processed in the order they arrive, with each item's handler completing
23    /// before the next item is processed.
24    ///
25    /// # Behavior
26    ///
27    /// - Processes each stream item with the provided async handler sequentially
28    /// - Waits for handler completion before processing next item
29    /// - Continues until stream ends or cancellation token is triggered
30    /// - Errors from handlers are passed to the error callback
31    ///
32    /// # Arguments
33    ///
34    /// * `on_next_func` - Async function called for each stream item. Receives the item
35    ///                    and a cancellation token. Returns `Result<(), E>`.
36    /// * `cancellation_token` - Optional token to stop processing. If `None`, a default
37    ///                          token is created that never cancels.
38    /// * `on_error_callback` - Error handler called when `on_next_func` returns an error.
39    ///
40    /// # Type Parameters
41    ///
42    /// * `F` - Function type for the item handler
43    /// * `Fut` - Future type returned by the handler
44    /// * `E` - Error type
45    /// * `OnError` - Function type for error handling
46    ///
47    /// # Errors
48    ///
49    /// Returns `Ok(())` on stream completion. Errors from item handlers are passed
50    /// to the error callback. The subscription continues processing subsequent items
51    /// even if individual items fail, unless the cancellation token is triggered.
52    ///
53    /// # See Also
54    ///
55    /// - [`subscribe_latest`](crate::SubscribeLatestExt::subscribe_latest) - Cancels old work for new items
56    ///
57    /// # Examples
58    ///
59    /// ## Basic Usage
60    ///
61    /// Process all items sequentially:
62    ///
63    /// ```
64    /// use fluxion_exec::SubscribeExt;
65    /// use futures::channel::mpsc::unbounded;
66    /// use futures::stream;
67    /// use futures::StreamExt;
68    /// use std::sync::Arc;
69    /// use futures::lock::Mutex;
70    ///
71    /// # #[tokio::main]
72    /// # async fn main() {
73    /// let results = Arc::new(Mutex::new(Vec::new()));
74    /// let results_clone = results.clone();
75    /// let (notify_tx, mut notify_rx) = unbounded();
76    ///
77    /// let stream = stream::iter(vec![1, 2, 3, 4, 5]);
78    ///
79    /// // Subscribe and process each item
80    /// stream.subscribe(
81    ///     move |item, _token| {
82    ///         let results = results_clone.clone();
83    ///         let notify_tx = notify_tx.clone();
84    ///         async move {
85    ///             results.lock().await.push(item * 2);
86    ///             let _ = notify_tx.unbounded_send(());
87    ///             Ok::<(), std::io::Error>(())
88    ///         }
89    ///     },
90    ///     |_err| {}, // Ignore errors
91    ///     None, // No cancellation
92    /// ).await.unwrap();
93    ///
94    /// // Wait for all 5 items to be processed
95    /// for _ in 0..5 {
96    ///     notify_rx.next().await.unwrap();
97    /// }
98    ///
99    /// let processed = results.lock().await;
100    /// assert!(processed.contains(&2));
101    /// assert!(processed.contains(&4));
102    /// # }
103    /// ```
104    ///
105    /// ## With Error Handling
106    ///
107    /// Use an error callback to handle errors without stopping the stream:
108    ///
109    /// ```
110    /// use fluxion_exec::SubscribeExt;
111    /// use futures::channel::mpsc::unbounded;
112    /// use futures::stream;
113    /// use futures::StreamExt;
114    /// use std::sync::Arc;
115    /// use futures::lock::Mutex;
116    ///
117    /// # #[tokio::main]
118    /// # async fn main() {
119    /// #[derive(Debug)]
120    /// struct MyError(String);
121    /// impl core::fmt::Display for MyError {
122    ///     fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
123    ///         write!(f, "MyError: {}", self.0)
124    ///     }
125    /// }
126    /// impl std::error::Error for MyError {}
127    ///
128    /// let error_count = Arc::new(Mutex::new(0));
129    /// let error_count_clone = error_count.clone();
130    /// let (notify_tx, mut notify_rx) = unbounded();
131    ///
132    /// let stream = stream::iter(vec![1, 2, 3, 4, 5]);
133    ///
134    /// stream.subscribe(
135    ///     move |item, _token| {
136    ///         let notify_tx = notify_tx.clone();
137    ///         async move {
138    ///             let res = if item % 2 == 0 {
139    ///                 Err(MyError(format!("Even number: {}", item)))
140    ///             } else {
141    ///                 Ok(())
142    ///             };
143    ///             // Signal completion regardless of success/failure
144    ///             // Note: In real code, you might signal in the error callback too
145    ///             // but here we just want to know the handler finished.
146    ///             // However, subscribe spawns the handler. If it errors,
147    ///             // the error callback is called.
148    ///             // We need to signal completion in both paths.
149    ///             // Since the handler returns the error, we can't signal *after* returning Err.
150    ///             // So we signal before returning.
151    ///             let _ = notify_tx.unbounded_send(());
152    ///             res
153    ///         }
154    ///     },
155    ///     move |_err| {
156    ///         let count = error_count_clone.clone();
157    ///         tokio::spawn(async move {
158    ///             *count.lock().await += 1;
159    ///         });
160    ///     },
161    ///     None,
162    /// ).await.unwrap();
163    ///
164    /// // Wait for 5 items
165    /// for _ in 0..5 {
166    ///     notify_rx.next().await.unwrap();
167    /// }
168    ///
169    /// // Give a tiny bit of time for the error callback spawn to finish updating the count
170    /// // (Since the callback spawns another task)
171    /// // Alternatively, we could use a channel in the error callback too.
172    /// tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
173    ///
174    /// assert_eq!(*error_count.lock().await, 2); // Items 2 and 4 errored
175    /// # }
176    /// ```
177    ///
178    /// ## With Cancellation
179    ///
180    /// Use a cancellation token to stop processing:
181    ///
182    /// ```
183    /// use fluxion_exec::SubscribeExt;
184    /// use futures::channel::mpsc::unbounded;
185    /// use futures::StreamExt;
186    /// use fluxion_core::CancellationToken;
187    /// use std::sync::Arc;
188    /// use futures::lock::Mutex;
189    ///
190    /// # #[tokio::main]
191    /// # async fn main() {
192    /// let (tx, rx) = unbounded();
193    /// let stream = rx;
194    ///
195    /// let cancel_token = CancellationToken::new();
196    /// let cancel_clone = cancel_token.clone();
197    ///
198    /// let processed = Arc::new(Mutex::new(Vec::new()));
199    /// let processed_clone = processed.clone();
200    /// let (notify_tx, mut notify_rx) = unbounded();
201    ///
202    /// let handle = tokio::spawn(async move {
203    ///     stream.subscribe(
204    ///         move |item, token| {
205    ///             let vec = processed_clone.clone();
206    ///             let notify_tx = notify_tx.clone();
207    ///             async move {
208    ///                 if token.is_cancelled() {
209    ///                     return Ok(());
210    ///                 }
211    ///                 vec.lock().await.push(item);
212    ///                 let _ = notify_tx.unbounded_send(());
213    ///                 Ok::<(), std::io::Error>(())
214    ///             }
215    ///         },
216    ///         |_| {}, // Ignore errors
217    ///         Some(cancel_token),
218    ///     ).await
219    /// });
220    ///
221    /// // Send items
222    /// tx.unbounded_send(1).unwrap();
223    /// tx.unbounded_send(2).unwrap();
224    /// tx.unbounded_send(3).unwrap();
225    ///
226    /// // Wait for first item to be processed
227    /// notify_rx.next().await.unwrap();
228    ///
229    /// // Cancel now
230    /// cancel_clone.cancel();
231    /// drop(tx);
232    ///
233    /// handle.await.unwrap().unwrap();
234    ///
235    /// // At least one item should be processed before cancellation
236    /// assert!(!processed.lock().await.is_empty());
237    /// # }
238    /// ```
239    ///
240    /// ## Database Write Pattern
241    ///
242    /// Process events and persist to a database:
243    ///
244    /// ```
245    /// use fluxion_exec::SubscribeExt;
246    /// use futures::channel::mpsc::unbounded;
247    /// use futures::stream;
248    /// use futures::StreamExt;
249    /// use std::sync::Arc;
250    /// use futures::lock::Mutex;
251    ///
252    /// # #[tokio::main]
253    /// # async fn main() {
254    /// #[derive(Clone, Debug)]
255    /// struct Event { id: u32, data: String }
256    ///
257    /// // Simulated database
258    /// let db = Arc::new(Mutex::new(Vec::new()));
259    /// let db_clone = db.clone();
260    /// let (notify_tx, mut notify_rx) = unbounded();
261    ///
262    /// let events = vec![
263    ///     Event { id: 1, data: "event1".to_string() },
264    ///     Event { id: 2, data: "event2".to_string() },
265    /// ];
266    ///
267    /// let stream = stream::iter(events);
268    ///
269    /// stream.subscribe(
270    ///     move |event, _token| {
271    ///         let db = db_clone.clone();
272    ///         let notify_tx = notify_tx.clone();
273    ///         async move {
274    ///             // Simulate database write
275    ///             db.lock().await.push(event);
276    ///             let _ = notify_tx.unbounded_send(());
277    ///             Ok::<(), std::io::Error>(())
278    ///         }
279    ///     },
280    ///     |err| eprintln!("DB Error: {}", err),
281    ///     None,
282    /// ).await.unwrap();
283    ///
284    /// // Wait for 2 events
285    /// notify_rx.next().await.unwrap();
286    /// notify_rx.next().await.unwrap();
287    ///
288    /// assert_eq!(db.lock().await.len(), 2);
289    /// # }
290    /// ```
291    ///
292    /// # Thread Safety
293    ///
294    /// Handlers are executed sequentially on the calling task. The subscription
295    /// completes when the stream ends or cancellation is triggered.
296    async fn subscribe<F, Fut, E, OnError>(
297        self,
298        on_next_func: F,
299        on_error_callback: OnError,
300        cancellation_token: Option<CancellationToken>,
301    ) -> Result<()>
302    where
303        F: Fn(T, CancellationToken) -> Fut + Clone + 'static,
304        Fut: Future<Output = core::result::Result<(), E>> + 'static,
305        OnError: Fn(E) + Clone + 'static,
306        T: Debug + Clone + 'static,
307        E: 'static;
308}
309
310// Non-WASM version with Send + Sync bounds for multi-threaded runtimes
311#[cfg(not(target_arch = "wasm32"))]
312#[async_trait]
313pub trait SubscribeExt<T>: Stream<Item = T> + Sized {
314    async fn subscribe<F, Fut, E, OnError>(
315        self,
316        on_next_func: F,
317        on_error_callback: OnError,
318        cancellation_token: Option<CancellationToken>,
319    ) -> Result<()>
320    where
321        F: Fn(T, CancellationToken) -> Fut + Clone + Send + Sync + 'static,
322        Fut: Future<Output = core::result::Result<(), E>> + Send + 'static,
323        OnError: Fn(E) + Clone + Send + Sync + 'static,
324        T: Debug + Send + Clone + 'static,
325        E: Send + 'static;
326}
327
328// WASM implementation
329#[cfg(target_arch = "wasm32")]
330#[async_trait(?Send)]
331impl<S, T> SubscribeExt<T> for S
332where
333    S: Stream<Item = T> + Unpin + 'static,
334    T: 'static,
335{
336    async fn subscribe<F, Fut, E, OnError>(
337        self,
338        on_next_func: F,
339        on_error_callback: OnError,
340        cancellation_token: Option<CancellationToken>,
341    ) -> Result<()>
342    where
343        F: Fn(T, CancellationToken) -> Fut + Clone + 'static,
344        Fut: Future<Output = core::result::Result<(), E>> + 'static,
345        OnError: Fn(E) + Clone + 'static,
346        T: Debug + Clone + 'static,
347        E: 'static,
348    {
349        subscribe_impl(self, on_next_func, on_error_callback, cancellation_token).await
350    }
351}
352
353// Non-WASM implementation with Send + Sync bounds
354#[cfg(not(target_arch = "wasm32"))]
355#[async_trait]
356impl<S, T> SubscribeExt<T> for S
357where
358    S: Stream<Item = T> + Send + Unpin + 'static,
359    T: Send + 'static,
360{
361    async fn subscribe<F, Fut, E, OnError>(
362        self,
363        on_next_func: F,
364        on_error_callback: OnError,
365        cancellation_token: Option<CancellationToken>,
366    ) -> Result<()>
367    where
368        F: Fn(T, CancellationToken) -> Fut + Clone + Send + Sync + 'static,
369        Fut: Future<Output = core::result::Result<(), E>> + Send + 'static,
370        OnError: Fn(E) + Clone + Send + Sync + 'static,
371        T: Debug + Send + Clone + 'static,
372        E: Send + 'static,
373    {
374        subscribe_impl(self, on_next_func, on_error_callback, cancellation_token).await
375    }
376}
377
378// Shared implementation for both WASM and non-WASM
379async fn subscribe_impl<S, T, F, Fut, E, OnError>(
380    mut stream: S,
381    on_next_func: F,
382    on_error_callback: OnError,
383    cancellation_token: Option<CancellationToken>,
384) -> Result<()>
385where
386    S: Stream<Item = T> + Unpin,
387    F: Fn(T, CancellationToken) -> Fut + Clone,
388    Fut: Future<Output = core::result::Result<(), E>>,
389    OnError: Fn(E) + Clone,
390    T: Debug + Clone,
391{
392    let cancellation_token = cancellation_token.unwrap_or_default();
393
394    while let Some(item) = stream.next().await {
395        if cancellation_token.is_cancelled() {
396            break;
397        }
398
399        // Call handler directly (sequential processing)
400        let result = on_next_func(item.clone(), cancellation_token.clone()).await;
401
402        if let Err(error) = result {
403            on_error_callback(error);
404        }
405    }
406
407    Ok(())
408}