fluxion-stream 0.1.0

Stream combinators with ordering guarantees for async Rust
Documentation
# fluxion-stream

Stream combinators for async Rust with strong temporal-ordering guarantees. This crate provides composable operators and lightweight sequencing utilities designed for correctness and performance in event-driven systems.

## Key features

- Temporal ordering via `Ordered` trait and sequence numbers
- Composable operators: `combine_latest`, `with_latest_from`, `ordered_merge`, `take_latest_when`, `take_while_with`, and more
- Efficient implementation with minimal allocations
- Integration with tokio streams

## Core concepts

### Ordered Trait

The `Ordered` trait is central to fluxion-stream. It provides a temporal ordering mechanism for stream items:

```rust
pub trait Ordered: Clone {
    type Inner: Clone;
    
    fn order(&self) -> u64;  // Temporal ordering value
    fn get(&self) -> &Self::Inner;  // Access inner value
    fn with_order(value: Self::Inner, order: u64) -> Self;
}
```

### Stream Operators

All operators preserve temporal ordering and handle concurrent streams correctly.

## Quick Examples

### Basic Stream Creation

```rust
use fluxion_stream::FluxionStream;
use futures::StreamExt;

#[tokio::main]
async fn main() {
    let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<i32>();
    let stream = FluxionStream::from_unbounded_receiver(rx);
    
    tx.send(1).unwrap();
    tx.send(2).unwrap();
    drop(tx);
    
    let values: Vec<_> = stream.collect().await;
    println!("{:?}", values); // [1, 2]
}
```

### Combining Streams with `ordered_merge`

```rust
use fluxion_stream::{FluxionStream, OrderedStreamExt};
use fluxion_test_utils::Sequenced;
use futures::StreamExt;

#[tokio::main]
async fn main() {
    // Create test streams with ordered values
    let (tx1, rx1) = tokio::sync::mpsc::unbounded_channel();
    let (tx2, rx2) = tokio::sync::mpsc::unbounded_channel();
    
    let stream1 = FluxionStream::from_unbounded_receiver(rx1);
    let stream2 = FluxionStream::from_unbounded_receiver(rx2);
    
    // Send ordered values
    tx1.send(Sequenced::with_sequence(1, 1)).unwrap();  // (value, sequence)
    tx2.send(Sequenced::with_sequence(2, 2)).unwrap();
    tx1.send(Sequenced::with_sequence(3, 3)).unwrap();
    
    drop(tx1);
    drop(tx2);
    
    // Merge streams in temporal order
    let merged = stream1.ordered_merge(vec![stream2]);
    let values: Vec<_> = merged.map(|s| s.value).collect().await;
    
    println!("{:?}", values); // [1, 2, 3] - ordered by sequence number
}
```

### Using `combine_latest`

```rust
use fluxion_stream::{FluxionStream, CombineLatestExt};
use futures::StreamExt;

// Combine multiple streams, emitting when any stream emits
// (after all have emitted at least once)
let combined = stream1.combine_latest(
    vec![stream2, stream3],
    |state| state.get_state().len() == 3  // Filter: all streams present
);
```

## Core modules

- `fluxion_stream` — Main `FluxionStream` type with extension methods
- Operator modules: `combine_latest`, `ordered_merge`, `with_latest_from`, `take_latest_when`, 
  `take_while_with`, `emit_when`, `combine_with_previous`

## Running tests

```powershell
cargo test --package fluxion-stream --all-features --all-targets
```

Documentation

```powershell
cargo doc --package fluxion-stream --no-deps --open
```

License

Apache-2.0