edgeparse_core/pipeline/
error_recovery.rs1use log;
7use std::panic::{self, AssertUnwindSafe};
8
9#[derive(Debug)]
11pub enum PageResult<T> {
12 Ok(T),
14 Failed {
16 page_index: usize,
18 error: String,
20 },
21}
22
23pub fn process_pages_with_recovery<T, F>(pages: Vec<T>, stage_name: &str, mut op: F) -> Vec<T>
28where
29 T: Send + Default + 'static,
30 F: FnMut(T) -> T,
31{
32 let mut results = Vec::with_capacity(pages.len());
33
34 for (idx, page) in pages.into_iter().enumerate() {
35 let result = panic::catch_unwind(AssertUnwindSafe(|| op(page)));
36 match result {
37 Ok(processed) => {
38 results.push(processed);
39 }
40 Err(e) => {
41 let msg = if let Some(s) = e.downcast_ref::<&str>() {
42 s.to_string()
43 } else if let Some(s) = e.downcast_ref::<String>() {
44 s.clone()
45 } else {
46 "unknown panic".to_string()
47 };
48 log::error!(
49 "Stage '{}' failed on page {}: {}. Skipping page.",
50 stage_name,
51 idx + 1,
52 msg
53 );
54 results.push(Default::default());
57 }
58 }
59 }
60
61 results
62}
63
64#[derive(Debug, Default)]
66pub struct PipelineErrors {
67 pub errors: Vec<(String, usize, String)>,
69}
70
71impl PipelineErrors {
72 pub fn record(&mut self, stage: &str, page_index: usize, error: &str) {
74 self.errors
75 .push((stage.to_string(), page_index, error.to_string()));
76 }
77
78 pub fn has_errors(&self) -> bool {
80 !self.errors.is_empty()
81 }
82
83 pub fn summary(&self) -> String {
85 if self.errors.is_empty() {
86 return "No errors".to_string();
87 }
88 self.errors
89 .iter()
90 .map(|(stage, page, msg)| format!("[{stage}] page {}: {msg}", page + 1))
91 .collect::<Vec<_>>()
92 .join("\n")
93 }
94}
95
96#[cfg(test)]
97mod tests {
98 use super::*;
99
100 #[test]
101 fn test_process_pages_no_failure() {
102 let pages = vec![vec![1, 2], vec![3, 4]];
103 let result = process_pages_with_recovery(pages, "test", |mut p| {
104 p.push(99);
105 p
106 });
107 assert_eq!(result.len(), 2);
108 assert_eq!(result[0], vec![1, 2, 99]);
109 assert_eq!(result[1], vec![3, 4, 99]);
110 }
111
112 #[test]
113 fn test_process_pages_with_panic_recovery() {
114 let pages = vec![vec![1], vec![2], vec![3]];
115 let result = process_pages_with_recovery(pages, "test", |p| {
116 if p[0] == 2 {
117 panic!("simulated panic on page 2");
118 }
119 p
120 });
121 assert_eq!(result.len(), 3);
122 assert_eq!(result[0], vec![1]);
123 assert!(result[1].is_empty()); assert_eq!(result[2], vec![3]);
125 }
126
127 #[test]
128 fn test_pipeline_errors() {
129 let mut errors = PipelineErrors::default();
130 assert!(!errors.has_errors());
131
132 errors.record("Stage 3", 0, "Failed to detect tables");
133 errors.record("Stage 8", 2, "Header detection failed");
134
135 assert!(errors.has_errors());
136 assert_eq!(errors.errors.len(), 2);
137 let summary = errors.summary();
138 assert!(summary.contains("Stage 3"));
139 assert!(summary.contains("page 1"));
140 assert!(summary.contains("page 3"));
141 }
142
143 #[test]
144 fn test_pipeline_errors_empty_summary() {
145 let errors = PipelineErrors::default();
146 assert_eq!(errors.summary(), "No errors");
147 }
148}