Skip to main content

fleischwolf_pdf/
lib.rs

1//! PDF backend for fleischwolf.
2//!
3//! A port of docling's standard PDF pipeline: pdfium extracts the text layer
4//! (cells with bounding boxes) and renders page images; a discriminative ONNX
5//! stack (layout detection, table structure, OCR) classifies regions; the cells
6//! are assembled in reading order into a [`DoclingDocument`].
7//!
8//! Current stages: pdfium text-cell extraction + page rendering ([`pdfium_backend`])
9//! and the deterministic text/reading-order assembly ([`assemble`]). The layout,
10//! table-structure and OCR ONNX stages land behind [`Pipeline`] next.
11
12mod assemble;
13mod dp_lines;
14pub mod layout;
15mod mets;
16mod ocr;
17pub mod pdfium_backend;
18pub mod resample;
19pub mod tableformer;
20pub mod textparse;
21pub mod timing;
22
23use std::fmt;
24use std::sync::mpsc::{sync_channel, Receiver};
25use std::sync::{Arc, Mutex};
26
27use fleischwolf_core::{DoclingDocument, Node};
28
29pub use mets::convert_mets_gbs;
30pub use pdfium_backend::{PdfDocument, PdfPage, TextCell};
31
32/// Errors from the PDF backend. Detailed and surfaced (never silently skipped).
33#[derive(Debug)]
34pub enum PdfError {
35    /// pdfium failed to bind, open, or read the document.
36    Pdfium(String),
37    /// The layout ONNX model failed to load or run.
38    Layout(String),
39    /// The OCR ONNX model failed to load or run.
40    Ocr(String),
41}
42
43impl fmt::Display for PdfError {
44    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
45        match self {
46            PdfError::Pdfium(m) => write!(f, "pdf: pdfium error: {m}"),
47            PdfError::Layout(m) => write!(f, "pdf: {m}"),
48            PdfError::Ocr(m) => write!(f, "pdf: {m}"),
49        }
50    }
51}
52
53impl std::error::Error for PdfError {}
54
55impl From<pdfium_render::prelude::PdfiumError> for PdfError {
56    fn from(e: pdfium_render::prelude::PdfiumError) -> Self {
57        PdfError::Pdfium(e.to_string())
58    }
59}
60
61/// Threads ONNX inference may use, capped by `FLEISCHWOLF_PDF_THREADS` if set.
62/// Defaults to the available parallelism (ort otherwise picks a low number).
63pub(crate) fn intra_threads() -> usize {
64    if let Some(n) = std::env::var("FLEISCHWOLF_PDF_THREADS")
65        .ok()
66        .and_then(|v| v.parse::<usize>().ok())
67        .filter(|&n| n > 0)
68    {
69        return n;
70    }
71    std::thread::available_parallelism()
72        .map(|n| n.get())
73        .unwrap_or(1)
74}
75
76/// One page's assembled output: typed nodes plus the page's hyperlinks, kept
77/// separate so pages processed out of order can be stitched back in page order.
78type PageOut = (Vec<Node>, Vec<(String, String)>);
79
80/// A self-contained set of the per-page models (layout, OCR, TableFormer). Each
81/// parallel page-worker owns its own `Worker` so inference runs concurrently
82/// without sharing an ONNX session (`ort`'s `Session::run` is `&mut self`).
83struct Worker {
84    layout: layout::LayoutModel,
85    ocr: Option<ocr::OcrModel>,
86    /// TableFormer structure model; `None` when its ONNX graphs aren't present
87    /// (the assembler then falls back to geometric table reconstruction).
88    tables: Option<tableformer::TableFormer>,
89}
90
91impl Worker {
92    fn load(intra: usize) -> Result<Self, PdfError> {
93        Ok(Self {
94            layout: layout::LayoutModel::load_with(intra).map_err(PdfError::Layout)?,
95            ocr: None,
96            tables: tableformer::TableFormer::load_with(intra),
97        })
98    }
99
100    /// Run layout (+ OCR for cell-less pages) + TableFormer and assemble page `n`
101    /// into its nodes and links. Pure given the page (mutates only the worker's
102    /// lazily-loaded OCR model), so it is safe to run concurrently across pages.
103    fn process(&mut self, n: usize, page: &mut PdfPage) -> Result<PageOut, PdfError> {
104        let regions = timing::timed("layout.predict", || {
105            self.layout.predict(&page.image, page.width, page.height)
106        })
107        .map_err(|e| PdfError::Layout(format!("page {}: {e}", n + 1)))?;
108        // Resolve overlapping detections once, before OCR.
109        let mut regions = assemble::resolve(regions);
110        // Emit text the detector missed as orphan text regions (docling parity).
111        assemble::add_orphan_regions(&mut regions, &page.cells);
112        // Drop phantom empty low-confidence picture boxes (docling parity).
113        assemble::drop_false_pictures(&mut regions, &page.cells, page.width, page.height);
114        // No text layer → recognise text from the page image via OCR.
115        if page.cells.is_empty() {
116            if self.ocr.is_none() {
117                self.ocr = Some(ocr::OcrModel::load().map_err(PdfError::Ocr)?);
118            }
119            let cells = self
120                .ocr
121                .as_mut()
122                .unwrap()
123                .ocr_page(&page.image, &regions, page.scale)
124                .map_err(|e| PdfError::Ocr(format!("page {}: {e}", n + 1)))?;
125            page.cells = cells;
126        }
127        // TableFormer structure per table region (else geometric fallback).
128        let mut table_rows: Vec<Option<Vec<Vec<String>>>> = vec![None; regions.len()];
129        if let Some(tf) = self.tables.as_mut() {
130            timing::timed("tableformer", || {
131                for (i, r) in regions.iter().enumerate() {
132                    if r.label == "table" {
133                        table_rows[i] = tf.predict_table_rows(
134                            &page.image,
135                            page.height,
136                            [r.l, r.t, r.r, r.b],
137                            &page.word_cells,
138                        );
139                    }
140                }
141            });
142        }
143        Ok(timing::timed("assemble_page", || {
144            assemble::assemble_page(page, regions, &table_rows)
145        }))
146    }
147}
148
149/// Per-worker ONNX intra-op threads. The layout model is memory-bandwidth bound,
150/// so on a typical machine two threads per worker (sharing one in-cache copy of
151/// the weights) extracts more throughput than one fat model or many single-thread
152/// workers. `FLEISCHWOLF_PDF_INTRA` overrides for per-machine tuning.
153fn pdf_intra() -> usize {
154    if let Some(n) = std::env::var("FLEISCHWOLF_PDF_INTRA")
155        .ok()
156        .and_then(|v| v.parse::<usize>().ok())
157        .filter(|&n| n > 0)
158    {
159        return n;
160    }
161    if intra_threads() >= 2 {
162        2
163    } else {
164        1
165    }
166}
167
168/// How many page-workers to spin up for a multi-page PDF. `FLEISCHWOLF_PDF_WORKERS`
169/// overrides; otherwise size the pool so `workers × intra ≈ cores`, capped at 4 so
170/// a worst-case pool holds a bounded amount of model memory (~0.4 GB per worker)
171/// and does not oversaturate the memory bus with model-weight traffic.
172fn pdf_worker_count() -> usize {
173    if let Some(n) = std::env::var("FLEISCHWOLF_PDF_WORKERS")
174        .ok()
175        .and_then(|v| v.parse::<usize>().ok())
176        .filter(|&n| n > 0)
177    {
178        return n;
179    }
180    (intra_threads() / pdf_intra()).clamp(1, 4)
181}
182
183/// Minimum page count before a PDF is worth the parallel worker pool. Below this,
184/// the serial primary (running its model on every core) is faster than fanning out
185/// — the helper pool's one-time model-load cost only pays off once enough pages
186/// share it. `FLEISCHWOLF_PDF_PARALLEL_MIN` overrides.
187fn pdf_parallel_min() -> usize {
188    std::env::var("FLEISCHWOLF_PDF_PARALLEL_MIN")
189        .ok()
190        .and_then(|v| v.parse::<usize>().ok())
191        .filter(|&n| n > 0)
192        .unwrap_or(6)
193}
194
195/// A reusable PDF pipeline. The **primary** worker runs its models on every core,
196/// so a single-page / small / image / METS input is converted at full intra-op
197/// speed with no pool to load. A document with enough pages instead fans out
198/// across a **pool** of narrower workers processed concurrently. Both load lazily
199/// and are cached for reuse, so a one-shot conversion only pays for what it uses.
200pub struct Pipeline {
201    /// Full-intra worker for the serial path; loaded on first serial use.
202    primary: Option<Worker>,
203    /// Narrower workers (≈cores/`target_workers` threads each) for the parallel
204    /// path; loaded on first multi-page use and cached.
205    pool: Vec<Worker>,
206    /// Desired pool size for multi-page documents.
207    target_workers: usize,
208    /// Page count at/above which the parallel pool is worth its load cost.
209    parallel_min: usize,
210}
211
212impl Pipeline {
213    /// Construct the pipeline. Models load lazily on first use (full-intra primary
214    /// for serial inputs, the helper pool for multi-page PDFs), so nothing is
215    /// loaded that a given document doesn't need.
216    pub fn new() -> Result<Self, PdfError> {
217        Ok(Self {
218            primary: None,
219            pool: Vec::new(),
220            target_workers: pdf_worker_count(),
221            parallel_min: pdf_parallel_min(),
222        })
223    }
224
225    /// The full-intra serial worker, loaded on first use.
226    fn primary(&mut self) -> Result<&mut Worker, PdfError> {
227        if self.primary.is_none() {
228            self.primary = Some(Worker::load(intra_threads())?);
229        }
230        Ok(self.primary.as_mut().unwrap())
231    }
232
233    /// Convert a PDF (bytes) to a [`DoclingDocument`]. A document with fewer than
234    /// `parallel_min` pages (or a pool size of 1) streams through the full-intra
235    /// primary; a larger one renders on this thread (pdfium is not thread-safe) and
236    /// fans the pages out across the worker pool, reassembled in page order so the
237    /// output is byte-identical to the serial path.
238    pub fn convert(
239        &mut self,
240        bytes: &[u8],
241        password: Option<&str>,
242        name: &str,
243    ) -> Result<DoclingDocument, PdfError> {
244        let pages = pdfium_backend::page_count(bytes, password)?;
245        let doc = if self.target_workers >= 2 && pages >= self.parallel_min {
246            self.convert_parallel(bytes, password, name)?
247        } else {
248            self.convert_serial(bytes, password, name)?
249        };
250        timing::report();
251        Ok(doc)
252    }
253
254    /// Stream pages one at a time through the primary worker — render → process →
255    /// drop — so the document holds ~one page bitmap (~5 MB) at a time.
256    fn convert_serial(
257        &mut self,
258        bytes: &[u8],
259        password: Option<&str>,
260        name: &str,
261    ) -> Result<DoclingDocument, PdfError> {
262        let mut doc = DoclingDocument::new(name);
263        let worker = self.primary()?;
264        pdfium_backend::for_each_page(bytes, password, |n, _total, mut page| {
265            let (nodes, links) = worker.process(n, &mut page)?;
266            doc.nodes.extend(nodes);
267            doc.links.extend(links);
268            Ok::<(), PdfError>(())
269        })?;
270        assemble::merge_continuations(&mut doc.nodes);
271        Ok(doc)
272    }
273
274    /// Render pages serially on this thread (pdfium) and process them in parallel
275    /// across the worker pool. A bounded channel applies backpressure so only a
276    /// handful of page bitmaps are resident at once; results carry their page
277    /// index and are reassembled in order, so the output is byte-identical to the
278    /// serial path.
279    fn convert_parallel(
280        &mut self,
281        bytes: &[u8],
282        password: Option<&str>,
283        name: &str,
284    ) -> Result<DoclingDocument, PdfError> {
285        self.ensure_pool()?;
286        let n_workers = self.pool.len();
287        let (work_tx, work_rx) = sync_channel::<(usize, PdfPage)>(n_workers * 2);
288        let work_rx: Arc<Mutex<Receiver<(usize, PdfPage)>>> = Arc::new(Mutex::new(work_rx));
289        let results: Arc<Mutex<Vec<(usize, PageOut)>>> = Arc::new(Mutex::new(Vec::new()));
290        let first_err: Arc<Mutex<Option<PdfError>>> = Arc::new(Mutex::new(None));
291
292        // Move the pool into the scope so each worker gets an exclusive `&mut`.
293        let mut workers = std::mem::take(&mut self.pool);
294        std::thread::scope(|s| {
295            for worker in workers.iter_mut() {
296                let work_rx = Arc::clone(&work_rx);
297                let results = Arc::clone(&results);
298                let first_err = Arc::clone(&first_err);
299                s.spawn(move || loop {
300                    // Hold the receiver lock only for the recv; release before the
301                    // (long) per-page work so other workers can pull concurrently.
302                    let item = work_rx.lock().unwrap().recv();
303                    let Ok((idx, mut page)) = item else { break };
304                    match worker.process(idx, &mut page) {
305                        Ok(out) => results.lock().unwrap().push((idx, out)),
306                        Err(e) => {
307                            let mut slot = first_err.lock().unwrap();
308                            if slot.is_none() {
309                                *slot = Some(e);
310                            }
311                        }
312                    }
313                });
314            }
315            // Render on this thread and feed the workers; backpressure blocks here
316            // when the channel is full. Dropping `work_tx` afterwards signals the
317            // workers (recv → Err) to finish.
318            let render = pdfium_backend::for_each_page(bytes, password, |i, _total, page| {
319                work_tx
320                    .send((i, page))
321                    .map_err(|_| PdfError::Pdfium("page-worker channel closed".into()))
322            });
323            drop(work_tx);
324            if let Err(e) = render {
325                let mut slot = first_err.lock().unwrap();
326                if slot.is_none() {
327                    *slot = Some(e);
328                }
329            }
330        });
331        // Threads have joined; restore the pool for the next conversion.
332        self.pool = workers;
333
334        if let Some(e) = first_err.lock().unwrap().take() {
335            return Err(e);
336        }
337        let mut results = Arc::try_unwrap(results)
338            .unwrap_or_else(|arc| Mutex::new(arc.lock().unwrap().clone()))
339            .into_inner()
340            .unwrap();
341        results.sort_by_key(|(idx, _)| *idx);
342        let mut doc = DoclingDocument::new(name);
343        for (_, (nodes, links)) in results {
344            doc.nodes.extend(nodes);
345            doc.links.extend(links);
346        }
347        assemble::merge_continuations(&mut doc.nodes);
348        Ok(doc)
349    }
350
351    /// Lazily grow the pool to `target_workers`, loading the new workers
352    /// concurrently (model load is mostly I/O + mmap, so N loads overlap to roughly
353    /// one load's wall-time). Cached for reuse across documents.
354    fn ensure_pool(&mut self) -> Result<(), PdfError> {
355        let need = self.target_workers.saturating_sub(self.pool.len());
356        if need == 0 {
357            return Ok(());
358        }
359        let intra = pdf_intra();
360        let loaded: Vec<Result<Worker, PdfError>> = std::thread::scope(|s| {
361            let handles: Vec<_> = (0..need)
362                .map(|_| s.spawn(move || Worker::load(intra)))
363                .collect();
364            handles.into_iter().map(|h| h.join().unwrap()).collect()
365        });
366        for w in loaded {
367            self.pool.push(w?);
368        }
369        Ok(())
370    }
371
372    /// Convert a standalone image (PNG/JPEG/TIFF/WebP/…) as a single page —
373    /// docling routes images through the same layout+OCR pipeline as a PDF page.
374    pub fn convert_image(&mut self, bytes: &[u8], name: &str) -> Result<DoclingDocument, PdfError> {
375        let image = image::load_from_memory(bytes)
376            .map_err(|e| PdfError::Pdfium(format!("image: {e}")))?
377            .into_rgb8();
378        let (w, h) = image.dimensions();
379        // The image is its own page rendered at 1 px per "point" (scale 1.0); a
380        // standalone image has no text layer, so OCR supplies the cells.
381        let page = PdfPage {
382            width: w as f32,
383            height: h as f32,
384            scale: 1.0,
385            cells: Vec::new(),
386            code_cells: Vec::new(),
387            word_cells: Vec::new(),
388            image,
389            links: Vec::new(),
390        };
391        self.process_pages(vec![page], name)
392    }
393
394    /// Run layout (+ OCR for cell-less pages) and assemble each already-rendered
395    /// page (image / METS inputs, which are small and already materialised).
396    fn process_pages(
397        &mut self,
398        mut pages: Vec<PdfPage>,
399        name: &str,
400    ) -> Result<DoclingDocument, PdfError> {
401        let mut doc = DoclingDocument::new(name);
402        let worker = self.primary()?;
403        for (n, page) in pages.iter_mut().enumerate() {
404            let (nodes, links) = worker.process(n, page)?;
405            doc.nodes.extend(nodes);
406            doc.links.extend(links);
407        }
408        assemble::merge_continuations(&mut doc.nodes);
409        Ok(doc)
410    }
411}
412
413/// Convenience one-shot conversion (loads the pipeline per call). Errors are
414/// detailed and surfaced (never silently skipped).
415pub fn convert(
416    bytes: &[u8],
417    password: Option<&str>,
418    name: &str,
419) -> Result<DoclingDocument, PdfError> {
420    Pipeline::new()?.convert(bytes, password, name)
421}
422
423/// Convenience one-shot image conversion (loads the pipeline per call).
424pub fn convert_image(bytes: &[u8], name: &str) -> Result<DoclingDocument, PdfError> {
425    Pipeline::new()?.convert_image(bytes, name)
426}
427
428/// Convert pre-segmented pages (image + already-known text cells, e.g. METS/hOCR
429/// scans) through the shared layout + assembly pipeline.
430pub fn convert_pages(pages: Vec<PdfPage>, name: &str) -> Result<DoclingDocument, PdfError> {
431    Pipeline::new()?.process_pages(pages, name)
432}