Crate fluxion_stream

Crate fluxion_stream 

Source
Expand description

Stream operators with temporal ordering guarantees.

This crate provides reactive stream combinators that maintain temporal ordering across asynchronous operations. All operators work with types implementing the Timestamped trait, which provides timestamp-based ordering for correct temporal sequencing.

§Architecture

The crate is built around several key concepts:

  • FluxionStream: A wrapper around any Stream that provides access to all operators
  • Timestamped trait: Types must have intrinsic timestamps for temporal ordering
  • Extension traits: Each operator is provided via an extension trait for composability
  • Temporal correctness: All operators respect the timestamp ordering of items across streams

§Operator Categories

§Combination Operators

  • combine_latest: Emits when any stream emits, combining latest values from all streams
  • with_latest_from: Samples secondary streams only when primary emits
  • ordered_merge: Merges multiple streams preserving temporal order

§Filtering Operators

§Transformation Operators

§Temporal Ordering Explained

All operators in this crate maintain temporal ordering - items are processed in the order of their intrinsic ordering value, not the order they arrive at the operator.

§How It Works

When multiple streams are combined:

  1. Each stream item must implement Timestamped, providing a comparable timestamp
  2. Operators use ordered_merge internally to sequence items
  3. Items are buffered and emitted in order of their timestamp
  4. Late-arriving items with earlier timestamps are placed correctly in the sequence

§Example: Out-of-Order Delivery

use fluxion_stream::{FluxionStream, OrderedStreamExt};
use fluxion_test_utils::{Sequenced, helpers::unwrap_stream, test_channel};

let (tx1, stream1) = test_channel::<Sequenced<i32>>();
let (tx2, stream2) = test_channel::<Sequenced<i32>>();

let mut merged = stream1.ordered_merge(vec![stream2]);

// Send out of order - stream2 sends seq=1, stream1 sends seq=2
tx2.send((100, 1).into()).unwrap();
tx1.send((200, 2).into()).unwrap();

// Items are emitted in temporal order (seq 1, then seq 2)
let first = unwrap_stream(&mut merged, 500).await.unwrap();
assert_eq!(first.value, 100); // seq=1 arrives first despite being sent second

§Operator Selection Guide

Choose the right operator for your use case:

§When You Need Combined State

OperatorUse WhenTriggers OnExample Use Case
combine_latestYou need latest from all streamsAny stream emitsDashboard combining multiple data sources
with_latest_fromYou have primary + context streamsPrimary emits onlyUser clicks enriched with latest config

§When You Need All Items

OperatorUse WhenOutputExample Use Case
ordered_mergeMerge multiple sources in orderEvery item from all streamsEvent log from multiple services
combine_with_previousCompare consecutive itemsPairs of (previous, current)Detecting value changes

§When You Need Conditional Emission

OperatorUse WhenBehaviorExample Use Case
emit_whenGate by conditionEmits source when filter is trueSend notifications only when enabled
take_latest_whenSample on conditionEmits latest source when filter triggersSample sensor on button press
take_while_withStop on conditionEmits until condition false, then stopsProcess until timeout

§Performance Characteristics

§Memory Usage

§Latency Considerations

  • Ordered operators: May buffer items waiting for earlier-ordered items
  • Unordered operators: Process items immediately as they arrive
  • Combining operators: Wait for all streams to emit at least once before first emission

§Throughput

All operators use lock-free or minimally-locked designs:

  • Single mutex per operator (not per item)
  • No blocking operations in hot paths
  • Efficient polling with futures::StreamExt

§Return Type Patterns

Fluxion operators use three different return type patterns, each chosen for specific reasons related to type erasure, composability, and performance.

§Pattern 1: impl Stream<Item = T>

When used: Lightweight operators with simple transformations

Examples:

Benefits:

  • Zero-cost abstraction (no boxing)
  • Compiler can fully optimize the stream pipeline
  • Type information preserved for further optimizations

Tradeoffs:

  • Concrete type exposed in signatures (can be complex)
  • May increase compile times for deeply nested operators

§Pattern 2: FluxionStream<impl Stream<Item = T>>

When used: Operators that should compose with other FluxionStream methods

Examples:

Benefits:

  • Enables method chaining with FluxionStream convenience methods
  • Still zero-cost (no boxing)
  • Provides consistent API surface

Use cases:

  • When users are likely to chain multiple operators
  • When the operator produces a complex transformed type

§Pattern 3: Pin<Box<dyn Stream<Item = T>>>

When used: Operators with dynamic dispatch requirements or complex internal state

Examples:

Benefits:

  • Type erasure simplifies signatures
  • Reduces compile time for complex operator chains
  • Hides internal implementation details

Tradeoffs:

  • Heap allocation (small overhead)
  • Dynamic dispatch prevents some optimizations
  • Runtime cost typically negligible compared to async operations

Why used for these operators: These operators maintain internal state machines with multiple branches and complex lifetime requirements. Type erasure keeps the public API simple while allowing internal flexibility.

§Choosing the Right Pattern

As a user, you typically don’t need to worry about these patterns - all three compose seamlessly. For example, combining different operators in a single chain works naturally regardless of their internal implementation patterns. Each operator returns either impl Stream or FluxionStream<impl Stream>, and they compose transparently.

The patterns are implementation details chosen to balance performance, ergonomics, and maintainability.

§Common Patterns

§Pattern: Enriching Events with Context

Uses with_latest_from to combine a primary stream with context from a secondary stream.

use fluxion_stream::{FluxionStream, WithLatestFromExt};
use fluxion_test_utils::{Sequenced, helpers::unwrap_stream, unwrap_value, test_channel};
use fluxion_core::Timestamped as TimestampedTrait;

// User clicks enriched with latest configuration
let (click_tx, clicks) = test_channel::<Sequenced<String>>();
let (config_tx, configs) = test_channel::<Sequenced<String>>();

let mut enriched = clicks.with_latest_from(
    configs,
    |state| state.clone()
);

// Send config first, then click
config_tx.send(("theme=dark".to_string(), 1).into()).unwrap();
click_tx.send(("button1".to_string(), 2).into()).unwrap();

let result = unwrap_value(Some(unwrap_stream(&mut enriched, 500).await));
assert_eq!(result.values().len(), 2); // Has both click and config

§Pattern: Merging Multiple Event Sources

Uses ordered_merge to combine logs from multiple services in temporal order.

use fluxion_stream::{FluxionStream, OrderedStreamExt};
use fluxion_test_utils::{Sequenced, helpers::unwrap_stream, unwrap_value, test_channel};

// Combine logs from multiple services in temporal order
let (service1_tx, service1) = test_channel::<Sequenced<String>>();
let (service2_tx, service2) = test_channel::<Sequenced<String>>();

let mut unified_log = service1.ordered_merge(vec![service2]);

// Send logs with different timestamps
service1_tx.send(("service1: started".to_string(), 1).into()).unwrap();
service2_tx.send(("service2: ready".to_string(), 2).into()).unwrap();

let first = unwrap_value(Some(unwrap_stream(&mut unified_log, 500).await));
assert_eq!(first.value, "service1: started");

§Pattern: Change Detection

Uses combine_with_previous to detect when values change by comparing with the previous value.

use fluxion_stream::{FluxionStream, CombineWithPreviousExt};
use fluxion_test_utils::{Sequenced, helpers::unwrap_stream, unwrap_value, test_channel};
use fluxion_core::Timestamped as TimestampedTrait;

let (tx, stream) = test_channel::<Sequenced<i32>>();

// Pair each value with its previous value
let mut paired = stream.combine_with_previous();

// Send values
tx.send((1, 1).into()).unwrap();
tx.send((1, 2).into()).unwrap(); // Same value
tx.send((2, 3).into()).unwrap(); // Changed!

let result = unwrap_value(Some(unwrap_stream(&mut paired, 500).await));
assert!(result.previous.is_none()); // First has no previous

let result = unwrap_value(Some(unwrap_stream(&mut paired, 500).await));
let changed = result.previous.as_ref().unwrap().value != result.current.value;
assert!(!changed); // 1 == 1, no change

let result = unwrap_value(Some(unwrap_stream(&mut paired, 500).await));
let changed = result.previous.as_ref().unwrap().value != result.current.value;
assert!(changed); // 1 != 2, changed!

§Pattern: Conditional Processing

Uses emit_when to gate emissions based on a filter stream, only emitting when the condition is satisfied.

use fluxion_stream::{FluxionStream, EmitWhenExt};
use fluxion_test_utils::{Sequenced, helpers::unwrap_stream, unwrap_value, test_channel};
use fluxion_core::Timestamped as TimestampedTrait;

// Send notifications only when enabled
let (event_tx, events) = test_channel::<Sequenced<i32>>();
let (enabled_tx, enabled) = test_channel::<Sequenced<i32>>();

let mut notifications = events.emit_when(
    enabled,
    |state| state.values().get(1).map(|v| *v > 0).unwrap_or(false)
);

// Enable notifications
enabled_tx.send((1, 1).into()).unwrap();
// Send event
event_tx.send((999, 2).into()).unwrap();

let result = unwrap_value(Some(unwrap_stream(&mut notifications, 500).await));
assert_eq!(result.value, 999);

§Anti-Patterns

§❌ Don’t: Use ordered_merge When Order Doesn’t Matter

// BAD: Ordering overhead when you don't need it
let merged = stream1.ordered_merge(vec![stream2]);

Use standard futures combinators instead:

// GOOD: Use futures::stream::select for unordered merging
use futures::stream::select;
let merged = select(stream1, stream2);

§❌ Don’t: Use combine_latest for All Items

// BAD: combine_latest only emits latest, loses intermediate values
let combined = stream1.combine_latest(vec![stream2], |_| true);

Use ordered_merge to get all items:

// GOOD: ordered_merge emits every item
let merged = stream1.ordered_merge(vec![stream2]);

§❌ Don’t: Complex Filter Logic in Operators

// BAD: Complex business logic in filter predicate
stream1.combine_latest(vec![stream2], |state| {
    // 50 lines of complex filtering logic...
});

Extract to well-tested function:

// GOOD: Testable, reusable filter logic
fn should_emit(state: &CombinedState<i32>) -> bool {
    // Clear, testable logic
    state.values().iter().all(|&v| v > 0)
}

stream1.combine_latest(vec![stream2], should_emit);

§Operator Chaining

Fluxion operators are designed to be composed together, creating sophisticated data flows from simple building blocks. The key to successful chaining is understanding how each operator transforms the stream.

§Basic Chaining Pattern

use fluxion_stream::{FluxionStream, CombineWithPreviousExt, TakeLatestWhenExt};
use fluxion_test_utils::{Sequenced, test_channel, helpers::unwrap_stream, unwrap_value};
use fluxion_core::Timestamped as TimestampedTrait;

async fn example() {
let (source_tx, source_stream) = test_channel::<Sequenced<i32>>();
let (filter_tx, filter_stream) = test_channel::<Sequenced<i32>>();

// Chain: sample when filter emits, then pair with previous value
let sampled = source_stream.take_latest_when(filter_stream, |_| true);
let mut composed = FluxionStream::new(sampled).combine_with_previous();

source_tx.send(Sequenced::new(42)).unwrap();
filter_tx.send(Sequenced::new(1)).unwrap();

let item = unwrap_value(Some(unwrap_stream(&mut composed, 500).await));
assert!(item.previous.is_none());
assert_eq!(&item.current.value, &42);
}

§Chaining with Transformation

Use map_ordered and filter_ordered to transform streams while preserving temporal ordering:

use fluxion_stream::{FluxionStream};
use fluxion_test_utils::{Sequenced, test_channel, helpers::unwrap_stream, unwrap_value};
use fluxion_core::Timestamped as TimestampedTrait;

async fn example() {
let (tx, stream) = test_channel::<Sequenced<i32>>();

// Chain: filter positives, map to string
let mut composed = FluxionStream::new(stream)
    .filter_ordered(|&n| n > 0)  // filter_ordered receives &T::Inner
    .map_ordered(|seq| format!("Value: {}", seq.value));  // map_ordered receives T

tx.send(Sequenced::new(-1)).unwrap();
tx.send(Sequenced::new(5)).unwrap();

let result = unwrap_value(Some(unwrap_stream(&mut composed, 500).await));
assert_eq!(result, "Value: 5");
}

§Multi-Stream Chaining

Combine multiple streams and then process the result:

use fluxion_stream::{FluxionStream, CombineLatestExt, CombineWithPreviousExt};
use fluxion_test_utils::{Sequenced, test_channel, helpers::unwrap_stream, unwrap_value};
use fluxion_core::Timestamped as TimestampedTrait;

async fn example() {
let (tx1, stream1) = test_channel::<Sequenced<i32>>();
let (tx2, stream2) = test_channel::<Sequenced<i32>>();

// Chain: combine latest from both streams, then track changes
let mut composed = stream1
    .combine_latest(vec![stream2], |_| true)
    .combine_with_previous();

tx1.send(Sequenced::new(1)).unwrap();
tx2.send(Sequenced::new(2)).unwrap();

let item = unwrap_value(Some(unwrap_stream(&mut composed, 500).await));
assert!(item.previous.is_none());
assert_eq!(item.current.values().len(), 2);
}

§Key Principles for Chaining

  1. Use map_ordered and filter_ordered: These preserve the FluxionStream wrapper and maintain temporal ordering guarantees
  2. Order matters: combine_with_previous().filter_ordered() is different from filter_ordered().combine_with_previous()
  3. Type awareness: Each operator changes the item type - track what flows through the chain
  4. Test incrementally: Build complex chains step by step, testing each addition

§Advanced Composition Examples

§1. Ordered Merge → Combine With Previous

Merge multiple streams in temporal order, then track consecutive values:

use fluxion_stream::{FluxionStream, OrderedStreamExt, CombineWithPreviousExt};
use fluxion_test_utils::{Sequenced, test_channel, helpers::unwrap_stream, unwrap_value};
use fluxion_core::Timestamped as TimestampedTrait;

async fn example() {
let (tx1, stream1) = test_channel::<Sequenced<i32>>();
let (tx2, stream2) = test_channel::<Sequenced<i32>>();

// Merge streams in temporal order, then pair consecutive values
let mut composed = stream1
    .ordered_merge(vec![FluxionStream::new(stream2)])
    .combine_with_previous();

tx1.send(Sequenced::new(1)).unwrap();
let item = unwrap_value(Some(unwrap_stream(&mut composed, 500).await));
assert!(item.previous.is_none());
assert_eq!(&item.current.value, &1);

tx2.send(Sequenced::new(2)).unwrap();
let item = unwrap_value(Some(unwrap_stream(&mut composed, 500).await));
assert_eq!(&item.previous.unwrap().value, &1);
assert_eq!(&item.current.value, &2);
}

§2. Combine Latest → Combine With Previous

Combine latest values from multiple streams, then track state changes:

use fluxion_stream::{FluxionStream, CombineLatestExt, CombineWithPreviousExt};
use fluxion_test_utils::{Sequenced, test_channel, helpers::unwrap_stream, unwrap_value};
use fluxion_core::Timestamped as TimestampedTrait;

async fn example() {
let (tx1, stream1) = test_channel::<Sequenced<i32>>();
let (tx2, stream2) = test_channel::<Sequenced<i32>>();

// Combine latest, then track previous combined state
let mut composed = stream1
    .combine_latest(vec![stream2], |_| true)
    .combine_with_previous();

tx1.send(Sequenced::new(1)).unwrap();
tx2.send(Sequenced::new(2)).unwrap();

let item = unwrap_value(Some(unwrap_stream(&mut composed, 500).await));
assert!(item.previous.is_none());
assert_eq!(item.current.values().len(), 2);

tx1.send(Sequenced::new(3)).unwrap();
let item = unwrap_value(Some(unwrap_stream(&mut composed, 500).await));
// Previous state had [1, 2], current has [3, 2]
assert!(item.previous.is_some());
}

§3. Combine Latest → Take While With

Combine streams and continue only while a condition holds:

use fluxion_stream::{FluxionStream, CombineLatestExt, TakeWhileExt};
use fluxion_test_utils::{Sequenced, test_channel, helpers::unwrap_stream, unwrap_value};
use fluxion_core::Timestamped as TimestampedTrait;

async fn example() {
let (tx1, stream1) = test_channel::<Sequenced<i32>>();
let (tx2, stream2) = test_channel::<Sequenced<i32>>();
let (filter_tx, filter_stream) = test_channel::<Sequenced<bool>>();

// Combine latest values, but stop when filter becomes false
let mut composed = stream1
    .combine_latest(vec![stream2], |_| true)
    .take_while_with(filter_stream, |f| *f);

filter_tx.send(Sequenced::new(true)).unwrap();
tx1.send(Sequenced::new(1)).unwrap();
tx2.send(Sequenced::new(2)).unwrap();

let combined = unwrap_value(Some(unwrap_stream(&mut composed, 500).await));
assert_eq!(combined.values().len(), 2);
}

§4. Ordered Merge → Take While With

Merge streams in order and terminate based on external condition:

use fluxion_stream::{FluxionStream, OrderedStreamExt, TakeWhileExt};
use fluxion_test_utils::{Sequenced, test_channel, helpers::unwrap_stream, unwrap_value};
use fluxion_core::Timestamped as TimestampedTrait;

async fn example() {
let (tx1, stream1) = test_channel::<Sequenced<i32>>();
let (tx2, stream2) = test_channel::<Sequenced<i32>>();
let (filter_tx, filter_stream) = test_channel::<Sequenced<bool>>();

// Merge all values in order, but stop when filter says so
let mut composed = stream1
    .ordered_merge(vec![FluxionStream::new(stream2)])
    .take_while_with(filter_stream, |f| *f);

filter_tx.send(Sequenced::new(true)).unwrap();
tx1.send(Sequenced::new(1)).unwrap();

let item = unwrap_value(Some(unwrap_stream(&mut composed, 500).await)).value.clone();
assert_eq!(item, 1);

tx2.send(Sequenced::new(2)).unwrap();
let item = unwrap_value(Some(unwrap_stream(&mut composed, 500).await)).value.clone();
assert_eq!(item, 2);
}

§5. Take Latest When → Combine With Previous

Sample latest value on trigger, then pair with previous sampled value:

use fluxion_stream::{FluxionStream, TakeLatestWhenExt, CombineWithPreviousExt};
use fluxion_test_utils::{Sequenced, test_channel, helpers::unwrap_stream, unwrap_value};
use fluxion_core::Timestamped as TimestampedTrait;

async fn example() {
let (source_tx, source_stream) = test_channel::<Sequenced<i32>>();
let (filter_tx, filter_stream) = test_channel::<Sequenced<i32>>();

// Sample source when filter emits, then track consecutive samples
let sampled = source_stream.take_latest_when(filter_stream, |_| true);
let mut composed = FluxionStream::new(sampled).combine_with_previous();

source_tx.send(Sequenced::new(42)).unwrap();
filter_tx.send(Sequenced::new(0)).unwrap();

let item = unwrap_value(Some(unwrap_stream(&mut composed, 500).await));
assert!(item.previous.is_none());
assert_eq!(&item.current.value, &42);

source_tx.send(Sequenced::new(99)).unwrap();
let item = unwrap_value(Some(unwrap_stream(&mut composed, 500).await));
assert_eq!(&item.previous.unwrap().value, &42);
assert_eq!(&item.current.value, &99);
}

These patterns demonstrate how Fluxion operators compose to create sophisticated data flows. See the composition tests in the source repository for more examples.

§Getting Started

Add to your Cargo.toml:

[dependencies]
fluxion-stream = { path = "../fluxion-stream" }
tokio = { version = "1.48", features = ["sync", "rt"] }
futures = "0.3"

See individual operator documentation for detailed examples.

Re-exports§

pub use combine_latest::CombineLatestExt;
pub use combine_with_previous::CombineWithPreviousExt;
pub use emit_when::EmitWhenExt;
pub use fluxion_stream::FluxionStream;
pub use merge_with::MergedStream;
pub use ordered_merge::OrderedStreamExt;
pub use take_latest_when::TakeLatestWhenExt;
pub use take_while_with::TakeWhileExt;
pub use types::CombinedState;
pub use types::WithPrevious;
pub use with_latest_from::WithLatestFromExt;

Modules§

combine_latest
combine_with_previous
emit_when
fluxion_stream
merge_with
ordered_merge
take_latest_when
take_while_with
types
Common types and type aliases used throughout the fluxion-stream crate.
with_latest_from

Macros§

error
info
warn