mod assemble;
mod dp_lines;
pub mod layout;
mod mets;
mod ocr;
pub mod pdfium_backend;
pub mod resample;
pub mod tableformer;
pub mod textparse;
pub mod timing;
use std::collections::BTreeMap;
use std::fmt;
use std::sync::mpsc::{sync_channel, Receiver};
use std::sync::{Arc, Mutex};
use fleischwolf_core::{DoclingDocument, Node};
pub use mets::{convert_mets_gbs, convert_mets_gbs_with_options};
pub use pdfium_backend::{PdfDocument, PdfPage, TextCell};
#[derive(Debug)]
pub enum PdfError {
Pdfium(String),
Layout(String),
Ocr(String),
}
impl fmt::Display for PdfError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
PdfError::Pdfium(m) => write!(f, "pdf: pdfium error: {m}"),
PdfError::Layout(m) => write!(f, "pdf: {m}"),
PdfError::Ocr(m) => write!(f, "pdf: {m}"),
}
}
}
impl std::error::Error for PdfError {}
impl From<pdfium_render::prelude::PdfiumError> for PdfError {
fn from(e: pdfium_render::prelude::PdfiumError) -> Self {
PdfError::Pdfium(e.to_string())
}
}
pub(crate) fn intra_threads() -> usize {
if let Some(n) = std::env::var("FLEISCHWOLF_PDF_THREADS")
.ok()
.and_then(|v| v.parse::<usize>().ok())
.filter(|&n| n > 0)
{
return n;
}
std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1)
}
pub(crate) fn fp32_forced() -> bool {
std::env::var("FLEISCHWOLF_FP32")
.map(|v| v != "0")
.unwrap_or(false)
}
pub(crate) fn resolve_asset(rel: &str) -> String {
if std::path::Path::new(rel).exists() {
return rel.to_string();
}
if let Some(dir) = std::env::current_exe()
.ok()
.and_then(|p| p.canonicalize().ok())
.and_then(|p| p.parent().map(std::path::Path::to_path_buf))
{
for base in [Some(dir.as_path()), dir.parent()].into_iter().flatten() {
let p = base.join(rel);
if p.exists() {
return p.to_string_lossy().into_owned();
}
}
}
rel.to_string()
}
pub(crate) fn model_path(env: &str, fp32_default: &str, int8_default: &str) -> String {
if let Ok(p) = std::env::var(env) {
return p;
}
if !fp32_forced() {
let p = resolve_asset(int8_default);
if std::path::Path::new(&p).exists() {
return p;
}
}
resolve_asset(fp32_default)
}
type PageOut = (Vec<Node>, Vec<(String, String)>);
enum TfSlot {
Unloaded,
Missing,
Ready(tableformer::TableFormer),
}
type SharedTables = Arc<Mutex<TfSlot>>;
struct Worker {
layout: Option<layout::LayoutModel>,
ocr: Option<ocr::OcrModel>,
tables: Option<SharedTables>,
no_ocr: bool,
}
impl Worker {
fn load(intra: usize, tables: Option<SharedTables>, no_ocr: bool) -> Result<Self, PdfError> {
Ok(Self {
layout: if no_ocr {
None
} else {
Some(layout::LayoutModel::load_with(intra).map_err(PdfError::Layout)?)
},
ocr: None,
tables,
no_ocr,
})
}
fn process(&mut self, n: usize, page: &mut PdfPage) -> Result<PageOut, PdfError> {
if self.no_ocr {
let mut regions = Vec::new();
assemble::add_orphan_regions(&mut regions, &page.cells);
let table_rows = vec![None; regions.len()];
return Ok(timing::timed("assemble_page", || {
assemble::assemble_page(page, regions, &table_rows)
}));
}
let regions = timing::timed("layout.predict", || {
self.layout
.as_mut()
.expect("layout model loaded unless no_ocr")
.predict(&page.image, page.width, page.height)
})
.map_err(|e| PdfError::Layout(format!("page {}: {e}", n + 1)))?;
let mut regions = assemble::resolve(regions);
assemble::add_orphan_regions(&mut regions, &page.cells);
assemble::drop_false_pictures(&mut regions, &page.cells, page.width, page.height);
if page.cells.is_empty() {
if self.ocr.is_none() {
self.ocr = Some(ocr::OcrModel::load().map_err(PdfError::Ocr)?);
}
let cells = timing::timed("ocr.page", || {
self.ocr
.as_mut()
.unwrap()
.ocr_page(&page.image, ®ions, page.scale)
})
.map_err(|e| PdfError::Ocr(format!("page {}: {e}", n + 1)))?;
page.cells = cells;
}
let mut table_rows: Vec<Option<Vec<Vec<String>>>> = vec![None; regions.len()];
if let Some(slot) = self.tables.as_ref() {
if regions.iter().any(|r| r.label == "table") {
timing::timed("tableformer", || {
let mut guard = slot.lock().unwrap();
if matches!(*guard, TfSlot::Unloaded) {
*guard = match tableformer::TableFormer::load_with(intra_threads()) {
Some(tf) => TfSlot::Ready(tf),
None => TfSlot::Missing,
};
}
if let TfSlot::Ready(tf) = &mut *guard {
for (i, r) in regions.iter().enumerate() {
if r.label == "table" {
table_rows[i] = tf.predict_table_rows(
&page.image,
page.height,
[r.l, r.t, r.r, r.b],
&page.word_cells,
);
}
}
}
});
}
}
Ok(timing::timed("assemble_page", || {
assemble::assemble_page(page, regions, &table_rows)
}))
}
}
fn pdf_intra() -> usize {
if let Some(n) = std::env::var("FLEISCHWOLF_PDF_INTRA")
.ok()
.and_then(|v| v.parse::<usize>().ok())
.filter(|&n| n > 0)
{
return n;
}
if intra_threads() >= 2 {
2
} else {
1
}
}
fn pdf_worker_count() -> usize {
if let Some(n) = std::env::var("FLEISCHWOLF_PDF_WORKERS")
.ok()
.and_then(|v| v.parse::<usize>().ok())
.filter(|&n| n > 0)
{
return n;
}
(intra_threads() / pdf_intra()).clamp(1, 4)
}
fn pdf_parallel_min() -> usize {
std::env::var("FLEISCHWOLF_PDF_PARALLEL_MIN")
.ok()
.and_then(|v| v.parse::<usize>().ok())
.filter(|&n| n > 0)
.unwrap_or(6)
}
pub struct Pipeline {
primary: Option<Worker>,
pool: Vec<Worker>,
tables: SharedTables,
target_workers: usize,
parallel_min: usize,
no_table_former: bool,
no_ocr: bool,
}
impl Pipeline {
pub fn new() -> Result<Self, PdfError> {
Ok(Self {
primary: None,
pool: Vec::new(),
tables: Arc::new(Mutex::new(TfSlot::Unloaded)),
target_workers: pdf_worker_count(),
parallel_min: pdf_parallel_min(),
no_table_former: false,
no_ocr: false,
})
}
pub fn no_table_former(mut self, disable: bool) -> Self {
self.no_table_former = disable;
self
}
pub fn no_ocr(mut self, disable: bool) -> Self {
self.no_ocr = disable;
self
}
fn tables_slot(&self) -> Option<SharedTables> {
if self.no_table_former || self.no_ocr {
None
} else {
Some(Arc::clone(&self.tables))
}
}
fn primary(&mut self) -> Result<&mut Worker, PdfError> {
if self.primary.is_none() {
self.primary = Some(Worker::load(
intra_threads(),
self.tables_slot(),
self.no_ocr,
)?);
}
Ok(self.primary.as_mut().unwrap())
}
pub fn convert(
&mut self,
bytes: &[u8],
password: Option<&str>,
name: &str,
) -> Result<DoclingDocument, PdfError> {
let pages = pdfium_backend::page_count(bytes, password)?;
let doc = if self.target_workers >= 2 && pages >= self.parallel_min {
self.convert_parallel(bytes, password, name)?
} else {
self.convert_serial(bytes, password, name)?
};
timing::report();
Ok(doc)
}
fn convert_serial(
&mut self,
bytes: &[u8],
password: Option<&str>,
name: &str,
) -> Result<DoclingDocument, PdfError> {
let mut doc = DoclingDocument::new(name);
let render_image = !self.no_ocr;
let worker = self.primary()?;
pdfium_backend::for_each_page(bytes, password, render_image, |n, _total, mut page| {
let (nodes, links) = worker.process(n, &mut page)?;
doc.nodes.extend(nodes);
doc.links.extend(links);
Ok::<(), PdfError>(())
})?;
assemble::merge_continuations(&mut doc.nodes);
Ok(doc)
}
fn convert_parallel(
&mut self,
bytes: &[u8],
password: Option<&str>,
name: &str,
) -> Result<DoclingDocument, PdfError> {
self.ensure_pool()?;
let n_workers = self.pool.len();
let render_image = !self.no_ocr;
let (work_tx, work_rx) = sync_channel::<(usize, PdfPage)>(n_workers * 2);
let work_rx: Arc<Mutex<Receiver<(usize, PdfPage)>>> = Arc::new(Mutex::new(work_rx));
let results: Arc<Mutex<Vec<(usize, PageOut)>>> = Arc::new(Mutex::new(Vec::new()));
let first_err: Arc<Mutex<Option<PdfError>>> = Arc::new(Mutex::new(None));
let mut workers = std::mem::take(&mut self.pool);
std::thread::scope(|s| {
for worker in workers.iter_mut() {
let work_rx = Arc::clone(&work_rx);
let results = Arc::clone(&results);
let first_err = Arc::clone(&first_err);
s.spawn(move || loop {
let item = work_rx.lock().unwrap().recv();
let Ok((idx, mut page)) = item else { break };
match worker.process(idx, &mut page) {
Ok(out) => results.lock().unwrap().push((idx, out)),
Err(e) => {
let mut slot = first_err.lock().unwrap();
if slot.is_none() {
*slot = Some(e);
}
}
}
});
}
let render =
pdfium_backend::for_each_page(bytes, password, render_image, |i, _total, page| {
work_tx
.send((i, page))
.map_err(|_| PdfError::Pdfium("page-worker channel closed".into()))
});
drop(work_tx);
if let Err(e) = render {
let mut slot = first_err.lock().unwrap();
if slot.is_none() {
*slot = Some(e);
}
}
});
self.pool = workers;
if let Some(e) = first_err.lock().unwrap().take() {
return Err(e);
}
let mut results = Arc::try_unwrap(results)
.unwrap_or_else(|arc| Mutex::new(arc.lock().unwrap().clone()))
.into_inner()
.unwrap();
results.sort_by_key(|(idx, _)| *idx);
let mut doc = DoclingDocument::new(name);
for (_, (nodes, links)) in results {
doc.nodes.extend(nodes);
doc.links.extend(links);
}
assemble::merge_continuations(&mut doc.nodes);
Ok(doc)
}
pub fn convert_streaming<F>(
&mut self,
bytes: &[u8],
password: Option<&str>,
name: &str,
emit: F,
) -> Result<(), PdfError>
where
F: FnMut(Vec<Node>, Vec<(String, String)>) -> Result<(), PdfError>,
{
let _ = name; let pages = pdfium_backend::page_count(bytes, password)?;
let r = if self.target_workers >= 2 && pages >= self.parallel_min {
self.convert_streaming_parallel(bytes, password, emit)
} else {
self.convert_streaming_serial(bytes, password, emit)
};
timing::report();
r
}
fn convert_streaming_serial<F>(
&mut self,
bytes: &[u8],
password: Option<&str>,
mut emit: F,
) -> Result<(), PdfError>
where
F: FnMut(Vec<Node>, Vec<(String, String)>) -> Result<(), PdfError>,
{
let mut asm = assemble::StreamAssembler::new();
let render_image = !self.no_ocr;
let worker = self.primary()?;
pdfium_backend::for_each_page(bytes, password, render_image, |n, _total, mut page| {
let (nodes, links) = worker.process(n, &mut page)?;
emit(asm.push(nodes), links)
})?;
emit(asm.finish(), Vec::new())
}
fn convert_streaming_parallel<F>(
&mut self,
bytes: &[u8],
password: Option<&str>,
mut emit: F,
) -> Result<(), PdfError>
where
F: FnMut(Vec<Node>, Vec<(String, String)>) -> Result<(), PdfError>,
{
self.ensure_pool()?;
let n_workers = self.pool.len();
let render_image = !self.no_ocr;
let (work_tx, work_rx) = sync_channel::<(usize, PdfPage)>(n_workers * 2);
let work_rx: Arc<Mutex<Receiver<(usize, PdfPage)>>> = Arc::new(Mutex::new(work_rx));
let (res_tx, res_rx) = sync_channel::<Result<(usize, PageOut), PdfError>>(n_workers * 2);
let mut workers = std::mem::take(&mut self.pool);
let mut asm = assemble::StreamAssembler::new();
let mut first_err: Option<PdfError> = None;
std::thread::scope(|s| {
for worker in workers.iter_mut() {
let work_rx = Arc::clone(&work_rx);
let res_tx = res_tx.clone();
s.spawn(move || loop {
let item = work_rx.lock().unwrap().recv();
let Ok((idx, mut page)) = item else { break };
let out = worker.process(idx, &mut page).map(|o| (idx, o));
if res_tx.send(out).is_err() {
break; }
});
}
{
let res_tx = res_tx.clone();
s.spawn(move || {
let render = pdfium_backend::for_each_page(
bytes,
password,
render_image,
|i, _total, page| {
work_tx
.send((i, page))
.map_err(|_| PdfError::Pdfium("page-worker channel closed".into()))
},
);
drop(work_tx); if let Err(e) = render {
let _ = res_tx.send(Err(e));
}
});
}
drop(res_tx);
let mut buffer: BTreeMap<usize, PageOut> = BTreeMap::new();
let mut next = 0usize;
for msg in res_rx.iter() {
match msg {
Err(e) => {
if first_err.is_none() {
first_err = Some(e);
}
}
Ok((idx, out)) => {
buffer.insert(idx, out);
if first_err.is_some() {
continue; }
while let Some((nodes, links)) = buffer.remove(&next) {
if let Err(e) = emit(asm.push(nodes), links) {
first_err = Some(e);
break;
}
next += 1;
}
}
}
}
});
self.pool = workers;
if let Some(e) = first_err {
return Err(e);
}
emit(asm.finish(), Vec::new())
}
fn ensure_pool(&mut self) -> Result<(), PdfError> {
let need = self.target_workers.saturating_sub(self.pool.len());
if need == 0 {
return Ok(());
}
let intra = pdf_intra();
let no_ocr = self.no_ocr;
let tables = self.tables_slot();
let loaded: Vec<Result<Worker, PdfError>> = std::thread::scope(|s| {
let handles: Vec<_> = (0..need)
.map(|_| {
let tables = tables.clone();
s.spawn(move || Worker::load(intra, tables, no_ocr))
})
.collect();
handles.into_iter().map(|h| h.join().unwrap()).collect()
});
for w in loaded {
self.pool.push(w?);
}
Ok(())
}
pub fn convert_image(&mut self, bytes: &[u8], name: &str) -> Result<DoclingDocument, PdfError> {
let image = image::load_from_memory(bytes)
.map_err(|e| PdfError::Pdfium(format!("image: {e}")))?
.into_rgb8();
let (w, h) = image.dimensions();
let page = PdfPage {
width: w as f32,
height: h as f32,
scale: 1.0,
cells: Vec::new(),
code_cells: Vec::new(),
word_cells: Vec::new(),
image,
links: Vec::new(),
};
self.process_pages(vec![page], name)
}
fn process_pages(
&mut self,
mut pages: Vec<PdfPage>,
name: &str,
) -> Result<DoclingDocument, PdfError> {
let mut doc = DoclingDocument::new(name);
let worker = self.primary()?;
for (n, page) in pages.iter_mut().enumerate() {
let (nodes, links) = worker.process(n, page)?;
doc.nodes.extend(nodes);
doc.links.extend(links);
}
assemble::merge_continuations(&mut doc.nodes);
Ok(doc)
}
}
pub fn convert(
bytes: &[u8],
password: Option<&str>,
name: &str,
) -> Result<DoclingDocument, PdfError> {
convert_with_options(bytes, password, name, false, false)
}
pub fn convert_with_options(
bytes: &[u8],
password: Option<&str>,
name: &str,
no_table_former: bool,
no_ocr: bool,
) -> Result<DoclingDocument, PdfError> {
Pipeline::new()?
.no_table_former(no_table_former)
.no_ocr(no_ocr)
.convert(bytes, password, name)
}
pub fn convert_image(bytes: &[u8], name: &str) -> Result<DoclingDocument, PdfError> {
convert_image_with_options(bytes, name, false, false)
}
pub fn convert_image_with_options(
bytes: &[u8],
name: &str,
no_table_former: bool,
no_ocr: bool,
) -> Result<DoclingDocument, PdfError> {
Pipeline::new()?
.no_table_former(no_table_former)
.no_ocr(no_ocr)
.convert_image(bytes, name)
}
pub fn convert_pages(pages: Vec<PdfPage>, name: &str) -> Result<DoclingDocument, PdfError> {
convert_pages_with_options(pages, name, false, false)
}
pub fn convert_pages_with_options(
pages: Vec<PdfPage>,
name: &str,
no_table_former: bool,
no_ocr: bool,
) -> Result<DoclingDocument, PdfError> {
Pipeline::new()?
.no_table_former(no_table_former)
.no_ocr(no_ocr)
.process_pages(pages, name)
}