1use crate::{PdfDocument, Result};
4use std::fs;
5use std::path::{Path, PathBuf};
6use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
7use std::sync::{mpsc, Arc};
8use std::thread;
9use std::time::{Duration, Instant};
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
13pub enum ErrorStrategy {
14 #[default]
16 Collect,
17 Skip,
19 FailFast,
21}
22
23pub struct BatchConfig {
25 pub workers: usize,
27 pub timeout: Duration,
29 pub on_error: ErrorStrategy,
31 pub on_progress: Option<Box<dyn Fn(usize, usize) + Send + Sync + 'static>>,
33}
34
35impl Default for BatchConfig {
36 fn default() -> Self {
37 Self {
38 workers: thread::available_parallelism()
39 .map(|n| n.get())
40 .unwrap_or(1),
41 timeout: Duration::from_secs(30),
42 on_error: ErrorStrategy::Collect,
43 on_progress: None,
44 }
45 }
46}
47
48#[derive(Debug)]
50pub struct BatchResult<T> {
51 pub successes: Vec<(PathBuf, T)>,
53 pub failures: Vec<(PathBuf, String)>,
55 pub skipped: Vec<(PathBuf, String)>,
57 pub duration: Duration,
59}
60
61impl<T> Default for BatchResult<T> {
62 fn default() -> Self {
63 Self {
64 successes: Vec::new(),
65 failures: Vec::new(),
66 skipped: Vec::new(),
67 duration: Duration::default(),
68 }
69 }
70}
71
72impl<T> BatchResult<T> {
73 pub fn total(&self) -> usize {
75 self.successes.len() + self.failures.len() + self.skipped.len()
76 }
77}
78
79#[derive(Debug, Default, Clone, Copy)]
81pub struct PdfBatch;
82
83impl PdfBatch {
84 pub fn collect_pdfs(root: impl AsRef<Path>) -> std::io::Result<Vec<PathBuf>> {
86 let mut paths = Vec::new();
87 collect_pdfs_recursive(root.as_ref(), &mut paths)?;
88 paths.sort();
89 Ok(paths)
90 }
91
92 pub fn extract_text(root: impl AsRef<Path>, config: BatchConfig) -> BatchResult<String> {
94 let root = root.as_ref().to_path_buf();
95 match Self::collect_pdfs(&root) {
96 Ok(paths) => process_batch(&paths, config, |doc| Ok(doc.extract_all_text())),
97 Err(err) => {
98 let mut result = BatchResult::default();
99 result.failures.push((root, err.to_string()));
100 result
101 }
102 }
103 }
104
105 pub fn page_counts(root: impl AsRef<Path>, config: BatchConfig) -> BatchResult<usize> {
107 let root = root.as_ref().to_path_buf();
108 match Self::collect_pdfs(&root) {
109 Ok(paths) => process_batch(&paths, config, |doc| Ok(doc.page_count())),
110 Err(err) => {
111 let mut result = BatchResult::default();
112 result.failures.push((root, err.to_string()));
113 result
114 }
115 }
116 }
117}
118
119pub fn process_batch<P, F, T>(paths: &[P], config: BatchConfig, processor: F) -> BatchResult<T>
121where
122 P: AsRef<Path>,
123 F: Fn(&PdfDocument) -> Result<T> + Send + Sync + 'static,
124 T: Send + 'static,
125{
126 let started = Instant::now();
127 let input_paths = paths
128 .iter()
129 .map(|path| path.as_ref().to_path_buf())
130 .collect::<Vec<_>>();
131 let total = input_paths.len();
132
133 if total == 0 {
134 return BatchResult {
135 duration: started.elapsed(),
136 ..Default::default()
137 };
138 }
139
140 let workers = config.workers.max(1).min(total);
141 let next_index = Arc::new(AtomicUsize::new(0));
142 let stop = Arc::new(AtomicBool::new(false));
143 let input_paths = Arc::new(input_paths);
144 let processor = Arc::new(processor);
145 let (tx, rx) = mpsc::channel();
146 let mut handles = Vec::with_capacity(workers);
147
148 for _ in 0..workers {
149 let tx = tx.clone();
150 let next_index = Arc::clone(&next_index);
151 let stop = Arc::clone(&stop);
152 let input_paths = Arc::clone(&input_paths);
153 let processor = Arc::clone(&processor);
154 let timeout = config.timeout;
155
156 handles.push(thread::spawn(move || loop {
157 if stop.load(Ordering::Relaxed) {
158 break;
159 }
160
161 let index = next_index.fetch_add(1, Ordering::Relaxed);
162 if index >= input_paths.len() {
163 break;
164 }
165
166 let path = input_paths[index].clone();
167 let outcome = process_one(path, timeout, Arc::clone(&processor));
168 if tx.send(outcome).is_err() {
169 break;
170 }
171 }));
172 }
173 drop(tx);
174
175 let mut result = BatchResult::default();
176 let mut done = 0usize;
177
178 while done < total {
179 let Ok(outcome) = rx.recv() else {
180 break;
181 };
182 done += 1;
183
184 match outcome {
185 BatchOutcome::Success(path, value) => result.successes.push((path, value)),
186 BatchOutcome::Failure(path, err) => match config.on_error {
187 ErrorStrategy::Collect => result.failures.push((path, err)),
188 ErrorStrategy::Skip => result.skipped.push((path, err)),
189 ErrorStrategy::FailFast => {
190 result.failures.push((path, err));
191 stop.store(true, Ordering::Relaxed);
192 }
193 },
194 }
195
196 if let Some(callback) = config.on_progress.as_ref() {
197 callback(done, total);
198 }
199
200 if config.on_error == ErrorStrategy::FailFast && !result.failures.is_empty() {
201 break;
202 }
203 }
204
205 stop.store(true, Ordering::Relaxed);
206 for handle in handles {
207 let _ = handle.join();
208 }
209
210 result.duration = started.elapsed();
211 result
212}
213
214#[derive(Debug)]
215enum BatchOutcome<T> {
216 Success(PathBuf, T),
217 Failure(PathBuf, String),
218}
219
220fn process_one<F, T>(path: PathBuf, timeout: Duration, processor: Arc<F>) -> BatchOutcome<T>
221where
222 F: Fn(&PdfDocument) -> Result<T> + Send + Sync + 'static,
223 T: Send + 'static,
224{
225 let bytes = match fs::read(&path) {
226 Ok(bytes) => bytes,
227 Err(err) => return BatchOutcome::Failure(path, err.to_string()),
228 };
229
230 let (tx, rx) = mpsc::sync_channel(1);
231 thread::spawn(move || {
232 let outcome = (|| {
233 let doc = PdfDocument::open(bytes).map_err(|err| err.to_string())?;
234 processor(&doc).map_err(|err| err.to_string())
235 })();
236 let _ = tx.send(outcome);
237 });
238
239 match rx.recv_timeout(timeout) {
240 Ok(Ok(value)) => BatchOutcome::Success(path, value),
241 Ok(Err(err)) => BatchOutcome::Failure(path, err),
242 Err(mpsc::RecvTimeoutError::Timeout) => {
243 BatchOutcome::Failure(path, format!("timed out after {}s", timeout.as_secs_f64()))
244 }
245 Err(mpsc::RecvTimeoutError::Disconnected) => {
246 BatchOutcome::Failure(path, "worker disconnected".into())
247 }
248 }
249}
250
251fn collect_pdfs_recursive(root: &Path, out: &mut Vec<PathBuf>) -> std::io::Result<()> {
252 for entry in fs::read_dir(root)? {
253 let entry = entry?;
254 let path = entry.path();
255 if path.is_dir() {
256 collect_pdfs_recursive(&path, out)?;
257 continue;
258 }
259
260 if path
261 .extension()
262 .and_then(|ext| ext.to_str())
263 .is_some_and(|ext| ext.eq_ignore_ascii_case("pdf"))
264 {
265 out.push(path);
266 }
267 }
268
269 Ok(())
270}
271
272#[cfg(test)]
273mod tests {
274 use super::{process_batch, BatchConfig, ErrorStrategy, PdfBatch};
275 use std::fs;
276 use std::path::PathBuf;
277 use std::time::{Duration, SystemTime, UNIX_EPOCH};
278
279 #[test]
280 fn batch_process_counts_pages() {
281 let dir = make_temp_dir("counts");
282 let first = dir.join("first.pdf");
283 let second = dir.join("second.pdf");
284 fs::write(&first, minimal_pdf_bytes()).expect("write fixture");
285 fs::write(&second, minimal_pdf_bytes()).expect("write fixture");
286
287 let result = process_batch(
288 &[first.clone(), second.clone()],
289 BatchConfig {
290 workers: 2,
291 timeout: Duration::from_secs(5),
292 on_error: ErrorStrategy::Collect,
293 on_progress: None,
294 },
295 |doc| Ok(doc.page_count()),
296 );
297
298 assert_eq!(result.successes.len(), 2);
299 assert!(result.failures.is_empty());
300 assert!(result.skipped.is_empty());
301 assert!(result.successes.iter().all(|(_, count)| *count == 1));
302
303 fs::remove_dir_all(dir).ok();
304 }
305
306 #[test]
307 fn batch_process_skip_strategy_records_skips() {
308 let dir = make_temp_dir("skip");
309 let good = dir.join("good.pdf");
310 let bad = dir.join("bad.pdf");
311 fs::write(&good, minimal_pdf_bytes()).expect("write fixture");
312 fs::write(&bad, b"not a pdf").expect("write invalid fixture");
313
314 let result = process_batch(
315 &[good, bad],
316 BatchConfig {
317 workers: 1,
318 timeout: Duration::from_secs(5),
319 on_error: ErrorStrategy::Skip,
320 on_progress: None,
321 },
322 |doc| Ok(doc.page_count()),
323 );
324
325 assert_eq!(result.successes.len(), 1);
326 assert!(result.failures.is_empty());
327 assert_eq!(result.skipped.len(), 1);
328
329 fs::remove_dir_all(dir).ok();
330 }
331
332 #[test]
333 fn pdf_batch_collects_nested_pdfs() {
334 let dir = make_temp_dir("walk");
335 let nested = dir.join("nested");
336 fs::create_dir_all(&nested).expect("create nested dir");
337 fs::write(dir.join("root.pdf"), minimal_pdf_bytes()).expect("write root pdf");
338 fs::write(nested.join("child.pdf"), minimal_pdf_bytes()).expect("write child pdf");
339 fs::write(nested.join("ignore.txt"), b"hello").expect("write non-pdf");
340
341 let pdfs = PdfBatch::collect_pdfs(&dir).expect("collect pdfs");
342 assert_eq!(pdfs.len(), 2);
343
344 fs::remove_dir_all(dir).ok();
345 }
346
347 fn make_temp_dir(label: &str) -> PathBuf {
348 let nanos = SystemTime::now()
349 .duration_since(UNIX_EPOCH)
350 .expect("time")
351 .as_nanos();
352 let dir = std::env::temp_dir().join(format!(
353 "pdf-engine-batch-{label}-{}-{nanos}",
354 std::process::id()
355 ));
356 fs::create_dir_all(&dir).expect("create temp dir");
357 dir
358 }
359
360 fn minimal_pdf_bytes() -> Vec<u8> {
361 br#"%PDF-1.4
3621 0 obj
363<< /Type /Catalog /Pages 2 0 R >>
364endobj
3652 0 obj
366<< /Type /Pages /Kids [3 0 R] /Count 1 >>
367endobj
3683 0 obj
369<< /Type /Page /Parent 2 0 R /MediaBox [0 0 200 200] >>
370endobj
371xref
3720 4
3730000000000 65535 f
3740000000009 00000 n
3750000000058 00000 n
3760000000115 00000 n
377trailer
378<< /Size 4 /Root 1 0 R >>
379startxref
380186
381%%EOF
382"#
383 .to_vec()
384 }
385}