Crate streamweave

Crate streamweave 

Source
Expand description

ยงStreamWeave

Crates.io Documentation CI codecov License: CC BY-SA 4.0

Composable, async, stream-first computation in pure Rust
Build fully composable, async data pipelines using a fluent API.

StreamWeave is a general-purpose Rust framework built around the concept of streaming data, with a focus on simplicity, composability, and performance.

High-Performance Streaming: Process 2-6 million messages per second with in-process zero-copy execution. Perfect for high-throughput data processing pipelines.

ยงโœจ Key Features

  • Pure Rust API with zero-cost abstractions
  • Full async/await compatibility via futures::Stream
  • Fluent pipeline-style API with type-safe builder pattern
  • Graph-based API for complex topologies with fan-in/fan-out patterns
  • Flow-Based Programming (FBP) patterns with type-safe routing
  • Comprehensive error handling system with multiple strategies
  • Code-as-configuration โ€” no external DSLs
  • Extensive package ecosystem for I/O, transformations, and integrations

ยง๐Ÿ“ฆ Core Concepts

StreamWeave breaks computation into three primary building blocks:

ComponentDescription
ProducerStarts a stream of data
TransformerTransforms stream items (e.g., map/filter)
ConsumerConsumes the stream, e.g. writing, logging

All components can be chained together fluently. These components can be used in both the Pipeline API (for simple linear flows) and the Graph API (for complex topologies with fan-in/fan-out patterns).

ยง๐Ÿ”€ Pipeline vs Graph API

StreamWeave provides two APIs for building data processing workflows:

FeaturePipeline APIGraph API
Use CaseSimple linear flowsComplex topologies
TopologySingle path: Producer โ†’ Transformer โ†’ ConsumerMultiple paths, fan-in/fan-out
RoutingSequential processingConfigurable routing strategies
ComplexityLower complexity, easier to useHigher flexibility, more powerful
Best ForETL pipelines, simple transformationsComplex workflows, parallel processing, data distribution

ยง๐Ÿš€ Quick Start

ยงInstallation

Add StreamWeave to your Cargo.toml:

[dependencies]
streamweave = "0.8.0"

ยงBasic Example

use streamweave::PipelineBuilder;
use streamweave_array::ArrayProducer;
use streamweave_transformers::MapTransformer;
use streamweave_vec::VecConsumer;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let pipeline = PipelineBuilder::new()
        .producer(ArrayProducer::new(vec![1, 2, 3, 4, 5]))
        .transformer(MapTransformer::new(|x: i32| x * 2))
        .consumer(VecConsumer::new());

    let ((), result) = pipeline.run().await?;
    println!("Result: {:?}", result.collected);
    Ok(())
}

For more examples and detailed documentation, see the package documentation below.

ยง๐Ÿ“ฆ Packages

StreamWeave is organized as a monorepo with 39 packages, each providing specific functionality. Each package has its own README with detailed documentation, examples, and API reference.

ยงCore Foundation Packages

These are the foundational packages that other packages depend on:

  • streamweave - Core traits and types (Producer, Transformer, Consumer)
  • error - Error handling system with multiple strategies
  • message - Message envelope and metadata
  • offset - Offset management for exactly-once processing
  • transaction - Transaction support and boundaries

ยงSystem Packages

Core system functionality:

  • pipeline - Pipeline builder and execution
  • graph - Graph API for complex topologies
  • stateful - Stateful processing and state management
  • window - Windowing operations (tumbling, sliding, session)

ยงI/O Packages

Standard I/O and file system operations:

  • stdio - Standard input/output streaming
  • file - File I/O operations
  • fs - File system operations and directory traversal
  • tempfile - Temporary file handling
  • path - Path manipulation and transformations

ยงData Format Packages

Data format parsing and serialization:

  • csv - CSV parsing and writing
  • jsonl - JSON Lines format support
  • parquet - Parquet format support

ยงDatabase Packages

Database integration:

ยงNetwork Packages

Network protocol integration:

  • kafka - Apache Kafka producer and consumer
  • redis - Redis Streams integration
  • http-server - HTTP graph server with Axum integration

ยงProducer/Consumer Packages

Various data source and sink implementations:

  • array - Array-based streaming
  • vec - Vector-based streaming
  • env - Environment variable streaming
  • command - Command execution and output streaming
  • process - Process management and monitoring
  • signal - Unix signal handling
  • timer - Time-based and interval-based streaming
  • tokio - Tokio channel integration

ยงTransformers Package

Comprehensive transformer implementations:

  • transformers - All transformer types including:
    • Basic: Map, Filter, Reduce
    • Advanced: Batch, Retry, CircuitBreaker, RateLimit
    • Stateful: RunningSum, MovingAverage
    • Routing: Router, Partition, RoundRobin
    • Merging: Merge, OrderedMerge, Interleave
    • ML: Inference, BatchedInference
    • Utility: Sample, Skip, Take, Limit, Sort, Split, Zip, Timeout, MessageDedupe

ยงIntegration and Utility Packages

Observability and integration capabilities:

ยง๐Ÿ“š Documentation

ยง๐Ÿ“– Examples

StreamWeave includes comprehensive examples demonstrating all major features. See the examples directory for:

  • Integration examples (Kafka, Redis, Database, HTTP)
  • File format examples (CSV, JSONL, Parquet)
  • Processing examples (Stateful, Error Handling, Windowing)
  • Visualization examples
  • Graph API examples

Run any example with:

cargo run --example <example_name> --features <required_features>

ยง๐Ÿค Contributing

Contributions are welcome! Please see our Contributing Guide for details.

ยง๐Ÿ“„ License

This project is licensed under the Creative Commons Attribution-ShareAlike 4.0 International License.

See [LICENSE](LICENSE) for details.

ยง๐Ÿ™ Acknowledgments

  • Built with Tokio for async runtime
  • HTTP support powered by Axum
  • Inspired by reactive programming patterns and stream processing frameworks

Re-exportsยง

pub use consumer::*;
pub use consumers::*;
pub use db::*;
pub use error::*;
pub use graph::*;
pub use input::*;
pub use message::*;
pub use metrics::*;
pub use ml::*;
pub use offset::*;
pub use output::*;
pub use pipeline::*;
pub use port::*;
pub use producer::*;
pub use producers::*;
pub use stateful_transformer::*;
pub use transaction::*;
pub use transformer::*;
pub use transformers::*;

Modulesยง

consumer
Consumer Trait
consumers
Consumer implementations for StreamWeave pipelines.
db
Database Integration Types
error
Error Handling System
graph
Graph-Based Execution for StreamWeave
input
Input trait for components that consume input streams.
message
Message envelope types for universal message-based processing.
metrics
Metrics Collection for StreamWeave Pipelines
ml
Machine Learning Module
offset
Offset Tracking for Resumable Pipeline Processing
output
Output trait for components that produce output streams.
pipeline
Pipeline Builder and Execution
port
Port Type System
producer
Producer Trait
producers
Producer Implementations
stateful_transformer
Stateful Transformer
transaction
Transaction support for exactly-once processing.
transformer
Transformer Trait
transformers
Transformers Module
window
Windowing operations for stream processing.