Skip to main content

fleischwolf_pdf/
lib.rs

1//! PDF backend for fleischwolf.
2//!
3//! A port of docling's standard PDF pipeline: pdfium extracts the text layer
4//! (cells with bounding boxes) and renders page images; a discriminative ONNX
5//! stack (layout detection, table structure, OCR) classifies regions; the cells
6//! are assembled in reading order into a [`DoclingDocument`].
7//!
8//! Current stages: pdfium text-cell extraction + page rendering ([`pdfium_backend`])
9//! and the deterministic text/reading-order assembly ([`assemble`]). The layout,
10//! table-structure and OCR ONNX stages land behind [`Pipeline`] next.
11
12mod assemble;
13mod dp_lines;
14pub mod layout;
15mod mets;
16mod ocr;
17pub mod pdfium_backend;
18pub mod resample;
19pub mod tableformer;
20pub mod textparse;
21pub mod timing;
22
23use std::collections::BTreeMap;
24use std::fmt;
25use std::sync::mpsc::{sync_channel, Receiver};
26use std::sync::{Arc, Mutex};
27
28use fleischwolf_core::{DoclingDocument, Node};
29
30pub use mets::{convert_mets_gbs, convert_mets_gbs_with_options};
31pub use pdfium_backend::{PdfDocument, PdfPage, TextCell};
32
33/// Errors from the PDF backend. Detailed and surfaced (never silently skipped).
34#[derive(Debug)]
35pub enum PdfError {
36    /// pdfium failed to bind, open, or read the document.
37    Pdfium(String),
38    /// The layout ONNX model failed to load or run.
39    Layout(String),
40    /// The OCR ONNX model failed to load or run.
41    Ocr(String),
42}
43
44impl fmt::Display for PdfError {
45    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
46        match self {
47            PdfError::Pdfium(m) => write!(f, "pdf: pdfium error: {m}"),
48            PdfError::Layout(m) => write!(f, "pdf: {m}"),
49            PdfError::Ocr(m) => write!(f, "pdf: {m}"),
50        }
51    }
52}
53
54impl std::error::Error for PdfError {}
55
56impl From<pdfium_render::prelude::PdfiumError> for PdfError {
57    fn from(e: pdfium_render::prelude::PdfiumError) -> Self {
58        PdfError::Pdfium(e.to_string())
59    }
60}
61
62/// Threads ONNX inference may use, capped by `FLEISCHWOLF_PDF_THREADS` if set.
63/// Defaults to the available parallelism (ort otherwise picks a low number).
64pub(crate) fn intra_threads() -> usize {
65    if let Some(n) = std::env::var("FLEISCHWOLF_PDF_THREADS")
66        .ok()
67        .and_then(|v| v.parse::<usize>().ok())
68        .filter(|&n| n > 0)
69    {
70        return n;
71    }
72    std::thread::available_parallelism()
73        .map(|n| n.get())
74        .unwrap_or(1)
75}
76
77/// True when `FLEISCHWOLF_FP32` (any value but `0`) forces the full-precision
78/// models even where an INT8 variant sits next to the fp32 default.
79pub(crate) fn fp32_forced() -> bool {
80    std::env::var("FLEISCHWOLF_FP32")
81        .map(|v| v != "0")
82        .unwrap_or(false)
83}
84
85/// Resolve a model path: an explicit env override always wins; otherwise the
86/// INT8 variant of the default path when it exists on disk (the quantized
87/// models are conformance-validated — see PDF_PERFORMANCE.md — and load/run
88/// markedly faster on CPU), unless `FLEISCHWOLF_FP32` opts back into full
89/// precision; else the fp32 default.
90pub(crate) fn model_path(env: &str, fp32_default: &str, int8_default: &str) -> String {
91    if let Ok(p) = std::env::var(env) {
92        return p;
93    }
94    if !fp32_forced() && std::path::Path::new(int8_default).exists() {
95        return int8_default.to_string();
96    }
97    fp32_default.to_string()
98}
99
100/// One page's assembled output: typed nodes plus the page's hyperlinks, kept
101/// separate so pages processed out of order can be stitched back in page order.
102type PageOut = (Vec<Node>, Vec<(String, String)>);
103
104/// A self-contained set of the per-page models (layout, OCR, TableFormer). Each
105/// parallel page-worker owns its own `Worker` so inference runs concurrently
106/// without sharing an ONNX session (`ort`'s `Session::run` is `&mut self`).
107struct Worker {
108    /// `None` when `no_ocr` skips layout entirely — no model load, no inference.
109    layout: Option<layout::LayoutModel>,
110    ocr: Option<ocr::OcrModel>,
111    /// TableFormer structure model; `None` when its ONNX graphs aren't present
112    /// (the assembler then falls back to geometric table reconstruction) or
113    /// when `no_table_former`/`no_ocr` skip it.
114    tables: Option<tableformer::TableFormer>,
115    /// Skip layout, OCR, and TableFormer; reconstruct text purely from the PDF's
116    /// embedded text layer. See [`Pipeline::no_ocr`].
117    no_ocr: bool,
118}
119
120impl Worker {
121    fn load(intra: usize, no_table_former: bool, no_ocr: bool) -> Result<Self, PdfError> {
122        Ok(Self {
123            layout: if no_ocr {
124                None
125            } else {
126                Some(layout::LayoutModel::load_with(intra).map_err(PdfError::Layout)?)
127            },
128            ocr: None,
129            tables: if no_table_former || no_ocr {
130                None
131            } else {
132                tableformer::TableFormer::load_with(intra)
133            },
134            no_ocr,
135        })
136    }
137
138    /// Run layout (+ OCR for cell-less pages) + TableFormer and assemble page `n`
139    /// into its nodes and links. Pure given the page (mutates only the worker's
140    /// lazily-loaded OCR model), so it is safe to run concurrently across pages.
141    fn process(&mut self, n: usize, page: &mut PdfPage) -> Result<PageOut, PdfError> {
142        if self.no_ocr {
143            // Fastest path: no layout/OCR/TableFormer inference at all. The PDF's
144            // embedded text cells (if any) become flat, line-grouped paragraphs in
145            // reading order via the same orphan-region machinery that normally
146            // rescues text the detector missed — here it rescues *all* of it.
147            // Pages with no embedded text layer (scanned/image-only) yield nothing;
148            // convert those without `no_ocr`.
149            let mut regions = Vec::new();
150            assemble::add_orphan_regions(&mut regions, &page.cells);
151            let table_rows = vec![None; regions.len()];
152            return Ok(timing::timed("assemble_page", || {
153                assemble::assemble_page(page, regions, &table_rows)
154            }));
155        }
156        let regions = timing::timed("layout.predict", || {
157            self.layout
158                .as_mut()
159                .expect("layout model loaded unless no_ocr")
160                .predict(&page.image, page.width, page.height)
161        })
162        .map_err(|e| PdfError::Layout(format!("page {}: {e}", n + 1)))?;
163        // Resolve overlapping detections once, before OCR.
164        let mut regions = assemble::resolve(regions);
165        // Emit text the detector missed as orphan text regions (docling parity).
166        assemble::add_orphan_regions(&mut regions, &page.cells);
167        // Drop phantom empty low-confidence picture boxes (docling parity).
168        assemble::drop_false_pictures(&mut regions, &page.cells, page.width, page.height);
169        // No text layer → recognise text from the page image via OCR.
170        if page.cells.is_empty() {
171            if self.ocr.is_none() {
172                self.ocr = Some(ocr::OcrModel::load().map_err(PdfError::Ocr)?);
173            }
174            let cells = timing::timed("ocr.page", || {
175                self.ocr
176                    .as_mut()
177                    .unwrap()
178                    .ocr_page(&page.image, &regions, page.scale)
179            })
180            .map_err(|e| PdfError::Ocr(format!("page {}: {e}", n + 1)))?;
181            page.cells = cells;
182        }
183        // TableFormer structure per table region (else geometric fallback).
184        let mut table_rows: Vec<Option<Vec<Vec<String>>>> = vec![None; regions.len()];
185        if let Some(tf) = self.tables.as_mut() {
186            timing::timed("tableformer", || {
187                for (i, r) in regions.iter().enumerate() {
188                    if r.label == "table" {
189                        table_rows[i] = tf.predict_table_rows(
190                            &page.image,
191                            page.height,
192                            [r.l, r.t, r.r, r.b],
193                            &page.word_cells,
194                        );
195                    }
196                }
197            });
198        }
199        Ok(timing::timed("assemble_page", || {
200            assemble::assemble_page(page, regions, &table_rows)
201        }))
202    }
203}
204
205/// Per-worker ONNX intra-op threads. The layout model is memory-bandwidth bound,
206/// so on a typical machine two threads per worker (sharing one in-cache copy of
207/// the weights) extracts more throughput than one fat model or many single-thread
208/// workers. `FLEISCHWOLF_PDF_INTRA` overrides for per-machine tuning.
209fn pdf_intra() -> usize {
210    if let Some(n) = std::env::var("FLEISCHWOLF_PDF_INTRA")
211        .ok()
212        .and_then(|v| v.parse::<usize>().ok())
213        .filter(|&n| n > 0)
214    {
215        return n;
216    }
217    if intra_threads() >= 2 {
218        2
219    } else {
220        1
221    }
222}
223
224/// How many page-workers to spin up for a multi-page PDF. `FLEISCHWOLF_PDF_WORKERS`
225/// overrides; otherwise size the pool so `workers × intra ≈ cores`, capped at 4 so
226/// a worst-case pool holds a bounded amount of model memory (~0.4 GB per worker)
227/// and does not oversaturate the memory bus with model-weight traffic.
228fn pdf_worker_count() -> usize {
229    if let Some(n) = std::env::var("FLEISCHWOLF_PDF_WORKERS")
230        .ok()
231        .and_then(|v| v.parse::<usize>().ok())
232        .filter(|&n| n > 0)
233    {
234        return n;
235    }
236    (intra_threads() / pdf_intra()).clamp(1, 4)
237}
238
239/// Minimum page count before a PDF is worth the parallel worker pool. Below this,
240/// the serial primary (running its model on every core) is faster than fanning out
241/// — the helper pool's one-time model-load cost only pays off once enough pages
242/// share it. `FLEISCHWOLF_PDF_PARALLEL_MIN` overrides.
243fn pdf_parallel_min() -> usize {
244    std::env::var("FLEISCHWOLF_PDF_PARALLEL_MIN")
245        .ok()
246        .and_then(|v| v.parse::<usize>().ok())
247        .filter(|&n| n > 0)
248        .unwrap_or(6)
249}
250
251/// A reusable PDF pipeline. The **primary** worker runs its models on every core,
252/// so a single-page / small / image / METS input is converted at full intra-op
253/// speed with no pool to load. A document with enough pages instead fans out
254/// across a **pool** of narrower workers processed concurrently. Both load lazily
255/// and are cached for reuse, so a one-shot conversion only pays for what it uses.
256pub struct Pipeline {
257    /// Full-intra worker for the serial path; loaded on first serial use.
258    primary: Option<Worker>,
259    /// Narrower workers (≈cores/`target_workers` threads each) for the parallel
260    /// path; loaded on first multi-page use and cached.
261    pool: Vec<Worker>,
262    /// Desired pool size for multi-page documents.
263    target_workers: usize,
264    /// Page count at/above which the parallel pool is worth its load cost.
265    parallel_min: usize,
266    /// Skip loading/running TableFormer; table regions fall back to geometric
267    /// reconstruction. See [`Pipeline::no_table_former`].
268    no_table_former: bool,
269    /// Skip layout, OCR, and TableFormer entirely. See [`Pipeline::no_ocr`].
270    no_ocr: bool,
271}
272
273impl Pipeline {
274    /// Construct the pipeline. Models load lazily on first use (full-intra primary
275    /// for serial inputs, the helper pool for multi-page PDFs), so nothing is
276    /// loaded that a given document doesn't need.
277    pub fn new() -> Result<Self, PdfError> {
278        Ok(Self {
279            primary: None,
280            pool: Vec::new(),
281            target_workers: pdf_worker_count(),
282            parallel_min: pdf_parallel_min(),
283            no_table_former: false,
284            no_ocr: false,
285        })
286    }
287
288    /// Skip loading and running the TableFormer table-structure model. Table
289    /// regions still get emitted, but reconstructed geometrically from cell
290    /// positions instead of via the ONNX model's predicted structure — faster
291    /// (no model load, no per-table inference) at the cost of table fidelity.
292    /// No effect if a worker is already loaded; set this before the first
293    /// conversion.
294    pub fn no_table_former(mut self, disable: bool) -> Self {
295        self.no_table_former = disable;
296        self
297    }
298
299    /// Skip layout detection, OCR, and TableFormer entirely — no model load, no
300    /// inference of any kind. The PDF's embedded text cells are grouped by line
301    /// and emitted as plain paragraphs in reading order: no headings, lists,
302    /// tables, code blocks, or pictures, since that structure comes from the
303    /// layout model. The fastest possible PDF path, but pages with no embedded
304    /// text layer (scanned/image-only PDFs) yield no text at all — convert those
305    /// without this flag. Implies `no_table_former`. No effect if a worker is
306    /// already loaded; set this before the first conversion.
307    pub fn no_ocr(mut self, disable: bool) -> Self {
308        self.no_ocr = disable;
309        self
310    }
311
312    /// The full-intra serial worker, loaded on first use.
313    fn primary(&mut self) -> Result<&mut Worker, PdfError> {
314        if self.primary.is_none() {
315            self.primary = Some(Worker::load(
316                intra_threads(),
317                self.no_table_former,
318                self.no_ocr,
319            )?);
320        }
321        Ok(self.primary.as_mut().unwrap())
322    }
323
324    /// Convert a PDF (bytes) to a [`DoclingDocument`]. A document with fewer than
325    /// `parallel_min` pages (or a pool size of 1) streams through the full-intra
326    /// primary; a larger one renders on this thread (pdfium is not thread-safe) and
327    /// fans the pages out across the worker pool, reassembled in page order so the
328    /// output is byte-identical to the serial path.
329    pub fn convert(
330        &mut self,
331        bytes: &[u8],
332        password: Option<&str>,
333        name: &str,
334    ) -> Result<DoclingDocument, PdfError> {
335        let pages = pdfium_backend::page_count(bytes, password)?;
336        let doc = if self.target_workers >= 2 && pages >= self.parallel_min {
337            self.convert_parallel(bytes, password, name)?
338        } else {
339            self.convert_serial(bytes, password, name)?
340        };
341        timing::report();
342        Ok(doc)
343    }
344
345    /// Stream pages one at a time through the primary worker — render → process →
346    /// drop — so the document holds ~one page bitmap (~5 MB) at a time.
347    fn convert_serial(
348        &mut self,
349        bytes: &[u8],
350        password: Option<&str>,
351        name: &str,
352    ) -> Result<DoclingDocument, PdfError> {
353        let mut doc = DoclingDocument::new(name);
354        let render_image = !self.no_ocr;
355        let worker = self.primary()?;
356        pdfium_backend::for_each_page(bytes, password, render_image, |n, _total, mut page| {
357            let (nodes, links) = worker.process(n, &mut page)?;
358            doc.nodes.extend(nodes);
359            doc.links.extend(links);
360            Ok::<(), PdfError>(())
361        })?;
362        assemble::merge_continuations(&mut doc.nodes);
363        Ok(doc)
364    }
365
366    /// Render pages serially on this thread (pdfium) and process them in parallel
367    /// across the worker pool. A bounded channel applies backpressure so only a
368    /// handful of page bitmaps are resident at once; results carry their page
369    /// index and are reassembled in order, so the output is byte-identical to the
370    /// serial path.
371    fn convert_parallel(
372        &mut self,
373        bytes: &[u8],
374        password: Option<&str>,
375        name: &str,
376    ) -> Result<DoclingDocument, PdfError> {
377        self.ensure_pool()?;
378        let n_workers = self.pool.len();
379        let render_image = !self.no_ocr;
380        let (work_tx, work_rx) = sync_channel::<(usize, PdfPage)>(n_workers * 2);
381        let work_rx: Arc<Mutex<Receiver<(usize, PdfPage)>>> = Arc::new(Mutex::new(work_rx));
382        let results: Arc<Mutex<Vec<(usize, PageOut)>>> = Arc::new(Mutex::new(Vec::new()));
383        let first_err: Arc<Mutex<Option<PdfError>>> = Arc::new(Mutex::new(None));
384
385        // Move the pool into the scope so each worker gets an exclusive `&mut`.
386        let mut workers = std::mem::take(&mut self.pool);
387        std::thread::scope(|s| {
388            for worker in workers.iter_mut() {
389                let work_rx = Arc::clone(&work_rx);
390                let results = Arc::clone(&results);
391                let first_err = Arc::clone(&first_err);
392                s.spawn(move || loop {
393                    // Hold the receiver lock only for the recv; release before the
394                    // (long) per-page work so other workers can pull concurrently.
395                    let item = work_rx.lock().unwrap().recv();
396                    let Ok((idx, mut page)) = item else { break };
397                    match worker.process(idx, &mut page) {
398                        Ok(out) => results.lock().unwrap().push((idx, out)),
399                        Err(e) => {
400                            let mut slot = first_err.lock().unwrap();
401                            if slot.is_none() {
402                                *slot = Some(e);
403                            }
404                        }
405                    }
406                });
407            }
408            // Render on this thread and feed the workers; backpressure blocks here
409            // when the channel is full. Dropping `work_tx` afterwards signals the
410            // workers (recv → Err) to finish.
411            let render =
412                pdfium_backend::for_each_page(bytes, password, render_image, |i, _total, page| {
413                    work_tx
414                        .send((i, page))
415                        .map_err(|_| PdfError::Pdfium("page-worker channel closed".into()))
416                });
417            drop(work_tx);
418            if let Err(e) = render {
419                let mut slot = first_err.lock().unwrap();
420                if slot.is_none() {
421                    *slot = Some(e);
422                }
423            }
424        });
425        // Threads have joined; restore the pool for the next conversion.
426        self.pool = workers;
427
428        if let Some(e) = first_err.lock().unwrap().take() {
429            return Err(e);
430        }
431        let mut results = Arc::try_unwrap(results)
432            .unwrap_or_else(|arc| Mutex::new(arc.lock().unwrap().clone()))
433            .into_inner()
434            .unwrap();
435        results.sort_by_key(|(idx, _)| *idx);
436        let mut doc = DoclingDocument::new(name);
437        for (_, (nodes, links)) in results {
438            doc.nodes.extend(nodes);
439            doc.links.extend(links);
440        }
441        assemble::merge_continuations(&mut doc.nodes);
442        Ok(doc)
443    }
444
445    /// Convert a PDF in **streaming** mode: `emit` is called with each finalized,
446    /// in-document-order batch of nodes (and that span's recovered links) as pages
447    /// complete, so a caller can serialize Markdown page by page instead of waiting
448    /// for the whole document. The batches are exactly the buffered [`convert`]'s
449    /// nodes, split at safe block boundaries by [`assemble::StreamAssembler`] — the
450    /// parallel path reorders pages back into document order before emitting, so
451    /// the output is identical regardless of worker scheduling.
452    ///
453    /// `emit` runs on the calling thread (never a worker), so it needn't be `Send`
454    /// and its backpressure throttles the whole pipeline. Returning `Err` from
455    /// `emit` aborts the conversion with that error.
456    pub fn convert_streaming<F>(
457        &mut self,
458        bytes: &[u8],
459        password: Option<&str>,
460        name: &str,
461        emit: F,
462    ) -> Result<(), PdfError>
463    where
464        F: FnMut(Vec<Node>, Vec<(String, String)>) -> Result<(), PdfError>,
465    {
466        let _ = name; // page nodes carry no name; the caller owns the document name.
467        let pages = pdfium_backend::page_count(bytes, password)?;
468        let r = if self.target_workers >= 2 && pages >= self.parallel_min {
469            self.convert_streaming_parallel(bytes, password, emit)
470        } else {
471            self.convert_streaming_serial(bytes, password, emit)
472        };
473        timing::report();
474        r
475    }
476
477    /// Serial streaming: render → process → emit, one page at a time, holding back
478    /// only the tail that might still merge into the next page.
479    fn convert_streaming_serial<F>(
480        &mut self,
481        bytes: &[u8],
482        password: Option<&str>,
483        mut emit: F,
484    ) -> Result<(), PdfError>
485    where
486        F: FnMut(Vec<Node>, Vec<(String, String)>) -> Result<(), PdfError>,
487    {
488        let mut asm = assemble::StreamAssembler::new();
489        let render_image = !self.no_ocr;
490        let worker = self.primary()?;
491        pdfium_backend::for_each_page(bytes, password, render_image, |n, _total, mut page| {
492            let (nodes, links) = worker.process(n, &mut page)?;
493            emit(asm.push(nodes), links)
494        })?;
495        emit(asm.finish(), Vec::new())
496    }
497
498    /// Parallel streaming: pages render serially on a dedicated thread (pdfium is
499    /// not thread-safe) and process across the worker pool; results carry their
500    /// page index and are reordered on the calling thread into a
501    /// [`assemble::StreamAssembler`], which emits each page in document order as
502    /// soon as its predecessors have arrived. Bounded channels keep only a handful
503    /// of pages resident and let `emit`'s backpressure reach the renderer.
504    fn convert_streaming_parallel<F>(
505        &mut self,
506        bytes: &[u8],
507        password: Option<&str>,
508        mut emit: F,
509    ) -> Result<(), PdfError>
510    where
511        F: FnMut(Vec<Node>, Vec<(String, String)>) -> Result<(), PdfError>,
512    {
513        self.ensure_pool()?;
514        let n_workers = self.pool.len();
515        let render_image = !self.no_ocr;
516        let (work_tx, work_rx) = sync_channel::<(usize, PdfPage)>(n_workers * 2);
517        let work_rx: Arc<Mutex<Receiver<(usize, PdfPage)>>> = Arc::new(Mutex::new(work_rx));
518        // Workers and the renderer report here; the calling thread drains it in
519        // page order. Bounded so workers block (bounding resident bitmaps) when the
520        // consumer falls behind.
521        let (res_tx, res_rx) = sync_channel::<Result<(usize, PageOut), PdfError>>(n_workers * 2);
522
523        let mut workers = std::mem::take(&mut self.pool);
524        let mut asm = assemble::StreamAssembler::new();
525        let mut first_err: Option<PdfError> = None;
526
527        std::thread::scope(|s| {
528            // Workers: pull a page, process it, report (index-tagged) result.
529            for worker in workers.iter_mut() {
530                let work_rx = Arc::clone(&work_rx);
531                let res_tx = res_tx.clone();
532                s.spawn(move || loop {
533                    let item = work_rx.lock().unwrap().recv();
534                    let Ok((idx, mut page)) = item else { break };
535                    let out = worker.process(idx, &mut page).map(|o| (idx, o));
536                    if res_tx.send(out).is_err() {
537                        break; // consumer gone
538                    }
539                });
540            }
541            // Renderer: feed pages to the pool on its own thread (pdfium stays on a
542            // single thread); report a render error through the same channel.
543            {
544                let res_tx = res_tx.clone();
545                s.spawn(move || {
546                    let render = pdfium_backend::for_each_page(
547                        bytes,
548                        password,
549                        render_image,
550                        |i, _total, page| {
551                            work_tx
552                                .send((i, page))
553                                .map_err(|_| PdfError::Pdfium("page-worker channel closed".into()))
554                        },
555                    );
556                    drop(work_tx); // signal workers to finish
557                    if let Err(e) = render {
558                        let _ = res_tx.send(Err(e));
559                    }
560                });
561            }
562            // Drop our own sender so the channel closes once the threads finish.
563            drop(res_tx);
564
565            // Collector (this thread): reorder into document order and emit.
566            let mut buffer: BTreeMap<usize, PageOut> = BTreeMap::new();
567            let mut next = 0usize;
568            for msg in res_rx.iter() {
569                match msg {
570                    Err(e) => {
571                        if first_err.is_none() {
572                            first_err = Some(e);
573                        }
574                    }
575                    Ok((idx, out)) => {
576                        buffer.insert(idx, out);
577                        if first_err.is_some() {
578                            continue; // keep draining so the threads can exit
579                        }
580                        while let Some((nodes, links)) = buffer.remove(&next) {
581                            if let Err(e) = emit(asm.push(nodes), links) {
582                                first_err = Some(e);
583                                break;
584                            }
585                            next += 1;
586                        }
587                    }
588                }
589            }
590        });
591        // Threads have joined; restore the pool for the next conversion.
592        self.pool = workers;
593
594        if let Some(e) = first_err {
595            return Err(e);
596        }
597        emit(asm.finish(), Vec::new())
598    }
599
600    /// Lazily grow the pool to `target_workers`, loading the new workers
601    /// concurrently (model load is mostly I/O + mmap, so N loads overlap to roughly
602    /// one load's wall-time). Cached for reuse across documents.
603    fn ensure_pool(&mut self) -> Result<(), PdfError> {
604        let need = self.target_workers.saturating_sub(self.pool.len());
605        if need == 0 {
606            return Ok(());
607        }
608        let intra = pdf_intra();
609        let no_table_former = self.no_table_former;
610        let no_ocr = self.no_ocr;
611        let loaded: Vec<Result<Worker, PdfError>> = std::thread::scope(|s| {
612            let handles: Vec<_> = (0..need)
613                .map(|_| s.spawn(move || Worker::load(intra, no_table_former, no_ocr)))
614                .collect();
615            handles.into_iter().map(|h| h.join().unwrap()).collect()
616        });
617        for w in loaded {
618            self.pool.push(w?);
619        }
620        Ok(())
621    }
622
623    /// Convert a standalone image (PNG/JPEG/TIFF/WebP/…) as a single page —
624    /// docling routes images through the same layout+OCR pipeline as a PDF page.
625    pub fn convert_image(&mut self, bytes: &[u8], name: &str) -> Result<DoclingDocument, PdfError> {
626        let image = image::load_from_memory(bytes)
627            .map_err(|e| PdfError::Pdfium(format!("image: {e}")))?
628            .into_rgb8();
629        let (w, h) = image.dimensions();
630        // The image is its own page rendered at 1 px per "point" (scale 1.0); a
631        // standalone image has no text layer, so OCR supplies the cells.
632        let page = PdfPage {
633            width: w as f32,
634            height: h as f32,
635            scale: 1.0,
636            cells: Vec::new(),
637            code_cells: Vec::new(),
638            word_cells: Vec::new(),
639            image,
640            links: Vec::new(),
641        };
642        self.process_pages(vec![page], name)
643    }
644
645    /// Run layout (+ OCR for cell-less pages) and assemble each already-rendered
646    /// page (image / METS inputs, which are small and already materialised).
647    fn process_pages(
648        &mut self,
649        mut pages: Vec<PdfPage>,
650        name: &str,
651    ) -> Result<DoclingDocument, PdfError> {
652        let mut doc = DoclingDocument::new(name);
653        let worker = self.primary()?;
654        for (n, page) in pages.iter_mut().enumerate() {
655            let (nodes, links) = worker.process(n, page)?;
656            doc.nodes.extend(nodes);
657            doc.links.extend(links);
658        }
659        assemble::merge_continuations(&mut doc.nodes);
660        Ok(doc)
661    }
662}
663
664/// Convenience one-shot conversion (loads the pipeline per call). Errors are
665/// detailed and surfaced (never silently skipped).
666pub fn convert(
667    bytes: &[u8],
668    password: Option<&str>,
669    name: &str,
670) -> Result<DoclingDocument, PdfError> {
671    convert_with_options(bytes, password, name, false, false)
672}
673
674/// Like [`convert`], but optionally skips loading/running TableFormer (see
675/// [`Pipeline::no_table_former`]) and/or layout+OCR+TableFormer entirely (see
676/// [`Pipeline::no_ocr`]).
677pub fn convert_with_options(
678    bytes: &[u8],
679    password: Option<&str>,
680    name: &str,
681    no_table_former: bool,
682    no_ocr: bool,
683) -> Result<DoclingDocument, PdfError> {
684    Pipeline::new()?
685        .no_table_former(no_table_former)
686        .no_ocr(no_ocr)
687        .convert(bytes, password, name)
688}
689
690/// Convenience one-shot image conversion (loads the pipeline per call).
691pub fn convert_image(bytes: &[u8], name: &str) -> Result<DoclingDocument, PdfError> {
692    convert_image_with_options(bytes, name, false, false)
693}
694
695/// Like [`convert_image`], but optionally skips loading/running TableFormer (see
696/// [`Pipeline::no_table_former`]) and/or layout+OCR+TableFormer entirely (see
697/// [`Pipeline::no_ocr`]).
698pub fn convert_image_with_options(
699    bytes: &[u8],
700    name: &str,
701    no_table_former: bool,
702    no_ocr: bool,
703) -> Result<DoclingDocument, PdfError> {
704    Pipeline::new()?
705        .no_table_former(no_table_former)
706        .no_ocr(no_ocr)
707        .convert_image(bytes, name)
708}
709
710/// Convert pre-segmented pages (image + already-known text cells, e.g. METS/hOCR
711/// scans) through the shared layout + assembly pipeline.
712pub fn convert_pages(pages: Vec<PdfPage>, name: &str) -> Result<DoclingDocument, PdfError> {
713    convert_pages_with_options(pages, name, false, false)
714}
715
716/// Like [`convert_pages`], but optionally skips loading/running TableFormer (see
717/// [`Pipeline::no_table_former`]) and/or layout+OCR+TableFormer entirely (see
718/// [`Pipeline::no_ocr`]).
719pub fn convert_pages_with_options(
720    pages: Vec<PdfPage>,
721    name: &str,
722    no_table_former: bool,
723    no_ocr: bool,
724) -> Result<DoclingDocument, PdfError> {
725    Pipeline::new()?
726        .no_table_former(no_table_former)
727        .no_ocr(no_ocr)
728        .process_pages(pages, name)
729}