streamweave-error 0.4.0

Error handling system for StreamWeave
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
# streamweave-error

[![Crates.io](https://img.shields.io/crates/v/streamweave-error.svg)](https://crates.io/crates/streamweave-error)
[![Documentation](https://docs.rs/streamweave-error/badge.svg)](https://docs.rs/streamweave-error)
[![License: CC BY-SA 4.0](https://img.shields.io/badge/License-CC%20BY--SA%204.0-lightgrey.svg)](https://creativecommons.org/licenses/by-sa/4.0/)

**Error handling system for StreamWeave**  
*Comprehensive error handling with multiple strategies and rich error context.*

The `streamweave-error` package provides a comprehensive error handling system for StreamWeave pipelines and graphs. It includes error strategies, error actions, rich error context, and component information for effective error handling and debugging.

## ✨ Key Features

- **ErrorStrategy Enum**: Multiple error handling strategies (Stop, Skip, Retry, Custom)
- **ErrorAction Enum**: Actions to take when errors occur (Stop, Skip, Retry)
- **StreamError Type**: Rich error type with context and component information
- **ErrorContext**: Detailed context about when and where errors occurred
- **ComponentInfo**: Component identification for error reporting
- **PipelineError**: Pipeline-specific error wrapper

## πŸ“¦ Installation

Add this to your `Cargo.toml`:

```toml
[dependencies]
streamweave-error = "0.3.0"
```

## πŸš€ Quick Start

### Basic Error Handling

```rust
use streamweave_error::{ErrorStrategy, ErrorAction, StreamError, ErrorContext, ComponentInfo};

// Configure error strategy
let strategy = ErrorStrategy::Skip;  // Skip errors and continue

// Create error context
let context = ErrorContext {
    timestamp: chrono::Utc::now(),
    item: Some(42),
    component_name: "MyTransformer".to_string(),
    component_type: "Transformer".to_string(),
};

// Create component info
let component = ComponentInfo::new(
    "my-transformer".to_string(),
    "MyTransformer".to_string(),
);

// Create stream error
let error = StreamError::new(
    Box::new(std::io::Error::other("Something went wrong")),
    context,
    component,
);

// Determine action based on strategy
let action = match strategy {
    ErrorStrategy::Stop => ErrorAction::Stop,
    ErrorStrategy::Skip => ErrorAction::Skip,
    ErrorStrategy::Retry(n) if error.retries < n => ErrorAction::Retry,
    _ => ErrorAction::Stop,
};
```

## πŸ“– API Overview

### ErrorStrategy

The `ErrorStrategy` enum defines how components should handle errors:

```rust
pub enum ErrorStrategy<T> {
    Stop,                    // Stop processing on error
    Skip,                    // Skip errors and continue
    Retry(usize),            // Retry up to N times
    Custom(CustomHandler),   // Custom error handling logic
}
```

**Stop Strategy:**
- Default behavior
- Stops processing immediately when an error occurs
- Ensures data integrity by preventing partial results

**Skip Strategy:**
- Skips items that cause errors
- Continues processing remaining items
- Useful for data cleaning scenarios

**Retry Strategy:**
- Retries failed operations up to specified number of times
- Useful for transient failures (network timeouts, etc.)
- Stops after retry limit is exhausted

**Custom Strategy:**
- Allows fine-grained control over error handling
- Can base decisions on error type, context, or retry count
- Enables complex error handling logic

### ErrorAction

The `ErrorAction` enum represents the action to take when an error occurs:

```rust
pub enum ErrorAction {
    Stop,   // Stop processing immediately
    Skip,   // Skip the item and continue
    Retry,  // Retry the operation
}
```

### StreamError

The `StreamError` type provides rich error information:

```rust
pub struct StreamError<T> {
    pub source: Box<dyn Error + Send + Sync>,  // Original error
    pub context: ErrorContext<T>,                // Error context
    pub component: ComponentInfo,                 // Component info
    pub retries: usize,                          // Retry count
}
```

### ErrorContext

The `ErrorContext` struct provides detailed information about when and where an error occurred:

```rust
pub struct ErrorContext<T> {
    pub timestamp: chrono::DateTime<chrono::Utc>,  // When error occurred
    pub item: Option<T>,                            // Item being processed
    pub component_name: String,                      // Component name
    pub component_type: String,                      // Component type
}
```

### ComponentInfo

The `ComponentInfo` struct identifies the component that encountered an error:

```rust
pub struct ComponentInfo {
    pub name: String,       // Component name
    pub type_name: String,  // Component type name
}
```

## πŸ“š Usage Examples

### Stop Strategy

Stop processing on first error (default behavior):

```rust
use streamweave_error::ErrorStrategy;

let strategy = ErrorStrategy::<i32>::Stop;

// When an error occurs, processing stops immediately
// This ensures data integrity
```

### Skip Strategy

Skip errors and continue processing:

```rust
use streamweave_error::ErrorStrategy;

let strategy = ErrorStrategy::<i32>::Skip;

// Invalid items are skipped, processing continues
// Useful for data cleaning pipelines
```

### Retry Strategy

Retry failed operations:

```rust
use streamweave_error::ErrorStrategy;

let strategy = ErrorStrategy::<i32>::Retry(3);

// Retries up to 3 times before giving up
// Useful for transient failures like network timeouts
```

### Custom Error Handler

Implement custom error handling logic:

```rust
use streamweave_error::{ErrorStrategy, ErrorAction, StreamError};

let strategy = ErrorStrategy::<i32>::new_custom(|error: &StreamError<i32>| {
    // Retry transient errors
    if error.retries < 3 && is_transient(&error.source) {
        ErrorAction::Retry
    }
    // Skip validation errors
    else if is_validation_error(&error.source) {
        ErrorAction::Skip
    }
    // Stop on critical errors
    else {
        ErrorAction::Stop
    }
});
```

### Creating Error Context

Create detailed error context for debugging:

```rust
use streamweave_error::{ErrorContext, ComponentInfo, StreamError};

// Create error context
let context = ErrorContext {
    timestamp: chrono::Utc::now(),
    item: Some(problematic_item),
    component_name: "DataValidator".to_string(),
    component_type: "Transformer".to_string(),
};

// Create component info
let component = ComponentInfo::new(
    "validator".to_string(),
    "DataValidator".to_string(),
);

// Create stream error
let error = StreamError::new(
    Box::new(validation_error),
    context,
    component,
);
```

### Error Propagation Patterns

Handle errors at different levels:

```rust
use streamweave_error::{ErrorStrategy, StreamError, ErrorAction};

// Component-level error handling
fn handle_component_error(error: &StreamError<i32>) -> ErrorAction {
    match error.retries {
        0..=2 => ErrorAction::Retry,  // Retry first 3 attempts
        _ => ErrorAction::Skip,        // Skip after retries exhausted
    }
}

// Pipeline-level error handling
fn handle_pipeline_error(error: &StreamError<i32>) -> ErrorAction {
    if is_critical_error(&error.source) {
        ErrorAction::Stop  // Stop on critical errors
    } else {
        ErrorAction::Skip  // Skip non-critical errors
    }
}
```

## πŸ”§ Configuration

Error strategies can be configured at multiple levels:

### Component Level

```rust
use streamweave_error::ErrorStrategy;

// Configure producer error handling
let producer = MyProducer::new()
    .with_config(
        ProducerConfig::default()
            .with_error_strategy(ErrorStrategy::Retry(3))
    );

// Configure transformer error handling
let transformer = MyTransformer::new()
    .with_config(
        TransformerConfig::default()
            .with_error_strategy(ErrorStrategy::Skip)
    );

// Configure consumer error handling
let consumer = MyConsumer::new()
    .with_config(
        ConsumerConfig {
            error_strategy: ErrorStrategy::Stop,
            name: "output".to_string(),
        }
    );
```

### Pipeline Level

```rust
use streamweave::PipelineBuilder;
use streamweave_error::ErrorStrategy;

let pipeline = PipelineBuilder::new()
    .producer(producer)
    .transformer(transformer)
    .consumer(consumer)
    .with_error_strategy(ErrorStrategy::Skip);  // Pipeline-wide default
```

## πŸ—οΈ Architecture

The error handling system integrates at multiple levels:

```
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚   Component     │───encounters error───> StreamError
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                           β”‚
                                               β”‚
                                               β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ ErrorStrategy   │───determines───>   β”‚ ErrorAction  β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
         β”‚                                       β”‚
         β”‚                                       β–Ό
         β”‚                              β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
         └───uses context from───>       β”‚ ErrorContext β”‚
                                        β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
```

## πŸ” Error Handling

### Error Strategies

**Stop (Default):**
- Ensures data integrity
- Prevents partial results
- Best for critical data processing

**Skip:**
- Allows partial results
- Continues processing
- Best for data cleaning and filtering

**Retry:**
- Handles transient failures
- Configurable retry count
- Best for network operations and external services

**Custom:**
- Maximum flexibility
- Can implement complex logic
- Best for domain-specific error handling

### Error Context

Error context provides:
- **Timestamp**: When the error occurred
- **Item**: The item being processed (if available)
- **Component Name**: Human-readable component identifier
- **Component Type**: Type name for debugging

### Component Information

Component info enables:
- **Logging**: Identify which component failed
- **Metrics**: Track errors per component
- **Debugging**: Locate source of errors
- **Monitoring**: Alert on component failures

## ⚑ Performance Considerations

- **Zero-Cost Abstractions**: Error strategies compile to efficient code
- **Minimal Overhead**: Error context creation is lightweight
- **Clone Efficiency**: Error types are designed for efficient cloning
- **Send + Sync**: All error types are thread-safe

## πŸ“ Examples

For more examples, see:
- [Error Handling Examples]https://github.com/Industrial/streamweave/tree/main/examples/error_handling
- [Pipeline Examples]https://github.com/Industrial/streamweave/tree/main/examples/basic_pipeline

## πŸ”— Dependencies

`streamweave-error` depends on:

- `thiserror` - Error trait implementation
- `serde` - Serialization support
- `serde_json` - JSON serialization
- `chrono` - Timestamp support

## 🎯 Use Cases

The error handling system is used for:

1. **Data Validation**: Skip invalid records while processing valid ones
2. **Network Operations**: Retry transient network failures
3. **External Services**: Handle service unavailability gracefully
4. **Data Cleaning**: Skip malformed data and continue processing
5. **Critical Processing**: Stop on errors to ensure data integrity

## πŸ“– Documentation

- [Full API Documentation]https://docs.rs/streamweave-error
- [Repository]https://github.com/Industrial/streamweave/tree/main/packages/error
- [StreamWeave Main Documentation]https://docs.rs/streamweave

## πŸ”— See Also

- [streamweave]../streamweave/README.md - Core traits that use error handling
- [streamweave-pipeline]../pipeline/README.md - Pipeline error handling
- [streamweave-graph]../graph/README.md - Graph error handling

## 🀝 Contributing

Contributions are welcome! Please see the [Contributing Guide](https://github.com/Industrial/streamweave/blob/main/CONTRIBUTING.md) for details.

## πŸ“„ License

This project is licensed under the [CC BY-SA 4.0](https://creativecommons.org/licenses/by-sa/4.0/) license.