streamweave_error/
error.rs

1use std::error::Error;
2use std::fmt;
3use std::sync::Arc;
4
5/// Action to take when an error occurs in a pipeline component.
6///
7/// This enum is used by error strategies to determine how to handle errors
8/// during stream processing.
9///
10/// # Example
11///
12/// ```rust
13/// use streamweave::error::ErrorAction;
14///
15/// // Stop processing on error
16/// let action = ErrorAction::Stop;
17///
18/// // Skip the item and continue
19/// let action = ErrorAction::Skip;
20///
21/// // Retry the operation
22/// let action = ErrorAction::Retry;
23/// ```
24#[derive(Debug, Clone, PartialEq)]
25pub enum ErrorAction {
26  /// Stop processing immediately when an error occurs.
27  ///
28  /// This is the default behavior and ensures data integrity by preventing
29  /// partial results after an error.
30  Stop,
31  /// Skip the item that caused the error and continue processing.
32  ///
33  /// Useful for non-critical errors where partial results are acceptable.
34  Skip,
35  /// Retry the operation that caused the error.
36  ///
37  /// Useful for transient failures that may succeed on retry.
38  Retry,
39}
40
41// Type alias for the complex custom error handler function
42type CustomErrorHandler<T> = Arc<dyn Fn(&StreamError<T>) -> ErrorAction + Send + Sync>;
43
44/// Strategy for handling errors in pipeline components.
45///
46/// Error strategies determine how components respond to errors during
47/// stream processing. Strategies can be set at the pipeline level or
48/// overridden at the component level.
49///
50/// # Example
51///
52/// ```rust
53/// use streamweave::error::ErrorStrategy;
54///
55/// // Stop on first error (default)
56/// let strategy = ErrorStrategy::Stop;
57///
58/// // Skip errors and continue
59/// let strategy = ErrorStrategy::Skip;
60///
61/// // Retry up to 3 times
62/// let strategy = ErrorStrategy::Retry(3);
63///
64/// // Custom error handling
65/// let strategy = ErrorStrategy::new_custom(|error| {
66///     if error.retries < 2 {
67///         ErrorAction::Retry
68///     } else {
69///         ErrorAction::Stop
70///     }
71/// });
72/// ```
73pub enum ErrorStrategy<T: std::fmt::Debug + Clone + Send + Sync> {
74  /// Stop processing immediately when an error occurs.
75  ///
76  /// This is the default strategy and ensures data integrity.
77  Stop,
78  /// Skip items that cause errors and continue processing.
79  ///
80  /// Useful for data cleaning scenarios where invalid records can be
81  /// safely ignored.
82  Skip,
83  /// Retry failed operations up to the specified number of times.
84  ///
85  /// # Arguments
86  ///
87  /// * `usize` - Maximum number of retry attempts
88  ///
89  /// Useful for transient failures like network timeouts.
90  Retry(usize),
91  /// Custom error handling logic.
92  ///
93  /// Allows fine-grained control over error handling based on error
94  /// context, type, or retry count.
95  Custom(CustomErrorHandler<T>),
96}
97
98impl<T: std::fmt::Debug + Clone + Send + Sync> Clone for ErrorStrategy<T> {
99  fn clone(&self) -> Self {
100    match self {
101      ErrorStrategy::Stop => ErrorStrategy::Stop,
102      ErrorStrategy::Skip => ErrorStrategy::Skip,
103      ErrorStrategy::Retry(n) => ErrorStrategy::Retry(*n),
104      ErrorStrategy::Custom(handler) => ErrorStrategy::Custom(handler.clone()),
105    }
106  }
107}
108
109impl<T: std::fmt::Debug + Clone + Send + Sync> fmt::Debug for ErrorStrategy<T> {
110  fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
111    match self {
112      ErrorStrategy::Stop => write!(f, "ErrorStrategy::Stop"),
113      ErrorStrategy::Skip => write!(f, "ErrorStrategy::Skip"),
114      ErrorStrategy::Retry(n) => write!(f, "ErrorStrategy::Retry({})", n),
115      ErrorStrategy::Custom(_) => write!(f, "ErrorStrategy::Custom"),
116    }
117  }
118}
119
120impl<T: std::fmt::Debug + Clone + Send + Sync> PartialEq for ErrorStrategy<T> {
121  fn eq(&self, other: &Self) -> bool {
122    match (self, other) {
123      (ErrorStrategy::Stop, ErrorStrategy::Stop) => true,
124      (ErrorStrategy::Skip, ErrorStrategy::Skip) => true,
125      (ErrorStrategy::Retry(n1), ErrorStrategy::Retry(n2)) => n1 == n2,
126      (ErrorStrategy::Custom(_), ErrorStrategy::Custom(_)) => true,
127      _ => false,
128    }
129  }
130}
131
132impl<T: std::fmt::Debug + Clone + Send + Sync> ErrorStrategy<T> {
133  /// Creates a custom error handling strategy with a user-defined handler function.
134  ///
135  /// # Arguments
136  ///
137  /// * `f` - A function that takes a `StreamError` and returns an `ErrorAction`.
138  ///
139  /// # Returns
140  ///
141  /// A `Custom` error strategy that uses the provided handler function.
142  pub fn new_custom<F>(f: F) -> Self
143  where
144    F: Fn(&StreamError<T>) -> ErrorAction + Send + Sync + 'static,
145  {
146    Self::Custom(Arc::new(f))
147  }
148}
149
150/// Error that occurred during stream processing.
151///
152/// This error type provides rich context about where and when an error
153/// occurred, making it easier to debug and handle errors appropriately.
154///
155/// # Fields
156///
157/// * `source` - The original error that occurred
158/// * `context` - Context about when and where the error occurred
159/// * `component` - Information about the component that encountered the error
160/// * `retries` - Number of times this error has been retried
161///
162/// # Example
163///
164/// ```rust
165/// use streamweave::error::{StreamError, ErrorContext, ComponentInfo};
166/// use std::error::Error;
167///
168/// # fn example() {
169/// let error = StreamError::new(
170///     Box::new(std::io::Error::new(std::io::ErrorKind::NotFound, "File not found")),
171///     ErrorContext {
172///         timestamp: chrono::Utc::now(),
173///         item: Some(42),
174///         component_name: "FileProducer".to_string(),
175///         component_type: "Producer".to_string(),
176///     },
177///     ComponentInfo {
178///         name: "file-producer".to_string(),
179///         type_name: "FileProducer".to_string(),
180///     },
181/// );
182/// # }
183/// ```
184#[derive(Debug)]
185pub struct StreamError<T: std::fmt::Debug + Clone + Send + Sync> {
186  /// The original error that occurred.
187  pub source: Box<dyn Error + Send + Sync>,
188  /// Context about when and where the error occurred.
189  pub context: ErrorContext<T>,
190  /// Information about the component that encountered the error.
191  pub component: ComponentInfo,
192  /// Number of times this error has been retried.
193  pub retries: usize,
194}
195
196impl<T: std::fmt::Debug + Clone + Send + Sync> Clone for StreamError<T> {
197  fn clone(&self) -> Self {
198    Self {
199      source: Box::new(StringError(self.source.to_string())),
200      context: self.context.clone(),
201      component: self.component.clone(),
202      retries: self.retries,
203    }
204  }
205}
206
207/// A simple error type that wraps a string message.
208///
209/// This is useful for creating errors from string messages without
210/// needing to implement a full error type.
211#[derive(Debug)]
212pub struct StringError(pub String);
213
214impl std::fmt::Display for StringError {
215  fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
216    write!(f, "{}", self.0)
217  }
218}
219
220impl std::error::Error for StringError {}
221
222impl<T: std::fmt::Debug + Clone + Send + Sync> StreamError<T> {
223  /// Creates a new `StreamError` with the given source error, context, and component information.
224  ///
225  /// # Arguments
226  ///
227  /// * `source` - The original error that occurred.
228  /// * `context` - Context about when and where the error occurred.
229  /// * `component` - Information about the component that encountered the error.
230  ///
231  /// # Returns
232  ///
233  /// A new `StreamError` with `retries` set to 0.
234  pub fn new(
235    source: Box<dyn Error + Send + Sync>,
236    context: ErrorContext<T>,
237    component: ComponentInfo,
238  ) -> Self {
239    Self {
240      source,
241      context,
242      component,
243      retries: 0,
244    }
245  }
246}
247
248impl<T: std::fmt::Debug + Clone + Send + Sync> fmt::Display for StreamError<T> {
249  fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
250    write!(
251      f,
252      "Error in {} ({}): {}",
253      self.component.name, self.component.type_name, self.source
254    )
255  }
256}
257
258impl<T: std::fmt::Debug + Clone + Send + Sync> Error for StreamError<T> {
259  fn source(&self) -> Option<&(dyn Error + 'static)> {
260    Some(self.source.as_ref())
261  }
262}
263
264/// Context information about when and where an error occurred.
265///
266/// This struct provides detailed information about the circumstances
267/// surrounding an error, including the timestamp, the item being processed
268/// (if any), and the component that encountered the error.
269#[derive(Debug, Clone, PartialEq)]
270pub struct ErrorContext<T: std::fmt::Debug + Clone + Send + Sync> {
271  /// The timestamp when the error occurred.
272  pub timestamp: chrono::DateTime<chrono::Utc>,
273  /// The item being processed when the error occurred, if available.
274  pub item: Option<T>,
275  /// The name of the component that encountered the error.
276  pub component_name: String,
277  /// The type of the component that encountered the error.
278  pub component_type: String,
279}
280
281impl<T: std::fmt::Debug + Clone + Send + Sync> Default for ErrorContext<T> {
282  fn default() -> Self {
283    Self {
284      timestamp: chrono::Utc::now(),
285      item: None,
286      component_name: "default".to_string(),
287      component_type: "default".to_string(),
288    }
289  }
290}
291
292/// Represents the stage in a pipeline where an error occurred.
293///
294/// This enum is used to identify which part of the pipeline
295/// encountered an error, allowing for more targeted error handling.
296#[derive(Debug, Clone, PartialEq)]
297pub enum PipelineStage {
298  /// Error occurred in a producer.
299  Producer,
300  /// Error occurred in a transformer, with the transformer name.
301  Transformer(String),
302  /// Error occurred in a consumer.
303  Consumer,
304}
305
306/// Information about a pipeline component.
307///
308/// This struct provides identifying information about a component,
309/// including its name and type, which is useful for logging and error reporting.
310#[derive(Debug, Clone, PartialEq)]
311pub struct ComponentInfo {
312  /// The name of the component.
313  pub name: String,
314  /// The type name of the component.
315  pub type_name: String,
316}
317
318impl Default for ComponentInfo {
319  fn default() -> Self {
320    Self {
321      name: "default".to_string(),
322      type_name: "default".to_string(),
323    }
324  }
325}
326
327impl ComponentInfo {
328  /// Creates a new `ComponentInfo` with the given name and type name.
329  ///
330  /// # Arguments
331  ///
332  /// * `name` - The name of the component.
333  /// * `type_name` - The type name of the component.
334  ///
335  /// # Returns
336  ///
337  /// A new `ComponentInfo` instance.
338  pub fn new(name: String, type_name: String) -> Self {
339    Self { name, type_name }
340  }
341}
342
343/// Extended error context that includes pipeline stage information.
344///
345/// This struct combines `ErrorContext` with `PipelineStage` to provide
346/// comprehensive information about where an error occurred in the pipeline.
347#[derive(Debug, Clone, PartialEq)]
348pub struct PipelineErrorContext<T: std::fmt::Debug + Clone + Send + Sync> {
349  /// The base error context.
350  pub context: ErrorContext<T>,
351  /// The pipeline stage where the error occurred.
352  pub stage: PipelineStage,
353}
354
355/// An error that occurred during pipeline execution.
356///
357/// This struct wraps a `StreamError` and provides pipeline-specific error information.
358#[derive(Debug)]
359pub struct PipelineError<T: std::fmt::Debug + Clone + Send + Sync> {
360  inner: StreamError<T>,
361}
362
363impl<T: std::fmt::Debug + Clone + Send + Sync> PipelineError<T> {
364  /// Creates a new `PipelineError` from an error, context, and component information.
365  ///
366  /// # Arguments
367  ///
368  /// * `error` - The underlying error that occurred.
369  /// * `context` - The error context containing details about when and where the error occurred.
370  /// * `component` - Information about the component where the error occurred.
371  ///
372  /// # Returns
373  ///
374  /// A new `PipelineError` instance.
375  pub fn new<E>(error: E, context: ErrorContext<T>, component: ComponentInfo) -> Self
376  where
377    E: Error + Send + Sync + 'static,
378  {
379    Self {
380      inner: StreamError::new(Box::new(error), context, component),
381    }
382  }
383
384  /// Creates a new `PipelineError` from an existing `StreamError`.
385  ///
386  /// # Arguments
387  ///
388  /// * `error` - The `StreamError` to wrap.
389  ///
390  /// # Returns
391  ///
392  /// A new `PipelineError` instance.
393  pub fn from_stream_error(error: StreamError<T>) -> Self {
394    Self { inner: error }
395  }
396
397  /// Returns a reference to the error context.
398  ///
399  /// # Returns
400  ///
401  /// A reference to the `ErrorContext` associated with this error.
402  pub fn context(&self) -> &ErrorContext<T> {
403    &self.inner.context
404  }
405
406  /// Returns a reference to the component information.
407  ///
408  /// # Returns
409  ///
410  /// A reference to the `ComponentInfo` associated with this error.
411  pub fn component(&self) -> &ComponentInfo {
412    &self.inner.component
413  }
414}
415
416impl<T: std::fmt::Debug + Clone + Send + Sync> std::fmt::Display for PipelineError<T> {
417  fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
418    write!(
419      f,
420      "Pipeline error in {}: {}",
421      self.inner.component.name, self.inner.source
422    )
423  }
424}
425
426impl<T: std::fmt::Debug + Clone + Send + Sync> Error for PipelineError<T> {
427  fn source(&self) -> Option<&(dyn Error + 'static)> {
428    Some(&*self.inner.source)
429  }
430}
431
432#[cfg(test)]
433mod tests {
434  use super::*;
435  use std::error::Error;
436
437  #[test]
438  fn test_error_action() {
439    assert_eq!(ErrorAction::Stop, ErrorAction::Stop);
440    assert_eq!(ErrorAction::Skip, ErrorAction::Skip);
441    assert_eq!(ErrorAction::Retry, ErrorAction::Retry);
442    assert_ne!(ErrorAction::Stop, ErrorAction::Skip);
443    assert_ne!(ErrorAction::Skip, ErrorAction::Retry);
444    assert_ne!(ErrorAction::Retry, ErrorAction::Stop);
445  }
446
447  #[test]
448  fn test_error_strategy_clone() {
449    let strategy = ErrorStrategy::<i32>::Stop;
450    assert_eq!(strategy.clone(), ErrorStrategy::Stop);
451
452    let strategy = ErrorStrategy::<i32>::Skip;
453    assert_eq!(strategy.clone(), ErrorStrategy::Skip);
454
455    let strategy = ErrorStrategy::<i32>::Retry(3);
456    assert_eq!(strategy.clone(), ErrorStrategy::Retry(3));
457
458    let strategy = ErrorStrategy::<i32>::new_custom(|_| ErrorAction::Skip);
459    let cloned = strategy.clone();
460    let _error: StreamError<i32> = StreamError {
461      source: Box::new(StringError("test".to_string())),
462      context: ErrorContext::default(),
463      component: ComponentInfo::default(),
464      retries: 0,
465    };
466    assert_eq!(strategy, cloned);
467  }
468
469  #[test]
470  fn test_error_strategy_debug() {
471    assert_eq!(
472      format!("{:?}", ErrorStrategy::<i32>::Stop),
473      "ErrorStrategy::Stop"
474    );
475    assert_eq!(
476      format!("{:?}", ErrorStrategy::<i32>::Skip),
477      "ErrorStrategy::Skip"
478    );
479    assert_eq!(
480      format!("{:?}", ErrorStrategy::<i32>::Retry(3)),
481      "ErrorStrategy::Retry(3)"
482    );
483    assert_eq!(
484      format!(
485        "{:?}",
486        ErrorStrategy::<i32>::new_custom(|_| ErrorAction::Skip)
487      ),
488      "ErrorStrategy::Custom"
489    );
490  }
491
492  #[test]
493  fn test_error_strategy_partial_eq() {
494    assert_eq!(ErrorStrategy::<i32>::Stop, ErrorStrategy::Stop);
495    assert_eq!(ErrorStrategy::<i32>::Skip, ErrorStrategy::Skip);
496    assert_eq!(ErrorStrategy::<i32>::Retry(3), ErrorStrategy::Retry(3));
497    assert_ne!(ErrorStrategy::<i32>::Stop, ErrorStrategy::Skip);
498    assert_ne!(ErrorStrategy::<i32>::Skip, ErrorStrategy::Retry(3));
499    assert_ne!(ErrorStrategy::<i32>::Retry(3), ErrorStrategy::Stop);
500  }
501
502  #[test]
503  fn test_error_strategy_new_custom() {
504    let strategy = ErrorStrategy::<i32>::new_custom(|_| ErrorAction::Skip);
505    let error: StreamError<i32> = StreamError {
506      source: Box::new(StringError("test".to_string())),
507      context: ErrorContext::default(),
508      component: ComponentInfo::default(),
509      retries: 0,
510    };
511    if let ErrorStrategy::Custom(handler) = strategy {
512      assert_eq!(handler(&error), ErrorAction::Skip);
513    } else {
514      panic!("Expected Custom variant");
515    }
516  }
517
518  #[test]
519  fn test_stream_error_clone() {
520    let error: StreamError<i32> = StreamError {
521      source: Box::new(StringError("test".to_string())),
522      context: ErrorContext::default(),
523      component: ComponentInfo::default(),
524      retries: 0,
525    };
526    let cloned = error.clone();
527    assert_eq!(error.source.to_string(), cloned.source.to_string());
528    assert_eq!(error.context, cloned.context);
529    assert_eq!(error.component, cloned.component);
530    assert_eq!(error.retries, cloned.retries);
531  }
532
533  #[test]
534  fn test_stream_error_display() {
535    let error: StreamError<i32> = StreamError {
536      source: Box::new(StringError("test error".to_string())),
537      context: ErrorContext::default(),
538      component: ComponentInfo {
539        name: "test".to_string(),
540        type_name: "TestComponent".to_string(),
541      },
542      retries: 0,
543    };
544    assert_eq!(
545      error.to_string(),
546      "Error in test (TestComponent): test error"
547    );
548  }
549
550  #[test]
551  fn test_stream_error_error() {
552    let source = StringError("test error".to_string());
553    let error: StreamError<i32> = StreamError {
554      source: Box::new(source),
555      context: ErrorContext::default(),
556      component: ComponentInfo::default(),
557      retries: 0,
558    };
559    assert_eq!(error.source().unwrap().to_string(), "test error");
560  }
561
562  #[test]
563  fn test_stream_error_new() {
564    let source = StringError("test error".to_string());
565    let context: ErrorContext<i32> = ErrorContext::default();
566    let component = ComponentInfo::default();
567    let error = StreamError::new(Box::new(source), context.clone(), component.clone());
568    assert_eq!(error.source.to_string(), "test error");
569    assert_eq!(error.context, context);
570    assert_eq!(error.component, component);
571    assert_eq!(error.retries, 0);
572  }
573
574  #[test]
575  fn test_error_context_default() {
576    let context = ErrorContext::<i32>::default();
577    assert!(context.timestamp <= chrono::Utc::now());
578    assert_eq!(context.item, None);
579    assert_eq!(context.component_name, "default");
580    assert_eq!(context.component_type, "default");
581  }
582
583  #[test]
584  fn test_error_context_clone() {
585    let context = ErrorContext {
586      timestamp: chrono::Utc::now(),
587      item: Some(42),
588      component_name: "test".to_string(),
589      component_type: "TestComponent".to_string(),
590    };
591    let cloned = context.clone();
592    assert_eq!(context.timestamp, cloned.timestamp);
593    assert_eq!(context.item, cloned.item);
594    assert_eq!(context.component_name, cloned.component_name);
595    assert_eq!(context.component_type, cloned.component_type);
596  }
597
598  #[test]
599  fn test_error_context_partial_eq() {
600    let context1 = ErrorContext {
601      timestamp: chrono::Utc::now(),
602      item: Some(42),
603      component_name: "test".to_string(),
604      component_type: "TestComponent".to_string(),
605    };
606    let context2 = ErrorContext {
607      timestamp: context1.timestamp,
608      item: Some(42),
609      component_name: "test".to_string(),
610      component_type: "TestComponent".to_string(),
611    };
612    assert_eq!(context1, context2);
613  }
614
615  #[test]
616  fn test_component_info_default() {
617    let info = ComponentInfo::default();
618    assert_eq!(info.name, "default");
619    assert_eq!(info.type_name, "default");
620  }
621
622  #[test]
623  fn test_component_info_clone() {
624    let info = ComponentInfo {
625      name: "test".to_string(),
626      type_name: "TestComponent".to_string(),
627    };
628    let cloned = info.clone();
629    assert_eq!(info.name, cloned.name);
630    assert_eq!(info.type_name, cloned.type_name);
631  }
632
633  #[test]
634  fn test_component_info_partial_eq() {
635    let info1 = ComponentInfo {
636      name: "test".to_string(),
637      type_name: "TestComponent".to_string(),
638    };
639    let info2 = ComponentInfo {
640      name: "test".to_string(),
641      type_name: "TestComponent".to_string(),
642    };
643    assert_eq!(info1, info2);
644  }
645
646  #[test]
647  fn test_component_info_new() {
648    let info = ComponentInfo::new("test".to_string(), "TestComponent".to_string());
649    assert_eq!(info.name, "test");
650    assert_eq!(info.type_name, "TestComponent");
651  }
652
653  #[test]
654  fn test_pipeline_error_new() {
655    let source = StringError("test error".to_string());
656    let context: ErrorContext<i32> = ErrorContext::default();
657    let component = ComponentInfo::default();
658    let error: PipelineError<i32> = PipelineError::new(source, context.clone(), component.clone());
659    assert_eq!(error.context(), &context);
660    assert_eq!(error.component(), &component);
661  }
662
663  #[test]
664  fn test_pipeline_error_from_stream_error() {
665    let stream_error: StreamError<i32> = StreamError {
666      source: Box::new(StringError("test error".to_string())),
667      context: ErrorContext::default(),
668      component: ComponentInfo::default(),
669      retries: 0,
670    };
671    let error: PipelineError<i32> = PipelineError::from_stream_error(stream_error.clone());
672    assert_eq!(error.context(), &stream_error.context);
673    assert_eq!(error.component(), &stream_error.component);
674  }
675
676  #[test]
677  fn test_pipeline_error_display() {
678    let error: PipelineError<i32> = PipelineError::new(
679      StringError("test error".to_string()),
680      ErrorContext::default(),
681      ComponentInfo {
682        name: "test".to_string(),
683        type_name: "TestComponent".to_string(),
684      },
685    );
686    assert_eq!(error.to_string(), "Pipeline error in test: test error");
687  }
688
689  #[test]
690  fn test_pipeline_error_error() {
691    let source = StringError("test error".to_string());
692    let error: PipelineError<i32> =
693      PipelineError::new(source, ErrorContext::default(), ComponentInfo::default());
694    assert_eq!(error.source().unwrap().to_string(), "test error");
695  }
696}