1use std::error::Error;
2use std::fmt;
3use std::sync::Arc;
4
5#[derive(Debug, Clone, PartialEq)]
25pub enum ErrorAction {
26 Stop,
31 Skip,
35 Retry,
39}
40
41type CustomErrorHandler<T> = Arc<dyn Fn(&StreamError<T>) -> ErrorAction + Send + Sync>;
43
44pub enum ErrorStrategy<T: std::fmt::Debug + Clone + Send + Sync> {
74 Stop,
78 Skip,
83 Retry(usize),
91 Custom(CustomErrorHandler<T>),
96}
97
98impl<T: std::fmt::Debug + Clone + Send + Sync> Clone for ErrorStrategy<T> {
99 fn clone(&self) -> Self {
100 match self {
101 ErrorStrategy::Stop => ErrorStrategy::Stop,
102 ErrorStrategy::Skip => ErrorStrategy::Skip,
103 ErrorStrategy::Retry(n) => ErrorStrategy::Retry(*n),
104 ErrorStrategy::Custom(handler) => ErrorStrategy::Custom(handler.clone()),
105 }
106 }
107}
108
109impl<T: std::fmt::Debug + Clone + Send + Sync> fmt::Debug for ErrorStrategy<T> {
110 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
111 match self {
112 ErrorStrategy::Stop => write!(f, "ErrorStrategy::Stop"),
113 ErrorStrategy::Skip => write!(f, "ErrorStrategy::Skip"),
114 ErrorStrategy::Retry(n) => write!(f, "ErrorStrategy::Retry({})", n),
115 ErrorStrategy::Custom(_) => write!(f, "ErrorStrategy::Custom"),
116 }
117 }
118}
119
120impl<T: std::fmt::Debug + Clone + Send + Sync> PartialEq for ErrorStrategy<T> {
121 fn eq(&self, other: &Self) -> bool {
122 match (self, other) {
123 (ErrorStrategy::Stop, ErrorStrategy::Stop) => true,
124 (ErrorStrategy::Skip, ErrorStrategy::Skip) => true,
125 (ErrorStrategy::Retry(n1), ErrorStrategy::Retry(n2)) => n1 == n2,
126 (ErrorStrategy::Custom(_), ErrorStrategy::Custom(_)) => true,
127 _ => false,
128 }
129 }
130}
131
132impl<T: std::fmt::Debug + Clone + Send + Sync> ErrorStrategy<T> {
133 pub fn new_custom<F>(f: F) -> Self
143 where
144 F: Fn(&StreamError<T>) -> ErrorAction + Send + Sync + 'static,
145 {
146 Self::Custom(Arc::new(f))
147 }
148}
149
150#[derive(Debug)]
185pub struct StreamError<T: std::fmt::Debug + Clone + Send + Sync> {
186 pub source: Box<dyn Error + Send + Sync>,
188 pub context: ErrorContext<T>,
190 pub component: ComponentInfo,
192 pub retries: usize,
194}
195
196impl<T: std::fmt::Debug + Clone + Send + Sync> Clone for StreamError<T> {
197 fn clone(&self) -> Self {
198 Self {
199 source: Box::new(StringError(self.source.to_string())),
200 context: self.context.clone(),
201 component: self.component.clone(),
202 retries: self.retries,
203 }
204 }
205}
206
207#[derive(Debug)]
212pub struct StringError(pub String);
213
214impl std::fmt::Display for StringError {
215 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
216 write!(f, "{}", self.0)
217 }
218}
219
220impl std::error::Error for StringError {}
221
222impl<T: std::fmt::Debug + Clone + Send + Sync> StreamError<T> {
223 pub fn new(
235 source: Box<dyn Error + Send + Sync>,
236 context: ErrorContext<T>,
237 component: ComponentInfo,
238 ) -> Self {
239 Self {
240 source,
241 context,
242 component,
243 retries: 0,
244 }
245 }
246}
247
248impl<T: std::fmt::Debug + Clone + Send + Sync> fmt::Display for StreamError<T> {
249 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
250 write!(
251 f,
252 "Error in {} ({}): {}",
253 self.component.name, self.component.type_name, self.source
254 )
255 }
256}
257
258impl<T: std::fmt::Debug + Clone + Send + Sync> Error for StreamError<T> {
259 fn source(&self) -> Option<&(dyn Error + 'static)> {
260 Some(self.source.as_ref())
261 }
262}
263
264#[derive(Debug, Clone, PartialEq)]
270pub struct ErrorContext<T: std::fmt::Debug + Clone + Send + Sync> {
271 pub timestamp: chrono::DateTime<chrono::Utc>,
273 pub item: Option<T>,
275 pub component_name: String,
277 pub component_type: String,
279}
280
281impl<T: std::fmt::Debug + Clone + Send + Sync> Default for ErrorContext<T> {
282 fn default() -> Self {
283 Self {
284 timestamp: chrono::Utc::now(),
285 item: None,
286 component_name: "default".to_string(),
287 component_type: "default".to_string(),
288 }
289 }
290}
291
292#[derive(Debug, Clone, PartialEq)]
297pub enum PipelineStage {
298 Producer,
300 Transformer(String),
302 Consumer,
304}
305
306#[derive(Debug, Clone, PartialEq)]
311pub struct ComponentInfo {
312 pub name: String,
314 pub type_name: String,
316}
317
318impl Default for ComponentInfo {
319 fn default() -> Self {
320 Self {
321 name: "default".to_string(),
322 type_name: "default".to_string(),
323 }
324 }
325}
326
327impl ComponentInfo {
328 pub fn new(name: String, type_name: String) -> Self {
339 Self { name, type_name }
340 }
341}
342
343#[derive(Debug, Clone, PartialEq)]
348pub struct PipelineErrorContext<T: std::fmt::Debug + Clone + Send + Sync> {
349 pub context: ErrorContext<T>,
351 pub stage: PipelineStage,
353}
354
355#[derive(Debug)]
359pub struct PipelineError<T: std::fmt::Debug + Clone + Send + Sync> {
360 inner: StreamError<T>,
361}
362
363impl<T: std::fmt::Debug + Clone + Send + Sync> PipelineError<T> {
364 pub fn new<E>(error: E, context: ErrorContext<T>, component: ComponentInfo) -> Self
376 where
377 E: Error + Send + Sync + 'static,
378 {
379 Self {
380 inner: StreamError::new(Box::new(error), context, component),
381 }
382 }
383
384 pub fn from_stream_error(error: StreamError<T>) -> Self {
394 Self { inner: error }
395 }
396
397 pub fn context(&self) -> &ErrorContext<T> {
403 &self.inner.context
404 }
405
406 pub fn component(&self) -> &ComponentInfo {
412 &self.inner.component
413 }
414}
415
416impl<T: std::fmt::Debug + Clone + Send + Sync> std::fmt::Display for PipelineError<T> {
417 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
418 write!(
419 f,
420 "Pipeline error in {}: {}",
421 self.inner.component.name, self.inner.source
422 )
423 }
424}
425
426impl<T: std::fmt::Debug + Clone + Send + Sync> Error for PipelineError<T> {
427 fn source(&self) -> Option<&(dyn Error + 'static)> {
428 Some(&*self.inner.source)
429 }
430}
431
432#[cfg(test)]
433mod tests {
434 use super::*;
435 use std::error::Error;
436
437 #[test]
438 fn test_error_action() {
439 assert_eq!(ErrorAction::Stop, ErrorAction::Stop);
440 assert_eq!(ErrorAction::Skip, ErrorAction::Skip);
441 assert_eq!(ErrorAction::Retry, ErrorAction::Retry);
442 assert_ne!(ErrorAction::Stop, ErrorAction::Skip);
443 assert_ne!(ErrorAction::Skip, ErrorAction::Retry);
444 assert_ne!(ErrorAction::Retry, ErrorAction::Stop);
445 }
446
447 #[test]
448 fn test_error_strategy_clone() {
449 let strategy = ErrorStrategy::<i32>::Stop;
450 assert_eq!(strategy.clone(), ErrorStrategy::Stop);
451
452 let strategy = ErrorStrategy::<i32>::Skip;
453 assert_eq!(strategy.clone(), ErrorStrategy::Skip);
454
455 let strategy = ErrorStrategy::<i32>::Retry(3);
456 assert_eq!(strategy.clone(), ErrorStrategy::Retry(3));
457
458 let strategy = ErrorStrategy::<i32>::new_custom(|_| ErrorAction::Skip);
459 let cloned = strategy.clone();
460 let _error: StreamError<i32> = StreamError {
461 source: Box::new(StringError("test".to_string())),
462 context: ErrorContext::default(),
463 component: ComponentInfo::default(),
464 retries: 0,
465 };
466 assert_eq!(strategy, cloned);
467 }
468
469 #[test]
470 fn test_error_strategy_debug() {
471 assert_eq!(
472 format!("{:?}", ErrorStrategy::<i32>::Stop),
473 "ErrorStrategy::Stop"
474 );
475 assert_eq!(
476 format!("{:?}", ErrorStrategy::<i32>::Skip),
477 "ErrorStrategy::Skip"
478 );
479 assert_eq!(
480 format!("{:?}", ErrorStrategy::<i32>::Retry(3)),
481 "ErrorStrategy::Retry(3)"
482 );
483 assert_eq!(
484 format!(
485 "{:?}",
486 ErrorStrategy::<i32>::new_custom(|_| ErrorAction::Skip)
487 ),
488 "ErrorStrategy::Custom"
489 );
490 }
491
492 #[test]
493 fn test_error_strategy_partial_eq() {
494 assert_eq!(ErrorStrategy::<i32>::Stop, ErrorStrategy::Stop);
495 assert_eq!(ErrorStrategy::<i32>::Skip, ErrorStrategy::Skip);
496 assert_eq!(ErrorStrategy::<i32>::Retry(3), ErrorStrategy::Retry(3));
497 assert_ne!(ErrorStrategy::<i32>::Stop, ErrorStrategy::Skip);
498 assert_ne!(ErrorStrategy::<i32>::Skip, ErrorStrategy::Retry(3));
499 assert_ne!(ErrorStrategy::<i32>::Retry(3), ErrorStrategy::Stop);
500 }
501
502 #[test]
503 fn test_error_strategy_new_custom() {
504 let strategy = ErrorStrategy::<i32>::new_custom(|_| ErrorAction::Skip);
505 let error: StreamError<i32> = StreamError {
506 source: Box::new(StringError("test".to_string())),
507 context: ErrorContext::default(),
508 component: ComponentInfo::default(),
509 retries: 0,
510 };
511 if let ErrorStrategy::Custom(handler) = strategy {
512 assert_eq!(handler(&error), ErrorAction::Skip);
513 } else {
514 panic!("Expected Custom variant");
515 }
516 }
517
518 #[test]
519 fn test_stream_error_clone() {
520 let error: StreamError<i32> = StreamError {
521 source: Box::new(StringError("test".to_string())),
522 context: ErrorContext::default(),
523 component: ComponentInfo::default(),
524 retries: 0,
525 };
526 let cloned = error.clone();
527 assert_eq!(error.source.to_string(), cloned.source.to_string());
528 assert_eq!(error.context, cloned.context);
529 assert_eq!(error.component, cloned.component);
530 assert_eq!(error.retries, cloned.retries);
531 }
532
533 #[test]
534 fn test_stream_error_display() {
535 let error: StreamError<i32> = StreamError {
536 source: Box::new(StringError("test error".to_string())),
537 context: ErrorContext::default(),
538 component: ComponentInfo {
539 name: "test".to_string(),
540 type_name: "TestComponent".to_string(),
541 },
542 retries: 0,
543 };
544 assert_eq!(
545 error.to_string(),
546 "Error in test (TestComponent): test error"
547 );
548 }
549
550 #[test]
551 fn test_stream_error_error() {
552 let source = StringError("test error".to_string());
553 let error: StreamError<i32> = StreamError {
554 source: Box::new(source),
555 context: ErrorContext::default(),
556 component: ComponentInfo::default(),
557 retries: 0,
558 };
559 assert_eq!(error.source().unwrap().to_string(), "test error");
560 }
561
562 #[test]
563 fn test_stream_error_new() {
564 let source = StringError("test error".to_string());
565 let context: ErrorContext<i32> = ErrorContext::default();
566 let component = ComponentInfo::default();
567 let error = StreamError::new(Box::new(source), context.clone(), component.clone());
568 assert_eq!(error.source.to_string(), "test error");
569 assert_eq!(error.context, context);
570 assert_eq!(error.component, component);
571 assert_eq!(error.retries, 0);
572 }
573
574 #[test]
575 fn test_error_context_default() {
576 let context = ErrorContext::<i32>::default();
577 assert!(context.timestamp <= chrono::Utc::now());
578 assert_eq!(context.item, None);
579 assert_eq!(context.component_name, "default");
580 assert_eq!(context.component_type, "default");
581 }
582
583 #[test]
584 fn test_error_context_clone() {
585 let context = ErrorContext {
586 timestamp: chrono::Utc::now(),
587 item: Some(42),
588 component_name: "test".to_string(),
589 component_type: "TestComponent".to_string(),
590 };
591 let cloned = context.clone();
592 assert_eq!(context.timestamp, cloned.timestamp);
593 assert_eq!(context.item, cloned.item);
594 assert_eq!(context.component_name, cloned.component_name);
595 assert_eq!(context.component_type, cloned.component_type);
596 }
597
598 #[test]
599 fn test_error_context_partial_eq() {
600 let context1 = ErrorContext {
601 timestamp: chrono::Utc::now(),
602 item: Some(42),
603 component_name: "test".to_string(),
604 component_type: "TestComponent".to_string(),
605 };
606 let context2 = ErrorContext {
607 timestamp: context1.timestamp,
608 item: Some(42),
609 component_name: "test".to_string(),
610 component_type: "TestComponent".to_string(),
611 };
612 assert_eq!(context1, context2);
613 }
614
615 #[test]
616 fn test_component_info_default() {
617 let info = ComponentInfo::default();
618 assert_eq!(info.name, "default");
619 assert_eq!(info.type_name, "default");
620 }
621
622 #[test]
623 fn test_component_info_clone() {
624 let info = ComponentInfo {
625 name: "test".to_string(),
626 type_name: "TestComponent".to_string(),
627 };
628 let cloned = info.clone();
629 assert_eq!(info.name, cloned.name);
630 assert_eq!(info.type_name, cloned.type_name);
631 }
632
633 #[test]
634 fn test_component_info_partial_eq() {
635 let info1 = ComponentInfo {
636 name: "test".to_string(),
637 type_name: "TestComponent".to_string(),
638 };
639 let info2 = ComponentInfo {
640 name: "test".to_string(),
641 type_name: "TestComponent".to_string(),
642 };
643 assert_eq!(info1, info2);
644 }
645
646 #[test]
647 fn test_component_info_new() {
648 let info = ComponentInfo::new("test".to_string(), "TestComponent".to_string());
649 assert_eq!(info.name, "test");
650 assert_eq!(info.type_name, "TestComponent");
651 }
652
653 #[test]
654 fn test_pipeline_error_new() {
655 let source = StringError("test error".to_string());
656 let context: ErrorContext<i32> = ErrorContext::default();
657 let component = ComponentInfo::default();
658 let error: PipelineError<i32> = PipelineError::new(source, context.clone(), component.clone());
659 assert_eq!(error.context(), &context);
660 assert_eq!(error.component(), &component);
661 }
662
663 #[test]
664 fn test_pipeline_error_from_stream_error() {
665 let stream_error: StreamError<i32> = StreamError {
666 source: Box::new(StringError("test error".to_string())),
667 context: ErrorContext::default(),
668 component: ComponentInfo::default(),
669 retries: 0,
670 };
671 let error: PipelineError<i32> = PipelineError::from_stream_error(stream_error.clone());
672 assert_eq!(error.context(), &stream_error.context);
673 assert_eq!(error.component(), &stream_error.component);
674 }
675
676 #[test]
677 fn test_pipeline_error_display() {
678 let error: PipelineError<i32> = PipelineError::new(
679 StringError("test error".to_string()),
680 ErrorContext::default(),
681 ComponentInfo {
682 name: "test".to_string(),
683 type_name: "TestComponent".to_string(),
684 },
685 );
686 assert_eq!(error.to_string(), "Pipeline error in test: test error");
687 }
688
689 #[test]
690 fn test_pipeline_error_error() {
691 let source = StringError("test error".to_string());
692 let error: PipelineError<i32> =
693 PipelineError::new(source, ErrorContext::default(), ComponentInfo::default());
694 assert_eq!(error.source().unwrap().to_string(), "test error");
695 }
696}