Skip to main content

pdf_engine/
batch.rs

1//! Batch helpers for processing many PDFs with a bounded worker pool.
2
3use crate::{PdfDocument, Result};
4use std::fs;
5use std::path::{Path, PathBuf};
6use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
7use std::sync::{mpsc, Arc};
8use std::thread;
9use std::time::{Duration, Instant};
10
11/// How batch processing should classify per-file failures.
12#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
13pub enum ErrorStrategy {
14    /// Collect failures in [`BatchResult::failures`] and continue (default).
15    #[default]
16    Collect,
17    /// Record failures in [`BatchResult::skipped`] and continue.
18    Skip,
19    /// Stop scheduling new work after the first failure.
20    FailFast,
21}
22
23/// Configuration for [`process_batch`] and [`PdfBatch`] helpers.
24pub struct BatchConfig {
25    /// Number of worker threads to use.
26    pub workers: usize,
27    /// Maximum time allowed per input PDF.
28    pub timeout: Duration,
29    /// How per-file failures should be recorded.
30    pub on_error: ErrorStrategy,
31    /// Optional progress callback invoked as `(done, total)`.
32    pub on_progress: Option<Box<dyn Fn(usize, usize) + Send + Sync + 'static>>,
33}
34
35impl Default for BatchConfig {
36    fn default() -> Self {
37        Self {
38            workers: thread::available_parallelism()
39                .map(|n| n.get())
40                .unwrap_or(1),
41            timeout: Duration::from_secs(30),
42            on_error: ErrorStrategy::Collect,
43            on_progress: None,
44        }
45    }
46}
47
48/// Aggregated outcome of a batch run.
49#[derive(Debug)]
50pub struct BatchResult<T> {
51    /// Successfully processed PDFs and their outputs.
52    pub successes: Vec<(PathBuf, T)>,
53    /// PDFs that failed to process.
54    pub failures: Vec<(PathBuf, String)>,
55    /// PDFs skipped due to [`ErrorStrategy::Skip`].
56    pub skipped: Vec<(PathBuf, String)>,
57    /// End-to-end runtime for the batch.
58    pub duration: Duration,
59}
60
61impl<T> Default for BatchResult<T> {
62    fn default() -> Self {
63        Self {
64            successes: Vec::new(),
65            failures: Vec::new(),
66            skipped: Vec::new(),
67            duration: Duration::default(),
68        }
69    }
70}
71
72impl<T> BatchResult<T> {
73    /// Total number of completed inputs.
74    pub fn total(&self) -> usize {
75        self.successes.len() + self.failures.len() + self.skipped.len()
76    }
77}
78
79/// Convenience helpers built on top of [`process_batch`].
80#[derive(Debug, Default, Clone, Copy)]
81pub struct PdfBatch;
82
83impl PdfBatch {
84    /// Recursively collect `.pdf` files from a directory tree.
85    pub fn collect_pdfs(root: impl AsRef<Path>) -> std::io::Result<Vec<PathBuf>> {
86        let mut paths = Vec::new();
87        collect_pdfs_recursive(root.as_ref(), &mut paths)?;
88        paths.sort();
89        Ok(paths)
90    }
91
92    /// Extract combined text from every PDF under `root`.
93    pub fn extract_text(root: impl AsRef<Path>, config: BatchConfig) -> BatchResult<String> {
94        let root = root.as_ref().to_path_buf();
95        match Self::collect_pdfs(&root) {
96            Ok(paths) => process_batch(&paths, config, |doc| Ok(doc.extract_all_text())),
97            Err(err) => {
98                let mut result = BatchResult::default();
99                result.failures.push((root, err.to_string()));
100                result
101            }
102        }
103    }
104
105    /// Count pages for every PDF under `root`.
106    pub fn page_counts(root: impl AsRef<Path>, config: BatchConfig) -> BatchResult<usize> {
107        let root = root.as_ref().to_path_buf();
108        match Self::collect_pdfs(&root) {
109            Ok(paths) => process_batch(&paths, config, |doc| Ok(doc.page_count())),
110            Err(err) => {
111                let mut result = BatchResult::default();
112                result.failures.push((root, err.to_string()));
113                result
114            }
115        }
116    }
117}
118
119/// Process a list of PDF paths with a fixed-size worker pool.
120pub fn process_batch<P, F, T>(paths: &[P], config: BatchConfig, processor: F) -> BatchResult<T>
121where
122    P: AsRef<Path>,
123    F: Fn(&PdfDocument) -> Result<T> + Send + Sync + 'static,
124    T: Send + 'static,
125{
126    let started = Instant::now();
127    let input_paths = paths
128        .iter()
129        .map(|path| path.as_ref().to_path_buf())
130        .collect::<Vec<_>>();
131    let total = input_paths.len();
132
133    if total == 0 {
134        return BatchResult {
135            duration: started.elapsed(),
136            ..Default::default()
137        };
138    }
139
140    let workers = config.workers.max(1).min(total);
141    let next_index = Arc::new(AtomicUsize::new(0));
142    let stop = Arc::new(AtomicBool::new(false));
143    let input_paths = Arc::new(input_paths);
144    let processor = Arc::new(processor);
145    let (tx, rx) = mpsc::channel();
146    let mut handles = Vec::with_capacity(workers);
147
148    for _ in 0..workers {
149        let tx = tx.clone();
150        let next_index = Arc::clone(&next_index);
151        let stop = Arc::clone(&stop);
152        let input_paths = Arc::clone(&input_paths);
153        let processor = Arc::clone(&processor);
154        let timeout = config.timeout;
155
156        handles.push(thread::spawn(move || loop {
157            if stop.load(Ordering::Relaxed) {
158                break;
159            }
160
161            let index = next_index.fetch_add(1, Ordering::Relaxed);
162            if index >= input_paths.len() {
163                break;
164            }
165
166            let path = input_paths[index].clone();
167            let outcome = process_one(path, timeout, Arc::clone(&processor));
168            if tx.send(outcome).is_err() {
169                break;
170            }
171        }));
172    }
173    drop(tx);
174
175    let mut result = BatchResult::default();
176    let mut done = 0usize;
177
178    while done < total {
179        let Ok(outcome) = rx.recv() else {
180            break;
181        };
182        done += 1;
183
184        match outcome {
185            BatchOutcome::Success(path, value) => result.successes.push((path, value)),
186            BatchOutcome::Failure(path, err) => match config.on_error {
187                ErrorStrategy::Collect => result.failures.push((path, err)),
188                ErrorStrategy::Skip => result.skipped.push((path, err)),
189                ErrorStrategy::FailFast => {
190                    result.failures.push((path, err));
191                    stop.store(true, Ordering::Relaxed);
192                }
193            },
194        }
195
196        if let Some(callback) = config.on_progress.as_ref() {
197            callback(done, total);
198        }
199
200        if config.on_error == ErrorStrategy::FailFast && !result.failures.is_empty() {
201            break;
202        }
203    }
204
205    stop.store(true, Ordering::Relaxed);
206    for handle in handles {
207        let _ = handle.join();
208    }
209
210    result.duration = started.elapsed();
211    result
212}
213
214#[derive(Debug)]
215enum BatchOutcome<T> {
216    Success(PathBuf, T),
217    Failure(PathBuf, String),
218}
219
220fn process_one<F, T>(path: PathBuf, timeout: Duration, processor: Arc<F>) -> BatchOutcome<T>
221where
222    F: Fn(&PdfDocument) -> Result<T> + Send + Sync + 'static,
223    T: Send + 'static,
224{
225    let bytes = match fs::read(&path) {
226        Ok(bytes) => bytes,
227        Err(err) => return BatchOutcome::Failure(path, err.to_string()),
228    };
229
230    let (tx, rx) = mpsc::sync_channel(1);
231    thread::spawn(move || {
232        let outcome = (|| {
233            let doc = PdfDocument::open(bytes).map_err(|err| err.to_string())?;
234            processor(&doc).map_err(|err| err.to_string())
235        })();
236        let _ = tx.send(outcome);
237    });
238
239    match rx.recv_timeout(timeout) {
240        Ok(Ok(value)) => BatchOutcome::Success(path, value),
241        Ok(Err(err)) => BatchOutcome::Failure(path, err),
242        Err(mpsc::RecvTimeoutError::Timeout) => {
243            BatchOutcome::Failure(path, format!("timed out after {}s", timeout.as_secs_f64()))
244        }
245        Err(mpsc::RecvTimeoutError::Disconnected) => {
246            BatchOutcome::Failure(path, "worker disconnected".into())
247        }
248    }
249}
250
251fn collect_pdfs_recursive(root: &Path, out: &mut Vec<PathBuf>) -> std::io::Result<()> {
252    for entry in fs::read_dir(root)? {
253        let entry = entry?;
254        let path = entry.path();
255        if path.is_dir() {
256            collect_pdfs_recursive(&path, out)?;
257            continue;
258        }
259
260        if path
261            .extension()
262            .and_then(|ext| ext.to_str())
263            .is_some_and(|ext| ext.eq_ignore_ascii_case("pdf"))
264        {
265            out.push(path);
266        }
267    }
268
269    Ok(())
270}
271
272#[cfg(test)]
273mod tests {
274    use super::{process_batch, BatchConfig, ErrorStrategy, PdfBatch};
275    use std::fs;
276    use std::path::PathBuf;
277    use std::time::{Duration, SystemTime, UNIX_EPOCH};
278
279    #[test]
280    fn batch_process_counts_pages() {
281        let dir = make_temp_dir("counts");
282        let first = dir.join("first.pdf");
283        let second = dir.join("second.pdf");
284        fs::write(&first, minimal_pdf_bytes()).expect("write fixture");
285        fs::write(&second, minimal_pdf_bytes()).expect("write fixture");
286
287        let result = process_batch(
288            &[first.clone(), second.clone()],
289            BatchConfig {
290                workers: 2,
291                timeout: Duration::from_secs(5),
292                on_error: ErrorStrategy::Collect,
293                on_progress: None,
294            },
295            |doc| Ok(doc.page_count()),
296        );
297
298        assert_eq!(result.successes.len(), 2);
299        assert!(result.failures.is_empty());
300        assert!(result.skipped.is_empty());
301        assert!(result.successes.iter().all(|(_, count)| *count == 1));
302
303        fs::remove_dir_all(dir).ok();
304    }
305
306    #[test]
307    fn batch_process_skip_strategy_records_skips() {
308        let dir = make_temp_dir("skip");
309        let good = dir.join("good.pdf");
310        let bad = dir.join("bad.pdf");
311        fs::write(&good, minimal_pdf_bytes()).expect("write fixture");
312        fs::write(&bad, b"not a pdf").expect("write invalid fixture");
313
314        let result = process_batch(
315            &[good, bad],
316            BatchConfig {
317                workers: 1,
318                timeout: Duration::from_secs(5),
319                on_error: ErrorStrategy::Skip,
320                on_progress: None,
321            },
322            |doc| Ok(doc.page_count()),
323        );
324
325        assert_eq!(result.successes.len(), 1);
326        assert!(result.failures.is_empty());
327        assert_eq!(result.skipped.len(), 1);
328
329        fs::remove_dir_all(dir).ok();
330    }
331
332    #[test]
333    fn pdf_batch_collects_nested_pdfs() {
334        let dir = make_temp_dir("walk");
335        let nested = dir.join("nested");
336        fs::create_dir_all(&nested).expect("create nested dir");
337        fs::write(dir.join("root.pdf"), minimal_pdf_bytes()).expect("write root pdf");
338        fs::write(nested.join("child.pdf"), minimal_pdf_bytes()).expect("write child pdf");
339        fs::write(nested.join("ignore.txt"), b"hello").expect("write non-pdf");
340
341        let pdfs = PdfBatch::collect_pdfs(&dir).expect("collect pdfs");
342        assert_eq!(pdfs.len(), 2);
343
344        fs::remove_dir_all(dir).ok();
345    }
346
347    fn make_temp_dir(label: &str) -> PathBuf {
348        let nanos = SystemTime::now()
349            .duration_since(UNIX_EPOCH)
350            .expect("time")
351            .as_nanos();
352        let dir = std::env::temp_dir().join(format!(
353            "pdf-engine-batch-{label}-{}-{nanos}",
354            std::process::id()
355        ));
356        fs::create_dir_all(&dir).expect("create temp dir");
357        dir
358    }
359
360    fn minimal_pdf_bytes() -> Vec<u8> {
361        br#"%PDF-1.4
3621 0 obj
363<< /Type /Catalog /Pages 2 0 R >>
364endobj
3652 0 obj
366<< /Type /Pages /Kids [3 0 R] /Count 1 >>
367endobj
3683 0 obj
369<< /Type /Page /Parent 2 0 R /MediaBox [0 0 200 200] >>
370endobj
371xref
3720 4
3730000000000 65535 f 
3740000000009 00000 n 
3750000000058 00000 n 
3760000000115 00000 n 
377trailer
378<< /Size 4 /Root 1 0 R >>
379startxref
380186
381%%EOF
382"#
383        .to_vec()
384    }
385}