fluxion-core 0.8.0

Core traits and types for ordered stream processing
Documentation

fluxion-core

Part of Fluxion - A reactive stream processing library for Rust

Core traits and types for ordered stream processing in async Rust.

Overview

This crate provides the foundational abstractions used throughout the Fluxion ecosystem:

  • Timestamped trait: Temporal ordering for stream items via timestamps
  • StreamItem<T>: Error-aware stream item wrapper (Value | Error)
  • FluxionSubject<T>: Hot, multi-subscriber broadcast subject
  • FluxionError: Unified error type for stream operations
  • Lock utilities: Safe mutex operations with error propagation

Key Types

Timestamp Traits

Fluxion-core provides two traits for temporal ordering:

HasTimestamp - Read-Only Access

Minimal trait for types that expose a timestamp value:

pub trait HasTimestamp {
    type Timestamp: Ord + Copy + Send + Sync + core::fmt::Debug;

    fn timestamp(&self) -> Self::Timestamp;  // Get timestamp for ordering
}

Use this when your type only needs to provide a timestamp for ordering (most common case).

Timestamped - Full Wrapper Interface

Extends HasTimestamp with an Inner type and construction methods for wrapper types:

pub trait Timestamped: HasTimestamp {
    type Inner: Clone;

    fn with_timestamp(value: Self::Inner, timestamp: Self::Timestamp) -> Self;
    fn into_inner(self) -> Self::Inner;
}

Use this for wrapper types like Sequenced<T> that wrap an inner value with a timestamp.

FluxionSubject

A hot, multi-subscriber broadcast subject for reactive programming patterns:

use fluxion_core::{FluxionSubject, StreamItem};
use futures::StreamExt;

#[tokio::main]
async fn main() {
    let subject = FluxionSubject::new();

    // Subscribe before sending - hot subject, no replay
    let mut stream1 = subject.subscribe();
    let mut stream2 = subject.subscribe();

    // Send to all subscribers
    subject.send(StreamItem::Value(42)).unwrap();

    // Both subscribers receive the value
    assert_eq!(stream1.next().await.unwrap().unwrap(), 42);
    assert_eq!(stream2.next().await.unwrap().unwrap(), 42);
}

Key Characteristics:

  • Hot: Late subscribers only receive items sent after they subscribe (no replay buffer)
  • Multi-subscriber: Broadcasts each item to all active subscribers simultaneously
  • Thread-safe: Uses Arc<Mutex<>> internally - cheap to clone, safe to send across threads
  • Automatic cleanup: Dead subscribers are removed on next send() (no memory leaks)
  • Unbounded: Uses unbounded mpsc channels (no backpressure)

Subject Lifecycle:

let subject = FluxionSubject::new();

// Clone shares the same subject state
let subject_clone = subject.clone();

// Subscribe on any clone
let stream = subject_clone.subscribe();

// Send on any clone - all subscribers receive it
subject.send(StreamItem::Value(1)).unwrap();

// Error terminates all subscribers
subject.error(FluxionError::stream_error("failed"));

// Explicit close completes all subscribers
subject.close();

Thread Safety:

let subject = FluxionSubject::new();

// Safe to share across async tasks
tokio::spawn({
    let subject = subject.clone();
    async move {
        subject.send(StreamItem::Value(1)).unwrap();
    }
});

tokio::spawn({
    let subject = subject.clone();
    async move {
        subject.subscribe();
    }
});

Common Patterns:

  1. Event Bus: Broadcast domain events to multiple handlers
  2. State Updates: Notify observers of state changes
  3. Message Fanout: Distribute work to multiple consumers
  4. Test Doubles: Injectable subjects for testing reactive flows

When to Use:

  • ✅ Multiple subscribers need the same stream
  • ✅ Subscribers can join/leave dynamically
  • ✅ No replay needed (hot semantics)
  • ✅ Unbounded buffers acceptable

When NOT to Use:

  • ❌ Need cold semantics (replay to new subscribers)
  • ❌ Need backpressure (bounded channels)
  • ❌ Single subscriber (use channels directly)
  • ❌ Persistent event log (use actual event store)

StreamItem

Error-aware wrapper for stream values:

pub enum StreamItem<T> {
    Value(T),
    Error(FluxionError),
}

Enables error propagation through operator chains without terminating the stream. See the Error Handling Guide for details.

Architecture Notes

Why FluxionSubject Uses Arc<Mutex<>>

The subject's design requires interior mutability with shared ownership:

Operations requiring mutation:

  • send() - broadcasts to all subscribers AND removes dead ones
  • subscribe() - adds new subscriber to the list
  • close() - sets closed flag and clears subscribers

Why Arc:

  • Subject is Clone - multiple handles can exist
  • All clones share the same subscriber list and state
  • Enables passing to different async tasks

Why Mutex:

  • Multiple threads/tasks can call operations concurrently
  • Prevents data races on the subscriber Vec
  • Ensures consistent state across send()/subscribe()/close()

Alternative considered: &mut self methods would prevent cloning and multi-task sharing - defeats the purpose of a broadcast subject.

Dead Subscriber Cleanup

Subscribers are automatically cleaned up during send():

// For each send, remove disconnected subscribers
for tx in state.senders.drain(..) {
    if tx.unbounded_send(item.clone()).is_ok() {
        next_senders.push(tx);  // Keep alive
    }
    // Dead subscribers dropped here
}

This prevents memory leaks when subscribers drop their streams without explicitly unsubscribing.

Usage

Add this to your Cargo.toml:

[dependencies]

fluxion-core = "0.8.0"

License

Apache-2.0