Expand description
Β§streamweave-transformers
Transformers for StreamWeave
A comprehensive collection of transformers for building powerful streaming data pipelines.
The streamweave-transformers package provides a rich set of transformers for StreamWeave pipelines and graphs. Transformers are organized into categories: basic operations, advanced processing, stateful operations, routing, merging, machine learning, and utility functions.
Β§β¨ Key Features
- 30+ Transformers: Comprehensive collection of transformers
- Categorized: Organized into logical categories
- Type-Safe: Full type safety with Rustβs type system
- Composable: Transformers can be chained together
- Error Handling: Built-in error handling strategies
- ML Support: Optional machine learning transformers
Β§π¦ Installation
Add this to your Cargo.toml:
[dependencies]
streamweave-transformers = "0.3.0"
# Optional: Enable ML transformers
streamweave-transformers = { version = "0.3.0", features = ["ml"] }Β§π Quick Start
Β§Basic Transformation
use streamweave_transformers::map::MapTransformer;
use streamweave_pipeline::PipelineBuilder;
let transformer = MapTransformer::new(|x: i32| x * 2);
let pipeline = PipelineBuilder::new()
.producer(/* produce numbers */)
.transformer(transformer)
.consumer(/* consume doubled numbers */);
pipeline.run().await?;Β§Filtering
use streamweave_transformers::filter::FilterTransformer;
use streamweave_pipeline::PipelineBuilder;
let transformer = FilterTransformer::new(|x: i32| x > 10);
let pipeline = PipelineBuilder::new()
.producer(/* produce numbers */)
.transformer(transformer)
.consumer(/* consume filtered numbers */);
pipeline.run().await?;Β§π Transformer Categories
Β§Basic Transformers
Fundamental stream operations:
- MapTransformer: Transform each item with a function
- FilterTransformer: Filter items based on a predicate
- ReduceTransformer: Reduce stream to accumulated value
Β§Advanced Transformers
Resilience and performance:
- BatchTransformer: Batch items for batch processing
- RetryTransformer: Retry failed operations
- CircuitBreakerTransformer: Circuit breaker pattern
- RateLimitTransformer: Rate limiting
Β§Stateful Transformers
Stateful operations:
- RunningSumTransformer: Running sum calculation
- MovingAverageTransformer: Moving average calculation
Β§Routing Transformers
Route items to different paths:
- RouterTransformer: Route based on conditions
- PartitionTransformer: Partition by key
- RoundRobinTransformer: Round-robin distribution
Β§Merging Transformers
Combine multiple streams:
- MergeTransformer: Merge multiple streams
- OrderedMergeTransformer: Merge maintaining order
- InterleaveTransformer: Interleave streams
Β§Machine Learning Transformers
ML inference (requires ml feature):
- InferenceTransformer: Single-item inference
- BatchedInferenceTransformer: Batch inference
Β§Utility Transformers
Common utility operations:
- SampleTransformer: Sample items randomly
- SkipTransformer: Skip N items
- TakeTransformer: Take N items
- LimitTransformer: Limit stream size
- SortTransformer: Sort items
- SplitTransformer: Split items
- SplitAtTransformer: Split at index
- ZipTransformer: Zip multiple streams
- TimeoutTransformer: Timeout operations
- MessageDedupeTransformer: Deduplicate messages
- GroupByTransformer: Group items by key
Β§π Usage Examples
Β§Basic Transformers
Β§Map Transformer
Transform each item:
use streamweave_transformers::map::MapTransformer;
let transformer = MapTransformer::new(|x: i32| x * 2);Β§Filter Transformer
Filter items:
use streamweave_transformers::filter::FilterTransformer;
let transformer = FilterTransformer::new(|x: i32| x > 0);Β§Reduce Transformer
Reduce stream:
use streamweave_transformers::reduce::ReduceTransformer;
let transformer = ReduceTransformer::new(0, |acc: i32, x: i32| acc + x);Β§Advanced Transformers
Β§Batch Transformer
Batch items:
use streamweave_transformers::batch::BatchTransformer;
use std::time::Duration;
let transformer = BatchTransformer::new(100, Duration::from_secs(1));Β§Retry Transformer
Retry failed operations:
use streamweave_transformers::retry::RetryTransformer;
let transformer = RetryTransformer::new(3, Duration::from_secs(1));Β§Circuit Breaker Transformer
Circuit breaker pattern:
use streamweave_transformers::circuit_breaker::CircuitBreakerTransformer;
let transformer = CircuitBreakerTransformer::new(5, Duration::from_secs(10));Β§Rate Limit Transformer
Rate limiting:
use streamweave_transformers::rate_limit::RateLimitTransformer;
use std::time::Duration;
let transformer = RateLimitTransformer::new(100, Duration::from_secs(1));Β§Stateful Transformers
Β§Running Sum Transformer
Calculate running sum:
use streamweave_transformers::running_sum::RunningSumTransformer;
let transformer = RunningSumTransformer::new(0);Β§Moving Average Transformer
Calculate moving average:
use streamweave_transformers::moving_average::MovingAverageTransformer;
let transformer = MovingAverageTransformer::new(10); // Window size 10Β§Routing Transformers
Β§Router Transformer
Route based on conditions:
use streamweave_transformers::router::RouterTransformer;
let transformer = RouterTransformer::new(|x: i32| {
if x > 0 { "positive" } else { "negative" }
});Β§Partition Transformer
Partition by key:
use streamweave_transformers::partition::PartitionTransformer;
let transformer = PartitionTransformer::new(|x: i32| x % 2);Β§Round Robin Transformer
Round-robin distribution:
use streamweave_transformers::round_robin::RoundRobinTransformer;
let transformer = RoundRobinTransformer::new(3); // 3 outputsΒ§Merging Transformers
Β§Merge Transformer
Merge streams:
use streamweave_transformers::merge::MergeTransformer;
let transformer = MergeTransformer::new();Β§Ordered Merge Transformer
Merge maintaining order:
use streamweave_transformers::ordered_merge::OrderedMergeTransformer;
let transformer = OrderedMergeTransformer::new(|x: &i32, y: &i32| x.cmp(y));Β§Interleave Transformer
Interleave streams:
use streamweave_transformers::interleave::InterleaveTransformer;
let transformer = InterleaveTransformer::new();Β§Machine Learning Transformers
Β§Inference Transformer
Single-item inference:
#[cfg(feature = "ml")]
use streamweave_transformers::ml::InferenceTransformer;
#[cfg(feature = "ml")]
let transformer = InferenceTransformer::new(/* model path */);Β§Batched Inference Transformer
Batch inference:
#[cfg(feature = "ml")]
use streamweave_transformers::ml::BatchedInferenceTransformer;
#[cfg(feature = "ml")]
let transformer = BatchedInferenceTransformer::new(/* model path */, 32);Β§Utility Transformers
Β§Sample Transformer
Random sampling:
use streamweave_transformers::sample::SampleTransformer;
let transformer = SampleTransformer::new(0.1); // 10% sample rateΒ§Skip Transformer
Skip items:
use streamweave_transformers::skip::SkipTransformer;
let transformer = SkipTransformer::new(10); // Skip first 10 itemsΒ§Take Transformer
Take items:
use streamweave_transformers::take::TakeTransformer;
let transformer = TakeTransformer::new(100); // Take first 100 itemsΒ§Limit Transformer
Limit stream:
use streamweave_transformers::limit::LimitTransformer;
let transformer = LimitTransformer::new(1000); // Limit to 1000 itemsΒ§Sort Transformer
Sort items:
use streamweave_transformers::sort::SortTransformer;
let transformer = SortTransformer::new(|x: &i32, y: &i32| x.cmp(y));Β§Split Transformer
Split items:
use streamweave_transformers::split::SplitTransformer;
let transformer = SplitTransformer::new(|x: &String| x.split_whitespace());Β§Split At Transformer
Split at index:
use streamweave_transformers::split_at::SplitAtTransformer;
let transformer = SplitAtTransformer::new(100); // Split at index 100Β§Zip Transformer
Zip streams:
use streamweave_transformers::zip::ZipTransformer;
let transformer = ZipTransformer::new();Β§Timeout Transformer
Timeout operations:
use streamweave_transformers::timeout::TimeoutTransformer;
use std::time::Duration;
let transformer = TimeoutTransformer::new(Duration::from_secs(5));Β§Message Dedupe Transformer
Deduplicate messages:
use streamweave_transformers::message_dedupe::MessageDedupeTransformer;
let transformer = MessageDedupeTransformer::new();Β§Group By Transformer
Group by key:
use streamweave_transformers::group_by::GroupByTransformer;
let transformer = GroupByTransformer::new(|x: &i32| x % 10);Β§ποΈ Architecture
Transformer flow:
Stream<T> ββ> Transformer ββ> Stream<U>Transformer Flow:
- Input stream flows into transformer
- Transformer processes items
- Output stream flows out
- Transformers can be chained
Β§π§ Configuration
All transformers support:
- Error Strategy: Error handling strategy
- Name: Component name for logging
Β§π Error Handling
Transformers support error handling strategies:
use streamweave_error::ErrorStrategy;
let transformer = MapTransformer::new(|x: i32| x * 2)
.with_error_strategy(ErrorStrategy::Skip);Β§β‘ Performance Considerations
- Batching: Use batch transformers for better throughput
- Stateful: Stateful transformers maintain state
- ML: ML transformers require model loading
- Routing: Routing transformers distribute load
Β§π Examples
For more examples, see:
Β§π Dependencies
streamweave-transformers depends on:
streamweave- Core traitsstreamweave-error- Error handlingstreamweave-message- Message envelopesstreamweave-stateful- Stateful operationstokio- Async runtimefutures- Stream utilitiesort(optional) - ONNX Runtime for MLndarray(optional) - Array operations for ML
Β§π― Use Cases
Transformers are used for:
- Data Transformation: Transform data in pipelines
- Filtering: Filter data based on conditions
- Aggregation: Aggregate data (sum, average, etc.)
- Routing: Route data to different paths
- Merging: Merge multiple data streams
- ML Inference: Run ML models on streams
- Utility Operations: Common utility operations
Β§π Documentation
Β§π See Also
- streamweave - Core traits
- streamweave-stateful - Stateful operations
- streamweave-error - Error handling
Β§π€ Contributing
Contributions are welcome! Please see the Contributing Guide for details.
Β§π License
This project is licensed under the CC BY-SA 4.0 license.
ModulesΒ§
- batch
- Batch transformer for StreamWeave
- circuit_
breaker - Circuit breaker transformer for StreamWeave
- filter
- Filter transformer for StreamWeave
- group_
by - Group by transformer for StreamWeave
- interleave
- Interleave transformer for StreamWeave
- limit
- Limit transformer for StreamWeave
- map
- Map transformer for StreamWeave
- merge
- Merge transformer for StreamWeave
- message_
dedupe - Message deduplication transformer for StreamWeave
- moving_
average - Moving average transformer for StreamWeave
- ordered_
merge - Ordered merge transformer for StreamWeave
- partition
- Partition transformer for StreamWeave
- rate_
limit - Rate limit transformer for StreamWeave
- reduce
- Reduce transformer for StreamWeave
- retry
- Retry transformer for StreamWeave
- round_
robin - Round-robin transformer for StreamWeave
- router
- Router transformer for StreamWeave
- running_
sum - Running sum transformer for StreamWeave
- sample
- Sample transformer for StreamWeave
- skip
- Skip transformer for StreamWeave
- sort
- Sort transformer for StreamWeave
- split
- Split transformer for StreamWeave
- split_
at - Split at transformer for StreamWeave
- take
- Take transformer for StreamWeave
- timeout
- Timeout transformer for StreamWeave
- zip
- Zip transformer for StreamWeave