Skip to main content

fleischwolf_pdf/
lib.rs

1//! PDF backend for fleischwolf.
2//!
3//! A port of docling's standard PDF pipeline: pdfium extracts the text layer
4//! (cells with bounding boxes) and renders page images; a discriminative ONNX
5//! stack (layout detection, table structure, OCR) classifies regions; the cells
6//! are assembled in reading order into a [`DoclingDocument`].
7//!
8//! Current stages: pdfium text-cell extraction + page rendering ([`pdfium_backend`])
9//! and the deterministic text/reading-order assembly ([`assemble`]). The layout,
10//! table-structure and OCR ONNX stages land behind [`Pipeline`] next.
11
12mod assemble;
13mod dp_lines;
14pub mod layout;
15mod mets;
16mod ocr;
17pub mod pdfium_backend;
18pub mod resample;
19pub mod tableformer;
20pub mod textparse;
21pub mod timing;
22
23use std::collections::BTreeMap;
24use std::fmt;
25use std::sync::mpsc::{sync_channel, Receiver};
26use std::sync::{Arc, Mutex};
27
28use fleischwolf_core::{DoclingDocument, Node};
29
30pub use mets::{convert_mets_gbs, convert_mets_gbs_with_options};
31pub use pdfium_backend::{PdfDocument, PdfPage, TextCell};
32
33/// Errors from the PDF backend. Detailed and surfaced (never silently skipped).
34#[derive(Debug)]
35pub enum PdfError {
36    /// pdfium failed to bind, open, or read the document.
37    Pdfium(String),
38    /// The layout ONNX model failed to load or run.
39    Layout(String),
40    /// The OCR ONNX model failed to load or run.
41    Ocr(String),
42}
43
44impl fmt::Display for PdfError {
45    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
46        match self {
47            PdfError::Pdfium(m) => write!(f, "pdf: pdfium error: {m}"),
48            PdfError::Layout(m) => write!(f, "pdf: {m}"),
49            PdfError::Ocr(m) => write!(f, "pdf: {m}"),
50        }
51    }
52}
53
54impl std::error::Error for PdfError {}
55
56impl From<pdfium_render::prelude::PdfiumError> for PdfError {
57    fn from(e: pdfium_render::prelude::PdfiumError) -> Self {
58        PdfError::Pdfium(e.to_string())
59    }
60}
61
62/// Threads ONNX inference may use, capped by `FLEISCHWOLF_PDF_THREADS` if set.
63/// Defaults to the available parallelism (ort otherwise picks a low number).
64pub(crate) fn intra_threads() -> usize {
65    if let Some(n) = std::env::var("FLEISCHWOLF_PDF_THREADS")
66        .ok()
67        .and_then(|v| v.parse::<usize>().ok())
68        .filter(|&n| n > 0)
69    {
70        return n;
71    }
72    std::thread::available_parallelism()
73        .map(|n| n.get())
74        .unwrap_or(1)
75}
76
77/// True when `FLEISCHWOLF_FP32` (any value but `0`) forces the full-precision
78/// models even where an INT8 variant sits next to the fp32 default.
79pub(crate) fn fp32_forced() -> bool {
80    std::env::var("FLEISCHWOLF_FP32")
81        .map(|v| v != "0")
82        .unwrap_or(false)
83}
84
85/// Resolve a default (CWD-relative) asset path. If it doesn't exist relative
86/// to the current directory, try next to the executable and one level above
87/// it (following symlinks — the layout `scripts/install.sh` produces:
88/// `/usr/local/bin/fleischwolf` → `/usr/local/fleischwolf/bin/fleischwolf`
89/// with `models/` and `.pdfium/` in `/usr/local/fleischwolf`). Lets an
90/// installed binary run from any working directory with no env vars; explicit
91/// env overrides never reach this. Returns `rel` unchanged when nothing
92/// exists anywhere, so callers' error messages keep the familiar path.
93pub(crate) fn resolve_asset(rel: &str) -> String {
94    if std::path::Path::new(rel).exists() {
95        return rel.to_string();
96    }
97    if let Some(dir) = std::env::current_exe()
98        .ok()
99        .and_then(|p| p.canonicalize().ok())
100        .and_then(|p| p.parent().map(std::path::Path::to_path_buf))
101    {
102        for base in [Some(dir.as_path()), dir.parent()].into_iter().flatten() {
103            let p = base.join(rel);
104            if p.exists() {
105                return p.to_string_lossy().into_owned();
106            }
107        }
108    }
109    rel.to_string()
110}
111
112/// Resolve a model path: an explicit env override always wins; otherwise the
113/// INT8 variant of the default path when it exists on disk (the quantized
114/// models are conformance-validated — see PDF_PERFORMANCE.md — and load/run
115/// markedly faster on CPU), unless `FLEISCHWOLF_FP32` opts back into full
116/// precision; else the fp32 default.
117pub(crate) fn model_path(env: &str, fp32_default: &str, int8_default: &str) -> String {
118    if let Ok(p) = std::env::var(env) {
119        return p;
120    }
121    if !fp32_forced() {
122        let p = resolve_asset(int8_default);
123        if std::path::Path::new(&p).exists() {
124            return p;
125        }
126    }
127    resolve_asset(fp32_default)
128}
129
130/// One page's assembled output: typed nodes plus the page's hyperlinks, kept
131/// separate so pages processed out of order can be stitched back in page order.
132type PageOut = (Vec<Node>, Vec<(String, String)>);
133
134/// The pool-wide TableFormer slot: one instance shared by every worker, loaded
135/// lazily on the first table region any worker sees. Tables appear on a
136/// minority of pages, so per-worker copies mostly multiplied ~0.4 GB of
137/// weights+arenas by the pool size for nothing; a single shared instance keeps
138/// the peak flat regardless of pool width, and a table's structure prediction
139/// is independent of which worker runs it, so output is byte-identical. The
140/// mutex serialises concurrent tables — the shared instance is loaded with the
141/// full intra-op thread budget to compensate (one wide TableFormer instead of
142/// several narrow ones).
143enum TfSlot {
144    /// Not attempted yet (no table seen so far).
145    Unloaded,
146    /// Load attempted, graphs absent — geometric fallback (warned once).
147    Missing,
148    Ready(tableformer::TableFormer),
149}
150
151type SharedTables = Arc<Mutex<TfSlot>>;
152
153/// A self-contained set of the per-page models (layout, OCR). Each parallel
154/// page-worker owns its own `Worker` so inference runs concurrently without
155/// sharing an ONNX session (`ort`'s `Session::run` is `&mut self`); only the
156/// rarely-hit TableFormer is shared (see [`TfSlot`]).
157struct Worker {
158    /// `None` when `no_ocr` skips layout entirely — no model load, no inference.
159    layout: Option<layout::LayoutModel>,
160    ocr: Option<ocr::OcrModel>,
161    /// Shared TableFormer slot; `None` when `no_table_former`/`no_ocr` skip it.
162    tables: Option<SharedTables>,
163    /// Skip layout, OCR, and TableFormer; reconstruct text purely from the PDF's
164    /// embedded text layer. See [`Pipeline::no_ocr`].
165    no_ocr: bool,
166}
167
168impl Worker {
169    fn load(intra: usize, tables: Option<SharedTables>, no_ocr: bool) -> Result<Self, PdfError> {
170        Ok(Self {
171            layout: if no_ocr {
172                None
173            } else {
174                Some(layout::LayoutModel::load_with(intra).map_err(PdfError::Layout)?)
175            },
176            ocr: None,
177            tables,
178            no_ocr,
179        })
180    }
181
182    /// Run layout (+ OCR for cell-less pages) + TableFormer and assemble page `n`
183    /// into its nodes and links. Pure given the page (mutates only the worker's
184    /// lazily-loaded OCR model), so it is safe to run concurrently across pages.
185    fn process(&mut self, n: usize, page: &mut PdfPage) -> Result<PageOut, PdfError> {
186        if self.no_ocr {
187            // Fastest path: no layout/OCR/TableFormer inference at all. The PDF's
188            // embedded text cells (if any) become flat, line-grouped paragraphs in
189            // reading order via the same orphan-region machinery that normally
190            // rescues text the detector missed — here it rescues *all* of it.
191            // Pages with no embedded text layer (scanned/image-only) yield nothing;
192            // convert those without `no_ocr`.
193            let mut regions = Vec::new();
194            assemble::add_orphan_regions(&mut regions, &page.cells);
195            let table_rows = vec![None; regions.len()];
196            return Ok(timing::timed("assemble_page", || {
197                assemble::assemble_page(page, regions, &table_rows)
198            }));
199        }
200        let regions = timing::timed("layout.predict", || {
201            self.layout
202                .as_mut()
203                .expect("layout model loaded unless no_ocr")
204                .predict(&page.image, page.width, page.height)
205        })
206        .map_err(|e| PdfError::Layout(format!("page {}: {e}", n + 1)))?;
207        // Resolve overlapping detections once, before OCR.
208        let mut regions = assemble::resolve(regions);
209        // Emit text the detector missed as orphan text regions (docling parity).
210        assemble::add_orphan_regions(&mut regions, &page.cells);
211        // Drop phantom empty low-confidence picture boxes (docling parity).
212        assemble::drop_false_pictures(&mut regions, &page.cells, page.width, page.height);
213        // No text layer → recognise text from the page image via OCR.
214        if page.cells.is_empty() {
215            if self.ocr.is_none() {
216                self.ocr = Some(ocr::OcrModel::load().map_err(PdfError::Ocr)?);
217            }
218            let cells = timing::timed("ocr.page", || {
219                self.ocr
220                    .as_mut()
221                    .unwrap()
222                    .ocr_page(&page.image, &regions, page.scale)
223            })
224            .map_err(|e| PdfError::Ocr(format!("page {}: {e}", n + 1)))?;
225            page.cells = cells;
226        }
227        // TableFormer structure per table region (else geometric fallback). The
228        // shared slot is only locked (and lazily loaded) when the page actually
229        // has a table, so table-free documents never pay for TableFormer at all.
230        let mut table_rows: Vec<Option<Vec<Vec<String>>>> = vec![None; regions.len()];
231        if let Some(slot) = self.tables.as_ref() {
232            if regions.iter().any(|r| r.label == "table") {
233                timing::timed("tableformer", || {
234                    let mut guard = slot.lock().unwrap();
235                    if matches!(*guard, TfSlot::Unloaded) {
236                        // Full intra-op width: tables serialise on this mutex, so
237                        // the one instance gets the whole thread budget.
238                        *guard = match tableformer::TableFormer::load_with(intra_threads()) {
239                            Some(tf) => TfSlot::Ready(tf),
240                            None => TfSlot::Missing,
241                        };
242                    }
243                    if let TfSlot::Ready(tf) = &mut *guard {
244                        for (i, r) in regions.iter().enumerate() {
245                            if r.label == "table" {
246                                table_rows[i] = tf.predict_table_rows(
247                                    &page.image,
248                                    page.height,
249                                    [r.l, r.t, r.r, r.b],
250                                    &page.word_cells,
251                                );
252                            }
253                        }
254                    }
255                });
256            }
257        }
258        Ok(timing::timed("assemble_page", || {
259            assemble::assemble_page(page, regions, &table_rows)
260        }))
261    }
262}
263
264/// Per-worker ONNX intra-op threads. The layout model is memory-bandwidth bound,
265/// so on a typical machine two threads per worker (sharing one in-cache copy of
266/// the weights) extracts more throughput than one fat model or many single-thread
267/// workers. `FLEISCHWOLF_PDF_INTRA` overrides for per-machine tuning.
268fn pdf_intra() -> usize {
269    if let Some(n) = std::env::var("FLEISCHWOLF_PDF_INTRA")
270        .ok()
271        .and_then(|v| v.parse::<usize>().ok())
272        .filter(|&n| n > 0)
273    {
274        return n;
275    }
276    if intra_threads() >= 2 {
277        2
278    } else {
279        1
280    }
281}
282
283/// How many page-workers to spin up for a multi-page PDF. `FLEISCHWOLF_PDF_WORKERS`
284/// overrides; otherwise size the pool so `workers × intra ≈ cores`, capped at 4 so
285/// a worst-case pool holds a bounded amount of model memory (~0.4 GB per worker)
286/// and does not oversaturate the memory bus with model-weight traffic.
287fn pdf_worker_count() -> usize {
288    if let Some(n) = std::env::var("FLEISCHWOLF_PDF_WORKERS")
289        .ok()
290        .and_then(|v| v.parse::<usize>().ok())
291        .filter(|&n| n > 0)
292    {
293        return n;
294    }
295    (intra_threads() / pdf_intra()).clamp(1, 4)
296}
297
298/// Minimum page count before a PDF is worth the parallel worker pool. Below this,
299/// the serial primary (running its model on every core) is faster than fanning out
300/// — the helper pool's one-time model-load cost only pays off once enough pages
301/// share it. `FLEISCHWOLF_PDF_PARALLEL_MIN` overrides.
302fn pdf_parallel_min() -> usize {
303    std::env::var("FLEISCHWOLF_PDF_PARALLEL_MIN")
304        .ok()
305        .and_then(|v| v.parse::<usize>().ok())
306        .filter(|&n| n > 0)
307        .unwrap_or(6)
308}
309
310/// A reusable PDF pipeline. The **primary** worker runs its models on every core,
311/// so a single-page / small / image / METS input is converted at full intra-op
312/// speed with no pool to load. A document with enough pages instead fans out
313/// across a **pool** of narrower workers processed concurrently. Both load lazily
314/// and are cached for reuse, so a one-shot conversion only pays for what it uses.
315pub struct Pipeline {
316    /// Full-intra worker for the serial path; loaded on first serial use.
317    primary: Option<Worker>,
318    /// Narrower workers (≈cores/`target_workers` threads each) for the parallel
319    /// path; loaded on first multi-page use and cached.
320    pool: Vec<Worker>,
321    /// The single TableFormer instance every worker shares (see [`TfSlot`]).
322    tables: SharedTables,
323    /// Desired pool size for multi-page documents.
324    target_workers: usize,
325    /// Page count at/above which the parallel pool is worth its load cost.
326    parallel_min: usize,
327    /// Skip loading/running TableFormer; table regions fall back to geometric
328    /// reconstruction. See [`Pipeline::no_table_former`].
329    no_table_former: bool,
330    /// Skip layout, OCR, and TableFormer entirely. See [`Pipeline::no_ocr`].
331    no_ocr: bool,
332}
333
334impl Pipeline {
335    /// Construct the pipeline. Models load lazily on first use (full-intra primary
336    /// for serial inputs, the helper pool for multi-page PDFs), so nothing is
337    /// loaded that a given document doesn't need.
338    pub fn new() -> Result<Self, PdfError> {
339        Ok(Self {
340            primary: None,
341            pool: Vec::new(),
342            tables: Arc::new(Mutex::new(TfSlot::Unloaded)),
343            target_workers: pdf_worker_count(),
344            parallel_min: pdf_parallel_min(),
345            no_table_former: false,
346            no_ocr: false,
347        })
348    }
349
350    /// Skip loading and running the TableFormer table-structure model. Table
351    /// regions still get emitted, but reconstructed geometrically from cell
352    /// positions instead of via the ONNX model's predicted structure — faster
353    /// (no model load, no per-table inference) at the cost of table fidelity.
354    /// No effect if a worker is already loaded; set this before the first
355    /// conversion.
356    pub fn no_table_former(mut self, disable: bool) -> Self {
357        self.no_table_former = disable;
358        self
359    }
360
361    /// Skip layout detection, OCR, and TableFormer entirely — no model load, no
362    /// inference of any kind. The PDF's embedded text cells are grouped by line
363    /// and emitted as plain paragraphs in reading order: no headings, lists,
364    /// tables, code blocks, or pictures, since that structure comes from the
365    /// layout model. The fastest possible PDF path, but pages with no embedded
366    /// text layer (scanned/image-only PDFs) yield no text at all — convert those
367    /// without this flag. Implies `no_table_former`. No effect if a worker is
368    /// already loaded; set this before the first conversion.
369    pub fn no_ocr(mut self, disable: bool) -> Self {
370        self.no_ocr = disable;
371        self
372    }
373
374    /// The shared TableFormer slot handed to each worker, or `None` when the
375    /// pipeline options skip TableFormer entirely.
376    fn tables_slot(&self) -> Option<SharedTables> {
377        if self.no_table_former || self.no_ocr {
378            None
379        } else {
380            Some(Arc::clone(&self.tables))
381        }
382    }
383
384    /// The full-intra serial worker, loaded on first use.
385    fn primary(&mut self) -> Result<&mut Worker, PdfError> {
386        if self.primary.is_none() {
387            self.primary = Some(Worker::load(
388                intra_threads(),
389                self.tables_slot(),
390                self.no_ocr,
391            )?);
392        }
393        Ok(self.primary.as_mut().unwrap())
394    }
395
396    /// Convert a PDF (bytes) to a [`DoclingDocument`]. A document with fewer than
397    /// `parallel_min` pages (or a pool size of 1) streams through the full-intra
398    /// primary; a larger one renders on this thread (pdfium is not thread-safe) and
399    /// fans the pages out across the worker pool, reassembled in page order so the
400    /// output is byte-identical to the serial path.
401    pub fn convert(
402        &mut self,
403        bytes: &[u8],
404        password: Option<&str>,
405        name: &str,
406    ) -> Result<DoclingDocument, PdfError> {
407        let pages = pdfium_backend::page_count(bytes, password)?;
408        let doc = if self.target_workers >= 2 && pages >= self.parallel_min {
409            self.convert_parallel(bytes, password, name)?
410        } else {
411            self.convert_serial(bytes, password, name)?
412        };
413        timing::report();
414        Ok(doc)
415    }
416
417    /// Stream pages one at a time through the primary worker — render → process →
418    /// drop — so the document holds ~one page bitmap (~5 MB) at a time.
419    fn convert_serial(
420        &mut self,
421        bytes: &[u8],
422        password: Option<&str>,
423        name: &str,
424    ) -> Result<DoclingDocument, PdfError> {
425        let mut doc = DoclingDocument::new(name);
426        let render_image = !self.no_ocr;
427        let worker = self.primary()?;
428        pdfium_backend::for_each_page(bytes, password, render_image, |n, _total, mut page| {
429            let (nodes, links) = worker.process(n, &mut page)?;
430            doc.nodes.extend(nodes);
431            doc.links.extend(links);
432            Ok::<(), PdfError>(())
433        })?;
434        assemble::merge_continuations(&mut doc.nodes);
435        Ok(doc)
436    }
437
438    /// Render pages serially on this thread (pdfium) and process them in parallel
439    /// across the worker pool. A bounded channel applies backpressure so only a
440    /// handful of page bitmaps are resident at once; results carry their page
441    /// index and are reassembled in order, so the output is byte-identical to the
442    /// serial path.
443    fn convert_parallel(
444        &mut self,
445        bytes: &[u8],
446        password: Option<&str>,
447        name: &str,
448    ) -> Result<DoclingDocument, PdfError> {
449        self.ensure_pool()?;
450        let n_workers = self.pool.len();
451        let render_image = !self.no_ocr;
452        let (work_tx, work_rx) = sync_channel::<(usize, PdfPage)>(n_workers * 2);
453        let work_rx: Arc<Mutex<Receiver<(usize, PdfPage)>>> = Arc::new(Mutex::new(work_rx));
454        let results: Arc<Mutex<Vec<(usize, PageOut)>>> = Arc::new(Mutex::new(Vec::new()));
455        let first_err: Arc<Mutex<Option<PdfError>>> = Arc::new(Mutex::new(None));
456
457        // Move the pool into the scope so each worker gets an exclusive `&mut`.
458        let mut workers = std::mem::take(&mut self.pool);
459        std::thread::scope(|s| {
460            for worker in workers.iter_mut() {
461                let work_rx = Arc::clone(&work_rx);
462                let results = Arc::clone(&results);
463                let first_err = Arc::clone(&first_err);
464                s.spawn(move || loop {
465                    // Hold the receiver lock only for the recv; release before the
466                    // (long) per-page work so other workers can pull concurrently.
467                    let item = work_rx.lock().unwrap().recv();
468                    let Ok((idx, mut page)) = item else { break };
469                    match worker.process(idx, &mut page) {
470                        Ok(out) => results.lock().unwrap().push((idx, out)),
471                        Err(e) => {
472                            let mut slot = first_err.lock().unwrap();
473                            if slot.is_none() {
474                                *slot = Some(e);
475                            }
476                        }
477                    }
478                });
479            }
480            // Render on this thread and feed the workers; backpressure blocks here
481            // when the channel is full. Dropping `work_tx` afterwards signals the
482            // workers (recv → Err) to finish.
483            let render =
484                pdfium_backend::for_each_page(bytes, password, render_image, |i, _total, page| {
485                    work_tx
486                        .send((i, page))
487                        .map_err(|_| PdfError::Pdfium("page-worker channel closed".into()))
488                });
489            drop(work_tx);
490            if let Err(e) = render {
491                let mut slot = first_err.lock().unwrap();
492                if slot.is_none() {
493                    *slot = Some(e);
494                }
495            }
496        });
497        // Threads have joined; restore the pool for the next conversion.
498        self.pool = workers;
499
500        if let Some(e) = first_err.lock().unwrap().take() {
501            return Err(e);
502        }
503        let mut results = Arc::try_unwrap(results)
504            .unwrap_or_else(|arc| Mutex::new(arc.lock().unwrap().clone()))
505            .into_inner()
506            .unwrap();
507        results.sort_by_key(|(idx, _)| *idx);
508        let mut doc = DoclingDocument::new(name);
509        for (_, (nodes, links)) in results {
510            doc.nodes.extend(nodes);
511            doc.links.extend(links);
512        }
513        assemble::merge_continuations(&mut doc.nodes);
514        Ok(doc)
515    }
516
517    /// Convert a PDF in **streaming** mode: `emit` is called with each finalized,
518    /// in-document-order batch of nodes (and that span's recovered links) as pages
519    /// complete, so a caller can serialize Markdown page by page instead of waiting
520    /// for the whole document. The batches are exactly the buffered [`convert`]'s
521    /// nodes, split at safe block boundaries by [`assemble::StreamAssembler`] — the
522    /// parallel path reorders pages back into document order before emitting, so
523    /// the output is identical regardless of worker scheduling.
524    ///
525    /// `emit` runs on the calling thread (never a worker), so it needn't be `Send`
526    /// and its backpressure throttles the whole pipeline. Returning `Err` from
527    /// `emit` aborts the conversion with that error.
528    pub fn convert_streaming<F>(
529        &mut self,
530        bytes: &[u8],
531        password: Option<&str>,
532        name: &str,
533        emit: F,
534    ) -> Result<(), PdfError>
535    where
536        F: FnMut(Vec<Node>, Vec<(String, String)>) -> Result<(), PdfError>,
537    {
538        let _ = name; // page nodes carry no name; the caller owns the document name.
539        let pages = pdfium_backend::page_count(bytes, password)?;
540        let r = if self.target_workers >= 2 && pages >= self.parallel_min {
541            self.convert_streaming_parallel(bytes, password, emit)
542        } else {
543            self.convert_streaming_serial(bytes, password, emit)
544        };
545        timing::report();
546        r
547    }
548
549    /// Serial streaming: render → process → emit, one page at a time, holding back
550    /// only the tail that might still merge into the next page.
551    fn convert_streaming_serial<F>(
552        &mut self,
553        bytes: &[u8],
554        password: Option<&str>,
555        mut emit: F,
556    ) -> Result<(), PdfError>
557    where
558        F: FnMut(Vec<Node>, Vec<(String, String)>) -> Result<(), PdfError>,
559    {
560        let mut asm = assemble::StreamAssembler::new();
561        let render_image = !self.no_ocr;
562        let worker = self.primary()?;
563        pdfium_backend::for_each_page(bytes, password, render_image, |n, _total, mut page| {
564            let (nodes, links) = worker.process(n, &mut page)?;
565            emit(asm.push(nodes), links)
566        })?;
567        emit(asm.finish(), Vec::new())
568    }
569
570    /// Parallel streaming: pages render serially on a dedicated thread (pdfium is
571    /// not thread-safe) and process across the worker pool; results carry their
572    /// page index and are reordered on the calling thread into a
573    /// [`assemble::StreamAssembler`], which emits each page in document order as
574    /// soon as its predecessors have arrived. Bounded channels keep only a handful
575    /// of pages resident and let `emit`'s backpressure reach the renderer.
576    fn convert_streaming_parallel<F>(
577        &mut self,
578        bytes: &[u8],
579        password: Option<&str>,
580        mut emit: F,
581    ) -> Result<(), PdfError>
582    where
583        F: FnMut(Vec<Node>, Vec<(String, String)>) -> Result<(), PdfError>,
584    {
585        self.ensure_pool()?;
586        let n_workers = self.pool.len();
587        let render_image = !self.no_ocr;
588        let (work_tx, work_rx) = sync_channel::<(usize, PdfPage)>(n_workers * 2);
589        let work_rx: Arc<Mutex<Receiver<(usize, PdfPage)>>> = Arc::new(Mutex::new(work_rx));
590        // Workers and the renderer report here; the calling thread drains it in
591        // page order. Bounded so workers block (bounding resident bitmaps) when the
592        // consumer falls behind.
593        let (res_tx, res_rx) = sync_channel::<Result<(usize, PageOut), PdfError>>(n_workers * 2);
594
595        let mut workers = std::mem::take(&mut self.pool);
596        let mut asm = assemble::StreamAssembler::new();
597        let mut first_err: Option<PdfError> = None;
598
599        std::thread::scope(|s| {
600            // Workers: pull a page, process it, report (index-tagged) result.
601            for worker in workers.iter_mut() {
602                let work_rx = Arc::clone(&work_rx);
603                let res_tx = res_tx.clone();
604                s.spawn(move || loop {
605                    let item = work_rx.lock().unwrap().recv();
606                    let Ok((idx, mut page)) = item else { break };
607                    let out = worker.process(idx, &mut page).map(|o| (idx, o));
608                    if res_tx.send(out).is_err() {
609                        break; // consumer gone
610                    }
611                });
612            }
613            // Renderer: feed pages to the pool on its own thread (pdfium stays on a
614            // single thread); report a render error through the same channel.
615            {
616                let res_tx = res_tx.clone();
617                s.spawn(move || {
618                    let render = pdfium_backend::for_each_page(
619                        bytes,
620                        password,
621                        render_image,
622                        |i, _total, page| {
623                            work_tx
624                                .send((i, page))
625                                .map_err(|_| PdfError::Pdfium("page-worker channel closed".into()))
626                        },
627                    );
628                    drop(work_tx); // signal workers to finish
629                    if let Err(e) = render {
630                        let _ = res_tx.send(Err(e));
631                    }
632                });
633            }
634            // Drop our own sender so the channel closes once the threads finish.
635            drop(res_tx);
636
637            // Collector (this thread): reorder into document order and emit.
638            let mut buffer: BTreeMap<usize, PageOut> = BTreeMap::new();
639            let mut next = 0usize;
640            for msg in res_rx.iter() {
641                match msg {
642                    Err(e) => {
643                        if first_err.is_none() {
644                            first_err = Some(e);
645                        }
646                    }
647                    Ok((idx, out)) => {
648                        buffer.insert(idx, out);
649                        if first_err.is_some() {
650                            continue; // keep draining so the threads can exit
651                        }
652                        while let Some((nodes, links)) = buffer.remove(&next) {
653                            if let Err(e) = emit(asm.push(nodes), links) {
654                                first_err = Some(e);
655                                break;
656                            }
657                            next += 1;
658                        }
659                    }
660                }
661            }
662        });
663        // Threads have joined; restore the pool for the next conversion.
664        self.pool = workers;
665
666        if let Some(e) = first_err {
667            return Err(e);
668        }
669        emit(asm.finish(), Vec::new())
670    }
671
672    /// Lazily grow the pool to `target_workers`, loading the new workers
673    /// concurrently (model load is mostly I/O + mmap, so N loads overlap to roughly
674    /// one load's wall-time). Cached for reuse across documents.
675    fn ensure_pool(&mut self) -> Result<(), PdfError> {
676        let need = self.target_workers.saturating_sub(self.pool.len());
677        if need == 0 {
678            return Ok(());
679        }
680        let intra = pdf_intra();
681        let no_ocr = self.no_ocr;
682        let tables = self.tables_slot();
683        let loaded: Vec<Result<Worker, PdfError>> = std::thread::scope(|s| {
684            let handles: Vec<_> = (0..need)
685                .map(|_| {
686                    let tables = tables.clone();
687                    s.spawn(move || Worker::load(intra, tables, no_ocr))
688                })
689                .collect();
690            handles.into_iter().map(|h| h.join().unwrap()).collect()
691        });
692        for w in loaded {
693            self.pool.push(w?);
694        }
695        Ok(())
696    }
697
698    /// Convert a standalone image (PNG/JPEG/TIFF/WebP/…) as a single page —
699    /// docling routes images through the same layout+OCR pipeline as a PDF page.
700    pub fn convert_image(&mut self, bytes: &[u8], name: &str) -> Result<DoclingDocument, PdfError> {
701        let image = image::load_from_memory(bytes)
702            .map_err(|e| PdfError::Pdfium(format!("image: {e}")))?
703            .into_rgb8();
704        let (w, h) = image.dimensions();
705        // The image is its own page rendered at 1 px per "point" (scale 1.0); a
706        // standalone image has no text layer, so OCR supplies the cells.
707        let page = PdfPage {
708            width: w as f32,
709            height: h as f32,
710            scale: 1.0,
711            cells: Vec::new(),
712            code_cells: Vec::new(),
713            word_cells: Vec::new(),
714            image,
715            links: Vec::new(),
716        };
717        self.process_pages(vec![page], name)
718    }
719
720    /// Run layout (+ OCR for cell-less pages) and assemble each already-rendered
721    /// page (image / METS inputs, which are small and already materialised).
722    fn process_pages(
723        &mut self,
724        mut pages: Vec<PdfPage>,
725        name: &str,
726    ) -> Result<DoclingDocument, PdfError> {
727        let mut doc = DoclingDocument::new(name);
728        let worker = self.primary()?;
729        for (n, page) in pages.iter_mut().enumerate() {
730            let (nodes, links) = worker.process(n, page)?;
731            doc.nodes.extend(nodes);
732            doc.links.extend(links);
733        }
734        assemble::merge_continuations(&mut doc.nodes);
735        Ok(doc)
736    }
737}
738
739/// Convenience one-shot conversion (loads the pipeline per call). Errors are
740/// detailed and surfaced (never silently skipped).
741pub fn convert(
742    bytes: &[u8],
743    password: Option<&str>,
744    name: &str,
745) -> Result<DoclingDocument, PdfError> {
746    convert_with_options(bytes, password, name, false, false)
747}
748
749/// Like [`convert`], but optionally skips loading/running TableFormer (see
750/// [`Pipeline::no_table_former`]) and/or layout+OCR+TableFormer entirely (see
751/// [`Pipeline::no_ocr`]).
752pub fn convert_with_options(
753    bytes: &[u8],
754    password: Option<&str>,
755    name: &str,
756    no_table_former: bool,
757    no_ocr: bool,
758) -> Result<DoclingDocument, PdfError> {
759    Pipeline::new()?
760        .no_table_former(no_table_former)
761        .no_ocr(no_ocr)
762        .convert(bytes, password, name)
763}
764
765/// Convenience one-shot image conversion (loads the pipeline per call).
766pub fn convert_image(bytes: &[u8], name: &str) -> Result<DoclingDocument, PdfError> {
767    convert_image_with_options(bytes, name, false, false)
768}
769
770/// Like [`convert_image`], but optionally skips loading/running TableFormer (see
771/// [`Pipeline::no_table_former`]) and/or layout+OCR+TableFormer entirely (see
772/// [`Pipeline::no_ocr`]).
773pub fn convert_image_with_options(
774    bytes: &[u8],
775    name: &str,
776    no_table_former: bool,
777    no_ocr: bool,
778) -> Result<DoclingDocument, PdfError> {
779    Pipeline::new()?
780        .no_table_former(no_table_former)
781        .no_ocr(no_ocr)
782        .convert_image(bytes, name)
783}
784
785/// Convert pre-segmented pages (image + already-known text cells, e.g. METS/hOCR
786/// scans) through the shared layout + assembly pipeline.
787pub fn convert_pages(pages: Vec<PdfPage>, name: &str) -> Result<DoclingDocument, PdfError> {
788    convert_pages_with_options(pages, name, false, false)
789}
790
791/// Like [`convert_pages`], but optionally skips loading/running TableFormer (see
792/// [`Pipeline::no_table_former`]) and/or layout+OCR+TableFormer entirely (see
793/// [`Pipeline::no_ocr`]).
794pub fn convert_pages_with_options(
795    pages: Vec<PdfPage>,
796    name: &str,
797    no_table_former: bool,
798    no_ocr: bool,
799) -> Result<DoclingDocument, PdfError> {
800    Pipeline::new()?
801        .no_table_former(no_table_former)
802        .no_ocr(no_ocr)
803        .process_pages(pages, name)
804}