streamweave-transformers 0.4.0

Transformers for StreamWeave
Documentation

streamweave-transformers

Crates.io Documentation License: CC BY-SA 4.0

Transformers for StreamWeave
A comprehensive collection of transformers for building powerful streaming data pipelines.

The streamweave-transformers package provides a rich set of transformers for StreamWeave pipelines and graphs. Transformers are organized into categories: basic operations, advanced processing, stateful operations, routing, merging, machine learning, and utility functions.

✨ Key Features

  • 30+ Transformers: Comprehensive collection of transformers
  • Categorized: Organized into logical categories
  • Type-Safe: Full type safety with Rust's type system
  • Composable: Transformers can be chained together
  • Error Handling: Built-in error handling strategies
  • ML Support: Optional machine learning transformers

📦 Installation

Add this to your Cargo.toml:

[dependencies]
streamweave-transformers = "0.3.0"

# Optional: Enable ML transformers
streamweave-transformers = { version = "0.3.0", features = ["ml"] }

🚀 Quick Start

Basic Transformation

use streamweave_transformers::map::MapTransformer;
use streamweave_pipeline::PipelineBuilder;

let transformer = MapTransformer::new(|x: i32| x * 2);

let pipeline = PipelineBuilder::new()
    .producer(/* produce numbers */)
    .transformer(transformer)
    .consumer(/* consume doubled numbers */);

pipeline.run().await?;

Filtering

use streamweave_transformers::filter::FilterTransformer;
use streamweave_pipeline::PipelineBuilder;

let transformer = FilterTransformer::new(|x: i32| x > 10);

let pipeline = PipelineBuilder::new()
    .producer(/* produce numbers */)
    .transformer(transformer)
    .consumer(/* consume filtered numbers */);

pipeline.run().await?;

📖 Transformer Categories

Basic Transformers

Fundamental stream operations:

  • MapTransformer: Transform each item with a function
  • FilterTransformer: Filter items based on a predicate
  • ReduceTransformer: Reduce stream to accumulated value

Advanced Transformers

Resilience and performance:

  • BatchTransformer: Batch items for batch processing
  • RetryTransformer: Retry failed operations
  • CircuitBreakerTransformer: Circuit breaker pattern
  • RateLimitTransformer: Rate limiting

Stateful Transformers

Stateful operations:

  • RunningSumTransformer: Running sum calculation
  • MovingAverageTransformer: Moving average calculation

Routing Transformers

Route items to different paths:

  • RouterTransformer: Route based on conditions
  • PartitionTransformer: Partition by key
  • RoundRobinTransformer: Round-robin distribution

Merging Transformers

Combine multiple streams:

  • MergeTransformer: Merge multiple streams
  • OrderedMergeTransformer: Merge maintaining order
  • InterleaveTransformer: Interleave streams

Machine Learning Transformers

ML inference (requires ml feature):

  • InferenceTransformer: Single-item inference
  • BatchedInferenceTransformer: Batch inference

Utility Transformers

Common utility operations:

  • SampleTransformer: Sample items randomly
  • SkipTransformer: Skip N items
  • TakeTransformer: Take N items
  • LimitTransformer: Limit stream size
  • SortTransformer: Sort items
  • SplitTransformer: Split items
  • SplitAtTransformer: Split at index
  • ZipTransformer: Zip multiple streams
  • TimeoutTransformer: Timeout operations
  • MessageDedupeTransformer: Deduplicate messages
  • GroupByTransformer: Group items by key

📚 Usage Examples

Basic Transformers

Map Transformer

Transform each item:

use streamweave_transformers::map::MapTransformer;

let transformer = MapTransformer::new(|x: i32| x * 2);

Filter Transformer

Filter items:

use streamweave_transformers::filter::FilterTransformer;

let transformer = FilterTransformer::new(|x: i32| x > 0);

Reduce Transformer

Reduce stream:

use streamweave_transformers::reduce::ReduceTransformer;

let transformer = ReduceTransformer::new(0, |acc: i32, x: i32| acc + x);

Advanced Transformers

Batch Transformer

Batch items:

use streamweave_transformers::batch::BatchTransformer;
use std::time::Duration;

let transformer = BatchTransformer::new(100, Duration::from_secs(1));

Retry Transformer

Retry failed operations:

use streamweave_transformers::retry::RetryTransformer;

let transformer = RetryTransformer::new(3, Duration::from_secs(1));

Circuit Breaker Transformer

Circuit breaker pattern:

use streamweave_transformers::circuit_breaker::CircuitBreakerTransformer;

let transformer = CircuitBreakerTransformer::new(5, Duration::from_secs(10));

Rate Limit Transformer

Rate limiting:

use streamweave_transformers::rate_limit::RateLimitTransformer;
use std::time::Duration;

let transformer = RateLimitTransformer::new(100, Duration::from_secs(1));

Stateful Transformers

Running Sum Transformer

Calculate running sum:

use streamweave_transformers::running_sum::RunningSumTransformer;

let transformer = RunningSumTransformer::new(0);

Moving Average Transformer

Calculate moving average:

use streamweave_transformers::moving_average::MovingAverageTransformer;

let transformer = MovingAverageTransformer::new(10);  // Window size 10

Routing Transformers

Router Transformer

Route based on conditions:

use streamweave_transformers::router::RouterTransformer;

let transformer = RouterTransformer::new(|x: i32| {
    if x > 0 { "positive" } else { "negative" }
});

Partition Transformer

Partition by key:

use streamweave_transformers::partition::PartitionTransformer;

let transformer = PartitionTransformer::new(|x: i32| x % 2);

Round Robin Transformer

Round-robin distribution:

use streamweave_transformers::round_robin::RoundRobinTransformer;

let transformer = RoundRobinTransformer::new(3);  // 3 outputs

Merging Transformers

Merge Transformer

Merge streams:

use streamweave_transformers::merge::MergeTransformer;

let transformer = MergeTransformer::new();

Ordered Merge Transformer

Merge maintaining order:

use streamweave_transformers::ordered_merge::OrderedMergeTransformer;

let transformer = OrderedMergeTransformer::new(|x: &i32, y: &i32| x.cmp(y));

Interleave Transformer

Interleave streams:

use streamweave_transformers::interleave::InterleaveTransformer;

let transformer = InterleaveTransformer::new();

Machine Learning Transformers

Inference Transformer

Single-item inference:

#[cfg(feature = "ml")]
use streamweave_transformers::ml::InferenceTransformer;

#[cfg(feature = "ml")]
let transformer = InferenceTransformer::new(/* model path */);

Batched Inference Transformer

Batch inference:

#[cfg(feature = "ml")]
use streamweave_transformers::ml::BatchedInferenceTransformer;

#[cfg(feature = "ml")]
let transformer = BatchedInferenceTransformer::new(/* model path */, 32);

Utility Transformers

Sample Transformer

Random sampling:

use streamweave_transformers::sample::SampleTransformer;

let transformer = SampleTransformer::new(0.1);  // 10% sample rate

Skip Transformer

Skip items:

use streamweave_transformers::skip::SkipTransformer;

let transformer = SkipTransformer::new(10);  // Skip first 10 items

Take Transformer

Take items:

use streamweave_transformers::take::TakeTransformer;

let transformer = TakeTransformer::new(100);  // Take first 100 items

Limit Transformer

Limit stream:

use streamweave_transformers::limit::LimitTransformer;

let transformer = LimitTransformer::new(1000);  // Limit to 1000 items

Sort Transformer

Sort items:

use streamweave_transformers::sort::SortTransformer;

let transformer = SortTransformer::new(|x: &i32, y: &i32| x.cmp(y));

Split Transformer

Split items:

use streamweave_transformers::split::SplitTransformer;

let transformer = SplitTransformer::new(|x: &String| x.split_whitespace());

Split At Transformer

Split at index:

use streamweave_transformers::split_at::SplitAtTransformer;

let transformer = SplitAtTransformer::new(100);  // Split at index 100

Zip Transformer

Zip streams:

use streamweave_transformers::zip::ZipTransformer;

let transformer = ZipTransformer::new();

Timeout Transformer

Timeout operations:

use streamweave_transformers::timeout::TimeoutTransformer;
use std::time::Duration;

let transformer = TimeoutTransformer::new(Duration::from_secs(5));

Message Dedupe Transformer

Deduplicate messages:

use streamweave_transformers::message_dedupe::MessageDedupeTransformer;

let transformer = MessageDedupeTransformer::new();

Group By Transformer

Group by key:

use streamweave_transformers::group_by::GroupByTransformer;

let transformer = GroupByTransformer::new(|x: &i32| x % 10);

🏗️ Architecture

Transformer flow:

Stream<T> ──> Transformer ──> Stream<U>

Transformer Flow:

  1. Input stream flows into transformer
  2. Transformer processes items
  3. Output stream flows out
  4. Transformers can be chained

🔧 Configuration

All transformers support:

  • Error Strategy: Error handling strategy
  • Name: Component name for logging

🔍 Error Handling

Transformers support error handling strategies:

use streamweave_error::ErrorStrategy;

let transformer = MapTransformer::new(|x: i32| x * 2)
    .with_error_strategy(ErrorStrategy::Skip);

⚡ Performance Considerations

  • Batching: Use batch transformers for better throughput
  • Stateful: Stateful transformers maintain state
  • ML: ML transformers require model loading
  • Routing: Routing transformers distribute load

📝 Examples

For more examples, see:

🔗 Dependencies

streamweave-transformers depends on:

  • streamweave - Core traits
  • streamweave-error - Error handling
  • streamweave-message - Message envelopes
  • streamweave-stateful - Stateful operations
  • tokio - Async runtime
  • futures - Stream utilities
  • ort (optional) - ONNX Runtime for ML
  • ndarray (optional) - Array operations for ML

🎯 Use Cases

Transformers are used for:

  1. Data Transformation: Transform data in pipelines
  2. Filtering: Filter data based on conditions
  3. Aggregation: Aggregate data (sum, average, etc.)
  4. Routing: Route data to different paths
  5. Merging: Merge multiple data streams
  6. ML Inference: Run ML models on streams
  7. Utility Operations: Common utility operations

📖 Documentation

🔗 See Also

🤝 Contributing

Contributions are welcome! Please see the Contributing Guide for details.

📄 License

This project is licensed under the CC BY-SA 4.0 license.