fluxion-rx 0.1.0

A reactive stream processing library with ordered semantics, friendly interface, and bullet-proof, state-of-the art test coverage and examples
Documentation

Fluxion

A reactive stream processing library with ordered semantics, friendly interface, and bullet-proof, state-of-the art test coverage and examples.

Overview

Fluxion provides a high-level API for working with ordered, reactive streams. It builds on top of the Rust async ecosystem (tokio, futures) and adds ordering guarantees and powerful stream composition operators.

Design Philosophy

Fluxion maintains a clean separation of concerns:

  • Production code: Use FluxionStream for composable, immutable stream transformations
  • Test code: Use TestChannel (from fluxion-test-utils) which adds push capabilities

This architecture solves the fundamental conflict between:

  • Consuming operations (stream extensions that take self)
  • Mutation operations (push that needs &self)

Quick Start

use fluxion::FluxionStream;
use futures::StreamExt;

#[tokio::main]
async fn main() {
    // Create a stream from a tokio channel
    let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<i32>();
    let stream = FluxionStream::from_unbounded_receiver(rx);

    // Send some values
    tx.send(1).unwrap();
    tx.send(2).unwrap();
    tx.send(3).unwrap();
    drop(tx);

    // Process the stream
    let sum: i32 = stream.fold(0, |acc, x| async move { acc + x }).await;
    println!("Sum: {}", sum);  // Prints: Sum: 6
}

Core Concepts

Ordered Trait

All stream operators work with types implementing the [Ordered] trait, which provides temporal ordering:

use fluxion::Ordered;

// Items must provide an order value
fn process_ordered<T: Ordered>(item: &T) {
    let order = item.order();  // Get temporal order
    let value = item.get();    // Get inner value
}

Stream Operators

Fluxion provides powerful stream composition operators:

  • combine_latest - Combine multiple streams, emitting when any emits
  • with_latest_from - Sample one stream using another as trigger
  • ordered_merge - Merge streams preserving temporal order
  • take_latest_when - Sample on filter condition
  • emit_when - Gate stream emissions based on conditions

See [fluxion_stream] for the complete list.

Workspace Structure

  • fluxion - Main crate (this crate), re-exports core types
  • [fluxion_core] - Core traits and utilities
  • [fluxion_stream] - Stream operators and combinators
  • fluxion_exec - Async execution and subscription utilities
  • fluxion_error - Error types and handling