Crate streamweave_transformers

Crate streamweave_transformers 

Source
Expand description

Β§streamweave-transformers

Crates.io Documentation License: CC BY-SA 4.0

Transformers for StreamWeave
A comprehensive collection of transformers for building powerful streaming data pipelines.

The streamweave-transformers package provides a rich set of transformers for StreamWeave pipelines and graphs. Transformers are organized into categories: basic operations, advanced processing, stateful operations, routing, merging, machine learning, and utility functions.

§✨ Key Features

  • 30+ Transformers: Comprehensive collection of transformers
  • Categorized: Organized into logical categories
  • Type-Safe: Full type safety with Rust’s type system
  • Composable: Transformers can be chained together
  • Error Handling: Built-in error handling strategies
  • ML Support: Optional machine learning transformers

Β§πŸ“¦ Installation

Add this to your Cargo.toml:

[dependencies]
streamweave-transformers = "0.3.0"

# Optional: Enable ML transformers
streamweave-transformers = { version = "0.3.0", features = ["ml"] }

Β§πŸš€ Quick Start

Β§Basic Transformation

use streamweave_transformers::map::MapTransformer;
use streamweave_pipeline::PipelineBuilder;

let transformer = MapTransformer::new(|x: i32| x * 2);

let pipeline = PipelineBuilder::new()
    .producer(/* produce numbers */)
    .transformer(transformer)
    .consumer(/* consume doubled numbers */);

pipeline.run().await?;

Β§Filtering

use streamweave_transformers::filter::FilterTransformer;
use streamweave_pipeline::PipelineBuilder;

let transformer = FilterTransformer::new(|x: i32| x > 10);

let pipeline = PipelineBuilder::new()
    .producer(/* produce numbers */)
    .transformer(transformer)
    .consumer(/* consume filtered numbers */);

pipeline.run().await?;

Β§πŸ“– Transformer Categories

Β§Basic Transformers

Fundamental stream operations:

  • MapTransformer: Transform each item with a function
  • FilterTransformer: Filter items based on a predicate
  • ReduceTransformer: Reduce stream to accumulated value

Β§Advanced Transformers

Resilience and performance:

  • BatchTransformer: Batch items for batch processing
  • RetryTransformer: Retry failed operations
  • CircuitBreakerTransformer: Circuit breaker pattern
  • RateLimitTransformer: Rate limiting

Β§Stateful Transformers

Stateful operations:

  • RunningSumTransformer: Running sum calculation
  • MovingAverageTransformer: Moving average calculation

Β§Routing Transformers

Route items to different paths:

  • RouterTransformer: Route based on conditions
  • PartitionTransformer: Partition by key
  • RoundRobinTransformer: Round-robin distribution

Β§Merging Transformers

Combine multiple streams:

  • MergeTransformer: Merge multiple streams
  • OrderedMergeTransformer: Merge maintaining order
  • InterleaveTransformer: Interleave streams

Β§Machine Learning Transformers

ML inference (requires ml feature):

  • InferenceTransformer: Single-item inference
  • BatchedInferenceTransformer: Batch inference

Β§Utility Transformers

Common utility operations:

  • SampleTransformer: Sample items randomly
  • SkipTransformer: Skip N items
  • TakeTransformer: Take N items
  • LimitTransformer: Limit stream size
  • SortTransformer: Sort items
  • SplitTransformer: Split items
  • SplitAtTransformer: Split at index
  • ZipTransformer: Zip multiple streams
  • TimeoutTransformer: Timeout operations
  • MessageDedupeTransformer: Deduplicate messages
  • GroupByTransformer: Group items by key

Β§πŸ“š Usage Examples

Β§Basic Transformers

Β§Map Transformer

Transform each item:

use streamweave_transformers::map::MapTransformer;

let transformer = MapTransformer::new(|x: i32| x * 2);
Β§Filter Transformer

Filter items:

use streamweave_transformers::filter::FilterTransformer;

let transformer = FilterTransformer::new(|x: i32| x > 0);
Β§Reduce Transformer

Reduce stream:

use streamweave_transformers::reduce::ReduceTransformer;

let transformer = ReduceTransformer::new(0, |acc: i32, x: i32| acc + x);

Β§Advanced Transformers

Β§Batch Transformer

Batch items:

use streamweave_transformers::batch::BatchTransformer;
use std::time::Duration;

let transformer = BatchTransformer::new(100, Duration::from_secs(1));
Β§Retry Transformer

Retry failed operations:

use streamweave_transformers::retry::RetryTransformer;

let transformer = RetryTransformer::new(3, Duration::from_secs(1));
Β§Circuit Breaker Transformer

Circuit breaker pattern:

use streamweave_transformers::circuit_breaker::CircuitBreakerTransformer;

let transformer = CircuitBreakerTransformer::new(5, Duration::from_secs(10));
Β§Rate Limit Transformer

Rate limiting:

use streamweave_transformers::rate_limit::RateLimitTransformer;
use std::time::Duration;

let transformer = RateLimitTransformer::new(100, Duration::from_secs(1));

Β§Stateful Transformers

Β§Running Sum Transformer

Calculate running sum:

use streamweave_transformers::running_sum::RunningSumTransformer;

let transformer = RunningSumTransformer::new(0);
Β§Moving Average Transformer

Calculate moving average:

use streamweave_transformers::moving_average::MovingAverageTransformer;

let transformer = MovingAverageTransformer::new(10);  // Window size 10

Β§Routing Transformers

Β§Router Transformer

Route based on conditions:

use streamweave_transformers::router::RouterTransformer;

let transformer = RouterTransformer::new(|x: i32| {
    if x > 0 { "positive" } else { "negative" }
});
Β§Partition Transformer

Partition by key:

use streamweave_transformers::partition::PartitionTransformer;

let transformer = PartitionTransformer::new(|x: i32| x % 2);
Β§Round Robin Transformer

Round-robin distribution:

use streamweave_transformers::round_robin::RoundRobinTransformer;

let transformer = RoundRobinTransformer::new(3);  // 3 outputs

Β§Merging Transformers

Β§Merge Transformer

Merge streams:

use streamweave_transformers::merge::MergeTransformer;

let transformer = MergeTransformer::new();
Β§Ordered Merge Transformer

Merge maintaining order:

use streamweave_transformers::ordered_merge::OrderedMergeTransformer;

let transformer = OrderedMergeTransformer::new(|x: &i32, y: &i32| x.cmp(y));
Β§Interleave Transformer

Interleave streams:

use streamweave_transformers::interleave::InterleaveTransformer;

let transformer = InterleaveTransformer::new();

Β§Machine Learning Transformers

Β§Inference Transformer

Single-item inference:

#[cfg(feature = "ml")]
use streamweave_transformers::ml::InferenceTransformer;

#[cfg(feature = "ml")]
let transformer = InferenceTransformer::new(/* model path */);
Β§Batched Inference Transformer

Batch inference:

#[cfg(feature = "ml")]
use streamweave_transformers::ml::BatchedInferenceTransformer;

#[cfg(feature = "ml")]
let transformer = BatchedInferenceTransformer::new(/* model path */, 32);

Β§Utility Transformers

Β§Sample Transformer

Random sampling:

use streamweave_transformers::sample::SampleTransformer;

let transformer = SampleTransformer::new(0.1);  // 10% sample rate
Β§Skip Transformer

Skip items:

use streamweave_transformers::skip::SkipTransformer;

let transformer = SkipTransformer::new(10);  // Skip first 10 items
Β§Take Transformer

Take items:

use streamweave_transformers::take::TakeTransformer;

let transformer = TakeTransformer::new(100);  // Take first 100 items
Β§Limit Transformer

Limit stream:

use streamweave_transformers::limit::LimitTransformer;

let transformer = LimitTransformer::new(1000);  // Limit to 1000 items
Β§Sort Transformer

Sort items:

use streamweave_transformers::sort::SortTransformer;

let transformer = SortTransformer::new(|x: &i32, y: &i32| x.cmp(y));
Β§Split Transformer

Split items:

use streamweave_transformers::split::SplitTransformer;

let transformer = SplitTransformer::new(|x: &String| x.split_whitespace());
Β§Split At Transformer

Split at index:

use streamweave_transformers::split_at::SplitAtTransformer;

let transformer = SplitAtTransformer::new(100);  // Split at index 100
Β§Zip Transformer

Zip streams:

use streamweave_transformers::zip::ZipTransformer;

let transformer = ZipTransformer::new();
Β§Timeout Transformer

Timeout operations:

use streamweave_transformers::timeout::TimeoutTransformer;
use std::time::Duration;

let transformer = TimeoutTransformer::new(Duration::from_secs(5));
Β§Message Dedupe Transformer

Deduplicate messages:

use streamweave_transformers::message_dedupe::MessageDedupeTransformer;

let transformer = MessageDedupeTransformer::new();
Β§Group By Transformer

Group by key:

use streamweave_transformers::group_by::GroupByTransformer;

let transformer = GroupByTransformer::new(|x: &i32| x % 10);

Β§πŸ—οΈ Architecture

Transformer flow:

Stream<T> ──> Transformer ──> Stream<U>

Transformer Flow:

  1. Input stream flows into transformer
  2. Transformer processes items
  3. Output stream flows out
  4. Transformers can be chained

Β§πŸ”§ Configuration

All transformers support:

  • Error Strategy: Error handling strategy
  • Name: Component name for logging

Β§πŸ” Error Handling

Transformers support error handling strategies:

use streamweave_error::ErrorStrategy;

let transformer = MapTransformer::new(|x: i32| x * 2)
    .with_error_strategy(ErrorStrategy::Skip);

§⚑ Performance Considerations

  • Batching: Use batch transformers for better throughput
  • Stateful: Stateful transformers maintain state
  • ML: ML transformers require model loading
  • Routing: Routing transformers distribute load

Β§πŸ“ Examples

For more examples, see:

Β§πŸ”— Dependencies

streamweave-transformers depends on:

  • streamweave - Core traits
  • streamweave-error - Error handling
  • streamweave-message - Message envelopes
  • streamweave-stateful - Stateful operations
  • tokio - Async runtime
  • futures - Stream utilities
  • ort (optional) - ONNX Runtime for ML
  • ndarray (optional) - Array operations for ML

§🎯 Use Cases

Transformers are used for:

  1. Data Transformation: Transform data in pipelines
  2. Filtering: Filter data based on conditions
  3. Aggregation: Aggregate data (sum, average, etc.)
  4. Routing: Route data to different paths
  5. Merging: Merge multiple data streams
  6. ML Inference: Run ML models on streams
  7. Utility Operations: Common utility operations

Β§πŸ“– Documentation

Β§πŸ”— See Also

§🀝 Contributing

Contributions are welcome! Please see the Contributing Guide for details.

Β§πŸ“„ License

This project is licensed under the CC BY-SA 4.0 license.

ModulesΒ§

batch
Batch transformer for StreamWeave
circuit_breaker
Circuit breaker transformer for StreamWeave
filter
Filter transformer for StreamWeave
group_by
Group by transformer for StreamWeave
interleave
Interleave transformer for StreamWeave
limit
Limit transformer for StreamWeave
map
Map transformer for StreamWeave
merge
Merge transformer for StreamWeave
message_dedupe
Message deduplication transformer for StreamWeave
moving_average
Moving average transformer for StreamWeave
ordered_merge
Ordered merge transformer for StreamWeave
partition
Partition transformer for StreamWeave
rate_limit
Rate limit transformer for StreamWeave
reduce
Reduce transformer for StreamWeave
retry
Retry transformer for StreamWeave
round_robin
Round-robin transformer for StreamWeave
router
Router transformer for StreamWeave
running_sum
Running sum transformer for StreamWeave
sample
Sample transformer for StreamWeave
skip
Skip transformer for StreamWeave
sort
Sort transformer for StreamWeave
split
Split transformer for StreamWeave
split_at
Split at transformer for StreamWeave
take
Take transformer for StreamWeave
timeout
Timeout transformer for StreamWeave
zip
Zip transformer for StreamWeave