use std::path::Path;
use rayon::prelude::*;
use crate::converters::ConversionOptions;
use crate::document::PdfDocument;
use crate::error::{Error, Result};
pub struct ParallelExtractor;
impl ParallelExtractor {
pub fn extract_all_text(path: &Path) -> Result<Vec<String>> {
let page_count = Self::get_page_count(path)?;
if page_count == 0 {
return Ok(Vec::new());
}
let path_buf = path.to_path_buf();
let num_threads = rayon::current_num_threads().max(1);
let batch_size = page_count.div_ceil(num_threads);
let batches: Vec<(usize, usize)> = (0..page_count)
.step_by(batch_size)
.map(|start| (start, (start + batch_size).min(page_count)))
.collect();
let batch_results: std::result::Result<Vec<Vec<(usize, String)>>, Error> = batches
.into_par_iter()
.map(|(start, end)| {
let mut doc = PdfDocument::open(&path_buf)?;
let mut results = Vec::with_capacity(end - start);
for page_index in start..end {
let text = doc.extract_text(page_index)?;
results.push((page_index, text));
}
Ok(results)
})
.collect();
let mut all_results: Vec<(usize, String)> = batch_results?.into_iter().flatten().collect();
all_results.sort_unstable_by_key(|(idx, _)| *idx);
Ok(all_results.into_iter().map(|(_, text)| text).collect())
}
pub fn extract_all_markdown(path: &Path, options: &ConversionOptions) -> Result<Vec<String>> {
let page_count = Self::get_page_count(path)?;
if page_count == 0 {
return Ok(Vec::new());
}
let path_buf = path.to_path_buf();
let options = options.clone();
let num_threads = rayon::current_num_threads().max(1);
let batch_size = page_count.div_ceil(num_threads);
let batches: Vec<(usize, usize)> = (0..page_count)
.step_by(batch_size)
.map(|start| (start, (start + batch_size).min(page_count)))
.collect();
let batch_results: std::result::Result<Vec<Vec<(usize, String)>>, Error> = batches
.into_par_iter()
.map(|(start, end)| {
let mut doc = PdfDocument::open(&path_buf)?;
let mut results = Vec::with_capacity(end - start);
for page_index in start..end {
let md = doc.to_markdown(page_index, &options)?;
results.push((page_index, md));
}
Ok(results)
})
.collect();
let mut all_results: Vec<(usize, String)> = batch_results?.into_iter().flatten().collect();
all_results.sort_unstable_by_key(|(idx, _)| *idx);
Ok(all_results.into_iter().map(|(_, md)| md).collect())
}
fn get_page_count(path: &Path) -> Result<usize> {
let mut doc = PdfDocument::open(path)?;
doc.page_count()
}
}
pub fn extract_all_text_parallel(path: &Path) -> Result<Vec<String>> {
ParallelExtractor::extract_all_text(path)
}
pub fn extract_all_markdown_parallel(
path: &Path,
options: &ConversionOptions,
) -> Result<Vec<String>> {
ParallelExtractor::extract_all_markdown(path, options)
}
#[cfg(test)]
mod tests {
use super::*;
use std::path::PathBuf;
fn fixture_path(name: &str) -> PathBuf {
PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("tests")
.join("fixtures")
.join(name)
}
#[test]
fn test_parallel_text_extraction_simple() {
let path = fixture_path("simple.pdf");
if !path.exists() {
eprintln!("Skipping test: fixture not found at {:?}", path);
return;
}
let pages = ParallelExtractor::extract_all_text(&path).expect("extraction should succeed");
assert!(!pages.is_empty(), "should extract at least one page");
let mut doc = PdfDocument::open(&path).unwrap();
let expected_count = doc.page_count().unwrap();
assert_eq!(pages.len(), expected_count);
}
#[test]
fn test_parallel_text_matches_serial() {
let path = fixture_path("simple.pdf");
if !path.exists() {
eprintln!("Skipping test: fixture not found at {:?}", path);
return;
}
let mut doc = PdfDocument::open(&path).unwrap();
let page_count = doc.page_count().unwrap();
let serial: Vec<String> = (0..page_count)
.map(|i| doc.extract_text(i).unwrap())
.collect();
let parallel =
ParallelExtractor::extract_all_text(&path).expect("parallel extraction should succeed");
assert_eq!(serial.len(), parallel.len());
for (i, (s, p)) in serial.iter().zip(parallel.iter()).enumerate() {
assert_eq!(s, p, "page {} text differs between serial and parallel", i);
}
}
#[test]
fn test_parallel_markdown_extraction() {
let path = fixture_path("simple.pdf");
if !path.exists() {
eprintln!("Skipping test: fixture not found at {:?}", path);
return;
}
let opts = ConversionOptions::default();
let pages = ParallelExtractor::extract_all_markdown(&path, &opts)
.expect("markdown extraction should succeed");
assert!(!pages.is_empty(), "should extract at least one page");
let mut doc = PdfDocument::open(&path).unwrap();
let expected_count = doc.page_count().unwrap();
assert_eq!(pages.len(), expected_count);
}
#[test]
fn test_parallel_markdown_matches_serial() {
let path = fixture_path("simple.pdf");
if !path.exists() {
eprintln!("Skipping test: fixture not found at {:?}", path);
return;
}
let opts = ConversionOptions::default();
let mut doc = PdfDocument::open(&path).unwrap();
let page_count = doc.page_count().unwrap();
let serial: Vec<String> = (0..page_count)
.map(|i| doc.to_markdown(i, &opts).unwrap())
.collect();
let parallel = ParallelExtractor::extract_all_markdown(&path, &opts)
.expect("parallel extraction should succeed");
assert_eq!(serial.len(), parallel.len());
for (i, (s, p)) in serial.iter().zip(parallel.iter()).enumerate() {
assert_eq!(s, p, "page {} markdown differs between serial and parallel", i);
}
}
#[test]
fn test_parallel_nonexistent_file() {
let path = PathBuf::from("/nonexistent/file.pdf");
let result = ParallelExtractor::extract_all_text(&path);
assert!(result.is_err(), "should error on missing file");
}
#[test]
fn test_convenience_functions() {
let path = fixture_path("simple.pdf");
if !path.exists() {
eprintln!("Skipping test: fixture not found at {:?}", path);
return;
}
let text_pages = extract_all_text_parallel(&path).expect("convenience fn should work");
assert!(!text_pages.is_empty());
let opts = ConversionOptions::default();
let md_pages =
extract_all_markdown_parallel(&path, &opts).expect("convenience fn should work");
assert!(!md_pages.is_empty());
}
#[test]
fn test_parallel_preserves_page_order() {
let path = fixture_path("outline.pdf");
if !path.exists() {
eprintln!("Skipping test: fixture not found at {:?}", path);
return;
}
let mut doc = PdfDocument::open(&path).unwrap();
let page_count = doc.page_count().unwrap();
if page_count < 2 {
eprintln!("Skipping order test: need multi-page PDF");
return;
}
let serial: Vec<String> = (0..page_count)
.map(|i| doc.extract_text(i).unwrap())
.collect();
for _ in 0..5 {
let parallel = ParallelExtractor::extract_all_text(&path).unwrap();
assert_eq!(serial.len(), parallel.len());
for (i, (s, p)) in serial.iter().zip(parallel.iter()).enumerate() {
assert_eq!(s, p, "page {} text differs on repeated parallel run", i);
}
}
}
}