Skip to main content

edgeparse_core/pipeline/
orchestrator.rs

1//! Pipeline orchestrator — runs all processing stages in sequence.
2//!
3//! ```text
4//!   PDF bytes
5//!     │
6//!     ▼
7//!  ┌─────────────────────────────────────────────┐
8//!  │ Stage 0:  Page Range Filtering               │
9//!  │ Stage 1b: Watermark Removal                  │
10//!  │ Stage 2:  Content Filtering + FFFD Replace   │
11//!  └──────────────────┬──────────────────────────┘
12//!                     │  raw TextChunks, Lines, Images
13//!                     ▼
14//!  ┌─────────────────────────────────────────────┐
15//!  │ Stage 3-4: Border Table Detection            │
16//!  │ Stage 4b:  Content → Table Cells             │
17//!  │ Stage 4c:  Boxed Heading Promoter            │
18//!  │ Stage 4d:  Pre-Cluster Table Release         │
19//!  └──────────────────┬──────────────────────────┘
20//!                     │  TextChunks + TableBorders
21//!                     ▼
22//!  ┌─────────────────────────────────────────────┐
23//!  │ Stage 5b: Column Detection                   │
24//!  │ Stage 6:  TextChunk → TextLine Grouping      │
25//!  │ Stage 6.5: List Detection Pass 1 (TextLine)  │
26//!  │ Stage 7:  TextLine → TextBlock Grouping      │
27//!  │ Stage 7b: Cluster (Borderless) Tables        │
28//!  └──────────────────┬──────────────────────────┘
29//!                     │  TextBlocks + Tables + Lists
30//!                     ▼
31//!  ┌─────────────────────────────────────────────┐
32//!  │ Stage 8:  Header / Footer Detection          │
33//!  │ Stage 9:  List Detection Pass 1 (Block)      │
34//!  │ Stage 10: Paragraph Detection                │
35//!  │ Stage 10b: Figure Detection                  │
36//!  │ Stage 12: Heading Detection                  │
37//!  └──────────────────┬──────────────────────────┘
38//!                     │  Semantic elements
39//!                     ▼
40//!  ┌─────────────────────────────────────────────┐
41//!  │ Stage 11:  List Detection Pass 2 (Paragraph) │
42//!  │ Stage 13:  ID Assignment                     │
43//!  │ Stage 14:  Caption + Footnote + TOC Linking  │
44//!  │ Stage 15:  Cross-Page Table Linking          │
45//!  │ Stage 17:  Nesting Levels                    │
46//!  │ Stage 18:  Reading Order Sort                │
47//!  │ Stage 19:  Content Sanitization              │
48//!  └──────────────────┬──────────────────────────┘
49//!                     │
50//!                     ▼
51//!              PdfDocument (ready for output)
52//! ```
53
54use crate::api::config::ProcessingConfig;
55use crate::models::bbox::BoundingBox;
56use crate::models::content::ContentElement;
57use crate::pdf::page_info::PageInfo;
58use crate::pipeline::logging::PipelineTimer;
59use crate::pipeline::parallel::{par_map_pages, par_map_pages_indexed};
60use crate::pipeline::stages::boxed_heading_promoter;
61use crate::pipeline::stages::caption_linker;
62use crate::pipeline::stages::cluster_table_detector;
63use crate::pipeline::stages::column_detector;
64use crate::pipeline::stages::content_filter;
65use crate::pipeline::stages::content_sanitizer;
66use crate::pipeline::stages::cross_page_linker;
67use crate::pipeline::stages::figure_detector;
68use crate::pipeline::stages::footnote_detector;
69use crate::pipeline::stages::header_footer;
70use crate::pipeline::stages::heading_detector;
71use crate::pipeline::stages::id_assignment;
72use crate::pipeline::stages::list_detector;
73use crate::pipeline::stages::list_pass2;
74use crate::pipeline::stages::nesting_level;
75use crate::pipeline::stages::paragraph_detector;
76use crate::pipeline::stages::reading_order;
77use crate::pipeline::stages::table_content_assigner;
78use crate::pipeline::stages::table_detector;
79use crate::pipeline::stages::text_block_grouper;
80use crate::pipeline::stages::text_line_grouper;
81use crate::pipeline::stages::toc_detector;
82use crate::pipeline::stages::watermark_detector;
83use crate::tagged::struct_tree::McidMap;
84use crate::utils::page_range;
85use crate::EdgePdfError;
86
87/// Per-page content during pipeline processing.
88pub type PageContent = Vec<ContentElement>;
89
90/// Pipeline state passed between stages.
91pub struct PipelineState {
92    /// Per-page content elements
93    pub pages: Vec<PageContent>,
94    /// Processing configuration
95    pub config: ProcessingConfig,
96    /// MCID map from structure tree (tagged PDFs).
97    /// Maps (page_number, mcid) → tag info for heading detection.
98    pub mcid_map: Option<McidMap>,
99    /// Per-page geometry (MediaBox, width, height). Index matches pages.
100    pub page_info: Vec<PageInfo>,
101}
102
103impl PipelineState {
104    /// Create a new pipeline state from raw page content.
105    pub fn new(pages: Vec<PageContent>, config: ProcessingConfig) -> Self {
106        Self {
107            pages,
108            config,
109            mcid_map: None,
110            page_info: Vec::new(),
111        }
112    }
113
114    /// Create a new pipeline state with an MCID map from a tagged PDF.
115    pub fn with_mcid_map(
116        pages: Vec<PageContent>,
117        config: ProcessingConfig,
118        mcid_map: McidMap,
119    ) -> Self {
120        let mcid_map = if mcid_map.is_empty() {
121            None
122        } else {
123            Some(mcid_map)
124        };
125        Self {
126            pages,
127            config,
128            mcid_map,
129            page_info: Vec::new(),
130        }
131    }
132
133    /// Create a new pipeline state with page geometry.
134    pub fn with_page_info(mut self, page_info: Vec<PageInfo>) -> Self {
135        self.page_info = page_info;
136        self
137    }
138
139    /// Total number of content elements across all pages.
140    pub fn total_elements(&self) -> usize {
141        self.pages.iter().map(|p| p.len()).sum()
142    }
143}
144
145/// Run the full 20-stage pipeline.
146///
147/// # Errors
148/// Returns `EdgePdfError::PipelineError` if any stage fails.
149pub fn run_pipeline(state: &mut PipelineState) -> Result<(), EdgePdfError> {
150    macro_rules! timed_stage {
151        ($timer:expr, $name:expr, $state:expr, $body:block) => {{
152            if let Some(timer) = $timer.as_mut() {
153                timer.start_stage($name);
154            }
155            let result = { $body };
156            if let Some(timer) = $timer.as_mut() {
157                timer.end_stage($state.total_elements());
158            }
159            result
160        }};
161    }
162
163    let mut timer = pipeline_timing_enabled().then(PipelineTimer::new);
164
165    log::info!(
166        "Starting pipeline with {} pages, {} elements",
167        state.pages.len(),
168        state.total_elements()
169    );
170
171    // Stage 1: PDF Loading (already done before pipeline)
172
173    // Stage 0b: Page Range Filtering
174    timed_stage!(timer, "Stage 0b (Page Range Filtering)", state, {
175        if let Some(ref range_str) = state.config.pages {
176            let total = state.pages.len();
177            if let Some(selected) = page_range::parse_page_range(range_str, total) {
178                state.pages = page_range::filter_pages(std::mem::take(&mut state.pages), &selected);
179                if !state.page_info.is_empty() {
180                    state.page_info = state
181                        .page_info
182                        .drain(..)
183                        .enumerate()
184                        .filter_map(|(idx, info)| {
185                            let page_num = idx + 1;
186                            if selected.contains(&page_num) {
187                                Some(info)
188                            } else {
189                                None
190                            }
191                        })
192                        .collect();
193                }
194                log::info!(
195                    "Page range filter: kept {} of {} pages",
196                    state.pages.len(),
197                    total
198                );
199            }
200        }
201    });
202
203    // Stage 1b: Watermark Detection & Removal
204    timed_stage!(timer, "Stage 1b (Watermark Removal)", state, {
205        watermark_detector::remove_watermarks(&mut state.pages);
206    });
207    log::info!(
208        "Stage 1b (Watermark Removal) complete: {} elements",
209        state.total_elements()
210    );
211
212    // Stage 2: Content Filtering
213    let filter_config = &state.config.filter_config;
214    // Default A4 page bbox — will be refined when we track per-page MediaBox
215    let default_page = BoundingBox::new(None, 0.0, 0.0, 595.0, 842.0);
216
217    timed_stage!(timer, "Stage 2 (Content Filtering)", state, {
218        par_map_pages_indexed(&mut state.pages, |page_idx, elements| {
219            let page_bbox = state
220                .page_info
221                .get(page_idx)
222                .map(|info| info.crop_box.clone())
223                .unwrap_or_else(|| default_page.clone());
224            content_filter::filter_content(elements, filter_config, &page_bbox)
225        });
226    });
227    log::info!(
228        "Stage 2 (Content Filtering) complete: {} elements",
229        state.total_elements()
230    );
231
232    // Stage 2b: Replace undefined characters (U+FFFD → replacement char)
233    // Matches the reference TextProcessor.replaceUndefinedCharacters() called from
234    // ContentFilterProcessor.  Default replacement is space " ".
235    let replacement = &state.config.replace_invalid_chars;
236    timed_stage!(timer, "Stage 2b (Replace Undefined Chars)", state, {
237        if replacement != "\u{FFFD}" {
238            par_map_pages(&mut state.pages, |mut elements| {
239                for elem in &mut elements {
240                    replace_fffd_in_element(elem, replacement);
241                }
242                elements
243            });
244        }
245    });
246    log::info!("Stage 2b (Replace Undefined Chars) complete");
247
248    // Stage 3-4: Table Border Detection
249    timed_stage!(timer, "Stage 3-4 (Table Border Detection)", state, {
250        par_map_pages(&mut state.pages, table_detector::detect_table_borders);
251    });
252    log::info!(
253        "Stage 3-4 (Table Border Detection) complete: {} elements",
254        state.total_elements()
255    );
256
257    // Stage 4b: Content Assignment to Table Cells
258    timed_stage!(timer, "Stage 4b (Table Content Assignment)", state, {
259        par_map_pages(
260            &mut state.pages,
261            table_content_assigner::assign_content_to_tables,
262        );
263    });
264    log::info!(
265        "Stage 4b (Table Content Assignment) complete: {} elements",
266        state.total_elements()
267    );
268
269    // Stage 4b2: Filter mostly-empty bordered tables (chart grid FPs)
270    timed_stage!(timer, "Stage 4b2 (Empty Table Filter)", state, {
271        par_map_pages(&mut state.pages, table_detector::filter_empty_tables);
272    });
273    log::info!(
274        "Stage 4b2 (Empty Table Filter) complete: {} elements",
275        state.total_elements()
276    );
277
278    // Stage 4c: Boxed Heading Promoter — single-cell tables with short heading text
279    // are released back as free TextChunks so heading_detector can see them.
280    timed_stage!(timer, "Stage 4c (Boxed Heading Promoter)", state, {
281        par_map_pages(
282            &mut state.pages,
283            boxed_heading_promoter::promote_boxed_headings,
284        );
285    });
286    log::info!(
287        "Stage 4c (Boxed Heading Promoter) complete: {} elements",
288        state.total_elements()
289    );
290
291    // Stage 4d: Release page-wide single-cell pseudo-tables before line/block
292    // grouping so cluster detection can recover the underlying text layout.
293    timed_stage!(timer, "Stage 4d (Pre-Cluster Table Release)", state, {
294        par_map_pages(&mut state.pages, table_detector::release_pre_cluster_tables);
295    });
296    log::info!(
297        "Stage 4d (Pre-Cluster Table Release) complete: {} elements",
298        state.total_elements()
299    );
300
301    // Stage 5: Line Chunk Removal — handled by table detector (consumed lines removed)
302
303    // Stage 5b: Multi-Column Detection
304    let column_layouts = timed_stage!(timer, "Stage 5b (Column Detection)", state, {
305        column_detector::detect_columns(&mut state.pages)
306    });
307    log::info!(
308        "Stage 5b (Column Detection) complete: {} elements",
309        state.total_elements()
310    );
311
312    // Stage 6: Text Line Grouping
313    timed_stage!(timer, "Stage 6 (Text Line Grouping)", state, {
314        par_map_pages_indexed(&mut state.pages, |page_idx, elements| {
315            let layout = column_layouts.get(page_idx);
316            text_line_grouper::group_text_lines(elements, layout)
317        });
318    });
319    log::info!(
320        "Stage 6 (Text Line Grouping) complete: {} elements",
321        state.total_elements()
322    );
323
324    // Stage 6.5: List Detection Pass 1 (TextLine level — before block grouping)
325    // Matches the reference pipeline: ListProcessor.processLists() runs on individual
326    // TextLines BEFORE ParagraphProcessor.processParagraphs().  This catches
327    // bibliography entries ([N] bracket notation) and other list patterns at the
328    // TextLine level before they get merged into TextBlocks by Stage 7.
329    timed_stage!(timer, "Stage 6.5 (List Detection Pass 1)", state, {
330        par_map_pages(&mut state.pages, list_detector::detect_lists);
331    });
332    log::info!(
333        "Stage 6.5 (List Detection Pass 1) complete: {} elements",
334        state.total_elements()
335    );
336
337    // Stage 7: Text Block Grouping (paragraph detection)
338    timed_stage!(timer, "Stage 7 (Text Block Grouping)", state, {
339        par_map_pages(&mut state.pages, text_block_grouper::group_text_blocks);
340    });
341    log::info!(
342        "Stage 7 (Text Block Grouping) complete: {} elements",
343        state.total_elements()
344    );
345
346    // Stage 7b: Cluster (Borderless) Table Detection
347    timed_stage!(timer, "Stage 7b (Cluster Table Detection)", state, {
348        par_map_pages(
349            &mut state.pages,
350            cluster_table_detector::detect_cluster_tables,
351        );
352    });
353    log::info!(
354        "Stage 7b (Cluster Table Detection) complete: {} elements",
355        state.total_elements()
356    );
357
358    // Stage 7b2: Reject table-shaped layout artifacts produced by the border
359    // and cluster detectors, releasing their text back into the page flow.
360    timed_stage!(timer, "Stage 7b2 (Suspicious Table Filter)", state, {
361        par_map_pages(&mut state.pages, table_detector::filter_suspicious_tables);
362    });
363    log::info!(
364        "Stage 7b2 (Suspicious Table Filter) complete: {} elements",
365        state.total_elements()
366    );
367
368    // Stage 8: Header/Footer Detection (cross-page)
369    // Use median page height from page info, or fallback to A4.
370    let page_height = if !state.page_info.is_empty() {
371        let mut heights: Vec<f64> = state.page_info.iter().map(|p| p.height).collect();
372        heights.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
373        heights[heights.len() / 2]
374    } else {
375        842.0
376    };
377    timed_stage!(timer, "Stage 8 (Header/Footer Detection)", state, {
378        header_footer::detect_headers_footers(&mut state.pages, page_height);
379    });
380    log::info!(
381        "Stage 8 (Header/Footer Detection) complete: {} elements",
382        state.total_elements()
383    );
384
385    // Stage 9: List Detection Pass 1 (TextBlock/TextLine level)
386    // Also runs at Stage 6.5 on TextLines; this second pass catches patterns
387    // from TextBlocks that the block grouper broke apart (e.g., numbered lists).
388    timed_stage!(timer, "Stage 9 (List Detection)", state, {
389        par_map_pages(&mut state.pages, list_detector::detect_lists);
390    });
391    log::info!(
392        "Stage 9 (List Detection) complete: {} elements",
393        state.total_elements()
394    );
395
396    // Stage 10: Paragraph Detection
397    timed_stage!(timer, "Stage 10 (Paragraph Detection)", state, {
398        par_map_pages(&mut state.pages, paragraph_detector::detect_paragraphs);
399    });
400    log::info!(
401        "Stage 10 (Paragraph Detection) complete: {} elements",
402        state.total_elements()
403    );
404
405    // Stage 10b: Figure Detection
406    timed_stage!(timer, "Stage 10b (Figure Detection)", state, {
407        par_map_pages(&mut state.pages, figure_detector::detect_figures);
408    });
409    log::info!(
410        "Stage 10b (Figure Detection) complete: {} elements",
411        state.total_elements()
412    );
413
414    // Stage 12: Heading Detection (moved before List Pass 2 so headings are
415    // tagged before list body-continuation filtering)
416    timed_stage!(timer, "Stage 12 (Heading Detection)", state, {
417        heading_detector::detect_headings(&mut state.pages, state.mcid_map.as_ref());
418    });
419    log::info!(
420        "Stage 12 (Heading Detection) complete: {} elements",
421        state.total_elements()
422    );
423
424    // Stage 18 (pre-pass): Reading Order Sorting before List Pass 2
425    // so elements are in correct reading order for sequential list detection.
426    timed_stage!(timer, "Stage 18-pre (Reading Order pre-pass)", state, {
427        reading_order::sort_reading_order(&mut state.pages, &state.page_info);
428    });
429    log::info!(
430        "Stage 18-pre (Reading Order pre-pass) complete: {} elements",
431        state.total_elements()
432    );
433
434    // Stage 11: List Detection Pass 2 (Paragraph Level)
435    timed_stage!(timer, "Stage 11 (List Detection Pass 2)", state, {
436        par_map_pages(&mut state.pages, list_pass2::detect_paragraph_lists);
437    });
438    log::info!(
439        "Stage 11 (List Detection Pass 2) complete: {} elements",
440        state.total_elements()
441    );
442
443    // Stage 11b: Document-level common-prefix list detection (Figure N, Table N)
444    timed_stage!(timer, "Stage 11b (Common-prefix Lists)", state, {
445        list_pass2::detect_common_prefix_lists_document(&mut state.pages);
446    });
447    log::info!(
448        "Stage 11b (Common-prefix Lists) complete: {} elements",
449        state.total_elements()
450    );
451
452    // Stage 13: ID Assignment
453    timed_stage!(timer, "Stage 13 (ID Assignment)", state, {
454        id_assignment::assign_ids(&mut state.pages);
455    });
456    log::info!(
457        "Stage 13 (ID Assignment) complete: {} elements",
458        state.total_elements()
459    );
460
461    // Stage 14: Caption Linking
462    timed_stage!(timer, "Stage 14 (Caption Linking)", state, {
463        caption_linker::link_captions(&mut state.pages);
464    });
465    log::info!(
466        "Stage 14 (Caption Linking) complete: {} elements",
467        state.total_elements()
468    );
469
470    // Stage 14b: Footnote Detection
471    timed_stage!(timer, "Stage 14b (Footnote Detection)", state, {
472        footnote_detector::detect_footnotes(&mut state.pages);
473    });
474    log::info!(
475        "Stage 14b (Footnote Detection) complete: {} elements",
476        state.total_elements()
477    );
478
479    // Stage 14c: TOC Detection
480    timed_stage!(timer, "Stage 14c (TOC Detection)", state, {
481        toc_detector::detect_toc(&mut state.pages);
482    });
483    log::info!(
484        "Stage 14c (TOC Detection) complete: {} elements",
485        state.total_elements()
486    );
487    // Stage 15: Cross-Page Table Linking
488    timed_stage!(timer, "Stage 15 (Cross-Page Table Linking)", state, {
489        cross_page_linker::link_cross_page_tables(&mut state.pages);
490    });
491    log::info!(
492        "Stage 15 (Cross-Page Table Linking) complete: {} elements",
493        state.total_elements()
494    );
495    // Stage 16: Heading Level Assignment — handled by Stage 12 (heading_detector already assigns global levels)
496    // Stage 17: Nesting Level Assignment
497    timed_stage!(timer, "Stage 17 (Nesting Level Assignment)", state, {
498        nesting_level::assign_nesting_levels(&mut state.pages);
499    });
500    log::info!(
501        "Stage 17 (Nesting Level Assignment) complete: {} elements",
502        state.total_elements()
503    );
504
505    // Stage 18: Final Reading Order Sorting (after all semantic classification)
506    timed_stage!(timer, "Stage 18 (Reading Order)", state, {
507        reading_order::sort_reading_order(&mut state.pages, &state.page_info);
508    });
509    log::info!(
510        "Stage 18 (Reading Order) complete: {} elements",
511        state.total_elements()
512    );
513
514    // Stage 19: Content Sanitization
515    timed_stage!(timer, "Stage 19 (Content Sanitization)", state, {
516        content_sanitizer::sanitize_content(&mut state.pages, state.config.sanitize);
517    });
518    log::info!(
519        "Stage 19 (Content Sanitization) complete: {} elements",
520        state.total_elements()
521    );
522    // Stage 20: Output Generation — to be implemented
523
524    if let Some(timer) = timer.as_ref() {
525        timer.log_summary();
526    }
527
528    log::info!("Pipeline complete");
529    Ok(())
530}
531
532fn pipeline_timing_enabled() -> bool {
533    std::env::var("EDGEPARSE_PIPELINE_TIMING")
534        .map(|value| {
535            matches!(
536                value.to_ascii_lowercase().as_str(),
537                "1" | "true" | "yes" | "on"
538            )
539        })
540        .unwrap_or(false)
541}
542
543/// Replace U+FFFD (Unicode replacement character) in a content element's text.
544/// Matches the reference `TextProcessor.replaceUndefinedCharacters()`.
545fn replace_fffd_in_element(elem: &mut ContentElement, replacement: &str) {
546    if let ContentElement::TextChunk(c) = elem {
547        if c.value.contains('\u{FFFD}') {
548            c.value = c.value.replace('\u{FFFD}', replacement);
549        }
550    } // Only TextChunks exist at Stage 2 (before line grouping)
551}
552
553#[cfg(test)]
554mod tests {
555    use super::*;
556    use crate::api::config::ProcessingConfig;
557    use crate::models::chunks::TextChunk;
558    use crate::models::enums::{PdfLayer, TextFormat, TextType};
559
560    #[test]
561    fn test_pipeline_state() {
562        let state = PipelineState::new(vec![vec![], vec![]], ProcessingConfig::default());
563        assert_eq!(state.pages.len(), 2);
564        assert_eq!(state.total_elements(), 0);
565    }
566
567    #[test]
568    fn test_run_empty_pipeline() {
569        let mut state = PipelineState::new(vec![], ProcessingConfig::default());
570        let result = run_pipeline(&mut state);
571        assert!(result.is_ok());
572    }
573
574    #[test]
575    fn test_content_filter_uses_real_page_geometry() {
576        let chunk = ContentElement::TextChunk(TextChunk {
577            value: "Right column".to_string(),
578            bbox: BoundingBox::new(Some(1), 800.0, 400.0, 900.0, 420.0),
579            font_name: "Helvetica".to_string(),
580            font_size: 12.0,
581            font_weight: 400.0,
582            italic_angle: 0.0,
583            font_color: "[0.0]".to_string(),
584            contrast_ratio: 21.0,
585            symbol_ends: vec![],
586            text_format: TextFormat::Normal,
587            text_type: TextType::Regular,
588            pdf_layer: PdfLayer::Main,
589            ocg_visible: true,
590            index: None,
591            page_number: Some(1),
592            level: None,
593            mcid: None,
594        });
595        let page_info = vec![PageInfo {
596            index: 0,
597            page_number: 1,
598            media_box: BoundingBox::new(None, 0.0, 0.0, 960.0, 540.0),
599            crop_box: BoundingBox::new(None, 0.0, 0.0, 960.0, 540.0),
600            rotation: 0,
601            width: 960.0,
602            height: 540.0,
603        }];
604
605        let mut state = PipelineState::new(vec![vec![chunk]], ProcessingConfig::default())
606            .with_page_info(page_info);
607        run_pipeline(&mut state).unwrap();
608
609        assert!(state.total_elements() > 0);
610    }
611}