Skip to main content

edgeparse_core/pipeline/
error_recovery.rs

1//! Error recovery utilities for pipeline stage resilience.
2//!
3//! Wraps stage operations to catch panics and errors on individual pages,
4//! allowing the pipeline to continue processing remaining pages.
5
6use log;
7use std::panic::{self, AssertUnwindSafe};
8
9/// Result of processing a single page.
10#[derive(Debug)]
11pub enum PageResult<T> {
12    /// Successful processing
13    Ok(T),
14    /// Page processing failed with an error message
15    Failed {
16        /// Zero-based index of the page that failed.
17        page_index: usize,
18        /// Human-readable description of the error.
19        error: String,
20    },
21}
22
23/// Apply a fallible operation to each page, recovering from failures.
24///
25/// Returns a Vec of successful results. Failed pages are logged and skipped,
26/// with the original content preserved as fallback.
27pub fn process_pages_with_recovery<T, F>(pages: Vec<T>, stage_name: &str, mut op: F) -> Vec<T>
28where
29    T: Send + Default + 'static,
30    F: FnMut(T) -> T,
31{
32    let mut results = Vec::with_capacity(pages.len());
33
34    for (idx, page) in pages.into_iter().enumerate() {
35        let result = panic::catch_unwind(AssertUnwindSafe(|| op(page)));
36        match result {
37            Ok(processed) => {
38                results.push(processed);
39            }
40            Err(e) => {
41                let msg = if let Some(s) = e.downcast_ref::<&str>() {
42                    s.to_string()
43                } else if let Some(s) = e.downcast_ref::<String>() {
44                    s.clone()
45                } else {
46                    "unknown panic".to_string()
47                };
48                log::error!(
49                    "Stage '{}' failed on page {}: {}. Skipping page.",
50                    stage_name,
51                    idx + 1,
52                    msg
53                );
54                // NOTE: original page data is lost due to move semantics.
55                // In production, consider cloning before processing.
56                results.push(Default::default());
57            }
58        }
59    }
60
61    results
62}
63
64/// Track page-level errors during pipeline execution.
65#[derive(Debug, Default)]
66pub struct PipelineErrors {
67    /// Accumulated errors: (stage_name, page_index, error_message)
68    pub errors: Vec<(String, usize, String)>,
69}
70
71impl PipelineErrors {
72    /// Record an error.
73    pub fn record(&mut self, stage: &str, page_index: usize, error: &str) {
74        self.errors
75            .push((stage.to_string(), page_index, error.to_string()));
76    }
77
78    /// Whether any errors were recorded.
79    pub fn has_errors(&self) -> bool {
80        !self.errors.is_empty()
81    }
82
83    /// Summary of all errors.
84    pub fn summary(&self) -> String {
85        if self.errors.is_empty() {
86            return "No errors".to_string();
87        }
88        self.errors
89            .iter()
90            .map(|(stage, page, msg)| format!("[{stage}] page {}: {msg}", page + 1))
91            .collect::<Vec<_>>()
92            .join("\n")
93    }
94}
95
96#[cfg(test)]
97mod tests {
98    use super::*;
99
100    #[test]
101    fn test_process_pages_no_failure() {
102        let pages = vec![vec![1, 2], vec![3, 4]];
103        let result = process_pages_with_recovery(pages, "test", |mut p| {
104            p.push(99);
105            p
106        });
107        assert_eq!(result.len(), 2);
108        assert_eq!(result[0], vec![1, 2, 99]);
109        assert_eq!(result[1], vec![3, 4, 99]);
110    }
111
112    #[test]
113    fn test_process_pages_with_panic_recovery() {
114        let pages = vec![vec![1], vec![2], vec![3]];
115        let result = process_pages_with_recovery(pages, "test", |p| {
116            if p[0] == 2 {
117                panic!("simulated panic on page 2");
118            }
119            p
120        });
121        assert_eq!(result.len(), 3);
122        assert_eq!(result[0], vec![1]);
123        assert!(result[1].is_empty()); // recovered with default
124        assert_eq!(result[2], vec![3]);
125    }
126
127    #[test]
128    fn test_pipeline_errors() {
129        let mut errors = PipelineErrors::default();
130        assert!(!errors.has_errors());
131
132        errors.record("Stage 3", 0, "Failed to detect tables");
133        errors.record("Stage 8", 2, "Header detection failed");
134
135        assert!(errors.has_errors());
136        assert_eq!(errors.errors.len(), 2);
137        let summary = errors.summary();
138        assert!(summary.contains("Stage 3"));
139        assert!(summary.contains("page 1"));
140        assert!(summary.contains("page 3"));
141    }
142
143    #[test]
144    fn test_pipeline_errors_empty_summary() {
145        let errors = PipelineErrors::default();
146        assert_eq!(errors.summary(), "No errors");
147    }
148}