streamweave 0.2.1

Composable, async, stream-first computation in pure Rust
Documentation
# StreamWeave Graph Architecture

## 🎯 Why a Graph Architecture?

StreamWeave started with a simple, linear pipeline model that worked well for
basic streaming operations. However, as we added more complex features like
fan-out/fan-in patterns, stateful processing, and distributed computing, we
realized we needed a more flexible architecture. The graph model emerged as the
natural solution because:

1. **Natural Representation**: Many real-world data flows are naturally
represented as graphs
2. **Flexibility**: Graphs can represent complex topologies beyond simple linear
pipelines
3. **Scalability**: Easy to distribute processing across multiple nodes
4. **Maintainability**: Clear separation of concerns between routing and
processing
5. **Visualization**: Graphs are easy to visualize and debug

## 🏗️ Core Components

### Node Types

```rust
// Base Node with input/output routing
pub struct Node<I, O> {
    name: String,
    input_router: Box<dyn InputRouter<I>>,
    transformer: Box<dyn Transformer<I, O>>,
    output_router: Box<dyn OutputRouter<O>>,
    config: NodeConfig,
}

// Producer Node (source)
pub struct ProducerNode<O> {
    name: String,
    producer: Box<dyn Producer<O>>,
    output_router: Box<dyn OutputRouter<O>>,
    config: NodeConfig,
}

// Consumer Node (sink)
pub struct ConsumerNode<I> {
    name: String,
    input_router: Box<dyn InputRouter<I>>,
    consumer: Box<dyn Consumer<I>>,
    config: NodeConfig,
}
```

### Router Traits

Routers handle the flow of data between nodes:

```rust
pub trait InputRouter<I> {
    fn route(&mut self, input: I) -> Vec<(String, I)>;  // (port_name, data)
}

pub trait OutputRouter<O> {
    fn route(&mut self, output: O) -> Vec<(String, O)>;  // (port_name, data)
}
```

### Router Implementations

We provide several built-in routing strategies:

```rust
// Round-robin routing
pub struct RoundRobinRouter<T> {
    ports: Vec<String>,
    next_port: usize,
}

// Broadcast to all ports
pub struct BroadcastRouter<T> {
    ports: Vec<String>,
}

// Route based on a key
pub struct KeyBasedRouter<T, K> {
    ports: HashMap<K, String>,
    key_fn: Box<dyn Fn(&T) -> K>,
}

// Merge multiple inputs
pub struct MergeRouter<T> {
    ports: Vec<String>,
    merge_strategy: MergeStrategy,
}
```

## 🎨 Graph Structure

The graph manages nodes and their connections:

```rust
pub struct Graph {
    nodes: HashMap<String, Box<dyn Node>>,
    connections: Vec<Connection>,
}

pub struct Connection {
    source: (String, String),  // (node_name, output_port)
    target: (String, String),  // (node_name, input_port)
    config: ConnectionConfig,
}
```

## 🚀 Example Usage

Here's how to build a complex streaming graph:

```rust
let mut graph = Graph::new();

// Create a producer node with broadcast routing
let producer = ProducerNode::new(
    "source",
    FileProducer::new("input.txt"),
    BroadcastRouter::new(vec!["out1", "out2"]),
);

// Create a processing node with merge and key-based routing
let processor = Node::new(
    "processor",
    MergeRouter::new(vec!["in1", "in2"], MergeStrategy::RoundRobin),
    ProcessTransformer::new(),
    KeyBasedRouter::new(
        |item: &ProcessedItem| item.category.clone(),
        vec![
            ("category1", "out1"),
            ("category2", "out2"),
        ],
    ),
);

// Create a consumer node with round-robin input routing
let consumer = ConsumerNode::new(
    "sink",
    RoundRobinRouter::new(vec!["in1", "in2"]),
    FileConsumer::new("output.txt"),
);

// Add nodes to graph
graph.add_node("source", producer);
graph.add_node("processor", processor);
graph.add_node("sink", consumer);

// Connect nodes
graph.connect(
    ("source", "out1"),
    ("processor", "in1"),
)?;
graph.connect(
    ("source", "out2"),
    ("processor", "in2"),
)?;
graph.connect(
    ("processor", "out1"),
    ("sink", "in1"),
)?;
graph.connect(
    ("processor", "out2"),
    ("sink", "in2"),
)?;

// Run the graph
graph.run().await?;
```

## 🔄 Advanced Features

The graph architecture naturally supports our planned advanced features:

### Stateful Processing

```rust
let stateful_node = Node::new(
    "stateful",
    MergeRouter::new(vec!["in"], MergeStrategy::Ordered),
    StatefulTransformer::new(
        HashMap::new(),
        |state, item| {
            // Update state and return result
            state.entry(item.key).or_insert(0) += 1;
            (state.clone(), item)
        }
    ),
    BroadcastRouter::new(vec!["out"]),
);
```

### Exactly-Once Processing

```rust
let exactly_once_node = Node::new(
    "exactly_once",
    MergeRouter::new(vec!["in"], MergeStrategy::Ordered),
    ExactlyOnceTransformer::new(
        RedisCheckpointStore::new("redis://localhost"),
        |item| async {
            // Process item
            process_item(&item).await
        }
    ),
    BroadcastRouter::new(vec!["out"]),
);
```

### Windowing Operations

```rust
let windowing_node = Node::new(
    "windowing",
    MergeRouter::new(vec!["in"], MergeStrategy::TimeOrdered),
    WindowTransformer::new(
        WindowType::Time(Duration::from_secs(60)),
        |window| {
            // Process window
            process_window(&window)
        }
    ),
    BroadcastRouter::new(vec!["out"]),
);
```

## 🎯 Benefits

1. **Clean Separation of Concerns**
   - Routing logic is separate from transformation logic
   - Each component has a single responsibility
   - Easy to swap out different routing strategies

2. **Flexible Routing**
   - Different routing strategies can be mixed and matched
   - Easy to add new routing strategies
   - Clear interface for routing decisions

3. **Type Safety**
   - Input and output types are clearly defined
   - Routing decisions are type-safe
   - Easy to catch connection mismatches at compile time

4. **Extensibility**
   - Easy to add new node types
   - Easy to add new routing strategies
   - Easy to add new transformation types

## 🚀 Future Directions

1. **Visualization Tools**
   - Graph visualization
   - Real-time monitoring
   - Performance metrics

2. **Distributed Processing**
   - Node distribution across machines
   - Fault tolerance
   - Load balancing

3. **Advanced Routing**
   - Dynamic routing based on load
   - Circuit breaking
   - Rate limiting

4. **State Management**
   - Distributed state
   - State persistence
   - State recovery

## 🤝 Contributing

We welcome contributions to the graph architecture! Areas of particular
interest:

1. New routing strategies
2. Visualization tools
3. Distributed processing support
4. State management improvements
5. Performance optimizations