Skip to main content

edgeparse_core/pipeline/
parallel.rs

1//! Parallel page-processing utilities using rayon.
2//!
3//! Provides helpers that apply a per-page transformation in parallel across all
4//! pages of a document. Designed as a drop-in replacement for the sequential
5//! `for page in &mut pages { ... }` loops in the orchestrator.
6
7use rayon::prelude::*;
8
9use crate::models::content::ContentElement;
10
11/// Per-page content alias (mirrors `orchestrator::PageContent`).
12type PageContent = Vec<ContentElement>;
13
14/// Apply `op` to each page in parallel, replacing each page's content.
15///
16/// This is the parallel equivalent of:
17/// ```ignore
18/// for page in &mut pages {
19///     let elements = std::mem::take(page);
20///     *page = op(elements);
21/// }
22/// ```
23pub fn par_map_pages<F>(pages: &mut Vec<PageContent>, op: F)
24where
25    F: Fn(Vec<ContentElement>) -> Vec<ContentElement> + Sync + Send,
26{
27    let results: Vec<PageContent> = std::mem::take(pages).into_par_iter().map(op).collect();
28    *pages = results;
29}
30
31/// Apply `op` to each page in parallel where the closure also receives a
32/// zero-based page index.
33pub fn par_map_pages_indexed<F>(pages: &mut Vec<PageContent>, op: F)
34where
35    F: Fn(usize, Vec<ContentElement>) -> Vec<ContentElement> + Sync + Send,
36{
37    let results: Vec<PageContent> = std::mem::take(pages)
38        .into_par_iter()
39        .enumerate()
40        .map(|(i, page)| op(i, page))
41        .collect();
42    *pages = results;
43}
44
45/// Parallel fold — map each page to a value of type `T` and collect results.
46///
47/// Useful for gathering per-page statistics or metadata without modifying pages.
48pub fn par_extract<T, F>(pages: &[PageContent], op: F) -> Vec<T>
49where
50    T: Send,
51    F: Fn(&[ContentElement]) -> T + Sync + Send,
52{
53    pages.par_iter().map(|page| op(page)).collect()
54}
55
56/// Configure the global rayon thread pool with the given number of threads.
57///
58/// Returns `Ok(())` on success. Calling this more than once (after the pool is
59/// already initialized) returns an error which callers may ignore.
60pub fn configure_thread_pool(num_threads: usize) -> Result<(), rayon::ThreadPoolBuildError> {
61    rayon::ThreadPoolBuilder::new()
62        .num_threads(num_threads)
63        .build_global()
64}
65
66#[cfg(test)]
67mod tests {
68    use super::*;
69    use crate::models::bbox::BoundingBox;
70    use crate::models::chunks::TextChunk;
71    use crate::models::content::ContentElement;
72    use crate::models::enums::{PdfLayer, TextFormat, TextType};
73
74    fn text_chunk(val: &str) -> ContentElement {
75        ContentElement::TextChunk(TextChunk {
76            value: val.to_string(),
77            bbox: BoundingBox::new(None, 0.0, 0.0, 100.0, 10.0),
78            font_name: String::new(),
79            font_size: 12.0,
80            font_weight: 400.0,
81            italic_angle: 0.0,
82            font_color: String::new(),
83            contrast_ratio: 21.0,
84            symbol_ends: vec![],
85            text_format: TextFormat::Normal,
86            text_type: TextType::Regular,
87            pdf_layer: PdfLayer::Main,
88            ocg_visible: true,
89            index: None,
90            page_number: None,
91            level: None,
92            mcid: None,
93        })
94    }
95
96    #[test]
97    fn test_par_map_pages_identity() {
98        let mut pages = vec![
99            vec![text_chunk("a"), text_chunk("b")],
100            vec![text_chunk("c")],
101        ];
102        par_map_pages(&mut pages, |elems| elems);
103        assert_eq!(pages.len(), 2);
104        assert_eq!(pages[0].len(), 2);
105        assert_eq!(pages[1].len(), 1);
106    }
107
108    #[test]
109    fn test_par_map_pages_transform() {
110        let mut pages = vec![
111            vec![text_chunk("a"), text_chunk("b"), text_chunk("c")],
112            vec![text_chunk("x")],
113        ];
114        // Keep only first element per page
115        par_map_pages(&mut pages, |mut elems| {
116            elems.truncate(1);
117            elems
118        });
119        assert_eq!(pages[0].len(), 1);
120        assert_eq!(pages[1].len(), 1);
121    }
122
123    #[test]
124    fn test_par_map_pages_indexed() {
125        let mut pages = vec![
126            vec![text_chunk("a")],
127            vec![text_chunk("b")],
128            vec![text_chunk("c")],
129        ];
130        let indices_seen = std::sync::Mutex::new(vec![]);
131        par_map_pages_indexed(&mut pages, |i, elems| {
132            indices_seen.lock().unwrap().push(i);
133            elems
134        });
135        let mut seen = indices_seen.into_inner().unwrap();
136        seen.sort();
137        assert_eq!(seen, vec![0, 1, 2]);
138    }
139
140    #[test]
141    fn test_par_extract() {
142        let pages = vec![
143            vec![text_chunk("a"), text_chunk("b")],
144            vec![text_chunk("c")],
145            vec![],
146        ];
147        let counts: Vec<usize> = par_extract(&pages, |elems| elems.len());
148        assert_eq!(counts, vec![2, 1, 0]);
149    }
150
151    #[test]
152    fn test_empty_pages() {
153        let mut pages: Vec<PageContent> = vec![];
154        par_map_pages(&mut pages, |e| e);
155        assert!(pages.is_empty());
156
157        let counts: Vec<usize> = par_extract(&pages, |e| e.len());
158        assert!(counts.is_empty());
159    }
160}