streamweave-stateful
Stateful transformer support for StreamWeave
Transformers with persistent state for aggregations, sessions, and stateful processing.
The streamweave-stateful package provides stateful transformer support for StreamWeave. It enables transformers to maintain persistent state across stream items, supporting use cases like running aggregations, session management, pattern detection, and stateful windowing operations.
β¨ Key Features
- StatefulTransformer Trait: Extends Transformer with state management
- StateStore Trait: Abstract state storage interface
- InMemoryStateStore: Thread-safe in-memory state storage
- State Persistence: Serialization and checkpointing support
- Thread-Safe: All operations are thread-safe
- State Operations: Get, set, update, reset state
π¦ Installation
Add this to your Cargo.toml:
[]
= "0.3.0"
π Quick Start
Running Sum Transformer
use ;
use ;
use StreamExt;
use Pin;
π API Overview
StatefulTransformer Trait
The StatefulTransformer trait extends Transformer with state management:
StateStore Trait
The StateStore trait defines the interface for state storage:
InMemoryStateStore
Thread-safe in-memory state storage:
π Usage Examples
Running Aggregation
Calculate running sum, average, or count:
use ;
// In transform method
let sum = self.state.update?;
State Persistence
Serialize and restore state:
use ;
let store: = new;
// Serialize state to bytes
let checkpoint = store.serialize_state?;
// Restore to a new store
let store2: = empty;
store2.deserialize_and_set_state?;
assert_eq!;
State Reset
Reset state to initial value:
let store = new;
store.set?;
store.reset?;
assert_eq!;
State Initialization
Create stores with or without initial state:
// With initial state
let store = new;
// Without initial state
let store = empty;
// With optional initial state
let store = with_optional_initial;
Thread-Safe State Access
All state operations are thread-safe:
use Arc;
use task;
let store = new;
// Access from multiple threads
let store1 = clone;
let store2 = clone;
spawn;
spawn;
ποΈ Architecture
Stateful transformers maintain state across items:
βββββββββββββββ
β Input ββββitemβββ>ββββββββββββββββββββ
βββββββββββββββ β StatefulTransformerβ
β β
β ββββββββββββββββ β
β β State Store β β
β ββββββββββββββββ β
β β
ββββββββββββββββββββ
β
βΌ
βββββββββββββββ
β Output β
βββββββββββββββ
State Flow:
- Item arrives at transformer
- Transformer reads current state
- Transformer updates state based on item
- Transformer produces output
- State persists for next item
π§ Configuration
State Store Types
InMemoryStateStore:
- Thread-safe in-memory storage
- Fast access
- Not persistent across restarts
- Best for single-process use
Custom Stores:
- Implement
StateStoretrait - Can use databases, files, etc.
- Enable distributed state
- Best for production use
π Error Handling
State operations return StateResult<T>:
β‘ Performance Considerations
- Thread-Safe: All operations use
RwLockfor thread safety - Clone Efficiency: State cloning is efficient for small states
- Serialization Overhead: Checkpointing has serialization overhead
- Lock Contention: High contention may impact performance
π Examples
For more examples, see:
π Dependencies
streamweave-stateful depends on:
streamweave- Core traitsstreamweave-error- Error handlingtokio- Async runtimefutures- Stream utilitiesserde- Serialization supportserde_json- JSON serializationchrono- Timestamp support
π― Use Cases
Stateful transformers are used for:
- Running Aggregations: Sum, average, count, min, max
- Session Management: Track sessions across items
- Pattern Detection: Detect patterns across multiple items
- Stateful Windowing: Window operations with state
- State Persistence: Checkpoint and restore state
π Documentation
π See Also
- streamweave - Core traits
- streamweave-pipeline - Pipeline API
- streamweave-graph - Graph API
- streamweave-window - Windowing operations
π€ Contributing
Contributions are welcome! Please see the Contributing Guide for details.
π License
This project is licensed under the CC BY-SA 4.0 license.