fluxion-core
Part of Fluxion - A reactive stream processing library for Rust
Core traits and types for ordered stream processing in async Rust.
Overview
This crate provides the foundational abstractions used throughout the Fluxion ecosystem:
Timestampedtrait: Temporal ordering for stream items via timestampsStreamItem<T>: Error-aware stream item wrapper (Value|Error)FluxionSubject<T>: Hot, multi-subscriber broadcast subjectFluxionError: Unified error type for stream operations- Lock utilities: Safe mutex operations with error propagation
Key Types
Timestamp Traits
Fluxion-core provides two traits for temporal ordering:
HasTimestamp - Read-Only Access
Minimal trait for types that expose a timestamp value:
Use this when your type only needs to provide a timestamp for ordering (most common case).
Timestamped - Full Wrapper Interface
Extends HasTimestamp with an Inner type and construction methods for wrapper types:
Use this for wrapper types like Sequenced<T> that wrap an inner value with a timestamp.
FluxionSubject
A hot, multi-subscriber broadcast subject for reactive programming patterns:
use ;
use StreamExt;
async
Key Characteristics:
- Hot: Late subscribers only receive items sent after they subscribe (no replay buffer)
- Multi-subscriber: Broadcasts each item to all active subscribers simultaneously
- Thread-safe: Uses
Arc<Mutex<>>internally - cheap to clone, safe to send across threads - Automatic cleanup: Dead subscribers are removed on next
send()(no memory leaks) - Unbounded: Uses unbounded mpsc channels (no backpressure)
Subject Lifecycle:
let subject = new;
// Clone shares the same subject state
let subject_clone = subject.clone;
// Subscribe on any clone
let stream = subject_clone.subscribe;
// Send on any clone - all subscribers receive it
subject.send.unwrap;
// Error terminates all subscribers
subject.error;
// Explicit close completes all subscribers
subject.close;
Thread Safety:
let subject = new;
// Safe to share across async tasks
spawn;
spawn;
Common Patterns:
- Event Bus: Broadcast domain events to multiple handlers
- State Updates: Notify observers of state changes
- Message Fanout: Distribute work to multiple consumers
- Test Doubles: Injectable subjects for testing reactive flows
When to Use:
- ✅ Multiple subscribers need the same stream
- ✅ Subscribers can join/leave dynamically
- ✅ No replay needed (hot semantics)
- ✅ Unbounded buffers acceptable
When NOT to Use:
- ❌ Need cold semantics (replay to new subscribers)
- ❌ Need backpressure (bounded channels)
- ❌ Single subscriber (use channels directly)
- ❌ Persistent event log (use actual event store)
StreamItem
Error-aware wrapper for stream values:
Enables error propagation through operator chains without terminating the stream. See the Error Handling Guide for details.
Architecture Notes
Why FluxionSubject Uses Arc<Mutex<>>
The subject's design requires interior mutability with shared ownership:
Operations requiring mutation:
send()- broadcasts to all subscribers AND removes dead onessubscribe()- adds new subscriber to the listclose()- sets closed flag and clears subscribers
Why Arc:
- Subject is
Clone- multiple handles can exist - All clones share the same subscriber list and state
- Enables passing to different async tasks
Why Mutex:
- Multiple threads/tasks can call operations concurrently
- Prevents data races on the subscriber
Vec - Ensures consistent state across
send()/subscribe()/close()
Alternative considered: &mut self methods would prevent cloning and multi-task sharing - defeats the purpose of a broadcast subject.
Dead Subscriber Cleanup
Subscribers are automatically cleaned up during send():
// For each send, remove disconnected subscribers
for tx in state.senders.drain
This prevents memory leaks when subscribers drop their streams without explicitly unsubscribing.
Usage
Add this to your Cargo.toml:
[]
= "0.8.0"
License
Apache-2.0