SubscribeAsyncExt

Trait SubscribeAsyncExt 

Source
pub trait SubscribeAsyncExt<T>: Stream<Item = T> + Sized {
    // Required method
    fn subscribe_async<'async_trait, F, Fut, E, OnError>(
        self,
        on_next_func: F,
        cancellation_token: Option<CancellationToken>,
        on_error_callback: Option<OnError>,
    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
       where F: Fn(T, CancellationToken) -> Fut + Clone + Send + Sync + 'static + 'async_trait,
             Fut: Future<Output = Result<(), E>> + Send + 'static + 'async_trait,
             OnError: Fn(E) + Clone + Send + Sync + 'static + 'async_trait,
             T: Debug + Send + Clone + 'static,
             E: Error + Send + Sync + 'static + 'async_trait,
             Self: 'async_trait;
}
Expand description

Extension trait providing async subscription capabilities for streams.

This trait enables processing stream items with async handlers in a sequential manner.

Required Methods§

Source

fn subscribe_async<'async_trait, F, Fut, E, OnError>( self, on_next_func: F, cancellation_token: Option<CancellationToken>, on_error_callback: Option<OnError>, ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where F: Fn(T, CancellationToken) -> Fut + Clone + Send + Sync + 'static + 'async_trait, Fut: Future<Output = Result<(), E>> + Send + 'static + 'async_trait, OnError: Fn(E) + Clone + Send + Sync + 'static + 'async_trait, T: Debug + Send + Clone + 'static, E: Error + Send + Sync + 'static + 'async_trait, Self: 'async_trait,

Subscribes to the stream with an async handler, processing items sequentially.

This method consumes the stream and spawns async tasks to process each item. Items are processed in the order they arrive, with each item’s handler running to completion before the next item is processed (though handlers run concurrently via tokio spawn).

§Behavior
  • Processes each stream item with the provided async handler
  • Spawns a new task for each item (non-blocking)
  • Continues until stream ends or cancellation token is triggered
  • Errors from handlers are passed to the error callback if provided
  • If no error callback provided, errors are collected and returned on completion
§Arguments
  • on_next_func - Async function called for each stream item. Receives the item and a cancellation token. Returns Result<(), E>.
  • cancellation_token - Optional token to stop processing. If None, a default token is created that never cancels.
  • on_error_callback - Optional error handler called when on_next_func returns an error. If None, errors are collected and returned.
§Type Parameters
  • F - Function type for the item handler
  • Fut - Future type returned by the handler
  • E - Error type that implements std::error::Error
  • OnError - Function type for error handling
§Errors

Returns Err(FluxionError::MultipleErrors) if any items failed to process and no error callback was provided. If an error callback is provided, errors are passed to it and the function returns Ok(()) on stream completion.

The subscription continues processing subsequent items even if individual items fail, unless the cancellation token is triggered.

§See Also
§Examples
use fluxion_exec::SubscribeAsyncExt;
use futures::StreamExt;
use tokio_stream::wrappers::UnboundedReceiverStream;

# async fn example() -> Result<(), Box<dyn std::error::Error>> {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let stream = UnboundedReceiverStream::new(rx);

stream.subscribe_async(
    |item, _token| async move {
        // Process item
        println!("Processing: {:?}", item);
        Ok::<(), std::io::Error>(())
    },
    None,
    Some(|err| eprintln!("Error: {}", err))
).await?;
# Ok(())
# }
§With Cancellation
# use fluxion_exec::SubscribeAsyncExt;
# use futures::StreamExt;
# use tokio_util::sync::CancellationToken;
# async fn example() -> Result<(), Box<dyn std::error::Error>> {
# let stream = futures::stream::iter(vec![1, 2, 3]);
let cancel = CancellationToken::new();
let cancel_clone = cancel.clone();

tokio::spawn(async move {
    tokio::time::sleep(std::time::Duration::from_secs(5)).await;
    cancel_clone.cancel();
});

stream.subscribe_async(
    |item, token| async move {
        if token.is_cancelled() {
            return Ok(());
        }
        // Process item...
        Ok::<(), std::io::Error>(())
    },
    Some(cancel),
    None
).await?;
# Ok(())
# }
§Thread Safety

All spawned tasks run on the tokio runtime. The subscription completes when the stream ends, not when all spawned tasks complete.

Dyn Compatibility§

This trait is not dyn compatible.

In older versions of Rust, dyn compatibility was called "object safety", so this trait is not object safe.

Implementors§

Source§

impl<S, T> SubscribeAsyncExt<T> for S
where S: Stream<Item = T> + Send + Unpin + 'static, T: Send + 'static,