fluxion_exec/subscribe.rs
1// Copyright 2025 Umberto Gotti <umberto.gotti@umbertogotti.dev>
2// Licensed under the Apache License, Version 2.0
3// http://www.apache.org/licenses/LICENSE-2.0
4
5use alloc::boxed::Box;
6use async_trait::async_trait;
7use core::fmt::Debug;
8use core::future::Future;
9use fluxion_core::{CancellationToken, Result};
10use futures::stream::Stream;
11use futures::stream::StreamExt;
12
13/// Extension trait providing async subscription capabilities for streams.
14///
15/// This trait enables processing stream items with async handlers in a sequential manner.
16#[cfg(target_arch = "wasm32")]
17#[async_trait(?Send)]
18pub trait SubscribeExt<T>: Stream<Item = T> + Sized {
19 /// Subscribes to the stream with an async handler, processing items sequentially.
20 ///
21 /// This method consumes the stream and processes each item with the provided handler.
22 /// Items are processed in the order they arrive, with each item's handler completing
23 /// before the next item is processed.
24 ///
25 /// # Behavior
26 ///
27 /// - Processes each stream item with the provided async handler sequentially
28 /// - Waits for handler completion before processing next item
29 /// - Continues until stream ends or cancellation token is triggered
30 /// - Errors from handlers are passed to the error callback
31 ///
32 /// # Arguments
33 ///
34 /// * `on_next_func` - Async function called for each stream item. Receives the item
35 /// and a cancellation token. Returns `Result<(), E>`.
36 /// * `cancellation_token` - Optional token to stop processing. If `None`, a default
37 /// token is created that never cancels.
38 /// * `on_error_callback` - Error handler called when `on_next_func` returns an error.
39 ///
40 /// # Type Parameters
41 ///
42 /// * `F` - Function type for the item handler
43 /// * `Fut` - Future type returned by the handler
44 /// * `E` - Error type
45 /// * `OnError` - Function type for error handling
46 ///
47 /// # Errors
48 ///
49 /// Returns `Ok(())` on stream completion. Errors from item handlers are passed
50 /// to the error callback. The subscription continues processing subsequent items
51 /// even if individual items fail, unless the cancellation token is triggered.
52 ///
53 /// # See Also
54 ///
55 /// - [`subscribe_latest`](crate::SubscribeLatestExt::subscribe_latest) - Cancels old work for new items
56 ///
57 /// # Examples
58 ///
59 /// ## Basic Usage
60 ///
61 /// Process all items sequentially:
62 ///
63 /// ```
64 /// use fluxion_exec::SubscribeExt;
65 /// use futures::channel::mpsc::unbounded;
66 /// use futures::stream;
67 /// use futures::StreamExt;
68 /// use std::sync::Arc;
69 /// use futures::lock::Mutex;
70 ///
71 /// # #[tokio::main]
72 /// # async fn main() {
73 /// let results = Arc::new(Mutex::new(Vec::new()));
74 /// let results_clone = results.clone();
75 /// let (notify_tx, mut notify_rx) = unbounded();
76 ///
77 /// let stream = stream::iter(vec![1, 2, 3, 4, 5]);
78 ///
79 /// // Subscribe and process each item
80 /// stream.subscribe(
81 /// move |item, _token| {
82 /// let results = results_clone.clone();
83 /// let notify_tx = notify_tx.clone();
84 /// async move {
85 /// results.lock().await.push(item * 2);
86 /// let _ = notify_tx.unbounded_send(());
87 /// Ok::<(), std::io::Error>(())
88 /// }
89 /// },
90 /// |_err| {}, // Ignore errors
91 /// None, // No cancellation
92 /// ).await.unwrap();
93 ///
94 /// // Wait for all 5 items to be processed
95 /// for _ in 0..5 {
96 /// notify_rx.next().await.unwrap();
97 /// }
98 ///
99 /// let processed = results.lock().await;
100 /// assert!(processed.contains(&2));
101 /// assert!(processed.contains(&4));
102 /// # }
103 /// ```
104 ///
105 /// ## With Error Handling
106 ///
107 /// Use an error callback to handle errors without stopping the stream:
108 ///
109 /// ```
110 /// use fluxion_exec::SubscribeExt;
111 /// use futures::channel::mpsc::unbounded;
112 /// use futures::stream;
113 /// use futures::StreamExt;
114 /// use std::sync::Arc;
115 /// use futures::lock::Mutex;
116 ///
117 /// # #[tokio::main]
118 /// # async fn main() {
119 /// #[derive(Debug)]
120 /// struct MyError(String);
121 /// impl core::fmt::Display for MyError {
122 /// fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
123 /// write!(f, "MyError: {}", self.0)
124 /// }
125 /// }
126 /// impl std::error::Error for MyError {}
127 ///
128 /// let error_count = Arc::new(Mutex::new(0));
129 /// let error_count_clone = error_count.clone();
130 /// let (notify_tx, mut notify_rx) = unbounded();
131 ///
132 /// let stream = stream::iter(vec![1, 2, 3, 4, 5]);
133 ///
134 /// stream.subscribe(
135 /// move |item, _token| {
136 /// let notify_tx = notify_tx.clone();
137 /// async move {
138 /// let res = if item % 2 == 0 {
139 /// Err(MyError(format!("Even number: {}", item)))
140 /// } else {
141 /// Ok(())
142 /// };
143 /// // Signal completion regardless of success/failure
144 /// // Note: In real code, you might signal in the error callback too
145 /// // but here we just want to know the handler finished.
146 /// // However, subscribe spawns the handler. If it errors,
147 /// // the error callback is called.
148 /// // We need to signal completion in both paths.
149 /// // Since the handler returns the error, we can't signal *after* returning Err.
150 /// // So we signal before returning.
151 /// let _ = notify_tx.unbounded_send(());
152 /// res
153 /// }
154 /// },
155 /// move |_err| {
156 /// let count = error_count_clone.clone();
157 /// tokio::spawn(async move {
158 /// *count.lock().await += 1;
159 /// });
160 /// },
161 /// None,
162 /// ).await.unwrap();
163 ///
164 /// // Wait for 5 items
165 /// for _ in 0..5 {
166 /// notify_rx.next().await.unwrap();
167 /// }
168 ///
169 /// // Give a tiny bit of time for the error callback spawn to finish updating the count
170 /// // (Since the callback spawns another task)
171 /// // Alternatively, we could use a channel in the error callback too.
172 /// tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
173 ///
174 /// assert_eq!(*error_count.lock().await, 2); // Items 2 and 4 errored
175 /// # }
176 /// ```
177 ///
178 /// ## With Cancellation
179 ///
180 /// Use a cancellation token to stop processing:
181 ///
182 /// ```
183 /// use fluxion_exec::SubscribeExt;
184 /// use futures::channel::mpsc::unbounded;
185 /// use futures::StreamExt;
186 /// use fluxion_core::CancellationToken;
187 /// use std::sync::Arc;
188 /// use futures::lock::Mutex;
189 ///
190 /// # #[tokio::main]
191 /// # async fn main() {
192 /// let (tx, rx) = unbounded();
193 /// let stream = rx;
194 ///
195 /// let cancel_token = CancellationToken::new();
196 /// let cancel_clone = cancel_token.clone();
197 ///
198 /// let processed = Arc::new(Mutex::new(Vec::new()));
199 /// let processed_clone = processed.clone();
200 /// let (notify_tx, mut notify_rx) = unbounded();
201 ///
202 /// let handle = tokio::spawn(async move {
203 /// stream.subscribe(
204 /// move |item, token| {
205 /// let vec = processed_clone.clone();
206 /// let notify_tx = notify_tx.clone();
207 /// async move {
208 /// if token.is_cancelled() {
209 /// return Ok(());
210 /// }
211 /// vec.lock().await.push(item);
212 /// let _ = notify_tx.unbounded_send(());
213 /// Ok::<(), std::io::Error>(())
214 /// }
215 /// },
216 /// |_| {}, // Ignore errors
217 /// Some(cancel_token),
218 /// ).await
219 /// });
220 ///
221 /// // Send items
222 /// tx.unbounded_send(1).unwrap();
223 /// tx.unbounded_send(2).unwrap();
224 /// tx.unbounded_send(3).unwrap();
225 ///
226 /// // Wait for first item to be processed
227 /// notify_rx.next().await.unwrap();
228 ///
229 /// // Cancel now
230 /// cancel_clone.cancel();
231 /// drop(tx);
232 ///
233 /// handle.await.unwrap().unwrap();
234 ///
235 /// // At least one item should be processed before cancellation
236 /// assert!(!processed.lock().await.is_empty());
237 /// # }
238 /// ```
239 ///
240 /// ## Database Write Pattern
241 ///
242 /// Process events and persist to a database:
243 ///
244 /// ```
245 /// use fluxion_exec::SubscribeExt;
246 /// use futures::channel::mpsc::unbounded;
247 /// use futures::stream;
248 /// use futures::StreamExt;
249 /// use std::sync::Arc;
250 /// use futures::lock::Mutex;
251 ///
252 /// # #[tokio::main]
253 /// # async fn main() {
254 /// #[derive(Clone, Debug)]
255 /// struct Event { id: u32, data: String }
256 ///
257 /// // Simulated database
258 /// let db = Arc::new(Mutex::new(Vec::new()));
259 /// let db_clone = db.clone();
260 /// let (notify_tx, mut notify_rx) = unbounded();
261 ///
262 /// let events = vec![
263 /// Event { id: 1, data: "event1".to_string() },
264 /// Event { id: 2, data: "event2".to_string() },
265 /// ];
266 ///
267 /// let stream = stream::iter(events);
268 ///
269 /// stream.subscribe(
270 /// move |event, _token| {
271 /// let db = db_clone.clone();
272 /// let notify_tx = notify_tx.clone();
273 /// async move {
274 /// // Simulate database write
275 /// db.lock().await.push(event);
276 /// let _ = notify_tx.unbounded_send(());
277 /// Ok::<(), std::io::Error>(())
278 /// }
279 /// },
280 /// |err| eprintln!("DB Error: {}", err),
281 /// None,
282 /// ).await.unwrap();
283 ///
284 /// // Wait for 2 events
285 /// notify_rx.next().await.unwrap();
286 /// notify_rx.next().await.unwrap();
287 ///
288 /// assert_eq!(db.lock().await.len(), 2);
289 /// # }
290 /// ```
291 ///
292 /// # Thread Safety
293 ///
294 /// Handlers are executed sequentially on the calling task. The subscription
295 /// completes when the stream ends or cancellation is triggered.
296 async fn subscribe<F, Fut, E, OnError>(
297 self,
298 on_next_func: F,
299 on_error_callback: OnError,
300 cancellation_token: Option<CancellationToken>,
301 ) -> Result<()>
302 where
303 F: Fn(T, CancellationToken) -> Fut + Clone + 'static,
304 Fut: Future<Output = core::result::Result<(), E>> + 'static,
305 OnError: Fn(E) + Clone + 'static,
306 T: Debug + Clone + 'static,
307 E: 'static;
308}
309
310// Non-WASM version with Send + Sync bounds for multi-threaded runtimes
311#[cfg(not(target_arch = "wasm32"))]
312#[async_trait]
313pub trait SubscribeExt<T>: Stream<Item = T> + Sized {
314 async fn subscribe<F, Fut, E, OnError>(
315 self,
316 on_next_func: F,
317 on_error_callback: OnError,
318 cancellation_token: Option<CancellationToken>,
319 ) -> Result<()>
320 where
321 F: Fn(T, CancellationToken) -> Fut + Clone + Send + Sync + 'static,
322 Fut: Future<Output = core::result::Result<(), E>> + Send + 'static,
323 OnError: Fn(E) + Clone + Send + Sync + 'static,
324 T: Debug + Send + Clone + 'static,
325 E: Send + 'static;
326}
327
328// WASM implementation
329#[cfg(target_arch = "wasm32")]
330#[async_trait(?Send)]
331impl<S, T> SubscribeExt<T> for S
332where
333 S: Stream<Item = T> + Unpin + 'static,
334 T: 'static,
335{
336 async fn subscribe<F, Fut, E, OnError>(
337 self,
338 on_next_func: F,
339 on_error_callback: OnError,
340 cancellation_token: Option<CancellationToken>,
341 ) -> Result<()>
342 where
343 F: Fn(T, CancellationToken) -> Fut + Clone + 'static,
344 Fut: Future<Output = core::result::Result<(), E>> + 'static,
345 OnError: Fn(E) + Clone + 'static,
346 T: Debug + Clone + 'static,
347 E: 'static,
348 {
349 subscribe_impl(self, on_next_func, on_error_callback, cancellation_token).await
350 }
351}
352
353// Non-WASM implementation with Send + Sync bounds
354#[cfg(not(target_arch = "wasm32"))]
355#[async_trait]
356impl<S, T> SubscribeExt<T> for S
357where
358 S: Stream<Item = T> + Send + Unpin + 'static,
359 T: Send + 'static,
360{
361 async fn subscribe<F, Fut, E, OnError>(
362 self,
363 on_next_func: F,
364 on_error_callback: OnError,
365 cancellation_token: Option<CancellationToken>,
366 ) -> Result<()>
367 where
368 F: Fn(T, CancellationToken) -> Fut + Clone + Send + Sync + 'static,
369 Fut: Future<Output = core::result::Result<(), E>> + Send + 'static,
370 OnError: Fn(E) + Clone + Send + Sync + 'static,
371 T: Debug + Send + Clone + 'static,
372 E: Send + 'static,
373 {
374 subscribe_impl(self, on_next_func, on_error_callback, cancellation_token).await
375 }
376}
377
378// Shared implementation for both WASM and non-WASM
379async fn subscribe_impl<S, T, F, Fut, E, OnError>(
380 mut stream: S,
381 on_next_func: F,
382 on_error_callback: OnError,
383 cancellation_token: Option<CancellationToken>,
384) -> Result<()>
385where
386 S: Stream<Item = T> + Unpin,
387 F: Fn(T, CancellationToken) -> Fut + Clone,
388 Fut: Future<Output = core::result::Result<(), E>>,
389 OnError: Fn(E) + Clone,
390 T: Debug + Clone,
391{
392 let cancellation_token = cancellation_token.unwrap_or_default();
393
394 while let Some(item) = stream.next().await {
395 if cancellation_token.is_cancelled() {
396 break;
397 }
398
399 // Call handler directly (sequential processing)
400 let result = on_next_func(item.clone(), cancellation_token.clone()).await;
401
402 if let Err(error) = result {
403 on_error_callback(error);
404 }
405 }
406
407 Ok(())
408}