mod assemble;
mod dp_lines;
pub mod layout;
mod mets;
mod ocr;
pub mod pdfium_backend;
pub mod resample;
pub mod tableformer;
pub mod textparse;
pub mod timing;
use std::collections::BTreeMap;
use std::fmt;
use std::sync::mpsc::{sync_channel, Receiver};
use std::sync::{Arc, Mutex};
use fleischwolf_core::{DoclingDocument, Node};
pub use mets::convert_mets_gbs;
pub use pdfium_backend::{PdfDocument, PdfPage, TextCell};
#[derive(Debug)]
pub enum PdfError {
Pdfium(String),
Layout(String),
Ocr(String),
}
impl fmt::Display for PdfError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
PdfError::Pdfium(m) => write!(f, "pdf: pdfium error: {m}"),
PdfError::Layout(m) => write!(f, "pdf: {m}"),
PdfError::Ocr(m) => write!(f, "pdf: {m}"),
}
}
}
impl std::error::Error for PdfError {}
impl From<pdfium_render::prelude::PdfiumError> for PdfError {
fn from(e: pdfium_render::prelude::PdfiumError) -> Self {
PdfError::Pdfium(e.to_string())
}
}
pub(crate) fn intra_threads() -> usize {
if let Some(n) = std::env::var("FLEISCHWOLF_PDF_THREADS")
.ok()
.and_then(|v| v.parse::<usize>().ok())
.filter(|&n| n > 0)
{
return n;
}
std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1)
}
type PageOut = (Vec<Node>, Vec<(String, String)>);
struct Worker {
layout: layout::LayoutModel,
ocr: Option<ocr::OcrModel>,
tables: Option<tableformer::TableFormer>,
}
impl Worker {
fn load(intra: usize) -> Result<Self, PdfError> {
Ok(Self {
layout: layout::LayoutModel::load_with(intra).map_err(PdfError::Layout)?,
ocr: None,
tables: tableformer::TableFormer::load_with(intra),
})
}
fn process(&mut self, n: usize, page: &mut PdfPage) -> Result<PageOut, PdfError> {
let regions = timing::timed("layout.predict", || {
self.layout.predict(&page.image, page.width, page.height)
})
.map_err(|e| PdfError::Layout(format!("page {}: {e}", n + 1)))?;
let mut regions = assemble::resolve(regions);
assemble::add_orphan_regions(&mut regions, &page.cells);
assemble::drop_false_pictures(&mut regions, &page.cells, page.width, page.height);
if page.cells.is_empty() {
if self.ocr.is_none() {
self.ocr = Some(ocr::OcrModel::load().map_err(PdfError::Ocr)?);
}
let cells = self
.ocr
.as_mut()
.unwrap()
.ocr_page(&page.image, ®ions, page.scale)
.map_err(|e| PdfError::Ocr(format!("page {}: {e}", n + 1)))?;
page.cells = cells;
}
let mut table_rows: Vec<Option<Vec<Vec<String>>>> = vec![None; regions.len()];
if let Some(tf) = self.tables.as_mut() {
timing::timed("tableformer", || {
for (i, r) in regions.iter().enumerate() {
if r.label == "table" {
table_rows[i] = tf.predict_table_rows(
&page.image,
page.height,
[r.l, r.t, r.r, r.b],
&page.word_cells,
);
}
}
});
}
Ok(timing::timed("assemble_page", || {
assemble::assemble_page(page, regions, &table_rows)
}))
}
}
fn pdf_intra() -> usize {
if let Some(n) = std::env::var("FLEISCHWOLF_PDF_INTRA")
.ok()
.and_then(|v| v.parse::<usize>().ok())
.filter(|&n| n > 0)
{
return n;
}
if intra_threads() >= 2 {
2
} else {
1
}
}
fn pdf_worker_count() -> usize {
if let Some(n) = std::env::var("FLEISCHWOLF_PDF_WORKERS")
.ok()
.and_then(|v| v.parse::<usize>().ok())
.filter(|&n| n > 0)
{
return n;
}
(intra_threads() / pdf_intra()).clamp(1, 4)
}
fn pdf_parallel_min() -> usize {
std::env::var("FLEISCHWOLF_PDF_PARALLEL_MIN")
.ok()
.and_then(|v| v.parse::<usize>().ok())
.filter(|&n| n > 0)
.unwrap_or(6)
}
pub struct Pipeline {
primary: Option<Worker>,
pool: Vec<Worker>,
target_workers: usize,
parallel_min: usize,
}
impl Pipeline {
pub fn new() -> Result<Self, PdfError> {
Ok(Self {
primary: None,
pool: Vec::new(),
target_workers: pdf_worker_count(),
parallel_min: pdf_parallel_min(),
})
}
fn primary(&mut self) -> Result<&mut Worker, PdfError> {
if self.primary.is_none() {
self.primary = Some(Worker::load(intra_threads())?);
}
Ok(self.primary.as_mut().unwrap())
}
pub fn convert(
&mut self,
bytes: &[u8],
password: Option<&str>,
name: &str,
) -> Result<DoclingDocument, PdfError> {
let pages = pdfium_backend::page_count(bytes, password)?;
let doc = if self.target_workers >= 2 && pages >= self.parallel_min {
self.convert_parallel(bytes, password, name)?
} else {
self.convert_serial(bytes, password, name)?
};
timing::report();
Ok(doc)
}
fn convert_serial(
&mut self,
bytes: &[u8],
password: Option<&str>,
name: &str,
) -> Result<DoclingDocument, PdfError> {
let mut doc = DoclingDocument::new(name);
let worker = self.primary()?;
pdfium_backend::for_each_page(bytes, password, |n, _total, mut page| {
let (nodes, links) = worker.process(n, &mut page)?;
doc.nodes.extend(nodes);
doc.links.extend(links);
Ok::<(), PdfError>(())
})?;
assemble::merge_continuations(&mut doc.nodes);
Ok(doc)
}
fn convert_parallel(
&mut self,
bytes: &[u8],
password: Option<&str>,
name: &str,
) -> Result<DoclingDocument, PdfError> {
self.ensure_pool()?;
let n_workers = self.pool.len();
let (work_tx, work_rx) = sync_channel::<(usize, PdfPage)>(n_workers * 2);
let work_rx: Arc<Mutex<Receiver<(usize, PdfPage)>>> = Arc::new(Mutex::new(work_rx));
let results: Arc<Mutex<Vec<(usize, PageOut)>>> = Arc::new(Mutex::new(Vec::new()));
let first_err: Arc<Mutex<Option<PdfError>>> = Arc::new(Mutex::new(None));
let mut workers = std::mem::take(&mut self.pool);
std::thread::scope(|s| {
for worker in workers.iter_mut() {
let work_rx = Arc::clone(&work_rx);
let results = Arc::clone(&results);
let first_err = Arc::clone(&first_err);
s.spawn(move || loop {
let item = work_rx.lock().unwrap().recv();
let Ok((idx, mut page)) = item else { break };
match worker.process(idx, &mut page) {
Ok(out) => results.lock().unwrap().push((idx, out)),
Err(e) => {
let mut slot = first_err.lock().unwrap();
if slot.is_none() {
*slot = Some(e);
}
}
}
});
}
let render = pdfium_backend::for_each_page(bytes, password, |i, _total, page| {
work_tx
.send((i, page))
.map_err(|_| PdfError::Pdfium("page-worker channel closed".into()))
});
drop(work_tx);
if let Err(e) = render {
let mut slot = first_err.lock().unwrap();
if slot.is_none() {
*slot = Some(e);
}
}
});
self.pool = workers;
if let Some(e) = first_err.lock().unwrap().take() {
return Err(e);
}
let mut results = Arc::try_unwrap(results)
.unwrap_or_else(|arc| Mutex::new(arc.lock().unwrap().clone()))
.into_inner()
.unwrap();
results.sort_by_key(|(idx, _)| *idx);
let mut doc = DoclingDocument::new(name);
for (_, (nodes, links)) in results {
doc.nodes.extend(nodes);
doc.links.extend(links);
}
assemble::merge_continuations(&mut doc.nodes);
Ok(doc)
}
pub fn convert_streaming<F>(
&mut self,
bytes: &[u8],
password: Option<&str>,
name: &str,
emit: F,
) -> Result<(), PdfError>
where
F: FnMut(Vec<Node>, Vec<(String, String)>) -> Result<(), PdfError>,
{
let _ = name; let pages = pdfium_backend::page_count(bytes, password)?;
let r = if self.target_workers >= 2 && pages >= self.parallel_min {
self.convert_streaming_parallel(bytes, password, emit)
} else {
self.convert_streaming_serial(bytes, password, emit)
};
timing::report();
r
}
fn convert_streaming_serial<F>(
&mut self,
bytes: &[u8],
password: Option<&str>,
mut emit: F,
) -> Result<(), PdfError>
where
F: FnMut(Vec<Node>, Vec<(String, String)>) -> Result<(), PdfError>,
{
let mut asm = assemble::StreamAssembler::new();
let worker = self.primary()?;
pdfium_backend::for_each_page(bytes, password, |n, _total, mut page| {
let (nodes, links) = worker.process(n, &mut page)?;
emit(asm.push(nodes), links)
})?;
emit(asm.finish(), Vec::new())
}
fn convert_streaming_parallel<F>(
&mut self,
bytes: &[u8],
password: Option<&str>,
mut emit: F,
) -> Result<(), PdfError>
where
F: FnMut(Vec<Node>, Vec<(String, String)>) -> Result<(), PdfError>,
{
self.ensure_pool()?;
let n_workers = self.pool.len();
let (work_tx, work_rx) = sync_channel::<(usize, PdfPage)>(n_workers * 2);
let work_rx: Arc<Mutex<Receiver<(usize, PdfPage)>>> = Arc::new(Mutex::new(work_rx));
let (res_tx, res_rx) = sync_channel::<Result<(usize, PageOut), PdfError>>(n_workers * 2);
let mut workers = std::mem::take(&mut self.pool);
let mut asm = assemble::StreamAssembler::new();
let mut first_err: Option<PdfError> = None;
std::thread::scope(|s| {
for worker in workers.iter_mut() {
let work_rx = Arc::clone(&work_rx);
let res_tx = res_tx.clone();
s.spawn(move || loop {
let item = work_rx.lock().unwrap().recv();
let Ok((idx, mut page)) = item else { break };
let out = worker.process(idx, &mut page).map(|o| (idx, o));
if res_tx.send(out).is_err() {
break; }
});
}
{
let res_tx = res_tx.clone();
s.spawn(move || {
let render =
pdfium_backend::for_each_page(bytes, password, |i, _total, page| {
work_tx
.send((i, page))
.map_err(|_| PdfError::Pdfium("page-worker channel closed".into()))
});
drop(work_tx); if let Err(e) = render {
let _ = res_tx.send(Err(e));
}
});
}
drop(res_tx);
let mut buffer: BTreeMap<usize, PageOut> = BTreeMap::new();
let mut next = 0usize;
for msg in res_rx.iter() {
match msg {
Err(e) => {
if first_err.is_none() {
first_err = Some(e);
}
}
Ok((idx, out)) => {
buffer.insert(idx, out);
if first_err.is_some() {
continue; }
while let Some((nodes, links)) = buffer.remove(&next) {
if let Err(e) = emit(asm.push(nodes), links) {
first_err = Some(e);
break;
}
next += 1;
}
}
}
}
});
self.pool = workers;
if let Some(e) = first_err {
return Err(e);
}
emit(asm.finish(), Vec::new())
}
fn ensure_pool(&mut self) -> Result<(), PdfError> {
let need = self.target_workers.saturating_sub(self.pool.len());
if need == 0 {
return Ok(());
}
let intra = pdf_intra();
let loaded: Vec<Result<Worker, PdfError>> = std::thread::scope(|s| {
let handles: Vec<_> = (0..need)
.map(|_| s.spawn(move || Worker::load(intra)))
.collect();
handles.into_iter().map(|h| h.join().unwrap()).collect()
});
for w in loaded {
self.pool.push(w?);
}
Ok(())
}
pub fn convert_image(&mut self, bytes: &[u8], name: &str) -> Result<DoclingDocument, PdfError> {
let image = image::load_from_memory(bytes)
.map_err(|e| PdfError::Pdfium(format!("image: {e}")))?
.into_rgb8();
let (w, h) = image.dimensions();
let page = PdfPage {
width: w as f32,
height: h as f32,
scale: 1.0,
cells: Vec::new(),
code_cells: Vec::new(),
word_cells: Vec::new(),
image,
links: Vec::new(),
};
self.process_pages(vec![page], name)
}
fn process_pages(
&mut self,
mut pages: Vec<PdfPage>,
name: &str,
) -> Result<DoclingDocument, PdfError> {
let mut doc = DoclingDocument::new(name);
let worker = self.primary()?;
for (n, page) in pages.iter_mut().enumerate() {
let (nodes, links) = worker.process(n, page)?;
doc.nodes.extend(nodes);
doc.links.extend(links);
}
assemble::merge_continuations(&mut doc.nodes);
Ok(doc)
}
}
pub fn convert(
bytes: &[u8],
password: Option<&str>,
name: &str,
) -> Result<DoclingDocument, PdfError> {
Pipeline::new()?.convert(bytes, password, name)
}
pub fn convert_image(bytes: &[u8], name: &str) -> Result<DoclingDocument, PdfError> {
Pipeline::new()?.convert_image(bytes, name)
}
pub fn convert_pages(pages: Vec<PdfPage>, name: &str) -> Result<DoclingDocument, PdfError> {
Pipeline::new()?.process_pages(pages, name)
}