streamweave 0.2.0

Composable, async, stream-first computation in pure Rust
Documentation

StreamWeave

Crates.io Documentation CI codecov License: CC BY-SA 4.0

Composable, async, stream-first computation in pure Rust
Build fully composable, async data pipelines using a fluent API.

StreamWeave is a general-purpose Rust framework built around the concept of streaming data, with a focus on simplicity, composability, and performance. It supports WASM targets for the browser or server and does not rely on browser-specific stream primitives.

โœจ Key Features

โœ… Implemented

  • Pure Rust API with zero-cost abstractions
  • Full async/await compatibility via futures::Stream
  • Fluent pipeline-style API with type-safe builder pattern
  • Comprehensive error handling system with multiple strategies (Stop, Skip, Retry, Custom)
  • Code-as-configuration โ€” no external DSLs
  • Comprehensive test infrastructure
  • Integration Examples: Kafka, Redis Streams, Database (PostgreSQL/MySQL/SQLite), HTTP Polling
  • File Format Support: CSV, JSONL, Parquet with streaming parsing
  • Stateful Processing: RunningSum, MovingAverage transformers
  • Exactly-Once Processing: Message deduplication with configurable windows
  • Windowing Operations: Tumbling, sliding, and count-based windows
  • Advanced Transformers: CircuitBreaker, Retry, Batch, RateLimit
  • Common Transformers: Map, Filter, Flatten, Reduce, and many more
  • HTTP middleware support with Axum integration
  • WebSocket support
  • Server-Sent Events support

๐Ÿšง Planned

  • Support distributed processing
  • Fan-in/fan-out support
  • WASM-specific optimizations and documentation
  • Additional specialized transformers and utilities
  • Reusable pipeline components
  • Add machine learning integration
  • Implement monitoring and metrics
  • Add SQL-like querying
  • Add visualization tools

๐Ÿ“ฆ Core Concepts

StreamWeave breaks computation into three primary building blocks:

Component Description
Producer Starts a stream of data
Transformer Transforms stream items (e.g., map/filter)
Consumer Consumes the stream, e.g. writing, logging

All components can be chained together fluently.

๐Ÿ”„ Example Pipeline

โœ… Currently Possible

use streamweave::{
    consumers::console::console_consumer::ConsoleConsumer,
    pipeline::PipelineBuilder,
    producers::range::range_producer::RangeProducer,
    transformers::map::map_transformer::MapTransformer,
};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create a pipeline that:
    // 1. Produces numbers from 1 to 5
    // 2. Doubles each number
    // 3. Prints the result to the console
    let pipeline = PipelineBuilder::new()
        .producer(RangeProducer::new(1, 6, 1))
        .transformer(MapTransformer::new(|x: i32| x * 2))
        .consumer(ConsoleConsumer::new());

    // Run the pipeline
    pipeline.run().await?;
    Ok(())
}

๐Ÿงฑ API Overview

โœ… Implemented Pipeline Construction

PipelineBuilder::new()
    .producer(...)    // Add data source
    .transformer(...) // Add transformation
    .consumer(...)    // Add data sink
    .run()           // Execute pipeline

โœ… Error Handling

StreamWeave provides two levels of error handling:

  1. Pipeline Level
// Default behavior: Pipeline stops on first error
pipeline.run().await?;

// Configure pipeline-wide error handling
pipeline
    .with_error_strategy(ErrorStrategy::Stop)  // Default
    .with_error_strategy(ErrorStrategy::Skip)  // Skip errored items
    .with_error_strategy(ErrorStrategy::Retry(3))  // Retry 3 times
    .run()
    .await?;
  1. Component Level
// Override error handling for specific components
MapTransformer::new(parse)
    .with_error_strategy(ErrorStrategy::Stop)      // Stop component and pipeline
    .with_error_strategy(ErrorStrategy::Skip)      // Skip errors, continue processing
    .with_error_strategy(ErrorStrategy::Retry(3))  // Retry operation 3 times
    .with_error_strategy(ErrorStrategy::Custom(|err| {
        // Custom error handling logic
        ErrorAction::Skip
    }));

๐Ÿš€ Getting Started

Installation

Add StreamWeave to your Cargo.toml:

[dependencies]
streamweave = "0.2.0"

Basic Usage

use streamweave::{
    consumers::vec::vec_consumer::VecConsumer,
    pipeline::PipelineBuilder,
    producers::array::array_producer::ArrayProducer,
    transformers::map::map_transformer::MapTransformer,
};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let producer = ArrayProducer::new(vec![1, 2, 3, 4, 5]);
    let transformer = MapTransformer::new(|x: i32| x * 2);
    let consumer = VecConsumer::new();

    let pipeline = PipelineBuilder::new()
        .producer(producer)
        .transformer(transformer)
        .consumer(consumer);

    let ((), result) = pipeline.run().await?;
    println!("Result: {:?}", result.collected);
    Ok(())
}

๐Ÿงช Testing Pipelines

โœ… Implemented

The framework includes comprehensive test infrastructure for unit testing pipelines and components:

// Example from the test suite
let producer = NumberProducer { numbers: vec![1, 2, 3] };
let transformer = StringifyTransformer;
let consumer = CollectConsumer { collected: Vec::new() };

let (_, consumer) = PipelineBuilder::new()
    .producer(producer)
    .transformer(transformer)
    .consumer(consumer)
    .run()
    .await
    .unwrap();

assert_eq!(consumer.collected, vec!["1", "2", "3"]);

๐Ÿ“š Documentation

๐Ÿ“– Examples

StreamWeave includes comprehensive examples demonstrating all major features:

Integration Examples

File Format Examples

  • File Formats - CSV, JSONL, and Parquet read/write with streaming parsing

Processing Examples

Basic Examples

Run any example with:

cargo run --example <example_name> --features <required_features>

๐Ÿค Contributing

Contributions are welcome! Please see our Contributing Guide for details.

๐Ÿ“„ License

This project is licensed under the Creative Commons Attribution-ShareAlike 4.0 International License.

See LICENSE for details.

๐Ÿ™ Acknowledgments

  • Built with Tokio for async runtime
  • HTTP support powered by Axum
  • Inspired by reactive programming patterns and stream processing frameworks