Expand description
Β§streamweave-error
Error handling system for StreamWeave
Comprehensive error handling with multiple strategies and rich error context.
The streamweave-error package provides a comprehensive error handling system for StreamWeave pipelines and graphs. It includes error strategies, error actions, rich error context, and component information for effective error handling and debugging.
Β§β¨ Key Features
- ErrorStrategy Enum: Multiple error handling strategies (Stop, Skip, Retry, Custom)
- ErrorAction Enum: Actions to take when errors occur (Stop, Skip, Retry)
- StreamError Type: Rich error type with context and component information
- ErrorContext: Detailed context about when and where errors occurred
- ComponentInfo: Component identification for error reporting
- PipelineError: Pipeline-specific error wrapper
Β§π¦ Installation
Add this to your Cargo.toml:
[dependencies]
streamweave-error = "0.3.0"Β§π Quick Start
Β§Basic Error Handling
use streamweave_error::{ErrorStrategy, ErrorAction, StreamError, ErrorContext, ComponentInfo};
// Configure error strategy
let strategy = ErrorStrategy::Skip; // Skip errors and continue
// Create error context
let context = ErrorContext {
timestamp: chrono::Utc::now(),
item: Some(42),
component_name: "MyTransformer".to_string(),
component_type: "Transformer".to_string(),
};
// Create component info
let component = ComponentInfo::new(
"my-transformer".to_string(),
"MyTransformer".to_string(),
);
// Create stream error
let error = StreamError::new(
Box::new(std::io::Error::other("Something went wrong")),
context,
component,
);
// Determine action based on strategy
let action = match strategy {
ErrorStrategy::Stop => ErrorAction::Stop,
ErrorStrategy::Skip => ErrorAction::Skip,
ErrorStrategy::Retry(n) if error.retries < n => ErrorAction::Retry,
_ => ErrorAction::Stop,
};Β§π API Overview
Β§ErrorStrategy
The ErrorStrategy enum defines how components should handle errors:
pub enum ErrorStrategy<T> {
Stop, // Stop processing on error
Skip, // Skip errors and continue
Retry(usize), // Retry up to N times
Custom(CustomHandler), // Custom error handling logic
}Stop Strategy:
- Default behavior
- Stops processing immediately when an error occurs
- Ensures data integrity by preventing partial results
Skip Strategy:
- Skips items that cause errors
- Continues processing remaining items
- Useful for data cleaning scenarios
Retry Strategy:
- Retries failed operations up to specified number of times
- Useful for transient failures (network timeouts, etc.)
- Stops after retry limit is exhausted
Custom Strategy:
- Allows fine-grained control over error handling
- Can base decisions on error type, context, or retry count
- Enables complex error handling logic
Β§ErrorAction
The ErrorAction enum represents the action to take when an error occurs:
pub enum ErrorAction {
Stop, // Stop processing immediately
Skip, // Skip the item and continue
Retry, // Retry the operation
}Β§StreamError
The StreamError type provides rich error information:
pub struct StreamError<T> {
pub source: Box<dyn Error + Send + Sync>, // Original error
pub context: ErrorContext<T>, // Error context
pub component: ComponentInfo, // Component info
pub retries: usize, // Retry count
}Β§ErrorContext
The ErrorContext struct provides detailed information about when and where an error occurred:
pub struct ErrorContext<T> {
pub timestamp: chrono::DateTime<chrono::Utc>, // When error occurred
pub item: Option<T>, // Item being processed
pub component_name: String, // Component name
pub component_type: String, // Component type
}Β§ComponentInfo
The ComponentInfo struct identifies the component that encountered an error:
pub struct ComponentInfo {
pub name: String, // Component name
pub type_name: String, // Component type name
}Β§π Usage Examples
Β§Stop Strategy
Stop processing on first error (default behavior):
use streamweave_error::ErrorStrategy;
let strategy = ErrorStrategy::<i32>::Stop;
// When an error occurs, processing stops immediately
// This ensures data integrityΒ§Skip Strategy
Skip errors and continue processing:
use streamweave_error::ErrorStrategy;
let strategy = ErrorStrategy::<i32>::Skip;
// Invalid items are skipped, processing continues
// Useful for data cleaning pipelinesΒ§Retry Strategy
Retry failed operations:
use streamweave_error::ErrorStrategy;
let strategy = ErrorStrategy::<i32>::Retry(3);
// Retries up to 3 times before giving up
// Useful for transient failures like network timeoutsΒ§Custom Error Handler
Implement custom error handling logic:
use streamweave_error::{ErrorStrategy, ErrorAction, StreamError};
let strategy = ErrorStrategy::<i32>::new_custom(|error: &StreamError<i32>| {
// Retry transient errors
if error.retries < 3 && is_transient(&error.source) {
ErrorAction::Retry
}
// Skip validation errors
else if is_validation_error(&error.source) {
ErrorAction::Skip
}
// Stop on critical errors
else {
ErrorAction::Stop
}
});Β§Creating Error Context
Create detailed error context for debugging:
use streamweave_error::{ErrorContext, ComponentInfo, StreamError};
// Create error context
let context = ErrorContext {
timestamp: chrono::Utc::now(),
item: Some(problematic_item),
component_name: "DataValidator".to_string(),
component_type: "Transformer".to_string(),
};
// Create component info
let component = ComponentInfo::new(
"validator".to_string(),
"DataValidator".to_string(),
);
// Create stream error
let error = StreamError::new(
Box::new(validation_error),
context,
component,
);Β§Error Propagation Patterns
Handle errors at different levels:
use streamweave_error::{ErrorStrategy, StreamError, ErrorAction};
// Component-level error handling
fn handle_component_error(error: &StreamError<i32>) -> ErrorAction {
match error.retries {
0..=2 => ErrorAction::Retry, // Retry first 3 attempts
_ => ErrorAction::Skip, // Skip after retries exhausted
}
}
// Pipeline-level error handling
fn handle_pipeline_error(error: &StreamError<i32>) -> ErrorAction {
if is_critical_error(&error.source) {
ErrorAction::Stop // Stop on critical errors
} else {
ErrorAction::Skip // Skip non-critical errors
}
}Β§π§ Configuration
Error strategies can be configured at multiple levels:
Β§Component Level
use streamweave_error::ErrorStrategy;
// Configure producer error handling
let producer = MyProducer::new()
.with_config(
ProducerConfig::default()
.with_error_strategy(ErrorStrategy::Retry(3))
);
// Configure transformer error handling
let transformer = MyTransformer::new()
.with_config(
TransformerConfig::default()
.with_error_strategy(ErrorStrategy::Skip)
);
// Configure consumer error handling
let consumer = MyConsumer::new()
.with_config(
ConsumerConfig {
error_strategy: ErrorStrategy::Stop,
name: "output".to_string(),
}
);Β§Pipeline Level
use streamweave::PipelineBuilder;
use streamweave_error::ErrorStrategy;
let pipeline = PipelineBuilder::new()
.producer(producer)
.transformer(transformer)
.consumer(consumer)
.with_error_strategy(ErrorStrategy::Skip); // Pipeline-wide defaultΒ§ποΈ Architecture
The error handling system integrates at multiple levels:
βββββββββββββββββββ
β Component ββββencounters errorβββ> StreamError
βββββββββββββββββββ β
β
βΌ
βββββββββββββββββββ ββββββββββββββββ
β ErrorStrategy ββββdeterminesβββ> β ErrorAction β
βββββββββββββββββββ ββββββββββββββββ
β β
β βΌ
β ββββββββββββββββ
ββββuses context fromβββ> β ErrorContext β
ββββββββββββββββΒ§π Error Handling
Β§Error Strategies
Stop (Default):
- Ensures data integrity
- Prevents partial results
- Best for critical data processing
Skip:
- Allows partial results
- Continues processing
- Best for data cleaning and filtering
Retry:
- Handles transient failures
- Configurable retry count
- Best for network operations and external services
Custom:
- Maximum flexibility
- Can implement complex logic
- Best for domain-specific error handling
Β§Error Context
Error context provides:
- Timestamp: When the error occurred
- Item: The item being processed (if available)
- Component Name: Human-readable component identifier
- Component Type: Type name for debugging
Β§Component Information
Component info enables:
- Logging: Identify which component failed
- Metrics: Track errors per component
- Debugging: Locate source of errors
- Monitoring: Alert on component failures
Β§β‘ Performance Considerations
- Zero-Cost Abstractions: Error strategies compile to efficient code
- Minimal Overhead: Error context creation is lightweight
- Clone Efficiency: Error types are designed for efficient cloning
- Send + Sync: All error types are thread-safe
Β§π Examples
For more examples, see:
Β§π Dependencies
streamweave-error depends on:
thiserror- Error trait implementationserde- Serialization supportserde_json- JSON serializationchrono- Timestamp support
Β§π― Use Cases
The error handling system is used for:
- Data Validation: Skip invalid records while processing valid ones
- Network Operations: Retry transient network failures
- External Services: Handle service unavailability gracefully
- Data Cleaning: Skip malformed data and continue processing
- Critical Processing: Stop on errors to ensure data integrity
Β§π Documentation
Β§π See Also
- streamweave - Core traits that use error handling
- streamweave-pipeline - Pipeline error handling
- streamweave-graph - Graph error handling
Β§π€ Contributing
Contributions are welcome! Please see the Contributing Guide for details.
Β§π License
This project is licensed under the CC BY-SA 4.0 license.
Re-exportsΒ§
pub use error::*;