Skip to main content

edgeparse_core/pipeline/
orchestrator.rs

1//! Pipeline orchestrator — runs all processing stages in sequence.
2//!
3//! ```text
4//!   PDF bytes
5//!     │
6//!     ▼
7//!  ┌─────────────────────────────────────────────┐
8//!  │ Stage 0:  Page Range Filtering               │
9//!  │ Stage 1b: Watermark Removal                  │
10//!  │ Stage 2:  Content Filtering + FFFD Replace   │
11//!  └──────────────────┬──────────────────────────┘
12//!                     │  raw TextChunks, Lines, Images
13//!                     ▼
14//!  ┌─────────────────────────────────────────────┐
15//!  │ Stage 3-4: Border Table Detection            │
16//!  │ Stage 4b:  Content → Table Cells             │
17//!  │ Stage 4c:  Boxed Heading Promoter            │
18//!  │ Stage 4d:  Pre-Cluster Table Release         │
19//!  └──────────────────┬──────────────────────────┘
20//!                     │  TextChunks + TableBorders
21//!                     ▼
22//!  ┌─────────────────────────────────────────────┐
23//!  │ Stage 5b: Column Detection                   │
24//!  │ Stage 6:  TextChunk → TextLine Grouping      │
25//!  │ Stage 6.5: List Detection Pass 1 (TextLine)  │
26//!  │ Stage 7:  TextLine → TextBlock Grouping      │
27//!  │ Stage 7b: Cluster (Borderless) Tables        │
28//!  └──────────────────┬──────────────────────────┘
29//!                     │  TextBlocks + Tables + Lists
30//!                     ▼
31//!  ┌─────────────────────────────────────────────┐
32//!  │ Stage 8:  Header / Footer Detection          │
33//!  │ Stage 9:  List Detection Pass 1 (Block)      │
34//!  │ Stage 10: Paragraph Detection                │
35//!  │ Stage 10b: Figure Detection                  │
36//!  │ Stage 12: Heading Detection                  │
37//!  └──────────────────┬──────────────────────────┘
38//!                     │  Semantic elements
39//!                     ▼
40//!  ┌─────────────────────────────────────────────┐
41//!  │ Stage 11:  List Detection Pass 2 (Paragraph) │
42//!  │ Stage 13:  ID Assignment                     │
43//!  │ Stage 14:  Caption + Footnote + TOC Linking  │
44//!  │ Stage 15:  Cross-Page Table Linking          │
45//!  │ Stage 17:  Nesting Levels                    │
46//!  │ Stage 18:  Reading Order Sort                │
47//!  │ Stage 19:  Content Sanitization              │
48//!  └──────────────────┬──────────────────────────┘
49//!                     │
50//!                     ▼
51//!              PdfDocument (ready for output)
52//! ```
53
54use crate::api::config::ProcessingConfig;
55use crate::models::bbox::BoundingBox;
56use crate::models::content::ContentElement;
57use crate::pdf::page_info::PageInfo;
58use crate::pipeline::parallel::{par_map_pages, par_map_pages_indexed};
59use crate::pipeline::stages::boxed_heading_promoter;
60use crate::pipeline::stages::caption_linker;
61use crate::pipeline::stages::cluster_table_detector;
62use crate::pipeline::stages::column_detector;
63use crate::pipeline::stages::content_filter;
64use crate::pipeline::stages::content_sanitizer;
65use crate::pipeline::stages::cross_page_linker;
66use crate::pipeline::stages::figure_detector;
67use crate::pipeline::stages::footnote_detector;
68use crate::pipeline::stages::header_footer;
69use crate::pipeline::stages::heading_detector;
70use crate::pipeline::stages::id_assignment;
71use crate::pipeline::stages::list_detector;
72use crate::pipeline::stages::list_pass2;
73use crate::pipeline::stages::nesting_level;
74use crate::pipeline::stages::paragraph_detector;
75use crate::pipeline::stages::reading_order;
76use crate::pipeline::stages::table_content_assigner;
77use crate::pipeline::stages::table_detector;
78use crate::pipeline::stages::text_block_grouper;
79use crate::pipeline::stages::text_line_grouper;
80use crate::pipeline::stages::toc_detector;
81use crate::pipeline::stages::watermark_detector;
82use crate::tagged::struct_tree::McidMap;
83use crate::utils::page_range;
84use crate::EdgePdfError;
85
86/// Per-page content during pipeline processing.
87pub type PageContent = Vec<ContentElement>;
88
89/// Pipeline state passed between stages.
90pub struct PipelineState {
91    /// Per-page content elements
92    pub pages: Vec<PageContent>,
93    /// Processing configuration
94    pub config: ProcessingConfig,
95    /// MCID map from structure tree (tagged PDFs).
96    /// Maps (page_number, mcid) → tag info for heading detection.
97    pub mcid_map: Option<McidMap>,
98    /// Per-page geometry (MediaBox, width, height). Index matches pages.
99    pub page_info: Vec<PageInfo>,
100}
101
102impl PipelineState {
103    /// Create a new pipeline state from raw page content.
104    pub fn new(pages: Vec<PageContent>, config: ProcessingConfig) -> Self {
105        Self {
106            pages,
107            config,
108            mcid_map: None,
109            page_info: Vec::new(),
110        }
111    }
112
113    /// Create a new pipeline state with an MCID map from a tagged PDF.
114    pub fn with_mcid_map(
115        pages: Vec<PageContent>,
116        config: ProcessingConfig,
117        mcid_map: McidMap,
118    ) -> Self {
119        let mcid_map = if mcid_map.is_empty() {
120            None
121        } else {
122            Some(mcid_map)
123        };
124        Self {
125            pages,
126            config,
127            mcid_map,
128            page_info: Vec::new(),
129        }
130    }
131
132    /// Create a new pipeline state with page geometry.
133    pub fn with_page_info(mut self, page_info: Vec<PageInfo>) -> Self {
134        self.page_info = page_info;
135        self
136    }
137
138    /// Total number of content elements across all pages.
139    pub fn total_elements(&self) -> usize {
140        self.pages.iter().map(|p| p.len()).sum()
141    }
142}
143
144/// Run the full 20-stage pipeline.
145///
146/// # Errors
147/// Returns `EdgePdfError::PipelineError` if any stage fails.
148pub fn run_pipeline(state: &mut PipelineState) -> Result<(), EdgePdfError> {
149    log::info!(
150        "Starting pipeline with {} pages, {} elements",
151        state.pages.len(),
152        state.total_elements()
153    );
154
155    // Stage 1: PDF Loading (already done before pipeline)
156
157    // Stage 0b: Page Range Filtering
158    if let Some(ref range_str) = state.config.pages {
159        let total = state.pages.len();
160        if let Some(selected) = page_range::parse_page_range(range_str, total) {
161            state.pages = page_range::filter_pages(std::mem::take(&mut state.pages), &selected);
162            if !state.page_info.is_empty() {
163                state.page_info = state
164                    .page_info
165                    .drain(..)
166                    .enumerate()
167                    .filter_map(|(idx, info)| {
168                        let page_num = idx + 1;
169                        if selected.contains(&page_num) {
170                            Some(info)
171                        } else {
172                            None
173                        }
174                    })
175                    .collect();
176            }
177            log::info!(
178                "Page range filter: kept {} of {} pages",
179                state.pages.len(),
180                total
181            );
182        }
183    }
184
185    // Stage 1b: Watermark Detection & Removal
186    watermark_detector::remove_watermarks(&mut state.pages);
187    log::info!(
188        "Stage 1b (Watermark Removal) complete: {} elements",
189        state.total_elements()
190    );
191
192    // Stage 2: Content Filtering
193    let filter_config = &state.config.filter_config;
194    // Default A4 page bbox — will be refined when we track per-page MediaBox
195    let default_page = BoundingBox::new(None, 0.0, 0.0, 595.0, 842.0);
196
197    par_map_pages_indexed(&mut state.pages, |page_idx, elements| {
198        let page_bbox = state
199            .page_info
200            .get(page_idx)
201            .map(|info| info.crop_box.clone())
202            .unwrap_or_else(|| default_page.clone());
203        content_filter::filter_content(elements, filter_config, &page_bbox)
204    });
205    log::info!(
206        "Stage 2 (Content Filtering) complete: {} elements",
207        state.total_elements()
208    );
209
210    // Stage 2b: Replace undefined characters (U+FFFD → replacement char)
211    // Matches the reference TextProcessor.replaceUndefinedCharacters() called from
212    // ContentFilterProcessor.  Default replacement is space " ".
213    let replacement = &state.config.replace_invalid_chars;
214    if replacement != "\u{FFFD}" {
215        par_map_pages(&mut state.pages, |mut elements| {
216            for elem in &mut elements {
217                replace_fffd_in_element(elem, replacement);
218            }
219            elements
220        });
221    }
222    log::info!("Stage 2b (Replace Undefined Chars) complete");
223
224    // Stage 3-4: Table Border Detection
225    par_map_pages(&mut state.pages, table_detector::detect_table_borders);
226    log::info!(
227        "Stage 3-4 (Table Border Detection) complete: {} elements",
228        state.total_elements()
229    );
230
231    // Stage 4b: Content Assignment to Table Cells
232    par_map_pages(
233        &mut state.pages,
234        table_content_assigner::assign_content_to_tables,
235    );
236    log::info!(
237        "Stage 4b (Table Content Assignment) complete: {} elements",
238        state.total_elements()
239    );
240
241    // Stage 4b2: Filter mostly-empty bordered tables (chart grid FPs)
242    par_map_pages(&mut state.pages, table_detector::filter_empty_tables);
243    log::info!(
244        "Stage 4b2 (Empty Table Filter) complete: {} elements",
245        state.total_elements()
246    );
247
248    // Stage 4c: Boxed Heading Promoter — single-cell tables with short heading text
249    // are released back as free TextChunks so heading_detector can see them.
250    par_map_pages(
251        &mut state.pages,
252        boxed_heading_promoter::promote_boxed_headings,
253    );
254    log::info!(
255        "Stage 4c (Boxed Heading Promoter) complete: {} elements",
256        state.total_elements()
257    );
258
259    // Stage 4d: Release page-wide single-cell pseudo-tables before line/block
260    // grouping so cluster detection can recover the underlying text layout.
261    par_map_pages(&mut state.pages, table_detector::release_pre_cluster_tables);
262    log::info!(
263        "Stage 4d (Pre-Cluster Table Release) complete: {} elements",
264        state.total_elements()
265    );
266
267    // Stage 5: Line Chunk Removal — handled by table detector (consumed lines removed)
268
269    // Stage 5b: Multi-Column Detection
270    let column_layouts = column_detector::detect_columns(&mut state.pages);
271    log::info!(
272        "Stage 5b (Column Detection) complete: {} elements",
273        state.total_elements()
274    );
275
276    // Stage 6: Text Line Grouping
277    par_map_pages_indexed(&mut state.pages, |page_idx, elements| {
278        let layout = column_layouts.get(page_idx);
279        text_line_grouper::group_text_lines(elements, layout)
280    });
281    log::info!(
282        "Stage 6 (Text Line Grouping) complete: {} elements",
283        state.total_elements()
284    );
285
286    // Stage 6b: Re-run column detection on formed TextLines so downstream block
287    // grouping can rely on stable line-level column labels instead of raw-chunk
288    // geometry alone.
289    let _line_column_layouts = column_detector::detect_columns(&mut state.pages);
290    log::info!(
291        "Stage 6b (TextLine Column Detection) complete: {} elements",
292        state.total_elements()
293    );
294
295    // Stage 6.5: List Detection Pass 1 (TextLine level — before block grouping)
296    // Matches the reference pipeline: ListProcessor.processLists() runs on individual
297    // TextLines BEFORE ParagraphProcessor.processParagraphs().  This catches
298    // bibliography entries ([N] bracket notation) and other list patterns at the
299    // TextLine level before they get merged into TextBlocks by Stage 7.
300    par_map_pages(&mut state.pages, list_detector::detect_lists);
301    log::info!(
302        "Stage 6.5 (List Detection Pass 1) complete: {} elements",
303        state.total_elements()
304    );
305
306    // Stage 7: Text Block Grouping (paragraph detection)
307    par_map_pages(&mut state.pages, text_block_grouper::group_text_blocks);
308    log::info!(
309        "Stage 7 (Text Block Grouping) complete: {} elements",
310        state.total_elements()
311    );
312
313    // Stage 7b: Cluster (Borderless) Table Detection
314    par_map_pages(
315        &mut state.pages,
316        cluster_table_detector::detect_cluster_tables,
317    );
318    log::info!(
319        "Stage 7b (Cluster Table Detection) complete: {} elements",
320        state.total_elements()
321    );
322
323    // Stage 7b2: Reject table-shaped layout artifacts produced by the border
324    // and cluster detectors, releasing their text back into the page flow.
325    par_map_pages(&mut state.pages, table_detector::filter_suspicious_tables);
326    log::info!(
327        "Stage 7b2 (Suspicious Table Filter) complete: {} elements",
328        state.total_elements()
329    );
330
331    // Stage 8: Header/Footer Detection (cross-page)
332    // Use median page height from page info, or fallback to A4.
333    let page_height = if !state.page_info.is_empty() {
334        let mut heights: Vec<f64> = state.page_info.iter().map(|p| p.height).collect();
335        heights.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
336        heights[heights.len() / 2]
337    } else {
338        842.0
339    };
340    header_footer::detect_headers_footers(&mut state.pages, page_height);
341    log::info!(
342        "Stage 8 (Header/Footer Detection) complete: {} elements",
343        state.total_elements()
344    );
345
346    // Stage 9: List Detection Pass 1 (TextBlock/TextLine level)
347    // Also runs at Stage 6.5 on TextLines; this second pass catches patterns
348    // from TextBlocks that the block grouper broke apart (e.g., numbered lists).
349    par_map_pages(&mut state.pages, list_detector::detect_lists);
350    log::info!(
351        "Stage 9 (List Detection) complete: {} elements",
352        state.total_elements()
353    );
354
355    // Stage 10: Paragraph Detection
356    par_map_pages(&mut state.pages, paragraph_detector::detect_paragraphs);
357    log::info!(
358        "Stage 10 (Paragraph Detection) complete: {} elements",
359        state.total_elements()
360    );
361
362    // Stage 10b: Figure Detection
363    par_map_pages(&mut state.pages, figure_detector::detect_figures);
364    log::info!(
365        "Stage 10b (Figure Detection) complete: {} elements",
366        state.total_elements()
367    );
368
369    // Stage 12: Heading Detection (moved before List Pass 2 so headings are
370    // tagged before list body-continuation filtering)
371    heading_detector::detect_headings(&mut state.pages, state.mcid_map.as_ref());
372    log::info!(
373        "Stage 12 (Heading Detection) complete: {} elements",
374        state.total_elements()
375    );
376
377    // Stage 18 (pre-pass): Reading Order Sorting before List Pass 2
378    // so elements are in correct reading order for sequential list detection.
379    reading_order::sort_reading_order(&mut state.pages, &state.page_info);
380    log::info!(
381        "Stage 18-pre (Reading Order pre-pass) complete: {} elements",
382        state.total_elements()
383    );
384
385    // Stage 11: List Detection Pass 2 (Paragraph Level)
386    par_map_pages(&mut state.pages, list_pass2::detect_paragraph_lists);
387    log::info!(
388        "Stage 11 (List Detection Pass 2) complete: {} elements",
389        state.total_elements()
390    );
391
392    // Stage 11b: Document-level common-prefix list detection (Figure N, Table N)
393    list_pass2::detect_common_prefix_lists_document(&mut state.pages);
394    log::info!(
395        "Stage 11b (Common-prefix Lists) complete: {} elements",
396        state.total_elements()
397    );
398
399    // Stage 13: ID Assignment
400    id_assignment::assign_ids(&mut state.pages);
401    log::info!(
402        "Stage 13 (ID Assignment) complete: {} elements",
403        state.total_elements()
404    );
405
406    // Stage 14: Caption Linking
407    caption_linker::link_captions(&mut state.pages);
408    log::info!(
409        "Stage 14 (Caption Linking) complete: {} elements",
410        state.total_elements()
411    );
412
413    // Stage 14b: Footnote Detection
414    footnote_detector::detect_footnotes(&mut state.pages);
415    log::info!(
416        "Stage 14b (Footnote Detection) complete: {} elements",
417        state.total_elements()
418    );
419
420    // Stage 14c: TOC Detection
421    toc_detector::detect_toc(&mut state.pages);
422    log::info!(
423        "Stage 14c (TOC Detection) complete: {} elements",
424        state.total_elements()
425    );
426    // Stage 15: Cross-Page Table Linking
427    cross_page_linker::link_cross_page_tables(&mut state.pages);
428    log::info!(
429        "Stage 15 (Cross-Page Table Linking) complete: {} elements",
430        state.total_elements()
431    );
432    // Stage 16: Heading Level Assignment — handled by Stage 12 (heading_detector already assigns global levels)
433    // Stage 17: Nesting Level Assignment
434    nesting_level::assign_nesting_levels(&mut state.pages);
435    log::info!(
436        "Stage 17 (Nesting Level Assignment) complete: {} elements",
437        state.total_elements()
438    );
439
440    // Stage 18: Final Reading Order Sorting (after all semantic classification)
441    reading_order::sort_reading_order(&mut state.pages, &state.page_info);
442    log::info!(
443        "Stage 18 (Reading Order) complete: {} elements",
444        state.total_elements()
445    );
446
447    // Stage 19: Content Sanitization
448    content_sanitizer::sanitize_content(&mut state.pages, state.config.sanitize);
449    log::info!(
450        "Stage 19 (Content Sanitization) complete: {} elements",
451        state.total_elements()
452    );
453    // Stage 20: Output Generation — to be implemented
454
455    log::info!("Pipeline complete");
456    Ok(())
457}
458
459/// Replace U+FFFD (Unicode replacement character) in a content element's text.
460/// Matches the reference `TextProcessor.replaceUndefinedCharacters()`.
461fn replace_fffd_in_element(elem: &mut ContentElement, replacement: &str) {
462    if let ContentElement::TextChunk(c) = elem {
463        if c.value.contains('\u{FFFD}') {
464            c.value = c.value.replace('\u{FFFD}', replacement);
465        }
466    } // Only TextChunks exist at Stage 2 (before line grouping)
467}
468
469#[cfg(test)]
470mod tests {
471    use super::*;
472    use crate::api::config::ProcessingConfig;
473    use crate::models::chunks::TextChunk;
474    use crate::models::enums::{PdfLayer, TextFormat, TextType};
475
476    #[test]
477    fn test_pipeline_state() {
478        let state = PipelineState::new(vec![vec![], vec![]], ProcessingConfig::default());
479        assert_eq!(state.pages.len(), 2);
480        assert_eq!(state.total_elements(), 0);
481    }
482
483    #[test]
484    fn test_run_empty_pipeline() {
485        let mut state = PipelineState::new(vec![], ProcessingConfig::default());
486        let result = run_pipeline(&mut state);
487        assert!(result.is_ok());
488    }
489
490    #[test]
491    fn test_content_filter_uses_real_page_geometry() {
492        let chunk = ContentElement::TextChunk(TextChunk {
493            value: "Right column".to_string(),
494            bbox: BoundingBox::new(Some(1), 800.0, 400.0, 900.0, 420.0),
495            font_name: "Helvetica".to_string(),
496            font_size: 12.0,
497            font_weight: 400.0,
498            italic_angle: 0.0,
499            font_color: "[0.0]".to_string(),
500            contrast_ratio: 21.0,
501            symbol_ends: vec![],
502            text_format: TextFormat::Normal,
503            text_type: TextType::Regular,
504            pdf_layer: PdfLayer::Main,
505            ocg_visible: true,
506            index: None,
507            page_number: Some(1),
508            level: None,
509            mcid: None,
510        });
511        let page_info = vec![PageInfo {
512            index: 0,
513            page_number: 1,
514            media_box: BoundingBox::new(None, 0.0, 0.0, 960.0, 540.0),
515            crop_box: BoundingBox::new(None, 0.0, 0.0, 960.0, 540.0),
516            rotation: 0,
517            width: 960.0,
518            height: 540.0,
519        }];
520
521        let mut state = PipelineState::new(vec![vec![chunk]], ProcessingConfig::default())
522            .with_page_info(page_info);
523        run_pipeline(&mut state).unwrap();
524
525        assert!(state.total_elements() > 0);
526    }
527}