1mod assemble;
13mod dp_lines;
14pub mod layout;
15mod mets;
16mod ocr;
17pub mod pdfium_backend;
18pub mod resample;
19pub mod tableformer;
20pub mod textparse;
21pub mod timing;
22
23use std::fmt;
24use std::sync::mpsc::{sync_channel, Receiver};
25use std::sync::{Arc, Mutex};
26
27use fleischwolf_core::{DoclingDocument, Node};
28
29pub use mets::convert_mets_gbs;
30pub use pdfium_backend::{PdfDocument, PdfPage, TextCell};
31
32#[derive(Debug)]
34pub enum PdfError {
35 Pdfium(String),
37 Layout(String),
39 Ocr(String),
41}
42
43impl fmt::Display for PdfError {
44 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
45 match self {
46 PdfError::Pdfium(m) => write!(f, "pdf: pdfium error: {m}"),
47 PdfError::Layout(m) => write!(f, "pdf: {m}"),
48 PdfError::Ocr(m) => write!(f, "pdf: {m}"),
49 }
50 }
51}
52
53impl std::error::Error for PdfError {}
54
55impl From<pdfium_render::prelude::PdfiumError> for PdfError {
56 fn from(e: pdfium_render::prelude::PdfiumError) -> Self {
57 PdfError::Pdfium(e.to_string())
58 }
59}
60
61pub(crate) fn intra_threads() -> usize {
64 if let Some(n) = std::env::var("FLEISCHWOLF_PDF_THREADS")
65 .ok()
66 .and_then(|v| v.parse::<usize>().ok())
67 .filter(|&n| n > 0)
68 {
69 return n;
70 }
71 std::thread::available_parallelism()
72 .map(|n| n.get())
73 .unwrap_or(1)
74}
75
76type PageOut = (Vec<Node>, Vec<(String, String)>);
79
80struct Worker {
84 layout: layout::LayoutModel,
85 ocr: Option<ocr::OcrModel>,
86 tables: Option<tableformer::TableFormer>,
89}
90
91impl Worker {
92 fn load(intra: usize) -> Result<Self, PdfError> {
93 Ok(Self {
94 layout: layout::LayoutModel::load_with(intra).map_err(PdfError::Layout)?,
95 ocr: None,
96 tables: tableformer::TableFormer::load_with(intra),
97 })
98 }
99
100 fn process(&mut self, n: usize, page: &mut PdfPage) -> Result<PageOut, PdfError> {
104 let regions = timing::timed("layout.predict", || {
105 self.layout.predict(&page.image, page.width, page.height)
106 })
107 .map_err(|e| PdfError::Layout(format!("page {}: {e}", n + 1)))?;
108 let mut regions = assemble::resolve(regions);
110 assemble::add_orphan_regions(&mut regions, &page.cells);
112 assemble::drop_false_pictures(&mut regions, &page.cells, page.width, page.height);
114 if page.cells.is_empty() {
116 if self.ocr.is_none() {
117 self.ocr = Some(ocr::OcrModel::load().map_err(PdfError::Ocr)?);
118 }
119 let cells = self
120 .ocr
121 .as_mut()
122 .unwrap()
123 .ocr_page(&page.image, ®ions, page.scale)
124 .map_err(|e| PdfError::Ocr(format!("page {}: {e}", n + 1)))?;
125 page.cells = cells;
126 }
127 let mut table_rows: Vec<Option<Vec<Vec<String>>>> = vec![None; regions.len()];
129 if let Some(tf) = self.tables.as_mut() {
130 timing::timed("tableformer", || {
131 for (i, r) in regions.iter().enumerate() {
132 if r.label == "table" {
133 table_rows[i] = tf.predict_table_rows(
134 &page.image,
135 page.height,
136 [r.l, r.t, r.r, r.b],
137 &page.word_cells,
138 );
139 }
140 }
141 });
142 }
143 Ok(timing::timed("assemble_page", || {
144 assemble::assemble_page(page, regions, &table_rows)
145 }))
146 }
147}
148
149fn pdf_intra() -> usize {
154 if let Some(n) = std::env::var("FLEISCHWOLF_PDF_INTRA")
155 .ok()
156 .and_then(|v| v.parse::<usize>().ok())
157 .filter(|&n| n > 0)
158 {
159 return n;
160 }
161 if intra_threads() >= 2 {
162 2
163 } else {
164 1
165 }
166}
167
168fn pdf_worker_count() -> usize {
173 if let Some(n) = std::env::var("FLEISCHWOLF_PDF_WORKERS")
174 .ok()
175 .and_then(|v| v.parse::<usize>().ok())
176 .filter(|&n| n > 0)
177 {
178 return n;
179 }
180 (intra_threads() / pdf_intra()).clamp(1, 4)
181}
182
183fn pdf_parallel_min() -> usize {
188 std::env::var("FLEISCHWOLF_PDF_PARALLEL_MIN")
189 .ok()
190 .and_then(|v| v.parse::<usize>().ok())
191 .filter(|&n| n > 0)
192 .unwrap_or(6)
193}
194
195pub struct Pipeline {
201 primary: Option<Worker>,
203 pool: Vec<Worker>,
206 target_workers: usize,
208 parallel_min: usize,
210}
211
212impl Pipeline {
213 pub fn new() -> Result<Self, PdfError> {
217 Ok(Self {
218 primary: None,
219 pool: Vec::new(),
220 target_workers: pdf_worker_count(),
221 parallel_min: pdf_parallel_min(),
222 })
223 }
224
225 fn primary(&mut self) -> Result<&mut Worker, PdfError> {
227 if self.primary.is_none() {
228 self.primary = Some(Worker::load(intra_threads())?);
229 }
230 Ok(self.primary.as_mut().unwrap())
231 }
232
233 pub fn convert(
239 &mut self,
240 bytes: &[u8],
241 password: Option<&str>,
242 name: &str,
243 ) -> Result<DoclingDocument, PdfError> {
244 let pages = pdfium_backend::page_count(bytes, password)?;
245 let doc = if self.target_workers >= 2 && pages >= self.parallel_min {
246 self.convert_parallel(bytes, password, name)?
247 } else {
248 self.convert_serial(bytes, password, name)?
249 };
250 timing::report();
251 Ok(doc)
252 }
253
254 fn convert_serial(
257 &mut self,
258 bytes: &[u8],
259 password: Option<&str>,
260 name: &str,
261 ) -> Result<DoclingDocument, PdfError> {
262 let mut doc = DoclingDocument::new(name);
263 let worker = self.primary()?;
264 pdfium_backend::for_each_page(bytes, password, |n, _total, mut page| {
265 let (nodes, links) = worker.process(n, &mut page)?;
266 doc.nodes.extend(nodes);
267 doc.links.extend(links);
268 Ok::<(), PdfError>(())
269 })?;
270 assemble::merge_continuations(&mut doc.nodes);
271 Ok(doc)
272 }
273
274 fn convert_parallel(
280 &mut self,
281 bytes: &[u8],
282 password: Option<&str>,
283 name: &str,
284 ) -> Result<DoclingDocument, PdfError> {
285 self.ensure_pool()?;
286 let n_workers = self.pool.len();
287 let (work_tx, work_rx) = sync_channel::<(usize, PdfPage)>(n_workers * 2);
288 let work_rx: Arc<Mutex<Receiver<(usize, PdfPage)>>> = Arc::new(Mutex::new(work_rx));
289 let results: Arc<Mutex<Vec<(usize, PageOut)>>> = Arc::new(Mutex::new(Vec::new()));
290 let first_err: Arc<Mutex<Option<PdfError>>> = Arc::new(Mutex::new(None));
291
292 let mut workers = std::mem::take(&mut self.pool);
294 std::thread::scope(|s| {
295 for worker in workers.iter_mut() {
296 let work_rx = Arc::clone(&work_rx);
297 let results = Arc::clone(&results);
298 let first_err = Arc::clone(&first_err);
299 s.spawn(move || loop {
300 let item = work_rx.lock().unwrap().recv();
303 let Ok((idx, mut page)) = item else { break };
304 match worker.process(idx, &mut page) {
305 Ok(out) => results.lock().unwrap().push((idx, out)),
306 Err(e) => {
307 let mut slot = first_err.lock().unwrap();
308 if slot.is_none() {
309 *slot = Some(e);
310 }
311 }
312 }
313 });
314 }
315 let render = pdfium_backend::for_each_page(bytes, password, |i, _total, page| {
319 work_tx
320 .send((i, page))
321 .map_err(|_| PdfError::Pdfium("page-worker channel closed".into()))
322 });
323 drop(work_tx);
324 if let Err(e) = render {
325 let mut slot = first_err.lock().unwrap();
326 if slot.is_none() {
327 *slot = Some(e);
328 }
329 }
330 });
331 self.pool = workers;
333
334 if let Some(e) = first_err.lock().unwrap().take() {
335 return Err(e);
336 }
337 let mut results = Arc::try_unwrap(results)
338 .unwrap_or_else(|arc| Mutex::new(arc.lock().unwrap().clone()))
339 .into_inner()
340 .unwrap();
341 results.sort_by_key(|(idx, _)| *idx);
342 let mut doc = DoclingDocument::new(name);
343 for (_, (nodes, links)) in results {
344 doc.nodes.extend(nodes);
345 doc.links.extend(links);
346 }
347 assemble::merge_continuations(&mut doc.nodes);
348 Ok(doc)
349 }
350
351 fn ensure_pool(&mut self) -> Result<(), PdfError> {
355 let need = self.target_workers.saturating_sub(self.pool.len());
356 if need == 0 {
357 return Ok(());
358 }
359 let intra = pdf_intra();
360 let loaded: Vec<Result<Worker, PdfError>> = std::thread::scope(|s| {
361 let handles: Vec<_> = (0..need)
362 .map(|_| s.spawn(move || Worker::load(intra)))
363 .collect();
364 handles.into_iter().map(|h| h.join().unwrap()).collect()
365 });
366 for w in loaded {
367 self.pool.push(w?);
368 }
369 Ok(())
370 }
371
372 pub fn convert_image(&mut self, bytes: &[u8], name: &str) -> Result<DoclingDocument, PdfError> {
375 let image = image::load_from_memory(bytes)
376 .map_err(|e| PdfError::Pdfium(format!("image: {e}")))?
377 .into_rgb8();
378 let (w, h) = image.dimensions();
379 let page = PdfPage {
382 width: w as f32,
383 height: h as f32,
384 scale: 1.0,
385 cells: Vec::new(),
386 code_cells: Vec::new(),
387 word_cells: Vec::new(),
388 image,
389 links: Vec::new(),
390 };
391 self.process_pages(vec![page], name)
392 }
393
394 fn process_pages(
397 &mut self,
398 mut pages: Vec<PdfPage>,
399 name: &str,
400 ) -> Result<DoclingDocument, PdfError> {
401 let mut doc = DoclingDocument::new(name);
402 let worker = self.primary()?;
403 for (n, page) in pages.iter_mut().enumerate() {
404 let (nodes, links) = worker.process(n, page)?;
405 doc.nodes.extend(nodes);
406 doc.links.extend(links);
407 }
408 assemble::merge_continuations(&mut doc.nodes);
409 Ok(doc)
410 }
411}
412
413pub fn convert(
416 bytes: &[u8],
417 password: Option<&str>,
418 name: &str,
419) -> Result<DoclingDocument, PdfError> {
420 Pipeline::new()?.convert(bytes, password, name)
421}
422
423pub fn convert_image(bytes: &[u8], name: &str) -> Result<DoclingDocument, PdfError> {
425 Pipeline::new()?.convert_image(bytes, name)
426}
427
428pub fn convert_pages(pages: Vec<PdfPage>, name: &str) -> Result<DoclingDocument, PdfError> {
431 Pipeline::new()?.process_pages(pages, name)
432}