fluxion_exec/
subscribe_async.rs

1// Copyright 2025 Umberto Gotti <umberto.gotti@umbertogotti.dev>
2// Licensed under the Apache License, Version 2.0
3// http://www.apache.org/licenses/LICENSE-2.0
4
5use async_trait::async_trait;
6use futures::stream::Stream;
7use futures::stream::StreamExt;
8use std::future::Future;
9use tokio::sync::mpsc::unbounded_channel;
10use tokio_util::sync::CancellationToken;
11
12use fluxion_core::{FluxionError, Result};
13
14/// Extension trait providing async subscription capabilities for streams.
15///
16/// This trait enables processing stream items with async handlers in a sequential manner.
17#[async_trait]
18pub trait SubscribeAsyncExt<T>: Stream<Item = T> + Sized {
19    /// Subscribes to the stream with an async handler, processing items sequentially.
20    ///
21    /// This method consumes the stream and spawns async tasks to process each item.
22    /// Items are processed in the order they arrive, with each item's handler running
23    /// to completion before the next item is processed (though handlers run concurrently
24    /// via tokio spawn).
25    ///
26    /// # Behavior
27    ///
28    /// - Processes each stream item with the provided async handler
29    /// - Spawns a new task for each item (non-blocking)
30    /// - Continues until stream ends or cancellation token is triggered
31    /// - Errors from handlers are passed to the error callback if provided
32    /// - If no error callback provided, errors are collected and returned on completion
33    ///
34    /// # Arguments
35    ///
36    /// * `on_next_func` - Async function called for each stream item. Receives the item
37    ///                    and a cancellation token. Returns `Result<(), E>`.
38    /// * `cancellation_token` - Optional token to stop processing. If `None`, a default
39    ///                          token is created that never cancels.
40    /// * `on_error_callback` - Optional error handler called when `on_next_func` returns
41    ///                         an error. If `None`, errors are collected and returned.
42    ///
43    /// # Type Parameters
44    ///
45    /// * `F` - Function type for the item handler
46    /// * `Fut` - Future type returned by the handler
47    /// * `E` - Error type that implements `std::error::Error`
48    /// * `OnError` - Function type for error handling
49    ///
50    /// # Errors
51    ///
52    /// Returns `Err(FluxionError::MultipleErrors)` if any items failed to process and
53    /// no error callback was provided. If an error callback is provided, errors are
54    /// passed to it and the function returns `Ok(())` on stream completion.
55    ///
56    /// The subscription continues processing subsequent items even if individual items
57    /// fail, unless the cancellation token is triggered.
58    ///
59    /// # See Also
60    ///
61    /// - [`subscribe_latest_async`](crate::SubscribeLatestAsyncExt::subscribe_latest_async) - Cancels old work for new items
62    ///
63    /// # Examples
64    ///
65    /// ```text
66    /// use fluxion_exec::SubscribeAsyncExt;
67    /// use futures::StreamExt;
68    /// use tokio_stream::wrappers::UnboundedReceiverStream;
69    ///
70    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
71    /// let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
72    /// let stream = UnboundedReceiverStream::new(rx);
73    ///
74    /// stream.subscribe_async(
75    ///     |item, _token| async move {
76    ///         // Process item
77    ///         println!("Processing: {:?}", item);
78    ///         Ok::<(), std::io::Error>(())
79    ///     },
80    ///     None,
81    ///     Some(|err| eprintln!("Error: {}", err))
82    /// ).await?;
83    /// # Ok(())
84    /// # }
85    /// ```
86    ///
87    /// # With Cancellation
88    ///
89    /// ```text
90    /// # use fluxion_exec::SubscribeAsyncExt;
91    /// # use futures::StreamExt;
92    /// # use tokio_util::sync::CancellationToken;
93    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
94    /// # let stream = futures::stream::iter(vec![1, 2, 3]);
95    /// let cancel = CancellationToken::new();
96    /// let cancel_clone = cancel.clone();
97    ///
98    /// tokio::spawn(async move {
99    ///     tokio::time::sleep(std::time::Duration::from_secs(5)).await;
100    ///     cancel_clone.cancel();
101    /// });
102    ///
103    /// stream.subscribe_async(
104    ///     |item, token| async move {
105    ///         if token.is_cancelled() {
106    ///             return Ok(());
107    ///         }
108    ///         // Process item...
109    ///         Ok::<(), std::io::Error>(())
110    ///     },
111    ///     Some(cancel),
112    ///     None
113    /// ).await?;
114    /// # Ok(())
115    /// # }
116    /// ```
117    ///
118    /// # Thread Safety
119    ///
120    /// All spawned tasks run on the tokio runtime. The subscription completes
121    /// when the stream ends, not when all spawned tasks complete.
122    async fn subscribe_async<F, Fut, E, OnError>(
123        self,
124        on_next_func: F,
125        cancellation_token: Option<CancellationToken>,
126        on_error_callback: Option<OnError>,
127    ) -> Result<()>
128    where
129        F: Fn(T, CancellationToken) -> Fut + Clone + Send + Sync + 'static,
130        Fut: Future<Output = std::result::Result<(), E>> + Send + 'static,
131        OnError: Fn(E) + Clone + Send + Sync + 'static,
132        T: std::fmt::Debug + Send + Clone + 'static,
133        E: std::error::Error + Send + Sync + 'static;
134}
135
136#[async_trait]
137impl<S, T> SubscribeAsyncExt<T> for S
138where
139    S: Stream<Item = T> + Send + Unpin + 'static,
140    T: Send + 'static,
141{
142    async fn subscribe_async<F, Fut, E, OnError>(
143        mut self,
144        on_next_func: F,
145        cancellation_token: Option<CancellationToken>,
146        on_error_callback: Option<OnError>,
147    ) -> Result<()>
148    where
149        F: Fn(T, CancellationToken) -> Fut + Clone + Send + Sync + 'static,
150        Fut: Future<Output = std::result::Result<(), E>> + Send + 'static,
151        OnError: Fn(E) + Clone + Send + Sync + 'static,
152        T: std::fmt::Debug + Send + Clone + 'static,
153        E: std::error::Error + Send + Sync + 'static,
154    {
155        let cancellation_token = cancellation_token.unwrap_or_default();
156        let (error_tx, mut error_rx) = unbounded_channel();
157
158        while let Some(item) = self.next().await {
159            if cancellation_token.is_cancelled() {
160                break;
161            }
162
163            let on_next_func = on_next_func.clone();
164            let cancellation_token = cancellation_token.clone();
165            let on_error_callback = on_error_callback.clone();
166            let error_tx = error_tx.clone();
167
168            tokio::spawn(async move {
169                let result = on_next_func(item.clone(), cancellation_token).await;
170
171                if let Err(error) = result {
172                    if let Some(on_error_callback) = on_error_callback {
173                        on_error_callback(error);
174                    } else {
175                        // Collect error for later aggregation
176                        let _ = error_tx.send(error);
177                    }
178                }
179            });
180        }
181
182        // Drop the original sender so the channel closes
183        drop(error_tx);
184
185        // Collect all errors from the channel
186        let mut collected_errors = Vec::new();
187        while let Some(error) = error_rx.recv().await {
188            collected_errors.push(error);
189        }
190
191        if !collected_errors.is_empty() {
192            Err(FluxionError::from_user_errors(collected_errors))
193        } else {
194            Ok(())
195        }
196    }
197}