Crate fluxion_rx

Crate fluxion_rx 

Source
Expand description

§Fluxion

A reactive stream processing library with ordered semantics, friendly interface, and bullet-proof, state-of-the art test coverage and examples.

§Overview

Fluxion provides a high-level API for working with ordered, reactive streams. It builds on top of the Rust async ecosystem (tokio, futures) and adds ordering guarantees and powerful stream composition operators.

§Design Philosophy

Fluxion maintains a clean separation of concerns:

  • Production code: Use FluxionStream for composable, immutable stream transformations
  • Test code: Use async_channel for imperative test setup

This architecture solves the fundamental conflict between:

  • Consuming operations (stream extensions that take self)
  • Mutation operations (sending values via channels)

§Quick Start

use fluxion_rx::prelude::*;
use futures::StreamExt;

#[tokio::main]
async fn main() {
    // Create a stream from an async channel
    let (tx, rx) = async_channel::unbounded::<i32>();
    let stream = rx.into_fluxion_stream();

    // Send some values
    tx.try_send(1).unwrap();
    tx.try_send(2).unwrap();
    tx.try_send(3).unwrap();
    drop(tx);

    // Process the stream (unwrap StreamItem values)
    let sum: i32 = stream.fold(0, |acc, x| async move {
        acc + x.unwrap()
    }).await;
    println!("Sum: {}", sum);  // Prints: Sum: 6
}

§Core Concepts

§Timestamped Trait

All stream operators work with types implementing the Timestamped trait, which provides temporal ordering:

use fluxion_rx::Timestamped;

// Items must provide a timestamp
fn process_timestamped<T: Timestamped>(item: T) {
    let ts = item.timestamp();     // Get timestamp for ordering
    let value = item.into_inner();  // Extract inner value
}

§Stream Operators

Fluxion provides powerful stream composition operators:

  • combine_latest - Combine multiple streams, emitting when any emits
  • with_latest_from - Sample one stream using another as trigger
  • ordered_merge - Merge streams preserving temporal order
  • take_latest_when - Sample on filter condition
  • emit_when - Gate stream emissions based on conditions

See fluxion_stream for the complete list.

§Workspace Structure

  • fluxion - Main crate (this crate), re-exports core types
  • fluxion_core - Core traits and utilities
  • fluxion_stream - Stream operators and combinators
  • fluxion_exec - Async execution and subscription utilities

Re-exports§

pub use fluxion_exec;

Modules§

prelude
Prelude module for convenient imports.

Structs§

CombinedState
State container holding the latest values from multiple combined streams.
WithPrevious
Represents a value paired with its previous value in the stream.

Traits§

HasTimestamp
A minimal trait for types that have an intrinsic timestamp for stream ordering.
IntoStream
A trait for types that can be converted into a Stream.
Timestamped
A trait for types that have an intrinsic timestamp for stream ordering.