fluxion_exec/subscribe_async.rs
1// Copyright 2025 Umberto Gotti <umberto.gotti@umbertogotti.dev>
2// Licensed under the Apache License, Version 2.0
3// http://www.apache.org/licenses/LICENSE-2.0
4
5use async_trait::async_trait;
6use futures::stream::Stream;
7use futures::stream::StreamExt;
8use std::future::Future;
9use tokio::sync::mpsc::unbounded_channel;
10use tokio_util::sync::CancellationToken;
11
12use fluxion_core::{FluxionError, Result};
13
14/// Extension trait providing async subscription capabilities for streams.
15///
16/// This trait enables processing stream items with async handlers in a sequential manner.
17#[async_trait]
18pub trait SubscribeAsyncExt<T>: Stream<Item = T> + Sized {
19 /// Subscribes to the stream with an async handler, processing items sequentially.
20 ///
21 /// This method consumes the stream and spawns async tasks to process each item.
22 /// Items are processed in the order they arrive, with each item's handler running
23 /// to completion before the next item is processed (though handlers run concurrently
24 /// via tokio spawn).
25 ///
26 /// # Behavior
27 ///
28 /// - Processes each stream item with the provided async handler
29 /// - Spawns a new task for each item (non-blocking)
30 /// - Continues until stream ends or cancellation token is triggered
31 /// - Errors from handlers are passed to the error callback if provided
32 /// - If no error callback provided, errors are collected and returned on completion
33 ///
34 /// # Arguments
35 ///
36 /// * `on_next_func` - Async function called for each stream item. Receives the item
37 /// and a cancellation token. Returns `Result<(), E>`.
38 /// * `cancellation_token` - Optional token to stop processing. If `None`, a default
39 /// token is created that never cancels.
40 /// * `on_error_callback` - Optional error handler called when `on_next_func` returns
41 /// an error. If `None`, errors are collected and returned.
42 ///
43 /// # Type Parameters
44 ///
45 /// * `F` - Function type for the item handler
46 /// * `Fut` - Future type returned by the handler
47 /// * `E` - Error type that implements `std::error::Error`
48 /// * `OnError` - Function type for error handling
49 ///
50 /// # Errors
51 ///
52 /// Returns `Err(FluxionError::MultipleErrors)` if any items failed to process and
53 /// no error callback was provided. If an error callback is provided, errors are
54 /// passed to it and the function returns `Ok(())` on stream completion.
55 ///
56 /// The subscription continues processing subsequent items even if individual items
57 /// fail, unless the cancellation token is triggered.
58 ///
59 /// # See Also
60 ///
61 /// - [`subscribe_latest_async`](crate::SubscribeLatestAsyncExt::subscribe_latest_async) - Cancels old work for new items
62 ///
63 /// # Examples
64 ///
65 /// ```text
66 /// use fluxion_exec::SubscribeAsyncExt;
67 /// use futures::StreamExt;
68 /// use tokio_stream::wrappers::UnboundedReceiverStream;
69 ///
70 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
71 /// let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
72 /// let stream = UnboundedReceiverStream::new(rx);
73 ///
74 /// stream.subscribe_async(
75 /// |item, _token| async move {
76 /// // Process item
77 /// println!("Processing: {:?}", item);
78 /// Ok::<(), std::io::Error>(())
79 /// },
80 /// None,
81 /// Some(|err| eprintln!("Error: {}", err))
82 /// ).await?;
83 /// # Ok(())
84 /// # }
85 /// ```
86 ///
87 /// # With Cancellation
88 ///
89 /// ```text
90 /// # use fluxion_exec::SubscribeAsyncExt;
91 /// # use futures::StreamExt;
92 /// # use tokio_util::sync::CancellationToken;
93 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
94 /// # let stream = futures::stream::iter(vec![1, 2, 3]);
95 /// let cancel = CancellationToken::new();
96 /// let cancel_clone = cancel.clone();
97 ///
98 /// tokio::spawn(async move {
99 /// tokio::time::sleep(std::time::Duration::from_secs(5)).await;
100 /// cancel_clone.cancel();
101 /// });
102 ///
103 /// stream.subscribe_async(
104 /// |item, token| async move {
105 /// if token.is_cancelled() {
106 /// return Ok(());
107 /// }
108 /// // Process item...
109 /// Ok::<(), std::io::Error>(())
110 /// },
111 /// Some(cancel),
112 /// None
113 /// ).await?;
114 /// # Ok(())
115 /// # }
116 /// ```
117 ///
118 /// # Thread Safety
119 ///
120 /// All spawned tasks run on the tokio runtime. The subscription completes
121 /// when the stream ends, not when all spawned tasks complete.
122 async fn subscribe_async<F, Fut, E, OnError>(
123 self,
124 on_next_func: F,
125 cancellation_token: Option<CancellationToken>,
126 on_error_callback: Option<OnError>,
127 ) -> Result<()>
128 where
129 F: Fn(T, CancellationToken) -> Fut + Clone + Send + Sync + 'static,
130 Fut: Future<Output = std::result::Result<(), E>> + Send + 'static,
131 OnError: Fn(E) + Clone + Send + Sync + 'static,
132 T: std::fmt::Debug + Send + Clone + 'static,
133 E: std::error::Error + Send + Sync + 'static;
134}
135
136#[async_trait]
137impl<S, T> SubscribeAsyncExt<T> for S
138where
139 S: Stream<Item = T> + Send + Unpin + 'static,
140 T: Send + 'static,
141{
142 async fn subscribe_async<F, Fut, E, OnError>(
143 mut self,
144 on_next_func: F,
145 cancellation_token: Option<CancellationToken>,
146 on_error_callback: Option<OnError>,
147 ) -> Result<()>
148 where
149 F: Fn(T, CancellationToken) -> Fut + Clone + Send + Sync + 'static,
150 Fut: Future<Output = std::result::Result<(), E>> + Send + 'static,
151 OnError: Fn(E) + Clone + Send + Sync + 'static,
152 T: std::fmt::Debug + Send + Clone + 'static,
153 E: std::error::Error + Send + Sync + 'static,
154 {
155 let cancellation_token = cancellation_token.unwrap_or_default();
156 let (error_tx, mut error_rx) = unbounded_channel();
157
158 while let Some(item) = self.next().await {
159 if cancellation_token.is_cancelled() {
160 break;
161 }
162
163 let on_next_func = on_next_func.clone();
164 let cancellation_token = cancellation_token.clone();
165 let on_error_callback = on_error_callback.clone();
166 let error_tx = error_tx.clone();
167
168 tokio::spawn(async move {
169 let result = on_next_func(item.clone(), cancellation_token).await;
170
171 if let Err(error) = result {
172 if let Some(on_error_callback) = on_error_callback {
173 on_error_callback(error);
174 } else {
175 // Collect error for later aggregation
176 let _ = error_tx.send(error);
177 }
178 }
179 });
180 }
181
182 // Drop the original sender so the channel closes
183 drop(error_tx);
184
185 // Collect all errors from the channel
186 let mut collected_errors = Vec::new();
187 while let Some(error) = error_rx.recv().await {
188 collected_errors.push(error);
189 }
190
191 if !collected_errors.is_empty() {
192 Err(FluxionError::from_user_errors(collected_errors))
193 } else {
194 Ok(())
195 }
196 }
197}