Fluxion
A reactive stream processing library for Rust with temporal ordering guarantees, efficient async execution and friendly fluent API.
📊 See why Fluxion sets new standards for quality →
Table of Contents
Features
- 🔄 Rx-Style Operators: Familiar reactive programming patterns (
combine_latest,with_latest_from,ordered_merge, etc.) - ⏱️ Temporal Ordering: Guaranteed ordering semantics with
Sequenced<T>wrapper - ⚡ Async Execution: Efficient async processing with
subscribe_asyncandsubscribe_latest_async - 🛡️ Type-Safe Error Handling: Comprehensive error propagation with
Resulttypes - 📚 Excellent Documentation: Detailed guides, examples, and API docs
- ✅ Well Tested: 1,500+ tests with comprehensive coverage
Quick Start
Add Fluxion to your Cargo.toml:
[]
= "0.1.0"
= "0.1.0"
= { = "1.48", = ["full"] }
= "0.3"
Basic Usage
use *;
use Sequenced;
use StreamExt;
async
Chaining Multiple Operators
Fluxion operators can be chained to create complex processing pipelines. Here a complete example:
Example: combine_latest -> filter_ordered - Sampling on Trigger Events
use *;
use Sequenced;
use StreamExt;
async
📚 Operator Documentation
- All Operators - Complete operator reference
- Operators Roadmap - Planned future operators
Core Concepts
Stream Operators
Combining Streams:
combine_latest- Emit when any stream emits, with latest from allwith_latest_from- Sample secondary streams on primary emissionordered_merge- Merge multiple streams preserving temporal order
Filtering & Gating:
emit_when- Gate emissions based on a filter conditiontake_latest_when- Sample stream on trigger eventstake_while_with- Emit while condition holds
Transformation:
combine_with_previous- Pair consecutive valuesmap_ordered- Transform while preserving orderfilter_ordered- Filter while preserving order
Async Execution
Sequential Processing:
use SubscribeAsyncExt;
stream
.subscribe_async
.await?;
Latest-Value Processing (with auto-cancellation):
use SubscribeLatestAsyncExt;
stream
.subscribe_latest_async
.await?;
Documentation
📚 Guides
- Integration Guide - Learn the three patterns for integrating events (intrinsic, extrinsic, wrapper ordering)
- fluxion-stream - Stream operators and composition patterns
- fluxion-exec - Async execution and subscription utilities
💡 Complete Example
The stream-aggregation example demonstrates production-ready patterns:
- Real-world architecture: 3 producers, 1 aggregator, 1 consumer
- Ordered stream combining: Merges sensor readings, metrics, and system events
- Type-safe transformations: Uses
UnboundedReceiverExtfor elegant type erasure - Graceful shutdown: Proper cleanup with
CancellationToken - Error handling: Demonstrates best practices throughout
Why this example matters:
- Shows how all the pieces fit together in a realistic application
- Demonstrates the
into_fluxion_stream()pattern for combining heterogeneous streams - Illustrates proper resource management and cancellation
- Serves as a template for building your own event processing systems
Run it with: cargo run --example stream-aggregation
🔧 API Documentation
Generate and browse full API documentation:
Or for specific crates:
Development
Prerequisites
- Rust toolchain (version pinned in
rust-toolchain.toml) - Cargo
Building
# Run CI checks locally (PowerShell)
Workspace Structure
fluxion-rx- Main crate (re-exports from other crates)fluxion-stream- Stream operators and combinatorsfluxion-exec- Execution utilities and subscriptionsfluxion-core- Core utilities and traitsfluxion-error- Error types and handlingfluxion-test-utils- Test helpers and fixturesfluxion-merge- Stream merging utilitiesfluxion-ordered-merge- Ordered merging implementation
Development Notes
- All clippy, formatting, and documentation warnings are treated as errors in CI
- Use
.ci/coverage.ps1to collect code coverage locally (requirescargo-llvm-cov) - See ROADMAP.md for planned features and release schedule
Project Status
Current Version: 0.1.1
- ✅ Published to crates.io
- ✅ Core functionality complete
- ✅ Comprehensive test coverage
- ✅ Phase 1 error handling implemented
- 🚧 Phase 2 error propagation (planned for 1.0.0)
- 📝 Documentation complete for current features
See ROADMAP.md for details on the path to 1.0.0.
Contributing
Contributions are welcome! Please see CONTRIBUTING.md for guidelines.
Before submitting a PR:
- Run tests:
cargo test --workspace - Run clippy:
cargo clippy --workspace -- -D warnings - Format code:
cargo fmt --all - Update documentation if needed
License
Licensed under the Apache License, Version 2.0. See LICENSE for details.
Acknowledgments
Inspired by ReactiveX and other reactive programming libraries, with a focus on Rust's safety and performance characteristics.
Security
All commits and releases are GPG signed.
Key ID: 5729DA194B0929542BF79074C2A11DED229A1E51
Fingerprint: 5729 DA19 4B09 2954 2BF7 9074 C2A1 1DED 229A 1E51
Author
Name: Umberto Gotti Email: umberto.gotti@umbertogotti.dev Twitter: https://x.com/GottiUmberto