streamweave 0.10.1

Composable, async, stream-first computation in pure Rust
Documentation

StreamWeave

Crates.io Documentation CI codecov License: CC BY-SA 4.0

Composable, async, stream-first computation in pure Rust
Build fully composable, async data processing graphs using a declarative API.

StreamWeave is a general-purpose Rust framework built around the concept of streaming data, with a focus on simplicity, composability, and performance.

High-Performance Memory Management: Achieves optimal performance through advanced memory pooling techniques, including buffer pooling and string interning, significantly reducing allocation overhead and improving cache locality. This results in ultra-low latency and maximum throughput.

✨ Key Features

  • Pure Rust API with zero-cost abstractions
  • Full async/await compatibility via futures::Stream
  • Graph-based API for building data processing topologies
  • graph! macro for declarative graph construction with minimal syntax (80-90% less boilerplate)
  • Flow-Based Programming (FBP) patterns with type-safe routing
  • Support for fan-in patterns (multiple sources → one target)
  • Comprehensive error handling system with multiple strategies
  • Code-as-configuration — no external DSLs
  • Extensive package ecosystem for I/O, transformations, and integrations

📦 Core Concepts

StreamWeave uses a graph-based architecture where computation is organized as nodes connected by edges:

Component Description
Nodes Processing units that consume and produce streams
Edges Connections between node ports that route data
Graphs Collections of nodes and edges forming a processing topology

Nodes can have:

  • Input ports: Receive data from upstream nodes or external sources
  • Output ports: Send data to downstream nodes or external consumers
  • Zero or more inputs/outputs: Nodes can be sources (no inputs), sinks (no outputs), or transforms (both)

All data flows as Arc<dyn Any + Send + Sync> for zero-copy efficiency, with explicit port naming for clarity and type safety.

🚀 Quick Start

Installation

Add StreamWeave to your Cargo.toml:

[dependencies]
streamweave = "0.9.0"

Basic Example: Sum of Squares of Even Numbers

This example demonstrates the Graph API by implementing a classic algorithm: generating numbers 1-10, filtering even numbers, squaring them, and summing the results.

Using the Traditional Graph API

use std::any::Any;
use std::sync::Arc;
use streamweave::edge::Edge;
use streamweave::graph::Graph;
use streamweave::nodes::aggregation::SumNode;
use streamweave::nodes::filter_node::{FilterNode, filter_config};
use streamweave::nodes::map_node::{MapNode, map_config};
use streamweave::nodes::range_node::RangeNode;
use tokio::sync::mpsc;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create channels for external I/O
    let (range_start_tx, range_start_rx) = mpsc::channel(1);
    let (range_end_tx, range_end_rx) = mpsc::channel(1);
    let (range_step_tx, range_step_rx) = mpsc::channel(1);
    let (result_tx, mut result_rx) = mpsc::channel::<Arc<dyn Any + Send + Sync>>(10);

    // Build the graph
    let mut graph = Graph::new("sum_of_squares".to_string());

    // Add nodes
    graph.add_node("range".to_string(), Box::new(RangeNode::new("range".to_string())))?;
    graph.add_node("filter".to_string(), Box::new(FilterNode::new("filter".to_string())))?;
    graph.add_node("square".to_string(), Box::new(MapNode::new("square".to_string())))?;
    graph.add_node("sum".to_string(), Box::new(SumNode::new("sum".to_string())))?;

    // Connect nodes with edges
    graph.add_edge(Edge {
        source_node: "range".to_string(),
        source_port: "out".to_string(),
        target_node: "filter".to_string(),
        target_port: "in".to_string(),
    })?;
    graph.add_edge(Edge {
        source_node: "filter".to_string(),
        source_port: "out".to_string(),
        target_node: "square".to_string(),
        target_port: "in".to_string(),
    })?;
    graph.add_edge(Edge {
        source_node: "square".to_string(),
        source_port: "out".to_string(),
        target_node: "sum".to_string(),
        target_port: "in".to_string(),
    })?;

    // Expose input ports for range configuration
    graph.expose_input_port("range", "start", "start")?;
    graph.expose_input_port("range", "end", "end")?;
    graph.expose_input_port("range", "step", "step")?;
    
    // Expose output port for final result
    graph.expose_output_port("sum", "out", "result")?;

    // Connect external channels
    graph.connect_input_channel("start", range_start_rx)?;
    graph.connect_input_channel("end", range_end_rx)?;
    graph.connect_input_channel("step", range_step_rx)?;
    graph.connect_output_channel("result", result_tx)?;

    // Send range configuration
    range_start_tx.send(Arc::new(1i32) as Arc<dyn Any + Send + Sync>).await?;
    range_end_tx.send(Arc::new(11i32) as Arc<dyn Any + Send + Sync>).await?;
    range_step_tx.send(Arc::new(1i32) as Arc<dyn Any + Send + Sync>).await?;

    // Configure filter to keep only even numbers
    let (filter_config_tx, filter_config_rx) = mpsc::channel(1);
    graph.expose_input_port("filter", "configuration", "filter_config")?;
    graph.connect_input_channel("filter_config", filter_config_rx)?;
    
    // Configure map to square each number  
    let (square_config_tx, square_config_rx) = mpsc::channel(1);
    graph.expose_input_port("square", "configuration", "square_config")?;
    graph.connect_input_channel("square_config", square_config_rx)?;
    
    // Send configurations
    filter_config_tx.send(Arc::new(filter_config(|value| async move {
        if let Ok(arc_i32) = value.downcast::<i32>() {
            Ok(*arc_i32 % 2 == 0)
        } else {
            Err("Expected i32".to_string())
        }
    })) as Arc<dyn Any + Send + Sync>).await?;
    
    square_config_tx.send(Arc::new(map_config(|value| async move {
        if let Ok(arc_i32) = value.downcast::<i32>() {
            Ok(Arc::new(*arc_i32 * *arc_i32) as Arc<dyn Any + Send + Sync>)
        } else {
            Err("Expected i32".to_string())
        }
    })) as Arc<dyn Any + Send + Sync>).await?;

    // Execute the graph
    graph.execute().await?;

    // Wait for result
    if let Some(result) = result_rx.recv().await {
        if let Ok(sum) = result.downcast::<i32>() {
            println!("Sum of squares of even numbers (2² + 4² + 6² + 8² + 10²): {}", *sum);
            // Expected: 4 + 16 + 36 + 64 + 100 = 220
        }
    }

    graph.wait_for_completion().await?;
    Ok(())
}

Using the graph! Macro (Minimal Syntax)

The graph! macro provides a declarative syntax that reduces boilerplate significantly:

use streamweave::graph;
use streamweave::graph::Graph;
use streamweave::nodes::aggregation::SumNode;
use streamweave::nodes::filter_node::FilterNode;
use streamweave::nodes::map_node::MapNode;
use streamweave::nodes::range_node::RangeNode;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Build graph with minimal syntax
    let mut graph: Graph = graph! {
        range: RangeNode::new("range".to_string()),
        filter: FilterNode::new("filter".to_string()),
        square: MapNode::new("square".to_string()),
        sum: SumNode::new("sum".to_string()),
        ; range.out => filter.in,
          filter.out => square.in,
          square.out => sum.in,
          sum.out => graph.result
    };
    
    // Expose input ports and connect channels (same as traditional API)
    // ... (external I/O setup remains the same)
    
    // Execute the graph
    Graph::execute(&mut graph).await?;
    graph.wait_for_completion().await?;
    Ok(())
}

Key Benefits of graph! Macro:

  • 80-90% less boilerplate - Connections are declared declaratively
  • Visual clarity - Graph structure is immediately visible
  • Type safety - Compile-time validation of node names and connections
  • Explicit ports - All connections require explicit port names (no defaults)

For more graph! macro examples, see:

  • examples/graph_macro_simple.rs - Simple linear pipeline
  • examples/graph_macro_fan_patterns.rs - Fan-in patterns
  • examples/graph_macro_io.rs - Graph I/O patterns

Graph Visualization

The following Mermaid diagram shows the graph structure with nodes that have multiple inputs and outputs:

graph LR
    Start[Range Config<br/>start: 1<br/>end: 11<br/>step: 1] -->|start| Range[RangeNode<br/>Inputs: start, end, step<br/>Outputs: out]
    Start -->|end| Range
    Start -->|step| Range
    
    Range -->|out| Filter[FilterNode<br/>Inputs: configuration, in<br/>Outputs: out, error]
    FilterConfig[Filter Config<br/>even numbers] -->|configuration| Filter
    
    Filter -->|out| Square[MapNode<br/>Inputs: configuration, in<br/>Outputs: out, error]
    SquareConfig[Map Config<br/>square function] -->|configuration| Square
    
    Square -->|out| Sum[SumNode<br/>Inputs: configuration, in<br/>Outputs: out, error]
    
    Sum -->|out| Result[Result: 220]
    
    style Range fill:#e1f5ff
    style Filter fill:#fff4e1
    style Square fill:#fff4e1
    style Sum fill:#e8f5e9
    style Result fill:#f3e5f5

This graph demonstrates:

  • Multiple inputs: RangeNode receives start, end, and step from separate sources
  • Linear processing: Data flows through Range → Filter → Square → Sum
  • Configuration ports: FilterNode and MapNode receive configuration on separate ports
  • Multiple outputs: Each node has both out and error ports (error ports not shown for clarity)
  • Graph I/O: External configuration and results flow through exposed graph ports

For more examples and detailed documentation, see the package documentation below.

📦 Node Modules

StreamWeave is organized as a monorepo with 13 core node modules, each providing specific functionality. Each module provides examples and API reference.

Core Node Modules

StreamWeave's core functionality is built around a flexible node system. These modules provide the fundamental building blocks for data processing, including:

  • Advanced Node Operations: advanced/ (e.g., Break, Continue, Repeat, Retry, Switch, TryCatch)
  • Aggregation Nodes: aggregation/ (e.g., Average, Count, Max, Min, Sum)
  • Arithmetic Nodes: arithmetic/ (e.g., Add, Divide, Modulo, Multiply, Power, Subtract)
  • Array Nodes: array/ (e.g., Concat, Contains, Flatten, Index, Length, Reverse, Slice, Sort, Split, Unique)
  • Boolean Logic Nodes: boolean_logic/ (e.g., And, Nand, Nor, Not, Or, Xor)
  • Comparison Nodes: comparison/ (e.g., Equal, GreaterThan, LessThan, NotEqual)
  • Math Nodes: math/ (e.g., Abs, Ceil, Exp, Floor, Log, Max, Min, Round, Sqrt)
  • Object Nodes: object/ (e.g., HasProperty, Keys, Merge, Property, SetProperty, Size)
  • Reduction Nodes: reduction/ (e.g., Aggregate, GroupBy, Reduce, Scan)
  • Stream Processing Nodes: stream/ (e.g., Buffer, Debounce, Distinct, Filter, Map, Merge, Sample, Skip, Take, Throttle, Window, Zip)
  • String Manipulation Nodes: string/ (e.g., Append, Capitalize, CharAt, Concat, Contains, EndsWith, Format, IndexOf, Join, Length, Lowercase, Match, Prepend, Replace, Slice, Split, StartsWith, Trim, Uppercase)
  • Time-based Nodes: time/ (e.g., CurrentTime, Delay, FormatTime, ParseTime, Timeout, Timer, Timestamp)
  • Type Operation Nodes: type_ops/ (e.g., IsArray, IsBoolean, IsFloat, IsInt, IsNull, IsNumber, IsObject, IsString, ToArray, ToBoolean, ToFloat, ToInt, ToNumber, ToString, TypeOf)
  • Variable Nodes: variable/ (e.g., ReadVariable, WriteVariable)
  • Flow Control Nodes: while_loop_node.rs (e.g., WhileLoop)

For detailed examples and usage of each node, please refer to the examples directory.

📚 Documentation

📖 Examples

StreamWeave includes comprehensive examples demonstrating all major features. See the examples directory for:

  • Graph Macro Examples:
    • graph_macro_simple.rs - Simple linear pipeline with graph! macro
    • graph_macro_fan_patterns.rs - Fan-in patterns (fan-out not supported)
    • graph_macro_io.rs - Graph I/O with values and without values
  • Integration examples (Kafka, Redis, Database, HTTP)
  • File format examples (CSV, JSONL, Parquet)
  • Processing examples (Stateful, Error Handling, Windowing)
  • Visualization examples
  • Graph API examples

Run any example with:

cargo run --example <example_name> --features <required_features>

🤝 Contributing

Contributions are welcome! Please see our Contributing Guide for details.

📄 License

This project is licensed under the Creative Commons Attribution-ShareAlike 4.0 International License.

See [LICENSE](LICENSE) for details.

🙏 Acknowledgments

  • Built with Tokio for async runtime
  • HTTP support powered by Axum
  • Inspired by reactive programming patterns and stream processing frameworks