fluxion-exec
Part of Fluxion - A reactive stream processing library for Rust
Async stream execution utilities that enable processing streams with async handlers, providing fine-grained control over concurrency, cancellation, and error handling.
Overview
fluxion-exec provides two powerful execution patterns for consuming async streams:
subscribe_async- Sequential processing where every item is processed to completionsubscribe_latest_async- Latest-value processing with automatic cancellation of outdated work
These utilities solve the common problem of how to process stream items with async functions while controlling concurrency, managing cancellation, and handling errors gracefully.
Table of Contents
- Features
- Installation
- Quick Start
- Core Concepts
- Execution Patterns
- Detailed Examples
- Use Cases
- Performance Characteristics
- Error Handling
- Cancellation
- Common Patterns
- Anti-Patterns
- Comparison with Alternatives
- Troubleshooting
- API Reference
- License
Features
✨ Two Execution Modes
- Sequential processing - process every item in order
- Latest-value processing - skip intermediate values, process only latest
🎯 Flexible Error Handling
- Custom error callbacks
- Error collection and propagation
- Continue processing on errors
🚀 Async-First Design
- Built on tokio runtime
- Spawns background tasks for concurrent execution
- Non-blocking stream consumption
⚡ Cancellation Support
- Built-in
CancellationTokenintegration - Automatic cancellation of outdated work (in
subscribe_latest_async) - Graceful shutdown support
🔧 Extension Trait Pattern
- Works with any
Streamimplementation - Compose with other stream operators
- Type-safe and ergonomic API
Installation
Add to your Cargo.toml:
[]
= "0.4"
= { = "1.48", = ["rt", "sync", "macros"] }
= "0.1"
= "0.3"
Quick Start
The following sections contain a wide range of examples and suggestions. These are indicative and should not be expected to compile as they are. Check the following files for genuine runnable examples that can be used as they are:
Sequential Processing
async
Latest-Value Processing
async
Core Concepts
Subscription
A subscription attaches an async handler to a stream and processes items until the stream ends or a cancellation token is triggered.
Sequential Execution
With subscribe_async, items are processed one at a time. Each item's handler must complete before the next item is processed. This guarantees:
- Every item is processed
- Processing order is maintained
- No concurrent execution of handlers
Latest-Value Processing
With subscribe_latest_async, only the most recent value is processed. When new items arrive during processing:
- Current processing continues
- Latest item is queued
- Intermediate items are discarded
- After completion, the latest queued item is processed
This is ideal for scenarios where:
- Only current state matters
- Old values become irrelevant
- Expensive operations should skip stale data
Execution Patterns
subscribe_async - Sequential Processing
Process every item in order with async handlers.
stream.subscribe_async.await?;
When to use:
- Every item must be processed (e.g., database writes, event logging)
- Processing order matters
- Side effects must occur for each item
- Work cannot be skipped
Examples:
- Writing audit logs
- Processing financial transactions
- Sending notifications
- Persisting events to database
subscribe_latest_async - Latest-Value Processing
Process only the latest value, canceling work for outdated items.
stream.subscribe_latest_async.await?;
When to use:
- Only latest value matters (e.g., UI rendering, auto-save)
- Old values become irrelevant
- Expensive operations should skip intermediate values
- Real-time updates
Examples:
- Rendering UI with latest state
- Search-as-you-type queries
- Live preview updates
- Auto-saving current document
Detailed Examples
Example 1: Database Event Processing
Process every event sequentially and persist to database:
async
async
Example 2: Search-As-You-Type
Cancel outdated searches when new queries arrive:
async
async
Example 3: Error Handling and Recovery
;
async
Example 4: Graceful Shutdown with Cancellation
async
Use Cases
Sequential Processing (subscribe_async)
| Use Case | Description |
|---|---|
| Event Logging | Write every event to logs/database |
| Transaction Processing | Process financial transactions in order |
| Message Queue Consumer | Consume and acknowledge every message |
| Audit Trail | Maintain complete audit history |
| Batch ETL | Extract, transform, load data sequentially |
| Notification Service | Send every notification |
| File Processing | Process every file in a directory |
Latest-Value Processing (subscribe_latest_async)
| Use Case | Description |
|---|---|
| UI Rendering | Render only the latest application state |
| Auto-Save | Save current document (skip intermediate edits) |
| Live Preview | Update preview with latest content |
| Search Suggestions | Show results for latest query only |
| Real-time Dashboard | Display current metrics |
| Configuration Reload | Apply latest config changes |
| Debounced API Calls | Call API with latest parameters |
Performance Characteristics
Sequential Processing (subscribe_async)
- Latency: $O(n \times t)$ where $n$ is number of items, $t$ is processing time
- Throughput: Limited by handler execution time
- Memory: $O(1)$ - processes one item at a time
- Ordering: Strict sequential order maintained
- Guarantees: Every item processed exactly once
Best for: Correctness and completeness over speed
Latest-Value Processing (subscribe_latest_async)
- Latency: $O(t)$ for latest item (intermediate items skipped)
- Throughput: Higher than sequential (skips work)
- Memory: $O(1)$ - one active task, one queued item
- Ordering: Processes latest available
- Guarantees: At most 2 items in flight (current + latest)
Best for: Responsiveness and efficiency over completeness
Error Handling
With Error Callback
Errors are passed to the callback, processing continues:
stream.subscribe_async.await?; // Returns Ok(()) even if individual items failed
Without Error Callback
Errors are collected and returned as MultipleErrors:
match stream.subscribe_async.await
Fail-Fast Pattern
Return error immediately to stop processing:
stream.subscribe_async.await?; // Stops on first error
Cancellation
Using CancellationToken
let cancel_token = new;
let cancel_clone = cancel_token.clone;
// Start processing
let handle = spawn;
// Cancel from another task
spawn;
Automatic Cancellation in subscribe_latest_async
The cancellation token passed to handlers is automatically cancelled when newer items arrive:
stream.subscribe_latest_async.await?;
Common Patterns
Pattern: Retry on Failure
stream.subscribe_async.await?;
Pattern: Rate Limiting
let last_process = new;
stream.subscribe_async.await?;
Pattern: Batch Processing
stream
.chunks // Batch 100 items
.subscribe_async
.await?;
Pattern: Conditional Processing
stream
.filter
.subscribe_async
.await?;
Anti-Patterns
❌ Don't: Use subscribe_latest_async for Critical Work
// BAD: Payment processing might be skipped!
payment_stream.subscribe_latest_async.await?;
✅ Good: Use subscribe_async for work that must complete:
payment_stream.subscribe_async.await?;
❌ Don't: Block in Async Handlers
// BAD: Blocking operations stall the executor
stream.subscribe_async.await?;
✅ Good: Use async operations:
stream.subscribe_async.await?;
❌ Don't: Perform CPU-Intensive Work on Async Runtime
// BAD: CPU work blocks async tasks
stream.subscribe_async.await?;
✅ Good: Use spawn_blocking for CPU work:
stream.subscribe_async.await?;
❌ Don't: Ignore Cancellation Tokens
// BAD: Long-running work that can't be cancelled
stream.subscribe_latest_async.await?;
✅ Good: Check cancellation periodically:
stream.subscribe_latest_async.await?;
Comparison with Alternatives
vs futures::StreamExt::for_each
| Feature | subscribe_async |
for_each |
|---|---|---|
| Execution | Spawns tasks | Inline execution |
| Cancellation | Built-in token support | Manual |
| Error handling | Callbacks + collection | Returns first error |
| Concurrency | Configurable | Sequential only |
vs futures::StreamExt::buffer_unordered
| Feature | subscribe_async |
buffer_unordered |
|---|---|---|
| Ordering | Sequential | Unordered |
| Concurrency | One at a time | N concurrent |
| Backpressure | Automatic | Manual (buffer size) |
| Use case | Sequential processing | Parallel processing |
vs Manual Task Spawning
| Feature | subscribe_latest_async |
Manual spawning |
|---|---|---|
| Cancellation | Automatic | Manual |
| Latest-value | Built-in | Manual tracking |
| Error handling | Integrated | Manual |
| Cleanup | Automatic | Manual |
Troubleshooting
Problem: Handler Never Completes
Symptoms: Stream processing hangs indefinitely
Causes:
- Awaiting on an infinite stream without cancellation
- Deadlock in handler
- Blocking operation in async context
Solutions:
// Add timeout
use ;
let result = timeout.await??;
// Use cancellation token
let cancel = new;
spawn;
Problem: High Memory Usage
Symptoms: Memory grows unbounded during processing
Causes:
- Stream produces items faster than they can be processed
- Large items held in memory
- Memory leaks in handler
Solutions:
// Add backpressure with bounded channels
let = channel; // Bounded
// Use subscribe_latest_async to skip items
stream.subscribe_latest_async.await?;
// Process in batches and clear
stream.chunks.subscribe_async.await?;
Problem: Errors Not Propagated
Symptoms: Errors occur but are silently ignored
Cause: Error callback provided, but errors not handled
Solution:
let errors = new;
let errors_clone = errors.clone;
stream.subscribe_async.await?;
// Check errors after processing
let all_errors = errors.lock.await;
if !all_errors.is_empty
Problem: Items Processed Out of Order
Symptoms: Items appear in unexpected order
Cause: Using concurrent processing patterns
Solution: Use subscribe_async (strictly sequential) instead of buffer_unordered or parallel patterns
API Reference
See the full API documentation for detailed type signatures and additional examples.
Core Traits
SubscribeAsyncExt- Sequential processingSubscribeLatestAsyncExt- Latest-value processing
Related Crates
fluxion-stream- Stream composition operatorsfluxion-core- Core types and errors
License
Licensed under the Apache License, Version 2.0. See LICENSE for details.
Copyright 2025 Umberto Gotti