1use crate::api::config::ProcessingConfig;
55use crate::models::bbox::BoundingBox;
56use crate::models::content::ContentElement;
57use crate::pdf::page_info::PageInfo;
58use crate::pipeline::logging::PipelineTimer;
59use crate::pipeline::parallel::{par_map_pages, par_map_pages_indexed};
60use crate::pipeline::stages::boxed_heading_promoter;
61use crate::pipeline::stages::caption_linker;
62use crate::pipeline::stages::cluster_table_detector;
63use crate::pipeline::stages::column_detector;
64use crate::pipeline::stages::content_filter;
65use crate::pipeline::stages::content_sanitizer;
66use crate::pipeline::stages::cross_page_linker;
67use crate::pipeline::stages::figure_detector;
68use crate::pipeline::stages::footnote_detector;
69use crate::pipeline::stages::header_footer;
70use crate::pipeline::stages::heading_detector;
71use crate::pipeline::stages::id_assignment;
72use crate::pipeline::stages::list_detector;
73use crate::pipeline::stages::list_pass2;
74use crate::pipeline::stages::nesting_level;
75use crate::pipeline::stages::paragraph_detector;
76use crate::pipeline::stages::reading_order;
77use crate::pipeline::stages::table_content_assigner;
78use crate::pipeline::stages::table_detector;
79use crate::pipeline::stages::text_block_grouper;
80use crate::pipeline::stages::text_line_grouper;
81use crate::pipeline::stages::toc_detector;
82use crate::pipeline::stages::watermark_detector;
83use crate::tagged::struct_tree::McidMap;
84use crate::utils::page_range;
85use crate::EdgePdfError;
86
87pub type PageContent = Vec<ContentElement>;
89
90pub struct PipelineState {
92 pub pages: Vec<PageContent>,
94 pub config: ProcessingConfig,
96 pub mcid_map: Option<McidMap>,
99 pub page_info: Vec<PageInfo>,
101}
102
103impl PipelineState {
104 pub fn new(pages: Vec<PageContent>, config: ProcessingConfig) -> Self {
106 Self {
107 pages,
108 config,
109 mcid_map: None,
110 page_info: Vec::new(),
111 }
112 }
113
114 pub fn with_mcid_map(
116 pages: Vec<PageContent>,
117 config: ProcessingConfig,
118 mcid_map: McidMap,
119 ) -> Self {
120 let mcid_map = if mcid_map.is_empty() {
121 None
122 } else {
123 Some(mcid_map)
124 };
125 Self {
126 pages,
127 config,
128 mcid_map,
129 page_info: Vec::new(),
130 }
131 }
132
133 pub fn with_page_info(mut self, page_info: Vec<PageInfo>) -> Self {
135 self.page_info = page_info;
136 self
137 }
138
139 pub fn total_elements(&self) -> usize {
141 self.pages.iter().map(|p| p.len()).sum()
142 }
143}
144
145pub fn run_pipeline(state: &mut PipelineState) -> Result<(), EdgePdfError> {
150 macro_rules! timed_stage {
151 ($timer:expr, $name:expr, $state:expr, $body:block) => {{
152 if let Some(timer) = $timer.as_mut() {
153 timer.start_stage($name);
154 }
155 let result = { $body };
156 if let Some(timer) = $timer.as_mut() {
157 timer.end_stage($state.total_elements());
158 }
159 result
160 }};
161 }
162
163 let mut timer = pipeline_timing_enabled().then(PipelineTimer::new);
164
165 log::info!(
166 "Starting pipeline with {} pages, {} elements",
167 state.pages.len(),
168 state.total_elements()
169 );
170
171 timed_stage!(timer, "Stage 0b (Page Range Filtering)", state, {
175 if let Some(ref range_str) = state.config.pages {
176 let total = state.pages.len();
177 if let Some(selected) = page_range::parse_page_range(range_str, total) {
178 state.pages = page_range::filter_pages(std::mem::take(&mut state.pages), &selected);
179 if !state.page_info.is_empty() {
180 state.page_info = state
181 .page_info
182 .drain(..)
183 .enumerate()
184 .filter_map(|(idx, info)| {
185 let page_num = idx + 1;
186 if selected.contains(&page_num) {
187 Some(info)
188 } else {
189 None
190 }
191 })
192 .collect();
193 }
194 log::info!(
195 "Page range filter: kept {} of {} pages",
196 state.pages.len(),
197 total
198 );
199 }
200 }
201 });
202
203 timed_stage!(timer, "Stage 1b (Watermark Removal)", state, {
205 watermark_detector::remove_watermarks(&mut state.pages);
206 });
207 log::info!(
208 "Stage 1b (Watermark Removal) complete: {} elements",
209 state.total_elements()
210 );
211
212 let filter_config = &state.config.filter_config;
214 let default_page = BoundingBox::new(None, 0.0, 0.0, 595.0, 842.0);
216
217 timed_stage!(timer, "Stage 2 (Content Filtering)", state, {
218 par_map_pages_indexed(&mut state.pages, |page_idx, elements| {
219 let page_bbox = state
220 .page_info
221 .get(page_idx)
222 .map(|info| info.crop_box.clone())
223 .unwrap_or_else(|| default_page.clone());
224 content_filter::filter_content(elements, filter_config, &page_bbox)
225 });
226 });
227 log::info!(
228 "Stage 2 (Content Filtering) complete: {} elements",
229 state.total_elements()
230 );
231
232 let replacement = &state.config.replace_invalid_chars;
236 timed_stage!(timer, "Stage 2b (Replace Undefined Chars)", state, {
237 if replacement != "\u{FFFD}" {
238 par_map_pages(&mut state.pages, |mut elements| {
239 for elem in &mut elements {
240 replace_fffd_in_element(elem, replacement);
241 }
242 elements
243 });
244 }
245 });
246 log::info!("Stage 2b (Replace Undefined Chars) complete");
247
248 timed_stage!(timer, "Stage 3-4 (Table Border Detection)", state, {
250 par_map_pages(&mut state.pages, table_detector::detect_table_borders);
251 });
252 log::info!(
253 "Stage 3-4 (Table Border Detection) complete: {} elements",
254 state.total_elements()
255 );
256
257 timed_stage!(timer, "Stage 4b (Table Content Assignment)", state, {
259 par_map_pages(
260 &mut state.pages,
261 table_content_assigner::assign_content_to_tables,
262 );
263 });
264 log::info!(
265 "Stage 4b (Table Content Assignment) complete: {} elements",
266 state.total_elements()
267 );
268
269 timed_stage!(timer, "Stage 4b2 (Empty Table Filter)", state, {
271 par_map_pages(&mut state.pages, table_detector::filter_empty_tables);
272 });
273 log::info!(
274 "Stage 4b2 (Empty Table Filter) complete: {} elements",
275 state.total_elements()
276 );
277
278 timed_stage!(timer, "Stage 4c (Boxed Heading Promoter)", state, {
281 par_map_pages(
282 &mut state.pages,
283 boxed_heading_promoter::promote_boxed_headings,
284 );
285 });
286 log::info!(
287 "Stage 4c (Boxed Heading Promoter) complete: {} elements",
288 state.total_elements()
289 );
290
291 timed_stage!(timer, "Stage 4d (Pre-Cluster Table Release)", state, {
294 par_map_pages(&mut state.pages, table_detector::release_pre_cluster_tables);
295 });
296 log::info!(
297 "Stage 4d (Pre-Cluster Table Release) complete: {} elements",
298 state.total_elements()
299 );
300
301 let column_layouts = timed_stage!(timer, "Stage 5b (Column Detection)", state, {
305 column_detector::detect_columns(&mut state.pages)
306 });
307 log::info!(
308 "Stage 5b (Column Detection) complete: {} elements",
309 state.total_elements()
310 );
311
312 timed_stage!(timer, "Stage 6 (Text Line Grouping)", state, {
314 par_map_pages_indexed(&mut state.pages, |page_idx, elements| {
315 let layout = column_layouts.get(page_idx);
316 text_line_grouper::group_text_lines(elements, layout)
317 });
318 });
319 log::info!(
320 "Stage 6 (Text Line Grouping) complete: {} elements",
321 state.total_elements()
322 );
323
324 timed_stage!(timer, "Stage 6.5 (List Detection Pass 1)", state, {
330 par_map_pages(&mut state.pages, list_detector::detect_lists);
331 });
332 log::info!(
333 "Stage 6.5 (List Detection Pass 1) complete: {} elements",
334 state.total_elements()
335 );
336
337 timed_stage!(timer, "Stage 7 (Text Block Grouping)", state, {
339 par_map_pages(&mut state.pages, text_block_grouper::group_text_blocks);
340 });
341 log::info!(
342 "Stage 7 (Text Block Grouping) complete: {} elements",
343 state.total_elements()
344 );
345
346 timed_stage!(timer, "Stage 7b (Cluster Table Detection)", state, {
348 par_map_pages(
349 &mut state.pages,
350 cluster_table_detector::detect_cluster_tables,
351 );
352 });
353 log::info!(
354 "Stage 7b (Cluster Table Detection) complete: {} elements",
355 state.total_elements()
356 );
357
358 timed_stage!(timer, "Stage 7b2 (Suspicious Table Filter)", state, {
361 par_map_pages(&mut state.pages, table_detector::filter_suspicious_tables);
362 });
363 log::info!(
364 "Stage 7b2 (Suspicious Table Filter) complete: {} elements",
365 state.total_elements()
366 );
367
368 let page_height = if !state.page_info.is_empty() {
371 let mut heights: Vec<f64> = state.page_info.iter().map(|p| p.height).collect();
372 heights.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
373 heights[heights.len() / 2]
374 } else {
375 842.0
376 };
377 timed_stage!(timer, "Stage 8 (Header/Footer Detection)", state, {
378 header_footer::detect_headers_footers(&mut state.pages, page_height);
379 });
380 log::info!(
381 "Stage 8 (Header/Footer Detection) complete: {} elements",
382 state.total_elements()
383 );
384
385 timed_stage!(timer, "Stage 9 (List Detection)", state, {
389 par_map_pages(&mut state.pages, list_detector::detect_lists);
390 });
391 log::info!(
392 "Stage 9 (List Detection) complete: {} elements",
393 state.total_elements()
394 );
395
396 timed_stage!(timer, "Stage 10 (Paragraph Detection)", state, {
398 par_map_pages(&mut state.pages, paragraph_detector::detect_paragraphs);
399 });
400 log::info!(
401 "Stage 10 (Paragraph Detection) complete: {} elements",
402 state.total_elements()
403 );
404
405 timed_stage!(timer, "Stage 10b (Figure Detection)", state, {
407 par_map_pages(&mut state.pages, figure_detector::detect_figures);
408 });
409 log::info!(
410 "Stage 10b (Figure Detection) complete: {} elements",
411 state.total_elements()
412 );
413
414 timed_stage!(timer, "Stage 12 (Heading Detection)", state, {
417 heading_detector::detect_headings(&mut state.pages, state.mcid_map.as_ref());
418 });
419 log::info!(
420 "Stage 12 (Heading Detection) complete: {} elements",
421 state.total_elements()
422 );
423
424 timed_stage!(timer, "Stage 18-pre (Reading Order pre-pass)", state, {
427 reading_order::sort_reading_order(&mut state.pages, &state.page_info);
428 });
429 log::info!(
430 "Stage 18-pre (Reading Order pre-pass) complete: {} elements",
431 state.total_elements()
432 );
433
434 timed_stage!(timer, "Stage 11 (List Detection Pass 2)", state, {
436 par_map_pages(&mut state.pages, list_pass2::detect_paragraph_lists);
437 });
438 log::info!(
439 "Stage 11 (List Detection Pass 2) complete: {} elements",
440 state.total_elements()
441 );
442
443 timed_stage!(timer, "Stage 11b (Common-prefix Lists)", state, {
445 list_pass2::detect_common_prefix_lists_document(&mut state.pages);
446 });
447 log::info!(
448 "Stage 11b (Common-prefix Lists) complete: {} elements",
449 state.total_elements()
450 );
451
452 timed_stage!(timer, "Stage 13 (ID Assignment)", state, {
454 id_assignment::assign_ids(&mut state.pages);
455 });
456 log::info!(
457 "Stage 13 (ID Assignment) complete: {} elements",
458 state.total_elements()
459 );
460
461 timed_stage!(timer, "Stage 14 (Caption Linking)", state, {
463 caption_linker::link_captions(&mut state.pages);
464 });
465 log::info!(
466 "Stage 14 (Caption Linking) complete: {} elements",
467 state.total_elements()
468 );
469
470 timed_stage!(timer, "Stage 14b (Footnote Detection)", state, {
472 footnote_detector::detect_footnotes(&mut state.pages);
473 });
474 log::info!(
475 "Stage 14b (Footnote Detection) complete: {} elements",
476 state.total_elements()
477 );
478
479 timed_stage!(timer, "Stage 14c (TOC Detection)", state, {
481 toc_detector::detect_toc(&mut state.pages);
482 });
483 log::info!(
484 "Stage 14c (TOC Detection) complete: {} elements",
485 state.total_elements()
486 );
487 timed_stage!(timer, "Stage 15 (Cross-Page Table Linking)", state, {
489 cross_page_linker::link_cross_page_tables(&mut state.pages);
490 });
491 log::info!(
492 "Stage 15 (Cross-Page Table Linking) complete: {} elements",
493 state.total_elements()
494 );
495 timed_stage!(timer, "Stage 17 (Nesting Level Assignment)", state, {
498 nesting_level::assign_nesting_levels(&mut state.pages);
499 });
500 log::info!(
501 "Stage 17 (Nesting Level Assignment) complete: {} elements",
502 state.total_elements()
503 );
504
505 timed_stage!(timer, "Stage 18 (Reading Order)", state, {
507 reading_order::sort_reading_order(&mut state.pages, &state.page_info);
508 });
509 log::info!(
510 "Stage 18 (Reading Order) complete: {} elements",
511 state.total_elements()
512 );
513
514 timed_stage!(timer, "Stage 19 (Content Sanitization)", state, {
516 content_sanitizer::sanitize_content(&mut state.pages, state.config.sanitize);
517 });
518 log::info!(
519 "Stage 19 (Content Sanitization) complete: {} elements",
520 state.total_elements()
521 );
522 if let Some(timer) = timer.as_ref() {
525 timer.log_summary();
526 }
527
528 log::info!("Pipeline complete");
529 Ok(())
530}
531
532fn pipeline_timing_enabled() -> bool {
533 std::env::var("EDGEPARSE_PIPELINE_TIMING")
534 .map(|value| {
535 matches!(
536 value.to_ascii_lowercase().as_str(),
537 "1" | "true" | "yes" | "on"
538 )
539 })
540 .unwrap_or(false)
541}
542
543fn replace_fffd_in_element(elem: &mut ContentElement, replacement: &str) {
546 if let ContentElement::TextChunk(c) = elem {
547 if c.value.contains('\u{FFFD}') {
548 c.value = c.value.replace('\u{FFFD}', replacement);
549 }
550 } }
552
553#[cfg(test)]
554mod tests {
555 use super::*;
556 use crate::api::config::ProcessingConfig;
557 use crate::models::chunks::TextChunk;
558 use crate::models::enums::{PdfLayer, TextFormat, TextType};
559
560 #[test]
561 fn test_pipeline_state() {
562 let state = PipelineState::new(vec![vec![], vec![]], ProcessingConfig::default());
563 assert_eq!(state.pages.len(), 2);
564 assert_eq!(state.total_elements(), 0);
565 }
566
567 #[test]
568 fn test_run_empty_pipeline() {
569 let mut state = PipelineState::new(vec![], ProcessingConfig::default());
570 let result = run_pipeline(&mut state);
571 assert!(result.is_ok());
572 }
573
574 #[test]
575 fn test_content_filter_uses_real_page_geometry() {
576 let chunk = ContentElement::TextChunk(TextChunk {
577 value: "Right column".to_string(),
578 bbox: BoundingBox::new(Some(1), 800.0, 400.0, 900.0, 420.0),
579 font_name: "Helvetica".to_string(),
580 font_size: 12.0,
581 font_weight: 400.0,
582 italic_angle: 0.0,
583 font_color: "[0.0]".to_string(),
584 contrast_ratio: 21.0,
585 symbol_ends: vec![],
586 text_format: TextFormat::Normal,
587 text_type: TextType::Regular,
588 pdf_layer: PdfLayer::Main,
589 ocg_visible: true,
590 index: None,
591 page_number: Some(1),
592 level: None,
593 mcid: None,
594 });
595 let page_info = vec![PageInfo {
596 index: 0,
597 page_number: 1,
598 media_box: BoundingBox::new(None, 0.0, 0.0, 960.0, 540.0),
599 crop_box: BoundingBox::new(None, 0.0, 0.0, 960.0, 540.0),
600 rotation: 0,
601 width: 960.0,
602 height: 540.0,
603 }];
604
605 let mut state = PipelineState::new(vec![vec![chunk]], ProcessingConfig::default())
606 .with_page_info(page_info);
607 run_pipeline(&mut state).unwrap();
608
609 assert!(state.total_elements() > 0);
610 }
611}