fluxion-stream 0.8.0

Stream combinators with ordering guarantees for async Rust
Documentation

fluxion-stream

Part of Fluxion - A reactive stream processing library for Rust

Stream combinators for async Rust with strong temporal-ordering guarantees. This crate provides composable operators and lightweight sequencing utilities designed for correctness and performance in event-driven systems.

Crates.io Documentation License

Table of Contents

Overview

fluxion-stream is a collection of reactive stream operators that maintain temporal ordering across asynchronous operations. Unlike standard stream combinators, all operators in this crate respect the intrinsic ordering of items (via timestamps, sequence numbers, or other ordering mechanisms), ensuring correct temporal sequencing even when events arrive out of order.

Use this crate when:

  • You need to combine multiple async streams while preserving temporal order
  • Events may arrive out of order and need to be resequenced
  • You're building reactive systems (dashboards, monitoring, event processing)
  • You need composable stream operations with correctness guarantees

Key Features

  • Temporal Ordering: All operators maintain temporal correctness via the Timestamped trait
  • Composable Operators: 10+ stream combinators designed to work together seamlessly
  • Error Propagation: Structured error handling through StreamItem<T> enum
  • Zero-Copy: Minimal allocations and efficient buffering strategies
  • Tokio Integration: Built on tokio streams for async runtime compatibility
  • Type Safety: Compile-time guarantees for ordering and type compatibility

Core Concepts

Timestamp Traits

Fluxion uses two traits for temporal ordering:

HasTimestamp - Minimal Read-Only Interface

Provides read-only access to timestamp values:

pub trait HasTimestamp {
    type Timestamp: Ord + Copy + Send + Sync + core::fmt::Debug;

    fn timestamp(&self) -> Self::Timestamp;  // Get the timestamp for ordering
}

Timestamped - Construction Methods

Extends HasTimestamp with an Inner type and construction/deconstruction capabilities:

pub trait Timestamped: HasTimestamp {
    type Inner: Clone;

    fn with_timestamp(value: Self::Inner, timestamp: Self::Timestamp) -> Self;
    fn into_inner(self) -> Self::Inner;
}

When to use each:

  • Implement HasTimestamp for types that only need to expose a timestamp (read-only)
  • Implement Timestamped for wrapper types that construct timestamped values
  • Most operators require only HasTimestamp for ordering

Implementations:

  • Sequenced<T> - Test utility from fluxion-test-utils using monotonically growing sequence numbers
  • Custom domain types - Implement HasTimestamp for your types (e.g., events with built-in timestamps)

Temporal Ordering

Temporal ordering means items are processed based on their intrinsic timestamp, not arrival time:

// Stream 1 sends: [timestamp=2, value=B]
// Stream 2 sends: [timestamp=1, value=A]
// Merged output:  [timestamp=1, value=A], [timestamp=2, value=B]  ✓ Correct temporal order

How it works:

  1. Each item has a timestamp() value (std::time::Instant, u64 counter, etc.)
  2. Operators buffer items and emit them in order of their timestamp() value
  3. Late-arriving items are placed correctly in the sequence
  4. Gaps in timestamps may cause buffering until the sequence is complete

When to use:

  • Event sourcing and event-driven architectures
  • Time-series data processing
  • Distributed system event correlation
  • Any scenario where arrival order ≠ logical order

Error Propagation

All operators use StreamItem<T> for structured error handling:

pub enum StreamItem<T> {
    Value(T),      // Successful value
    Error(FluxionError),  // Error (lock failures, processing errors, etc.)
}

Error handling patterns:

// Pattern 1: Unwrap (panic on error)
let value = stream.next().await.unwrap().unwrap();

// Pattern 2: Filter errors
let values = stream
    .filter_map(|item| async move { item.ok() })
    .collect().await;

// Pattern 3: Handle explicitly
match stream.next().await {
    Some(StreamItem::Value(v)) => process(v),
    Some(StreamItem::Error(e)) => log_error(e),
    None => break,
}

See Error Handling Guide for comprehensive patterns.

Stream Operators

Combination Operators

combine_latest

Combines multiple streams, emitting when any stream emits (after all have emitted once).

Use case: Dashboard combining data from multiple sources

use fluxion_stream::CombineLatestExt;

let combined = stream1.combine_latest(
    vec![stream2, stream3],
    |state| state.values().len() == 3  // Emit when all present
);

Behavior:

  • Waits for initial values from all streams
  • Emits combined state when any stream produces a value
  • Maintains latest value from each stream
  • Preserves temporal ordering based on triggering stream

Full documentation | Tests | Benchmarks

with_latest_from

Samples secondary streams only when primary stream emits.

Use case: User actions enriched with latest configuration/state

use fluxion_stream::WithLatestFromExt;

let enriched = user_clicks.with_latest_from(
    vec![config_stream, state_stream],
    |combined| combined.is_complete(),
    |_primary, secondary| secondary.clone()

);

Behavior:

  • Only emits when primary stream emits
  • Samples latest values from secondary streams
  • Primary stream drives the emission timing
  • Secondary streams provide context

Full documentation | Tests | Benchmarks

ordered_merge

Merges multiple streams preserving temporal order.

Use case: Event log from multiple services

use fluxion_stream::OrderedStreamExt;

let merged = stream1.ordered_merge(vec![stream2, stream3]);

Behavior:

  • Emits all items from all streams
  • Items emitted in order of their timestamp() value
  • Buffers items to ensure correct ordering
  • Completes when all input streams complete

Full documentation | Tests | Benchmarks

merge_with

Stateful merging of multiple streams with shared state.

Use case: Repository pattern, event sourcing, aggregating events into domain state

use fluxion_stream::MergedStream;
use fluxion_test_utils::Sequenced;

struct Repository {
    users: HashMap<UserId, User>,
    orders: HashMap<OrderId, Order>,
}

let merged = MergedStream::seed::<Sequenced<Event>>(Repository::new())
    .merge_with(user_stream, |event, repo| {
        repo.users.insert(event.user_id, event.user);
        Event::UserAdded(event.user_id)
    })
    .merge_with(order_stream, |event, repo| {
        repo.orders.insert(event.order_id, event.order);
        Event::OrderCreated(event.order_id)
    })
    .into_fluxion_stream();

Behavior:

  • Maintains shared mutable state across all merged streams
  • Processes events in temporal order (uses ordered_merge internally)
  • Each merge_with call adds a new stream to the merge
  • State is locked per-item for thread safety
  • Can chain with other operators via into_fluxion_stream()

Key Features:

  • Stateful: Shared state accessible to all processing functions
  • Composable: Chain multiple merge_with calls
  • Type-safe: Output type specified once in seed()
  • Ordered: Temporal ordering guaranteed across all streams

Full documentation | Tests | Benchmarks

start_with

Prepend initial values to a stream.

Use case: Provide default/placeholder values, seed initial state

use fluxion_stream::{IntoFluxionStream, StartWithExt};
use fluxion_core::StreamItem;
use fluxion_test_utils::Sequenced;

let (tx, rx) = futures::channel::mpsc::unbounded();

let with_defaults = rx.into_fluxion_stream()
    .start_with(vec![
        StreamItem::Value(Sequenced::new(0)),
        StreamItem::Value(Sequenced::new(1)),
    ]);

tx.send(Sequenced::new(2)).unwrap();
tx.send(Sequenced::new(3)).unwrap();
// Output: 0, 1, 2, 3

Behavior:

  • Emits initial values before any source stream values
  • Initial values are emitted immediately upon subscription
  • Useful for providing defaults or seeding state
  • Can include StreamItem::Error for testing error handling
  • Preserves temporal ordering (initial values, then source values)

Full documentation | Tests | Benchmarks

Filtering Operators

emit_when

Gates source emissions based on filter stream conditions.

Use case: Only emit sensor data when system is active

use fluxion_stream::EmitWhenExt;

let gated = source.emit_when(
    filter_stream,
    |filter_value| *filter_value > 0  // Predicate for gating
);

Behavior:

  • Buffers source items when gate is closed
  • Emits buffered items when gate opens
  • Maintains temporal ordering
  • Completes when source completes

Full documentation | Tests | Benchmarks

take_latest_when

Samples source when filter condition is met.

Use case: Capture latest sensor reading on user request

use fluxion_stream::TakeLatestWhenExt;

let sampled = source.take_latest_when(
    trigger_stream,
    |trigger| *trigger == true

);

Behavior:

  • Maintains latest value from source
  • Emits latest value when filter condition is true
  • Discards intermediate values (only latest matters)
  • Useful for sampling / snapshot patterns

Full documentation | Tests | Benchmarks

sample_ratio

Probabilistic downsampling with configurable ratio.

Use case: Load reduction, logging sampling, monitoring downsampling

use fluxion_stream::SampleRatioExt;

// Sample approximately 10% of items
let sampled = stream.sample_ratio(0.1, fastrand::u64(..));

// For testing with deterministic seed
let sampled = stream.sample_ratio(0.5, 42);

Behavior:

  • Ratio range: 0.0 (emit nothing) to 1.0 (emit all)
  • Panics if ratio outside valid range
  • Deterministic with same seed for reproducible tests
  • Errors always pass through (never sampled)
  • Timestamp-preserving

Full documentation | Tests | Benchmarks

take_while_with

Emits while condition holds, terminates when false.

Use case: Process events until shutdown signal

use fluxion_stream::TakeWhileExt;

let bounded = source.take_while_with(
    condition_stream,
    |condition| *condition == true

);

Behavior:

  • Emits source items while condition is true
  • Terminates stream when condition becomes false
  • First false terminates immediately
  • Preserves temporal ordering until termination

Full documentation | Tests | Benchmarks

Transformation Operators

scan_ordered

Accumulates state across stream items, emitting intermediate results.

Use case: Running totals, moving averages, state machines, building collections over time

use fluxion_stream::ScanOrderedExt;
use fluxion_test_utils::Sequenced;

let sums = stream.scan_ordered::<Sequenced<i32>, _, _>(0, |acc, val| {
    *acc += val;
    *acc
});

Behavior:

  • Maintains accumulator state that persists across all items
  • Emits transformed value for each input
  • Errors propagate without affecting state
  • Can transform types (e.g., i32 → String)
  • Preserves timestamps from source items

Common Patterns:

  • Running sum/count: Accumulate numeric values
  • Building collections: Gather items into Vec/HashMap
  • State machines: Track state transitions with context
  • Moving calculations: Windowed statistics

Full documentation | Tests | Benchmarks

combine_with_previous

Pairs each value with the previous value.

Use case: Detect value changes or calculate deltas

use fluxion_stream::CombineWithPreviousExt;

let pairs = stream.combine_with_previous();

// Output: WithPrevious { previous: Some(1), current: 2 }

Behavior:

  • First item has previous = None
  • Subsequent items have previous = Some(prev)
  • Useful for change detection and delta calculations
  • Preserves temporal ordering

Full documentation | Tests | Benchmarks

window_by_count

Batches stream items into fixed-size windows.

Use case: Batch processing, aggregating metrics, reducing API calls

use fluxion_stream::WindowByCountExt;

let windowed = stream.window_by_count(3);

// Emits: vec![item1, item2, item3], vec![item4, item5, item6], ...

Behavior:

  • Collects items into windows of specified size
  • Emits complete window when size is reached
  • Emits partial window on stream completion
  • Errors pass through immediately (not batched)
  • Useful for batch processing and reducing downstream operations

Full documentation | Tests | Benchmarks

Utility Operators

map_ordered

Maps values while preserving ordering wrapper.

let mapped = stream.map_ordered(|x| x * 2);

Full documentation | Tests | Benchmarks

filter_ordered

Filters values while preserving ordering wrapper.

let filtered = stream.filter_ordered(|x| *x > 10);

Full documentation | Tests | Benchmarks

take_items

Emit only the first N items then complete.

Use case: Pagination, limiting results, testing

use fluxion_stream::{IntoFluxionStream, TakeItemsExt};

let limited = stream.take_items(10);
// Emits first 10 items, then completes

Behavior:

  • Emits at most N items then completes the stream
  • Errors count as items (use on_error() first to filter errors)
  • Stream completes immediately after emitting the Nth item
  • Useful for pagination, testing with limited data, or rate limiting
  • Temporal ordering preserved

Full documentation | Tests | Benchmarks

skip_items

Skip the first N items, emit all remaining.

Use case: Pagination (skip offset), ignoring warmup data

use fluxion_stream::{IntoFluxionStream, SkipItemsExt};

let after_skip = stream.skip_items(5);
// Discards first 5 items, emits all subsequent items

Behavior:

  • Discards first N items silently
  • Emits all items after the first N
  • Errors count as items (use on_error() first to filter errors)
  • Useful for pagination (offset) and skipping initial values
  • Temporal ordering preserved

Full documentation | Tests | Benchmarks

distinct_until_changed

Suppresses consecutive duplicate values.

Full documentation | Tests | Benchmarks

distinct_until_changed_by

Custom duplicate suppression with comparison function.

Use case: Field comparison, case-insensitive matching, threshold filtering, custom equality

Behavior:

  • Custom comparison function for flexible duplicate detection
  • No PartialEq requirement on inner type
  • Comparison returns true if values considered equal (filtered)
  • Follows Rust patterns: sort_by, dedup_by, max_by

Full documentation | Tests | Benchmarks

tap

Perform side-effects without transforming items.

Use case: Debugging, logging, metrics collection, tracing

use fluxion_stream::{IntoFluxionStream, TapExt};

let pipeline = rx.into_fluxion_stream()
    .tap(|x| println!("Input: {:?}", x))
    .map_ordered(|x| Sequenced::new(x.into_inner() * 2))
    .tap(|x| println!("After map: {:?}", x));

Behavior:

  • Pass-through operator: items flow unchanged
  • Callback invoked with reference to each value
  • Errors pass through unchanged (callback not invoked for errors)
  • Timestamp-preserving

Full documentation | Tests | Benchmarks

Error Handling Operators

on_error

Selectively consume or propagate errors using the Chain of Responsibility pattern.

Use case: Logging errors, metrics collection, conditional error recovery

use fluxion_stream::OnErrorExt;

let handled = stream
    .on_error(|err| {
        if err.to_string().contains("validation") {
            log::warn!("Validation error: {}", err);
            true // Consume validation errors
        } else {
            false // Propagate other errors
        }
    })
    .on_error(|_| {
        metrics::increment("errors");
        true // Catch-all
    });

Behavior:

  • Handler returns true to consume error (removes StreamItem::Error)
  • Handler returns false to propagate error downstream
  • Multiple on_error calls can be chained
  • Value items pass through unchanged
  • Enables side effects (logging, metrics) while filtering errors

Full documentation | Tests | Specification

Splitting Operators

partition

Splits a stream into two based on a predicate.

Use case: Error routing, priority queues, type routing, threshold filtering

use fluxion_stream::{IntoFluxionStream, PartitionExt};
use fluxion_test_utils::Sequenced;
use futures::StreamExt;

let (tx, rx) = futures::channel::mpsc::unbounded();

// Partition numbers into even and odd
let (mut evens, mut odds) = rx.into_fluxion_stream()
    .partition(|n: &i32| n % 2 == 0);

tx.send(Sequenced::new(1)).unwrap();
tx.send(Sequenced::new(2)).unwrap();
tx.send(Sequenced::new(3)).unwrap();
tx.send(Sequenced::new(4)).unwrap();
drop(tx);

// evens: 2, 4
// odds: 1, 3

Behavior:

  • Chain-breaking operator (returns two streams)
  • Spawns background routing task
  • Timestamp-preserving (original timestamps maintained)
  • Error propagation to both output streams
  • Unbounded internal buffers
  • Each item goes to exactly one output stream

Full documentation | Tests | Benchmarks

Multicasting Operators

share

Convert a cold stream into a hot, multi-subscriber broadcast source.

Use case: Share expensive computations across multiple consumers

use fluxion_stream::{IntoFluxionStream, ShareExt, FilterOrderedExt, MapOrderedExt};
use fluxion_test_utils::Sequenced;

let (tx, rx) = futures::channel::mpsc::unbounded::<Sequenced<i32>>();

// Source operators run ONCE
let source = rx.into_fluxion_stream()
    .map_ordered(|x: Sequenced<i32>| Sequenced::new(x.into_inner() * 2));

// Share among multiple subscribers
let shared = source.share();

// Each subscriber chains independently
let evens = shared.subscribe().unwrap()
    .filter_ordered(|x| x.into_inner() % 2 == 0);
let strings = shared.subscribe().unwrap()
    .map_ordered(|x: Sequenced<i32>| Sequenced::new(x.into_inner().to_string()));

Behavior:

  • Hot stream: Late subscribers do not receive past items
  • Shared execution: Source operators run once; results are broadcast to all
  • Subscription factory: Call subscribe() to create independent subscriber streams
  • Error propagation: Errors broadcast to all subscribers, then source closes

Full documentation | Tests | Benchmarks

Operator Selection Guide

When You Need Combined State

Operator Triggers On Output Best For
combine_latest Any stream emits Latest from all streams Dashboards, state aggregation
with_latest_from Primary emits Primary + context Enriching events with state
merge_with Any stream emits Transformed via state Repository pattern, event sourcing

When You Need All Items

Operator Output Ordering Best For
ordered_merge Every item Temporal Event logs, audit trails
combine_with_previous Pairs (prev, curr) Temporal Change detection, deltas
scan_ordered Accumulated state Temporal Running totals, state machines

When You Need Conditional Emission

Operator Buffering Termination Best For
emit_when Yes (buffers when gated) Source completes Conditional processing
take_latest_when No (only latest) Source completes Sampling, snapshots
take_while_with No First false Bounded processing
sample_ratio No Source completes Load reduction, logging sampling

When You Need Deduplication

Operator Comparison Requirement Best For
distinct_until_changed PartialEq Inner type must implement PartialEq Simple duplicate suppression
distinct_until_changed_by Custom function No trait requirements Field comparison, case-insensitive, threshold-based

When You Need Error Handling

Operator Consumes Errors Enables Side Effects Propagation Control Best For
on_error Selective Yes (logging, metrics) Handler-controlled Layered error handling, monitoring

When You Need Debugging / Observability

Operator Transforms Data Side Effects Best For
tap No (pass-through) Yes (logging, metrics) Debugging pipelines, tracing, metrics

When You Need Multicasting

Operator Late Subscribers Source Execution Best For
share Miss past items Once (broadcast) Sharing expensive computations, fan-out

When You Need Stream Splitting

Operator Outputs Routing Best For
partition Two streams By predicate Error routing, priority queues, threshold filtering

Quick Start

Add to your Cargo.toml:

[dependencies]

fluxion-stream = "0.5"

fluxion-core = "0.5"

tokio = { version = "1", features = ["full"] }

futures = "0.3"

Basic usage:

use fluxion_stream::{IntoFluxionStream, OrderedStreamExt};
use fluxion_test_utils::Sequenced;
use futures::StreamExt;

#[tokio::main]
async fn main() {
    // Create channels
    let (tx1, rx1) = futures::channel::mpsc::unbounded();
    let (tx2, rx2) = futures::channel::mpsc::unbounded();

    // Create streams
    let stream1 = rx1.into_fluxion_stream();
    let stream2 = rx2.into_fluxion_stream();

    // Merge in temporal order
    let mut merged = stream1.ordered_merge(vec![stream2]);

    // Send values (out of order)
    tx2.send(Sequenced::with_sequence(100, 1)).unwrap();
    tx1.send(Sequenced::with_sequence(200, 2)).unwrap();

    // Receive in temporal order
    let first = merged.next().await.unwrap().unwrap();
    assert_eq!(first.value, 100);  // seq=1 emitted first
}

Examples

Combine Latest for Dashboard

use fluxion_stream::{IntoFluxionStream, CombineLatestExt};
use fluxion_test_utils::Sequenced;
use futures::StreamExt;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let (cpu_tx, cpu_rx) = futures::channel::mpsc::unbounded::<Sequenced<i32>>();
    let (mem_tx, mem_rx) = futures::channel::mpsc::unbounded();

    let cpu_stream = cpu_rx.into_fluxion_stream();
    let mem_stream = mem_rx.into_fluxion_stream();

    let mut dashboard = cpu_stream.combine_latest(
        vec![mem_stream],
        |state| state.values().len() == 2

    );

    // Send metrics
    cpu_tx.send(Sequenced::with_sequence(45, 1)).unwrap();
    mem_tx.send(Sequenced::with_sequence(78, 2)).unwrap();

    // Get combined state
    if let Some(item) = dashboard.next().await {
        let state = item.unwrap();
        let values = state.get().values();
        println!("CPU: {}%, Memory: {}%", values[0], values[1]);
    }

    Ok(())
}

Filter with emit_when

use fluxion_stream::{IntoFluxionStream, EmitWhenExt};
use fluxion_test_utils::Sequenced;
use futures::StreamExt;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let (data_tx, data_rx) = futures::channel::mpsc::unbounded();
    let (gate_tx, gate_rx) = futures::channel::mpsc::unbounded();

    let data = data_rx.into_fluxion_stream();
    let gate = gate_rx.into_fluxion_stream();

    let mut gated = data.emit_when(gate, |open| *open);

    // Send data while gate is closed
    data_tx.send(Sequenced::with_sequence(1, 1)).unwrap();
    data_tx.send(Sequenced::with_sequence(2, 2)).unwrap();
    gate_tx.send(Sequenced::with_sequence(false, 3)).unwrap();

    // Open gate - buffered items released
    gate_tx.send(Sequenced::with_sequence(true, 4)).unwrap();

    // Items 1 and 2 are now emitted
    let first = gated.next().await.unwrap().unwrap();
    assert_eq!(first.value, 1);

    Ok(())
}

Running Total with scan_ordered

use fluxion_stream::{IntoFluxionStream, ScanOrderedExt};
use fluxion_test_utils::Sequenced;
use futures::StreamExt;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let (tx, rx) = futures::channel::mpsc::unbounded();
    let stream = rx.into_fluxion_stream();

    // Calculate running sum
    let mut running_sum = stream.scan_ordered::<Sequenced<i32>, _, _>(0, |acc, val| {
        *acc += val;
        *acc
    });

    tx.send(Sequenced::with_sequence(10, 1)).unwrap();
    tx.send(Sequenced::with_sequence(20, 2)).unwrap();
    tx.send(Sequenced::with_sequence(30, 3)).unwrap();

    // First sum: 0 + 10 = 10
    let first = running_sum.next().await.unwrap().unwrap();
    assert_eq!(first.value, 10);

    // Second sum: 10 + 20 = 30
    let second = running_sum.next().await.unwrap().unwrap();
    assert_eq!(second.value, 30);

    // Third sum: 30 + 30 = 60
    let third = running_sum.next().await.unwrap().unwrap();
    assert_eq!(third.value, 60);

    Ok(())
}

Change Detection with combine_with_previous

use fluxion_stream::{IntoFluxionStream, CombineWithPreviousExt};
use fluxion_test_utils::Sequenced;
use futures::StreamExt;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let (tx, rx) = futures::channel::mpsc::unbounded();
    let stream = rx.into_fluxion_stream();

    let mut pairs = stream.combine_with_previous();

    tx.send(Sequenced::with_sequence(10, 1)).unwrap();
    tx.send(Sequenced::with_sequence(15, 2)).unwrap();
    tx.send(Sequenced::with_sequence(15, 3)).unwrap();

    // First item - no previous
    let first = pairs.next().await.unwrap().unwrap();
    assert_eq!(first.get().current, 10);
    assert_eq!(first.get().previous, None);

    // Second item - has previous
    let second = pairs.next().await.unwrap().unwrap();
    let (prev, curr) = second.get().as_pair();
    assert_eq!(prev, Some(&10));
    assert_eq!(curr, &15);

    // Third item - detect no change
    let third = pairs.next().await.unwrap().unwrap();
    let (prev, curr) = third.get().as_pair();
    if prev == Some(curr) {
        println!("Value unchanged: {}", curr);
    }

    Ok(())
}

Testing

Run all tests:

cargo test

Run specific operator tests:

cargo test --test combine_latest_tests

cargo test --test ordered_merge_tests

cargo test --test emit_when_tests

Run with error tests:

cargo test combine_latest_error_tests

The crate includes comprehensive test coverage for:

  • Operator functionality (basic behavior)
  • Error propagation scenarios
  • Edge cases (empty streams, single items, etc.)
  • Temporal ordering correctness
  • Concurrent stream handling

License

Licensed under the Apache License, Version 2.0. See LICENSE for details.