fluxion-core 0.6.0

Core traits and types for ordered stream processing
Documentation
# fluxion-core


> **Part of [Fluxion]../README.md** - A reactive stream processing library for Rust

Core traits and types for ordered stream processing in async Rust.

## Overview


This crate provides the foundational abstractions used throughout the Fluxion ecosystem:

- **`Timestamped` trait**: Temporal ordering for stream items via timestamps
- **`StreamItem<T>`**: Error-aware stream item wrapper (`Value` | `Error`)
- **`FluxionSubject<T>`**: Hot, multi-subscriber broadcast subject
- **`FluxionError`**: Unified error type for stream operations
- **Lock utilities**: Safe mutex operations with error propagation

## Key Types


### Timestamp Traits


Fluxion-core provides two traits for temporal ordering:

#### HasTimestamp - Read-Only Access


Minimal trait for types that expose a timestamp value:

```rust
pub trait HasTimestamp {
    type Timestamp: Ord + Copy + Send + Sync + std::fmt::Debug;

    fn timestamp(&self) -> Self::Timestamp;  // Get timestamp for ordering
}
```

Use this when your type only needs to provide a timestamp for ordering (most common case).

#### Timestamped - Full Wrapper Interface


Extends `HasTimestamp` with an `Inner` type and construction methods for wrapper types:

```rust
pub trait Timestamped: HasTimestamp {
    type Inner: Clone;

    fn with_timestamp(value: Self::Inner, timestamp: Self::Timestamp) -> Self;
    fn with_fresh_timestamp(value: Self::Inner) -> Self;
    fn into_inner(self) -> Self::Inner;
}
```

Use this for wrapper types like `Sequenced<T>` that wrap an inner value with a timestamp.

### FluxionSubject<T>


A hot, multi-subscriber broadcast subject for reactive programming patterns:

```rust
use fluxion_core::{FluxionSubject, StreamItem};
use futures::StreamExt;

#[tokio::main]

async fn main() {
    let subject = FluxionSubject::new();

    // Subscribe before sending - hot subject, no replay
    let mut stream1 = subject.subscribe();
    let mut stream2 = subject.subscribe();

    // Send to all subscribers
    subject.send(StreamItem::Value(42)).unwrap();

    // Both subscribers receive the value
    assert_eq!(stream1.next().await.unwrap().unwrap(), 42);
    assert_eq!(stream2.next().await.unwrap().unwrap(), 42);
}
```

**Key Characteristics:**

- **Hot**: Late subscribers only receive items sent *after* they subscribe (no replay buffer)
- **Multi-subscriber**: Broadcasts each item to all active subscribers simultaneously
- **Thread-safe**: Uses `Arc<Mutex<>>` internally - cheap to clone, safe to send across threads
- **Automatic cleanup**: Dead subscribers are removed on next `send()` (no memory leaks)
- **Unbounded**: Uses unbounded mpsc channels (no backpressure)

**Subject Lifecycle:**

```rust
let subject = FluxionSubject::new();

// Clone shares the same subject state
let subject_clone = subject.clone();

// Subscribe on any clone
let stream = subject_clone.subscribe();

// Send on any clone - all subscribers receive it
subject.send(StreamItem::Value(1)).unwrap();

// Error terminates all subscribers
subject.error(FluxionError::stream_error("failed"));

// Explicit close completes all subscribers
subject.close();
```

**Thread Safety:**

```rust
let subject = FluxionSubject::new();

// Safe to share across async tasks
tokio::spawn({
    let subject = subject.clone();
    async move {
        subject.send(StreamItem::Value(1)).unwrap();
    }
});

tokio::spawn({
    let subject = subject.clone();
    async move {
        subject.subscribe();
    }
});
```

**Common Patterns:**

1. **Event Bus**: Broadcast domain events to multiple handlers
2. **State Updates**: Notify observers of state changes
3. **Message Fanout**: Distribute work to multiple consumers
4. **Test Doubles**: Injectable subjects for testing reactive flows

**When to Use:**
- ✅ Multiple subscribers need the same stream
- ✅ Subscribers can join/leave dynamically
- ✅ No replay needed (hot semantics)
- ✅ Unbounded buffers acceptable

**When NOT to Use:**
- ❌ Need cold semantics (replay to new subscribers)
- ❌ Need backpressure (bounded channels)
- ❌ Single subscriber (use channels directly)
- ❌ Persistent event log (use actual event store)

### StreamItem<T>


Error-aware wrapper for stream values:

```rust
pub enum StreamItem<T> {
    Value(T),
    Error(FluxionError),
}
```

Enables error propagation through operator chains without terminating the stream. See the [Error Handling Guide](../docs/ERROR-HANDLING.md) for details.

## Architecture Notes


### Why FluxionSubject Uses Arc<Mutex<>>


The subject's design requires interior mutability with shared ownership:

**Operations requiring mutation:**
- `send()` - broadcasts to all subscribers AND removes dead ones
- `subscribe()` - adds new subscriber to the list
- `close()` - sets closed flag and clears subscribers

**Why Arc:**
- Subject is `Clone` - multiple handles can exist
- All clones share the same subscriber list and state
- Enables passing to different async tasks

**Why Mutex:**
- Multiple threads/tasks can call operations concurrently
- Prevents data races on the subscriber `Vec`
- Ensures consistent state across `send()`/`subscribe()`/`close()`

**Alternative considered:** `&mut self` methods would prevent cloning and multi-task sharing - defeats the purpose of a broadcast subject.

### Dead Subscriber Cleanup


Subscribers are automatically cleaned up during `send()`:

```rust
// For each send, remove disconnected subscribers
for tx in state.senders.drain(..) {
    if tx.unbounded_send(item.clone()).is_ok() {
        next_senders.push(tx);  // Keep alive
    }
    // Dead subscribers dropped here
}
```

This prevents memory leaks when subscribers drop their streams without explicitly unsubscribing.

## Usage


Add this to your `Cargo.toml`:

```toml
[dependencies]
fluxion-core = "0.6.0"
```

## License


Apache-2.0