fleischwolf_pdf/lib.rs
1//! PDF backend for fleischwolf.
2//!
3//! A port of docling's standard PDF pipeline: pdfium extracts the text layer
4//! (cells with bounding boxes) and renders page images; a discriminative ONNX
5//! stack (layout detection, table structure, OCR) classifies regions; the cells
6//! are assembled in reading order into a [`DoclingDocument`].
7//!
8//! Current stages: pdfium text-cell extraction + page rendering ([`pdfium_backend`])
9//! and the deterministic text/reading-order assembly ([`assemble`]). The layout,
10//! table-structure and OCR ONNX stages land behind [`Pipeline`] next.
11
12mod assemble;
13mod dp_lines;
14pub mod layout;
15mod mets;
16mod ocr;
17pub mod pdfium_backend;
18pub mod resample;
19pub mod tableformer;
20pub mod textparse;
21pub mod timing;
22
23use std::collections::BTreeMap;
24use std::fmt;
25use std::sync::mpsc::{sync_channel, Receiver};
26use std::sync::{Arc, Mutex};
27
28use fleischwolf_core::{DoclingDocument, Node};
29
30pub use mets::convert_mets_gbs;
31pub use pdfium_backend::{PdfDocument, PdfPage, TextCell};
32
33/// Errors from the PDF backend. Detailed and surfaced (never silently skipped).
34#[derive(Debug)]
35pub enum PdfError {
36 /// pdfium failed to bind, open, or read the document.
37 Pdfium(String),
38 /// The layout ONNX model failed to load or run.
39 Layout(String),
40 /// The OCR ONNX model failed to load or run.
41 Ocr(String),
42}
43
44impl fmt::Display for PdfError {
45 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
46 match self {
47 PdfError::Pdfium(m) => write!(f, "pdf: pdfium error: {m}"),
48 PdfError::Layout(m) => write!(f, "pdf: {m}"),
49 PdfError::Ocr(m) => write!(f, "pdf: {m}"),
50 }
51 }
52}
53
54impl std::error::Error for PdfError {}
55
56impl From<pdfium_render::prelude::PdfiumError> for PdfError {
57 fn from(e: pdfium_render::prelude::PdfiumError) -> Self {
58 PdfError::Pdfium(e.to_string())
59 }
60}
61
62/// Threads ONNX inference may use, capped by `FLEISCHWOLF_PDF_THREADS` if set.
63/// Defaults to the available parallelism (ort otherwise picks a low number).
64pub(crate) fn intra_threads() -> usize {
65 if let Some(n) = std::env::var("FLEISCHWOLF_PDF_THREADS")
66 .ok()
67 .and_then(|v| v.parse::<usize>().ok())
68 .filter(|&n| n > 0)
69 {
70 return n;
71 }
72 std::thread::available_parallelism()
73 .map(|n| n.get())
74 .unwrap_or(1)
75}
76
77/// One page's assembled output: typed nodes plus the page's hyperlinks, kept
78/// separate so pages processed out of order can be stitched back in page order.
79type PageOut = (Vec<Node>, Vec<(String, String)>);
80
81/// A self-contained set of the per-page models (layout, OCR, TableFormer). Each
82/// parallel page-worker owns its own `Worker` so inference runs concurrently
83/// without sharing an ONNX session (`ort`'s `Session::run` is `&mut self`).
84struct Worker {
85 layout: layout::LayoutModel,
86 ocr: Option<ocr::OcrModel>,
87 /// TableFormer structure model; `None` when its ONNX graphs aren't present
88 /// (the assembler then falls back to geometric table reconstruction).
89 tables: Option<tableformer::TableFormer>,
90}
91
92impl Worker {
93 fn load(intra: usize) -> Result<Self, PdfError> {
94 Ok(Self {
95 layout: layout::LayoutModel::load_with(intra).map_err(PdfError::Layout)?,
96 ocr: None,
97 tables: tableformer::TableFormer::load_with(intra),
98 })
99 }
100
101 /// Run layout (+ OCR for cell-less pages) + TableFormer and assemble page `n`
102 /// into its nodes and links. Pure given the page (mutates only the worker's
103 /// lazily-loaded OCR model), so it is safe to run concurrently across pages.
104 fn process(&mut self, n: usize, page: &mut PdfPage) -> Result<PageOut, PdfError> {
105 let regions = timing::timed("layout.predict", || {
106 self.layout.predict(&page.image, page.width, page.height)
107 })
108 .map_err(|e| PdfError::Layout(format!("page {}: {e}", n + 1)))?;
109 // Resolve overlapping detections once, before OCR.
110 let mut regions = assemble::resolve(regions);
111 // Emit text the detector missed as orphan text regions (docling parity).
112 assemble::add_orphan_regions(&mut regions, &page.cells);
113 // Drop phantom empty low-confidence picture boxes (docling parity).
114 assemble::drop_false_pictures(&mut regions, &page.cells, page.width, page.height);
115 // No text layer → recognise text from the page image via OCR.
116 if page.cells.is_empty() {
117 if self.ocr.is_none() {
118 self.ocr = Some(ocr::OcrModel::load().map_err(PdfError::Ocr)?);
119 }
120 let cells = self
121 .ocr
122 .as_mut()
123 .unwrap()
124 .ocr_page(&page.image, ®ions, page.scale)
125 .map_err(|e| PdfError::Ocr(format!("page {}: {e}", n + 1)))?;
126 page.cells = cells;
127 }
128 // TableFormer structure per table region (else geometric fallback).
129 let mut table_rows: Vec<Option<Vec<Vec<String>>>> = vec![None; regions.len()];
130 if let Some(tf) = self.tables.as_mut() {
131 timing::timed("tableformer", || {
132 for (i, r) in regions.iter().enumerate() {
133 if r.label == "table" {
134 table_rows[i] = tf.predict_table_rows(
135 &page.image,
136 page.height,
137 [r.l, r.t, r.r, r.b],
138 &page.word_cells,
139 );
140 }
141 }
142 });
143 }
144 Ok(timing::timed("assemble_page", || {
145 assemble::assemble_page(page, regions, &table_rows)
146 }))
147 }
148}
149
150/// Per-worker ONNX intra-op threads. The layout model is memory-bandwidth bound,
151/// so on a typical machine two threads per worker (sharing one in-cache copy of
152/// the weights) extracts more throughput than one fat model or many single-thread
153/// workers. `FLEISCHWOLF_PDF_INTRA` overrides for per-machine tuning.
154fn pdf_intra() -> usize {
155 if let Some(n) = std::env::var("FLEISCHWOLF_PDF_INTRA")
156 .ok()
157 .and_then(|v| v.parse::<usize>().ok())
158 .filter(|&n| n > 0)
159 {
160 return n;
161 }
162 if intra_threads() >= 2 {
163 2
164 } else {
165 1
166 }
167}
168
169/// How many page-workers to spin up for a multi-page PDF. `FLEISCHWOLF_PDF_WORKERS`
170/// overrides; otherwise size the pool so `workers × intra ≈ cores`, capped at 4 so
171/// a worst-case pool holds a bounded amount of model memory (~0.4 GB per worker)
172/// and does not oversaturate the memory bus with model-weight traffic.
173fn pdf_worker_count() -> usize {
174 if let Some(n) = std::env::var("FLEISCHWOLF_PDF_WORKERS")
175 .ok()
176 .and_then(|v| v.parse::<usize>().ok())
177 .filter(|&n| n > 0)
178 {
179 return n;
180 }
181 (intra_threads() / pdf_intra()).clamp(1, 4)
182}
183
184/// Minimum page count before a PDF is worth the parallel worker pool. Below this,
185/// the serial primary (running its model on every core) is faster than fanning out
186/// — the helper pool's one-time model-load cost only pays off once enough pages
187/// share it. `FLEISCHWOLF_PDF_PARALLEL_MIN` overrides.
188fn pdf_parallel_min() -> usize {
189 std::env::var("FLEISCHWOLF_PDF_PARALLEL_MIN")
190 .ok()
191 .and_then(|v| v.parse::<usize>().ok())
192 .filter(|&n| n > 0)
193 .unwrap_or(6)
194}
195
196/// A reusable PDF pipeline. The **primary** worker runs its models on every core,
197/// so a single-page / small / image / METS input is converted at full intra-op
198/// speed with no pool to load. A document with enough pages instead fans out
199/// across a **pool** of narrower workers processed concurrently. Both load lazily
200/// and are cached for reuse, so a one-shot conversion only pays for what it uses.
201pub struct Pipeline {
202 /// Full-intra worker for the serial path; loaded on first serial use.
203 primary: Option<Worker>,
204 /// Narrower workers (≈cores/`target_workers` threads each) for the parallel
205 /// path; loaded on first multi-page use and cached.
206 pool: Vec<Worker>,
207 /// Desired pool size for multi-page documents.
208 target_workers: usize,
209 /// Page count at/above which the parallel pool is worth its load cost.
210 parallel_min: usize,
211}
212
213impl Pipeline {
214 /// Construct the pipeline. Models load lazily on first use (full-intra primary
215 /// for serial inputs, the helper pool for multi-page PDFs), so nothing is
216 /// loaded that a given document doesn't need.
217 pub fn new() -> Result<Self, PdfError> {
218 Ok(Self {
219 primary: None,
220 pool: Vec::new(),
221 target_workers: pdf_worker_count(),
222 parallel_min: pdf_parallel_min(),
223 })
224 }
225
226 /// The full-intra serial worker, loaded on first use.
227 fn primary(&mut self) -> Result<&mut Worker, PdfError> {
228 if self.primary.is_none() {
229 self.primary = Some(Worker::load(intra_threads())?);
230 }
231 Ok(self.primary.as_mut().unwrap())
232 }
233
234 /// Convert a PDF (bytes) to a [`DoclingDocument`]. A document with fewer than
235 /// `parallel_min` pages (or a pool size of 1) streams through the full-intra
236 /// primary; a larger one renders on this thread (pdfium is not thread-safe) and
237 /// fans the pages out across the worker pool, reassembled in page order so the
238 /// output is byte-identical to the serial path.
239 pub fn convert(
240 &mut self,
241 bytes: &[u8],
242 password: Option<&str>,
243 name: &str,
244 ) -> Result<DoclingDocument, PdfError> {
245 let pages = pdfium_backend::page_count(bytes, password)?;
246 let doc = if self.target_workers >= 2 && pages >= self.parallel_min {
247 self.convert_parallel(bytes, password, name)?
248 } else {
249 self.convert_serial(bytes, password, name)?
250 };
251 timing::report();
252 Ok(doc)
253 }
254
255 /// Stream pages one at a time through the primary worker — render → process →
256 /// drop — so the document holds ~one page bitmap (~5 MB) at a time.
257 fn convert_serial(
258 &mut self,
259 bytes: &[u8],
260 password: Option<&str>,
261 name: &str,
262 ) -> Result<DoclingDocument, PdfError> {
263 let mut doc = DoclingDocument::new(name);
264 let worker = self.primary()?;
265 pdfium_backend::for_each_page(bytes, password, |n, _total, mut page| {
266 let (nodes, links) = worker.process(n, &mut page)?;
267 doc.nodes.extend(nodes);
268 doc.links.extend(links);
269 Ok::<(), PdfError>(())
270 })?;
271 assemble::merge_continuations(&mut doc.nodes);
272 Ok(doc)
273 }
274
275 /// Render pages serially on this thread (pdfium) and process them in parallel
276 /// across the worker pool. A bounded channel applies backpressure so only a
277 /// handful of page bitmaps are resident at once; results carry their page
278 /// index and are reassembled in order, so the output is byte-identical to the
279 /// serial path.
280 fn convert_parallel(
281 &mut self,
282 bytes: &[u8],
283 password: Option<&str>,
284 name: &str,
285 ) -> Result<DoclingDocument, PdfError> {
286 self.ensure_pool()?;
287 let n_workers = self.pool.len();
288 let (work_tx, work_rx) = sync_channel::<(usize, PdfPage)>(n_workers * 2);
289 let work_rx: Arc<Mutex<Receiver<(usize, PdfPage)>>> = Arc::new(Mutex::new(work_rx));
290 let results: Arc<Mutex<Vec<(usize, PageOut)>>> = Arc::new(Mutex::new(Vec::new()));
291 let first_err: Arc<Mutex<Option<PdfError>>> = Arc::new(Mutex::new(None));
292
293 // Move the pool into the scope so each worker gets an exclusive `&mut`.
294 let mut workers = std::mem::take(&mut self.pool);
295 std::thread::scope(|s| {
296 for worker in workers.iter_mut() {
297 let work_rx = Arc::clone(&work_rx);
298 let results = Arc::clone(&results);
299 let first_err = Arc::clone(&first_err);
300 s.spawn(move || loop {
301 // Hold the receiver lock only for the recv; release before the
302 // (long) per-page work so other workers can pull concurrently.
303 let item = work_rx.lock().unwrap().recv();
304 let Ok((idx, mut page)) = item else { break };
305 match worker.process(idx, &mut page) {
306 Ok(out) => results.lock().unwrap().push((idx, out)),
307 Err(e) => {
308 let mut slot = first_err.lock().unwrap();
309 if slot.is_none() {
310 *slot = Some(e);
311 }
312 }
313 }
314 });
315 }
316 // Render on this thread and feed the workers; backpressure blocks here
317 // when the channel is full. Dropping `work_tx` afterwards signals the
318 // workers (recv → Err) to finish.
319 let render = pdfium_backend::for_each_page(bytes, password, |i, _total, page| {
320 work_tx
321 .send((i, page))
322 .map_err(|_| PdfError::Pdfium("page-worker channel closed".into()))
323 });
324 drop(work_tx);
325 if let Err(e) = render {
326 let mut slot = first_err.lock().unwrap();
327 if slot.is_none() {
328 *slot = Some(e);
329 }
330 }
331 });
332 // Threads have joined; restore the pool for the next conversion.
333 self.pool = workers;
334
335 if let Some(e) = first_err.lock().unwrap().take() {
336 return Err(e);
337 }
338 let mut results = Arc::try_unwrap(results)
339 .unwrap_or_else(|arc| Mutex::new(arc.lock().unwrap().clone()))
340 .into_inner()
341 .unwrap();
342 results.sort_by_key(|(idx, _)| *idx);
343 let mut doc = DoclingDocument::new(name);
344 for (_, (nodes, links)) in results {
345 doc.nodes.extend(nodes);
346 doc.links.extend(links);
347 }
348 assemble::merge_continuations(&mut doc.nodes);
349 Ok(doc)
350 }
351
352 /// Convert a PDF in **streaming** mode: `emit` is called with each finalized,
353 /// in-document-order batch of nodes (and that span's recovered links) as pages
354 /// complete, so a caller can serialize Markdown page by page instead of waiting
355 /// for the whole document. The batches are exactly the buffered [`convert`]'s
356 /// nodes, split at safe block boundaries by [`assemble::StreamAssembler`] — the
357 /// parallel path reorders pages back into document order before emitting, so
358 /// the output is identical regardless of worker scheduling.
359 ///
360 /// `emit` runs on the calling thread (never a worker), so it needn't be `Send`
361 /// and its backpressure throttles the whole pipeline. Returning `Err` from
362 /// `emit` aborts the conversion with that error.
363 pub fn convert_streaming<F>(
364 &mut self,
365 bytes: &[u8],
366 password: Option<&str>,
367 name: &str,
368 emit: F,
369 ) -> Result<(), PdfError>
370 where
371 F: FnMut(Vec<Node>, Vec<(String, String)>) -> Result<(), PdfError>,
372 {
373 let _ = name; // page nodes carry no name; the caller owns the document name.
374 let pages = pdfium_backend::page_count(bytes, password)?;
375 let r = if self.target_workers >= 2 && pages >= self.parallel_min {
376 self.convert_streaming_parallel(bytes, password, emit)
377 } else {
378 self.convert_streaming_serial(bytes, password, emit)
379 };
380 timing::report();
381 r
382 }
383
384 /// Serial streaming: render → process → emit, one page at a time, holding back
385 /// only the tail that might still merge into the next page.
386 fn convert_streaming_serial<F>(
387 &mut self,
388 bytes: &[u8],
389 password: Option<&str>,
390 mut emit: F,
391 ) -> Result<(), PdfError>
392 where
393 F: FnMut(Vec<Node>, Vec<(String, String)>) -> Result<(), PdfError>,
394 {
395 let mut asm = assemble::StreamAssembler::new();
396 let worker = self.primary()?;
397 pdfium_backend::for_each_page(bytes, password, |n, _total, mut page| {
398 let (nodes, links) = worker.process(n, &mut page)?;
399 emit(asm.push(nodes), links)
400 })?;
401 emit(asm.finish(), Vec::new())
402 }
403
404 /// Parallel streaming: pages render serially on a dedicated thread (pdfium is
405 /// not thread-safe) and process across the worker pool; results carry their
406 /// page index and are reordered on the calling thread into a
407 /// [`assemble::StreamAssembler`], which emits each page in document order as
408 /// soon as its predecessors have arrived. Bounded channels keep only a handful
409 /// of pages resident and let `emit`'s backpressure reach the renderer.
410 fn convert_streaming_parallel<F>(
411 &mut self,
412 bytes: &[u8],
413 password: Option<&str>,
414 mut emit: F,
415 ) -> Result<(), PdfError>
416 where
417 F: FnMut(Vec<Node>, Vec<(String, String)>) -> Result<(), PdfError>,
418 {
419 self.ensure_pool()?;
420 let n_workers = self.pool.len();
421 let (work_tx, work_rx) = sync_channel::<(usize, PdfPage)>(n_workers * 2);
422 let work_rx: Arc<Mutex<Receiver<(usize, PdfPage)>>> = Arc::new(Mutex::new(work_rx));
423 // Workers and the renderer report here; the calling thread drains it in
424 // page order. Bounded so workers block (bounding resident bitmaps) when the
425 // consumer falls behind.
426 let (res_tx, res_rx) = sync_channel::<Result<(usize, PageOut), PdfError>>(n_workers * 2);
427
428 let mut workers = std::mem::take(&mut self.pool);
429 let mut asm = assemble::StreamAssembler::new();
430 let mut first_err: Option<PdfError> = None;
431
432 std::thread::scope(|s| {
433 // Workers: pull a page, process it, report (index-tagged) result.
434 for worker in workers.iter_mut() {
435 let work_rx = Arc::clone(&work_rx);
436 let res_tx = res_tx.clone();
437 s.spawn(move || loop {
438 let item = work_rx.lock().unwrap().recv();
439 let Ok((idx, mut page)) = item else { break };
440 let out = worker.process(idx, &mut page).map(|o| (idx, o));
441 if res_tx.send(out).is_err() {
442 break; // consumer gone
443 }
444 });
445 }
446 // Renderer: feed pages to the pool on its own thread (pdfium stays on a
447 // single thread); report a render error through the same channel.
448 {
449 let res_tx = res_tx.clone();
450 s.spawn(move || {
451 let render =
452 pdfium_backend::for_each_page(bytes, password, |i, _total, page| {
453 work_tx
454 .send((i, page))
455 .map_err(|_| PdfError::Pdfium("page-worker channel closed".into()))
456 });
457 drop(work_tx); // signal workers to finish
458 if let Err(e) = render {
459 let _ = res_tx.send(Err(e));
460 }
461 });
462 }
463 // Drop our own sender so the channel closes once the threads finish.
464 drop(res_tx);
465
466 // Collector (this thread): reorder into document order and emit.
467 let mut buffer: BTreeMap<usize, PageOut> = BTreeMap::new();
468 let mut next = 0usize;
469 for msg in res_rx.iter() {
470 match msg {
471 Err(e) => {
472 if first_err.is_none() {
473 first_err = Some(e);
474 }
475 }
476 Ok((idx, out)) => {
477 buffer.insert(idx, out);
478 if first_err.is_some() {
479 continue; // keep draining so the threads can exit
480 }
481 while let Some((nodes, links)) = buffer.remove(&next) {
482 if let Err(e) = emit(asm.push(nodes), links) {
483 first_err = Some(e);
484 break;
485 }
486 next += 1;
487 }
488 }
489 }
490 }
491 });
492 // Threads have joined; restore the pool for the next conversion.
493 self.pool = workers;
494
495 if let Some(e) = first_err {
496 return Err(e);
497 }
498 emit(asm.finish(), Vec::new())
499 }
500
501 /// Lazily grow the pool to `target_workers`, loading the new workers
502 /// concurrently (model load is mostly I/O + mmap, so N loads overlap to roughly
503 /// one load's wall-time). Cached for reuse across documents.
504 fn ensure_pool(&mut self) -> Result<(), PdfError> {
505 let need = self.target_workers.saturating_sub(self.pool.len());
506 if need == 0 {
507 return Ok(());
508 }
509 let intra = pdf_intra();
510 let loaded: Vec<Result<Worker, PdfError>> = std::thread::scope(|s| {
511 let handles: Vec<_> = (0..need)
512 .map(|_| s.spawn(move || Worker::load(intra)))
513 .collect();
514 handles.into_iter().map(|h| h.join().unwrap()).collect()
515 });
516 for w in loaded {
517 self.pool.push(w?);
518 }
519 Ok(())
520 }
521
522 /// Convert a standalone image (PNG/JPEG/TIFF/WebP/…) as a single page —
523 /// docling routes images through the same layout+OCR pipeline as a PDF page.
524 pub fn convert_image(&mut self, bytes: &[u8], name: &str) -> Result<DoclingDocument, PdfError> {
525 let image = image::load_from_memory(bytes)
526 .map_err(|e| PdfError::Pdfium(format!("image: {e}")))?
527 .into_rgb8();
528 let (w, h) = image.dimensions();
529 // The image is its own page rendered at 1 px per "point" (scale 1.0); a
530 // standalone image has no text layer, so OCR supplies the cells.
531 let page = PdfPage {
532 width: w as f32,
533 height: h as f32,
534 scale: 1.0,
535 cells: Vec::new(),
536 code_cells: Vec::new(),
537 word_cells: Vec::new(),
538 image,
539 links: Vec::new(),
540 };
541 self.process_pages(vec![page], name)
542 }
543
544 /// Run layout (+ OCR for cell-less pages) and assemble each already-rendered
545 /// page (image / METS inputs, which are small and already materialised).
546 fn process_pages(
547 &mut self,
548 mut pages: Vec<PdfPage>,
549 name: &str,
550 ) -> Result<DoclingDocument, PdfError> {
551 let mut doc = DoclingDocument::new(name);
552 let worker = self.primary()?;
553 for (n, page) in pages.iter_mut().enumerate() {
554 let (nodes, links) = worker.process(n, page)?;
555 doc.nodes.extend(nodes);
556 doc.links.extend(links);
557 }
558 assemble::merge_continuations(&mut doc.nodes);
559 Ok(doc)
560 }
561}
562
563/// Convenience one-shot conversion (loads the pipeline per call). Errors are
564/// detailed and surfaced (never silently skipped).
565pub fn convert(
566 bytes: &[u8],
567 password: Option<&str>,
568 name: &str,
569) -> Result<DoclingDocument, PdfError> {
570 Pipeline::new()?.convert(bytes, password, name)
571}
572
573/// Convenience one-shot image conversion (loads the pipeline per call).
574pub fn convert_image(bytes: &[u8], name: &str) -> Result<DoclingDocument, PdfError> {
575 Pipeline::new()?.convert_image(bytes, name)
576}
577
578/// Convert pre-segmented pages (image + already-known text cells, e.g. METS/hOCR
579/// scans) through the shared layout + assembly pipeline.
580pub fn convert_pages(pages: Vec<PdfPage>, name: &str) -> Result<DoclingDocument, PdfError> {
581 Pipeline::new()?.process_pages(pages, name)
582}