use crate::api::config::ProcessingConfig;
use crate::models::bbox::BoundingBox;
use crate::models::content::ContentElement;
use crate::pdf::page_info::PageInfo;
use crate::pipeline::logging::PipelineTimer;
use crate::pipeline::parallel::{par_map_pages, par_map_pages_indexed};
use crate::pipeline::stages::boxed_heading_promoter;
use crate::pipeline::stages::caption_linker;
use crate::pipeline::stages::cluster_table_detector;
use crate::pipeline::stages::column_detector;
use crate::pipeline::stages::content_filter;
use crate::pipeline::stages::content_sanitizer;
use crate::pipeline::stages::cross_page_linker;
use crate::pipeline::stages::figure_detector;
use crate::pipeline::stages::footnote_detector;
use crate::pipeline::stages::header_footer;
use crate::pipeline::stages::heading_detector;
use crate::pipeline::stages::id_assignment;
use crate::pipeline::stages::list_detector;
use crate::pipeline::stages::list_pass2;
use crate::pipeline::stages::nesting_level;
use crate::pipeline::stages::paragraph_detector;
use crate::pipeline::stages::reading_order;
use crate::pipeline::stages::table_content_assigner;
use crate::pipeline::stages::table_detector;
use crate::pipeline::stages::text_block_grouper;
use crate::pipeline::stages::text_line_grouper;
use crate::pipeline::stages::toc_detector;
use crate::pipeline::stages::watermark_detector;
use crate::tagged::struct_tree::McidMap;
use crate::utils::page_range;
use crate::EdgePdfError;
pub type PageContent = Vec<ContentElement>;
pub struct PipelineState {
pub pages: Vec<PageContent>,
pub config: ProcessingConfig,
pub mcid_map: Option<McidMap>,
pub page_info: Vec<PageInfo>,
}
impl PipelineState {
pub fn new(pages: Vec<PageContent>, config: ProcessingConfig) -> Self {
Self {
pages,
config,
mcid_map: None,
page_info: Vec::new(),
}
}
pub fn with_mcid_map(
pages: Vec<PageContent>,
config: ProcessingConfig,
mcid_map: McidMap,
) -> Self {
let mcid_map = if mcid_map.is_empty() {
None
} else {
Some(mcid_map)
};
Self {
pages,
config,
mcid_map,
page_info: Vec::new(),
}
}
pub fn with_page_info(mut self, page_info: Vec<PageInfo>) -> Self {
self.page_info = page_info;
self
}
pub fn total_elements(&self) -> usize {
self.pages.iter().map(|p| p.len()).sum()
}
}
pub fn run_pipeline(state: &mut PipelineState) -> Result<(), EdgePdfError> {
macro_rules! timed_stage {
($timer:expr, $name:expr, $state:expr, $body:block) => {{
if let Some(timer) = $timer.as_mut() {
timer.start_stage($name);
}
let result = { $body };
if let Some(timer) = $timer.as_mut() {
timer.end_stage($state.total_elements());
}
result
}};
}
let mut timer = pipeline_timing_enabled().then(PipelineTimer::new);
log::info!(
"Starting pipeline with {} pages, {} elements",
state.pages.len(),
state.total_elements()
);
timed_stage!(timer, "Stage 0b (Page Range Filtering)", state, {
if let Some(ref range_str) = state.config.pages {
let total = state.pages.len();
if let Some(selected) = page_range::parse_page_range(range_str, total) {
state.pages = page_range::filter_pages(std::mem::take(&mut state.pages), &selected);
if !state.page_info.is_empty() {
state.page_info = state
.page_info
.drain(..)
.enumerate()
.filter_map(|(idx, info)| {
let page_num = idx + 1;
if selected.contains(&page_num) {
Some(info)
} else {
None
}
})
.collect();
}
log::info!(
"Page range filter: kept {} of {} pages",
state.pages.len(),
total
);
}
}
});
timed_stage!(timer, "Stage 1b (Watermark Removal)", state, {
watermark_detector::remove_watermarks(&mut state.pages);
});
log::info!(
"Stage 1b (Watermark Removal) complete: {} elements",
state.total_elements()
);
let filter_config = &state.config.filter_config;
let default_page = BoundingBox::new(None, 0.0, 0.0, 595.0, 842.0);
timed_stage!(timer, "Stage 2 (Content Filtering)", state, {
par_map_pages_indexed(&mut state.pages, |page_idx, elements| {
let page_bbox = state
.page_info
.get(page_idx)
.map(|info| info.crop_box.clone())
.unwrap_or_else(|| default_page.clone());
content_filter::filter_content(elements, filter_config, &page_bbox)
});
});
log::info!(
"Stage 2 (Content Filtering) complete: {} elements",
state.total_elements()
);
let replacement = &state.config.replace_invalid_chars;
timed_stage!(timer, "Stage 2b (Replace Undefined Chars)", state, {
if replacement != "\u{FFFD}" {
par_map_pages(&mut state.pages, |mut elements| {
for elem in &mut elements {
replace_fffd_in_element(elem, replacement);
}
elements
});
}
});
log::info!("Stage 2b (Replace Undefined Chars) complete");
timed_stage!(timer, "Stage 3-4 (Table Border Detection)", state, {
par_map_pages(&mut state.pages, table_detector::detect_table_borders);
});
log::info!(
"Stage 3-4 (Table Border Detection) complete: {} elements",
state.total_elements()
);
timed_stage!(timer, "Stage 4b (Table Content Assignment)", state, {
par_map_pages(
&mut state.pages,
table_content_assigner::assign_content_to_tables,
);
});
log::info!(
"Stage 4b (Table Content Assignment) complete: {} elements",
state.total_elements()
);
timed_stage!(timer, "Stage 4b2 (Empty Table Filter)", state, {
par_map_pages(&mut state.pages, table_detector::filter_empty_tables);
});
log::info!(
"Stage 4b2 (Empty Table Filter) complete: {} elements",
state.total_elements()
);
timed_stage!(timer, "Stage 4c (Boxed Heading Promoter)", state, {
par_map_pages(
&mut state.pages,
boxed_heading_promoter::promote_boxed_headings,
);
});
log::info!(
"Stage 4c (Boxed Heading Promoter) complete: {} elements",
state.total_elements()
);
timed_stage!(timer, "Stage 4d (Pre-Cluster Table Release)", state, {
par_map_pages(&mut state.pages, table_detector::release_pre_cluster_tables);
});
log::info!(
"Stage 4d (Pre-Cluster Table Release) complete: {} elements",
state.total_elements()
);
let column_layouts = timed_stage!(timer, "Stage 5b (Column Detection)", state, {
column_detector::detect_columns(&mut state.pages)
});
log::info!(
"Stage 5b (Column Detection) complete: {} elements",
state.total_elements()
);
timed_stage!(timer, "Stage 6 (Text Line Grouping)", state, {
par_map_pages_indexed(&mut state.pages, |page_idx, elements| {
let layout = column_layouts.get(page_idx);
text_line_grouper::group_text_lines(elements, layout)
});
});
log::info!(
"Stage 6 (Text Line Grouping) complete: {} elements",
state.total_elements()
);
timed_stage!(timer, "Stage 6.5 (List Detection Pass 1)", state, {
par_map_pages(&mut state.pages, list_detector::detect_lists);
});
log::info!(
"Stage 6.5 (List Detection Pass 1) complete: {} elements",
state.total_elements()
);
timed_stage!(timer, "Stage 7 (Text Block Grouping)", state, {
par_map_pages(&mut state.pages, text_block_grouper::group_text_blocks);
});
log::info!(
"Stage 7 (Text Block Grouping) complete: {} elements",
state.total_elements()
);
timed_stage!(timer, "Stage 7b (Cluster Table Detection)", state, {
par_map_pages(
&mut state.pages,
cluster_table_detector::detect_cluster_tables,
);
});
log::info!(
"Stage 7b (Cluster Table Detection) complete: {} elements",
state.total_elements()
);
timed_stage!(timer, "Stage 7b2 (Suspicious Table Filter)", state, {
par_map_pages(&mut state.pages, table_detector::filter_suspicious_tables);
});
log::info!(
"Stage 7b2 (Suspicious Table Filter) complete: {} elements",
state.total_elements()
);
let page_height = if !state.page_info.is_empty() {
let mut heights: Vec<f64> = state.page_info.iter().map(|p| p.height).collect();
heights.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
heights[heights.len() / 2]
} else {
842.0
};
timed_stage!(timer, "Stage 8 (Header/Footer Detection)", state, {
header_footer::detect_headers_footers(&mut state.pages, page_height);
});
log::info!(
"Stage 8 (Header/Footer Detection) complete: {} elements",
state.total_elements()
);
timed_stage!(timer, "Stage 9 (List Detection)", state, {
par_map_pages(&mut state.pages, list_detector::detect_lists);
});
log::info!(
"Stage 9 (List Detection) complete: {} elements",
state.total_elements()
);
timed_stage!(timer, "Stage 10 (Paragraph Detection)", state, {
par_map_pages(&mut state.pages, paragraph_detector::detect_paragraphs);
});
log::info!(
"Stage 10 (Paragraph Detection) complete: {} elements",
state.total_elements()
);
timed_stage!(timer, "Stage 10b (Figure Detection)", state, {
par_map_pages(&mut state.pages, figure_detector::detect_figures);
});
log::info!(
"Stage 10b (Figure Detection) complete: {} elements",
state.total_elements()
);
timed_stage!(timer, "Stage 12 (Heading Detection)", state, {
heading_detector::detect_headings(&mut state.pages, state.mcid_map.as_ref());
});
log::info!(
"Stage 12 (Heading Detection) complete: {} elements",
state.total_elements()
);
timed_stage!(timer, "Stage 18-pre (Reading Order pre-pass)", state, {
reading_order::sort_reading_order(&mut state.pages, &state.page_info);
});
log::info!(
"Stage 18-pre (Reading Order pre-pass) complete: {} elements",
state.total_elements()
);
timed_stage!(timer, "Stage 11 (List Detection Pass 2)", state, {
par_map_pages(&mut state.pages, list_pass2::detect_paragraph_lists);
});
log::info!(
"Stage 11 (List Detection Pass 2) complete: {} elements",
state.total_elements()
);
timed_stage!(timer, "Stage 11b (Common-prefix Lists)", state, {
list_pass2::detect_common_prefix_lists_document(&mut state.pages);
});
log::info!(
"Stage 11b (Common-prefix Lists) complete: {} elements",
state.total_elements()
);
timed_stage!(timer, "Stage 13 (ID Assignment)", state, {
id_assignment::assign_ids(&mut state.pages);
});
log::info!(
"Stage 13 (ID Assignment) complete: {} elements",
state.total_elements()
);
timed_stage!(timer, "Stage 14 (Caption Linking)", state, {
caption_linker::link_captions(&mut state.pages);
});
log::info!(
"Stage 14 (Caption Linking) complete: {} elements",
state.total_elements()
);
timed_stage!(timer, "Stage 14b (Footnote Detection)", state, {
footnote_detector::detect_footnotes(&mut state.pages);
});
log::info!(
"Stage 14b (Footnote Detection) complete: {} elements",
state.total_elements()
);
timed_stage!(timer, "Stage 14c (TOC Detection)", state, {
toc_detector::detect_toc(&mut state.pages);
});
log::info!(
"Stage 14c (TOC Detection) complete: {} elements",
state.total_elements()
);
timed_stage!(timer, "Stage 15 (Cross-Page Table Linking)", state, {
cross_page_linker::link_cross_page_tables(&mut state.pages);
});
log::info!(
"Stage 15 (Cross-Page Table Linking) complete: {} elements",
state.total_elements()
);
timed_stage!(timer, "Stage 17 (Nesting Level Assignment)", state, {
nesting_level::assign_nesting_levels(&mut state.pages);
});
log::info!(
"Stage 17 (Nesting Level Assignment) complete: {} elements",
state.total_elements()
);
timed_stage!(timer, "Stage 18 (Reading Order)", state, {
reading_order::sort_reading_order(&mut state.pages, &state.page_info);
});
log::info!(
"Stage 18 (Reading Order) complete: {} elements",
state.total_elements()
);
timed_stage!(timer, "Stage 19 (Content Sanitization)", state, {
content_sanitizer::sanitize_content(&mut state.pages, state.config.sanitize);
});
log::info!(
"Stage 19 (Content Sanitization) complete: {} elements",
state.total_elements()
);
if let Some(timer) = timer.as_ref() {
timer.log_summary();
}
log::info!("Pipeline complete");
Ok(())
}
fn pipeline_timing_enabled() -> bool {
std::env::var("EDGEPARSE_PIPELINE_TIMING")
.map(|value| {
matches!(
value.to_ascii_lowercase().as_str(),
"1" | "true" | "yes" | "on"
)
})
.unwrap_or(false)
}
fn replace_fffd_in_element(elem: &mut ContentElement, replacement: &str) {
if let ContentElement::TextChunk(c) = elem {
if c.value.contains('\u{FFFD}') {
c.value = c.value.replace('\u{FFFD}', replacement);
}
} }
#[cfg(test)]
mod tests {
use super::*;
use crate::api::config::ProcessingConfig;
use crate::models::chunks::TextChunk;
use crate::models::enums::{PdfLayer, TextFormat, TextType};
#[test]
fn test_pipeline_state() {
let state = PipelineState::new(vec![vec![], vec![]], ProcessingConfig::default());
assert_eq!(state.pages.len(), 2);
assert_eq!(state.total_elements(), 0);
}
#[test]
fn test_run_empty_pipeline() {
let mut state = PipelineState::new(vec![], ProcessingConfig::default());
let result = run_pipeline(&mut state);
assert!(result.is_ok());
}
#[test]
fn test_content_filter_uses_real_page_geometry() {
let chunk = ContentElement::TextChunk(TextChunk {
value: "Right column".to_string(),
bbox: BoundingBox::new(Some(1), 800.0, 400.0, 900.0, 420.0),
font_name: "Helvetica".to_string(),
font_size: 12.0,
font_weight: 400.0,
italic_angle: 0.0,
font_color: "[0.0]".to_string(),
contrast_ratio: 21.0,
symbol_ends: vec![],
text_format: TextFormat::Normal,
text_type: TextType::Regular,
pdf_layer: PdfLayer::Main,
ocg_visible: true,
index: None,
page_number: Some(1),
level: None,
mcid: None,
});
let page_info = vec![PageInfo {
index: 0,
page_number: 1,
media_box: BoundingBox::new(None, 0.0, 0.0, 960.0, 540.0),
crop_box: BoundingBox::new(None, 0.0, 0.0, 960.0, 540.0),
rotation: 0,
width: 960.0,
height: 540.0,
}];
let mut state = PipelineState::new(vec![vec![chunk]], ProcessingConfig::default())
.with_page_info(page_info);
run_pipeline(&mut state).unwrap();
assert!(state.total_elements() > 0);
}
}