streamweave-transformers
Transformers for StreamWeave
A comprehensive collection of transformers for building powerful streaming data pipelines.
The streamweave-transformers package provides a rich set of transformers for StreamWeave pipelines and graphs. Transformers are organized into categories: basic operations, advanced processing, stateful operations, routing, merging, machine learning, and utility functions.
✨ Key Features
- 30+ Transformers: Comprehensive collection of transformers
- Categorized: Organized into logical categories
- Type-Safe: Full type safety with Rust's type system
- Composable: Transformers can be chained together
- Error Handling: Built-in error handling strategies
- ML Support: Optional machine learning transformers
📦 Installation
Add this to your Cargo.toml:
[]
= "0.3.0"
# Optional: Enable ML transformers
= { = "0.3.0", = ["ml"] }
🚀 Quick Start
Basic Transformation
use MapTransformer;
use PipelineBuilder;
let transformer = new;
let pipeline = new
.producer
.transformer
.consumer;
pipeline.run.await?;
Filtering
use FilterTransformer;
use PipelineBuilder;
let transformer = new;
let pipeline = new
.producer
.transformer
.consumer;
pipeline.run.await?;
📖 Transformer Categories
Basic Transformers
Fundamental stream operations:
- MapTransformer: Transform each item with a function
- FilterTransformer: Filter items based on a predicate
- ReduceTransformer: Reduce stream to accumulated value
Advanced Transformers
Resilience and performance:
- BatchTransformer: Batch items for batch processing
- RetryTransformer: Retry failed operations
- CircuitBreakerTransformer: Circuit breaker pattern
- RateLimitTransformer: Rate limiting
Stateful Transformers
Stateful operations:
- RunningSumTransformer: Running sum calculation
- MovingAverageTransformer: Moving average calculation
Routing Transformers
Route items to different paths:
- RouterTransformer: Route based on conditions
- PartitionTransformer: Partition by key
- RoundRobinTransformer: Round-robin distribution
Merging Transformers
Combine multiple streams:
- MergeTransformer: Merge multiple streams
- OrderedMergeTransformer: Merge maintaining order
- InterleaveTransformer: Interleave streams
Machine Learning Transformers
ML inference (requires ml feature):
- InferenceTransformer: Single-item inference
- BatchedInferenceTransformer: Batch inference
Utility Transformers
Common utility operations:
- SampleTransformer: Sample items randomly
- SkipTransformer: Skip N items
- TakeTransformer: Take N items
- LimitTransformer: Limit stream size
- SortTransformer: Sort items
- SplitTransformer: Split items
- SplitAtTransformer: Split at index
- ZipTransformer: Zip multiple streams
- TimeoutTransformer: Timeout operations
- MessageDedupeTransformer: Deduplicate messages
- GroupByTransformer: Group items by key
📚 Usage Examples
Basic Transformers
Map Transformer
Transform each item:
use MapTransformer;
let transformer = new;
Filter Transformer
Filter items:
use FilterTransformer;
let transformer = new;
Reduce Transformer
Reduce stream:
use ReduceTransformer;
let transformer = new;
Advanced Transformers
Batch Transformer
Batch items:
use BatchTransformer;
use Duration;
let transformer = new;
Retry Transformer
Retry failed operations:
use RetryTransformer;
let transformer = new;
Circuit Breaker Transformer
Circuit breaker pattern:
use CircuitBreakerTransformer;
let transformer = new;
Rate Limit Transformer
Rate limiting:
use RateLimitTransformer;
use Duration;
let transformer = new;
Stateful Transformers
Running Sum Transformer
Calculate running sum:
use RunningSumTransformer;
let transformer = new;
Moving Average Transformer
Calculate moving average:
use MovingAverageTransformer;
let transformer = new; // Window size 10
Routing Transformers
Router Transformer
Route based on conditions:
use RouterTransformer;
let transformer = new;
Partition Transformer
Partition by key:
use PartitionTransformer;
let transformer = new;
Round Robin Transformer
Round-robin distribution:
use RoundRobinTransformer;
let transformer = new; // 3 outputs
Merging Transformers
Merge Transformer
Merge streams:
use MergeTransformer;
let transformer = new;
Ordered Merge Transformer
Merge maintaining order:
use OrderedMergeTransformer;
let transformer = new;
Interleave Transformer
Interleave streams:
use InterleaveTransformer;
let transformer = new;
Machine Learning Transformers
Inference Transformer
Single-item inference:
use InferenceTransformer;
let transformer = new;
Batched Inference Transformer
Batch inference:
use BatchedInferenceTransformer;
let transformer = new;
Utility Transformers
Sample Transformer
Random sampling:
use SampleTransformer;
let transformer = new; // 10% sample rate
Skip Transformer
Skip items:
use SkipTransformer;
let transformer = new; // Skip first 10 items
Take Transformer
Take items:
use TakeTransformer;
let transformer = new; // Take first 100 items
Limit Transformer
Limit stream:
use LimitTransformer;
let transformer = new; // Limit to 1000 items
Sort Transformer
Sort items:
use SortTransformer;
let transformer = new;
Split Transformer
Split items:
use SplitTransformer;
let transformer = new;
Split At Transformer
Split at index:
use SplitAtTransformer;
let transformer = new; // Split at index 100
Zip Transformer
Zip streams:
use ZipTransformer;
let transformer = new;
Timeout Transformer
Timeout operations:
use TimeoutTransformer;
use Duration;
let transformer = new;
Message Dedupe Transformer
Deduplicate messages:
use MessageDedupeTransformer;
let transformer = new;
Group By Transformer
Group by key:
use GroupByTransformer;
let transformer = new;
🏗️ Architecture
Transformer flow:
Stream<T> ──> Transformer ──> Stream<U>
Transformer Flow:
- Input stream flows into transformer
- Transformer processes items
- Output stream flows out
- Transformers can be chained
🔧 Configuration
All transformers support:
- Error Strategy: Error handling strategy
- Name: Component name for logging
🔍 Error Handling
Transformers support error handling strategies:
use ErrorStrategy;
let transformer = new
.with_error_strategy;
⚡ Performance Considerations
- Batching: Use batch transformers for better throughput
- Stateful: Stateful transformers maintain state
- ML: ML transformers require model loading
- Routing: Routing transformers distribute load
📝 Examples
For more examples, see:
🔗 Dependencies
streamweave-transformers depends on:
streamweave- Core traitsstreamweave-error- Error handlingstreamweave-message- Message envelopesstreamweave-stateful- Stateful operationstokio- Async runtimefutures- Stream utilitiesort(optional) - ONNX Runtime for MLndarray(optional) - Array operations for ML
🎯 Use Cases
Transformers are used for:
- Data Transformation: Transform data in pipelines
- Filtering: Filter data based on conditions
- Aggregation: Aggregate data (sum, average, etc.)
- Routing: Route data to different paths
- Merging: Merge multiple data streams
- ML Inference: Run ML models on streams
- Utility Operations: Common utility operations
📖 Documentation
🔗 See Also
- streamweave - Core traits
- streamweave-stateful - Stateful operations
- streamweave-error - Error handling
🤝 Contributing
Contributions are welcome! Please see the Contributing Guide for details.
📄 License
This project is licensed under the CC BY-SA 4.0 license.