use crate::checkpoint::{compute_conversion_id, CheckpointMeta, PageStats};
use crate::config::ConversionConfig;
use crate::error::Pdf2MdError;
use crate::output::{ConversionOutput, ConversionStats, DocumentMetadata, PageResult};
use crate::pipeline::render::EncodedPage;
use crate::pipeline::{input, llm, postprocess, render};
use edgequake_llm::{LLMProvider, ProviderFactory};
use futures::StreamExt;
use std::collections::HashSet;
use std::io::Write;
use std::path::Path;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tracing::{debug, info, warn};
pub async fn convert(
input_str: impl AsRef<str>,
config: &ConversionConfig,
) -> Result<ConversionOutput, Pdf2MdError> {
let total_start = Instant::now();
let input_str = input_str.as_ref();
info!("Starting conversion: {}", input_str);
let resolved = input::resolve_input(input_str, config.download_timeout_secs).await?;
let pdf_path = resolved.path().to_path_buf();
let provider = resolve_provider(config).await?;
let metadata = render::extract_metadata(&pdf_path, config.password.as_deref()).await?;
let total_pages = metadata.page_count;
info!("PDF has {} pages", total_pages);
let page_indices = config.pages.to_indices(total_pages);
if page_indices.is_empty() {
return Err(Pdf2MdError::PageOutOfRange {
page: 0,
total: total_pages,
});
}
debug!("Selected {} pages for conversion", page_indices.len());
let conversion_id = if config.checkpoint_store.is_some() {
let provider_name = resolve_provider_name(config);
let model_name = resolve_model_name(config, &provider_name);
Some(compute_conversion_id(
&pdf_path,
&provider_name,
&model_name,
config.fidelity,
config.dpi,
)?)
} else {
None
};
let mut resumed_results: Vec<PageResult> = Vec::new();
let mut pages_to_process: Vec<usize> = page_indices.clone();
if let (Some(ref store), Some(ref conv_id)) = (&config.checkpoint_store, &conversion_id) {
if config.no_resume {
info!("--no-resume: clearing existing checkpoints for {}", conv_id);
store.clear_checkpoints(conv_id)?;
} else {
let completed = store.list_completed_pages(conv_id)?;
if !completed.is_empty() {
let completed_set: HashSet<usize> = completed.iter().copied().collect();
info!(
"Checkpoint: {} pages already completed for conversion {}",
completed.len(),
conv_id
);
for &page_num_1based in &completed {
let page_idx = page_num_1based.saturating_sub(1);
if page_indices.contains(&page_idx) {
if let Ok(Some(cp)) = store.load_page_checkpoint(conv_id, page_num_1based) {
resumed_results.push(PageResult {
page_num: cp.page_number,
markdown: cp.markdown,
input_tokens: cp.stats.input_tokens,
output_tokens: cp.stats.output_tokens,
duration_ms: cp.stats.duration_ms,
retries: cp.stats.retries,
error: None,
});
}
}
}
pages_to_process.retain(|idx| !completed_set.contains(&(idx + 1)));
}
}
let provider_name = resolve_provider_name(config);
let model_name = resolve_model_name(config, &provider_name);
let meta = CheckpointMeta {
conversion_id: conv_id.clone(),
pdf_path: pdf_path.display().to_string(),
provider_name,
model_name,
fidelity: format!("{:?}", config.fidelity),
dpi: config.dpi,
maintain_format: config.maintain_format,
created_at: chrono_now_iso(),
};
if let Err(e) = store.save_meta(conv_id, &meta) {
warn!("Failed to save checkpoint metadata: {}", e);
}
}
let resumed_count = resumed_results.len();
if let Some(ref cb) = config.progress_callback {
cb.on_conversion_start(page_indices.len());
}
if let Some(ref cb) = config.progress_callback {
for pr in &resumed_results {
cb.on_page_resumed(pr.page_num, page_indices.len());
}
}
let pipeline_start = Instant::now();
let selected_count = page_indices.len();
let (fresh_results, cumulative_render_ms) = if pages_to_process.is_empty() {
info!(
"All {} pages loaded from checkpoint — no VLM calls needed",
resumed_count
);
(Vec::new(), 0u64)
} else {
info!(
"Processing {} fresh pages ({} resumed from checkpoint)",
pages_to_process.len(),
resumed_count
);
let rx = render::spawn_lazy_render_encode(
&pdf_path,
config,
&pages_to_process,
config.concurrency,
)
.await?;
info!(
"Lazy pipeline started for {} pages (concurrency={})",
pages_to_process.len(),
config.concurrency
);
if config.maintain_format {
process_sequential_lazy(rx, &provider, config, selected_count, &conversion_id).await
} else {
process_concurrent_lazy(rx, &provider, config, selected_count, &conversion_id).await
}
};
let pipeline_duration_ms = pipeline_start.elapsed().as_millis() as u64;
let render_duration_ms = cumulative_render_ms;
let llm_duration_ms = pipeline_duration_ms;
info!(
"Pipeline complete: {} fresh + {} resumed results in {}ms (render={}ms)",
fresh_results.len(),
resumed_count,
pipeline_duration_ms,
render_duration_ms
);
let fresh_results: Vec<PageResult> = fresh_results
.into_iter()
.map(|mut pr| {
if pr.error.is_none() {
pr.markdown = postprocess::clean_markdown(&pr.markdown);
}
pr
})
.collect();
let mut pages: Vec<PageResult> = resumed_results;
pages.extend(fresh_results);
pages.sort_by_key(|p| p.page_num);
let markdown = assemble_document(&pages, config, &metadata);
let processed = pages.iter().filter(|p| p.error.is_none()).count();
let failed = pages.iter().filter(|p| p.error.is_some()).count();
let skipped = page_indices.len().saturating_sub(pages.len());
if processed == 0 {
let first_error = pages
.iter()
.find_map(|p| p.error.as_ref())
.map(|e| format!("{}", e))
.unwrap_or_else(|| "Unknown error".to_string());
return Err(Pdf2MdError::AllPagesFailed {
total: pages.len(),
retries: config.max_retries,
first_error,
});
}
let stats = ConversionStats {
total_pages,
processed_pages: processed,
failed_pages: failed,
skipped_pages: skipped,
resumed_pages: resumed_count,
total_input_tokens: pages.iter().map(|p| p.input_tokens as u64).sum(),
total_output_tokens: pages.iter().map(|p| p.output_tokens as u64).sum(),
total_duration_ms: total_start.elapsed().as_millis() as u64,
render_duration_ms,
llm_duration_ms,
};
info!(
"Conversion complete: {}/{} pages ({} resumed), {}ms total",
processed, total_pages, resumed_count, stats.total_duration_ms
);
if let (Some(ref store), Some(ref conv_id)) = (&config.checkpoint_store, &conversion_id) {
if failed == 0 {
info!("All pages succeeded — clearing checkpoints for {}", conv_id);
if let Err(e) = store.clear_checkpoints(conv_id) {
warn!("Failed to clear checkpoints: {}", e);
}
} else {
info!(
"{} pages failed — keeping checkpoints for resume ({})",
failed, conv_id
);
}
}
if let Some(ref cb) = config.progress_callback {
cb.on_conversion_complete(page_indices.len(), processed);
}
Ok(ConversionOutput {
markdown,
pages,
metadata,
stats,
})
}
pub async fn convert_to_file(
input_str: impl AsRef<str>,
output_path: impl AsRef<Path>,
config: &ConversionConfig,
) -> Result<ConversionStats, Pdf2MdError> {
let output = convert(input_str, config).await?;
write_markdown_to_file(output_path.as_ref(), &output.markdown).await?;
Ok(output.stats)
}
pub(crate) async fn write_markdown_to_file(path: &Path, content: &str) -> Result<(), Pdf2MdError> {
if let Some(parent) = path.parent() {
if !parent.as_os_str().is_empty() {
tokio::fs::create_dir_all(parent).await.map_err(|e| {
Pdf2MdError::OutputWriteFailed {
path: path.to_path_buf(),
source: e,
}
})?;
}
}
if path.exists() {
info!("Overwriting existing output file: {}", path.display());
}
let tmp_name = format!(
".{}.{}.tmp",
path.file_name().unwrap_or_default().to_string_lossy(),
std::process::id()
);
let tmp_path = path.with_file_name(&tmp_name);
tokio::fs::write(&tmp_path, content)
.await
.map_err(|e| Pdf2MdError::OutputWriteFailed {
path: path.to_path_buf(),
source: e,
})?;
match tokio::fs::rename(&tmp_path, path).await {
Ok(()) => {
debug!("Atomic rename succeeded: {} → {}", tmp_name, path.display());
}
Err(rename_err) => {
debug!(
"Atomic rename failed ({}), falling back to direct write",
rename_err
);
tokio::fs::write(path, content)
.await
.map_err(|e| Pdf2MdError::OutputWriteFailed {
path: path.to_path_buf(),
source: e,
})?;
tokio::fs::remove_file(&tmp_path).await.ok();
}
}
Ok(())
}
pub fn convert_sync(
input_str: impl AsRef<str>,
config: &ConversionConfig,
) -> Result<ConversionOutput, Pdf2MdError> {
tokio::runtime::Runtime::new()
.map_err(|e| Pdf2MdError::Internal(format!("Failed to create tokio runtime: {}", e)))?
.block_on(convert(input_str, config))
}
pub async fn inspect(input_str: impl AsRef<str>) -> Result<DocumentMetadata, Pdf2MdError> {
let resolved = input::resolve_input(input_str.as_ref(), 120).await?;
let pdf_path = resolved.path().to_path_buf();
render::extract_metadata(&pdf_path, None).await
}
pub async fn convert_from_bytes(
bytes: &[u8],
config: &ConversionConfig,
) -> Result<ConversionOutput, Pdf2MdError> {
let mut tmp = tempfile::NamedTempFile::new()
.map_err(|e| Pdf2MdError::Internal(format!("tempfile: {e}")))?;
tmp.write_all(bytes)
.map_err(|e| Pdf2MdError::Internal(format!("tempfile write: {e}")))?;
let path = tmp.path().to_string_lossy().to_string();
convert(&path, config).await
}
fn default_vision_model_for_provider(provider_name: &str) -> &'static str {
match provider_name {
"bedrock" | "aws-bedrock" | "aws_bedrock" => "amazon.nova-lite-v1:0",
"mistral" | "mistral-ai" | "mistralai" => "pixtral-12b-2409",
"ollama" => "llava",
"lmstudio" | "lm-studio" | "lm_studio" => "llava",
_ => "gpt-4.1-nano",
}
}
fn create_vision_provider(
provider_name: &str,
model: &str,
) -> Result<Arc<dyn LLMProvider>, Pdf2MdError> {
ProviderFactory::create_llm_provider(provider_name, model).map_err(|e| {
Pdf2MdError::ProviderNotConfigured {
provider: provider_name.to_string(),
hint: format!("{e}"),
}
})
}
async fn resolve_provider(config: &ConversionConfig) -> Result<Arc<dyn LLMProvider>, Pdf2MdError> {
if let Some(ref provider) = config.provider {
return Ok(Arc::clone(provider));
}
if let Some(ref name) = config.provider_name {
let model = config
.model
.as_deref()
.unwrap_or_else(|| default_vision_model_for_provider(name));
return create_vision_provider(name, model);
}
if let (Ok(prov), Ok(model)) = (
std::env::var("EDGEQUAKE_LLM_PROVIDER"),
std::env::var("EDGEQUAKE_MODEL"),
) {
if !prov.is_empty() && !model.is_empty() {
return create_vision_provider(&prov, &model);
}
}
if let Ok(aws_key) = std::env::var("AWS_ACCESS_KEY_ID") {
if !aws_key.is_empty() {
let model = config.model.as_deref().unwrap_or("amazon.nova-lite-v1:0");
return create_vision_provider("bedrock", model);
}
}
if let Ok(openai_key) = std::env::var("OPENAI_API_KEY") {
if !openai_key.is_empty() {
let model = config.model.as_deref().unwrap_or("gpt-4.1-nano");
return create_vision_provider("openai", model);
}
}
if let Ok(mistral_key) = std::env::var("MISTRAL_API_KEY") {
if !mistral_key.is_empty() {
let model = config.model.as_deref().unwrap_or("pixtral-12b-2409");
return create_vision_provider("mistral", model);
}
}
let (llm_provider, _embedding) =
ProviderFactory::from_env().map_err(|e| Pdf2MdError::ProviderNotConfigured {
provider: "auto".to_string(),
hint: format!(
"No LLM provider could be auto-detected from environment.\n\
Set AWS_ACCESS_KEY_ID + AWS_SECRET_ACCESS_KEY (Bedrock, recommended),\n\
OPENAI_API_KEY, ANTHROPIC_API_KEY, or configure a provider.\n\
Error: {}",
e
),
})?;
Ok(llm_provider)
}
async fn process_concurrent_lazy(
rx: mpsc::Receiver<EncodedPage>,
provider: &Arc<dyn LLMProvider>,
config: &ConversionConfig,
total_selected_pages: usize,
conversion_id: &Option<String>,
) -> (Vec<PageResult>, u64) {
let render_ms = Arc::new(AtomicU64::new(0));
let provider_ref = Arc::clone(provider);
let cfg_ref = config.clone();
let concurrency = config.concurrency;
let render_ms_clone = Arc::clone(&render_ms);
let conv_id = conversion_id.clone();
let results: Vec<PageResult> = ReceiverStream::new(rx)
.map(move |page| {
render_ms_clone.fetch_add(page.render_encode_ms, Ordering::Relaxed);
let prov = Arc::clone(&provider_ref);
let cfg = cfg_ref.clone();
let total = total_selected_pages;
let cid = conv_id.clone();
async move {
let page_num = page.page_index + 1;
if let Some(ref cb) = cfg.progress_callback {
cb.on_page_start(page_num, total);
}
let result = llm::process_page(&prov, page_num, page.image_data, None, &cfg).await;
if let Some(ref cb) = cfg.progress_callback {
match &result.error {
None => cb.on_page_complete(page_num, total, result.markdown.len()),
Some(e) => cb.on_page_error(page_num, total, e.to_string()),
}
}
if result.error.is_none() {
if let (Some(ref store), Some(ref id)) = (&cfg.checkpoint_store, &cid) {
let stats = PageStats {
input_tokens: result.input_tokens,
output_tokens: result.output_tokens,
duration_ms: result.duration_ms,
retries: result.retries,
};
if let Err(e) =
store.save_page_checkpoint(id, page_num, &result.markdown, &stats)
{
warn!("Failed to save checkpoint for page {}: {}", page_num, e);
}
}
}
result
}
})
.buffer_unordered(concurrency)
.collect()
.await;
(results, render_ms.load(Ordering::Relaxed))
}
async fn process_sequential_lazy(
rx: mpsc::Receiver<EncodedPage>,
provider: &Arc<dyn LLMProvider>,
config: &ConversionConfig,
total_selected_pages: usize,
conversion_id: &Option<String>,
) -> (Vec<PageResult>, u64) {
let mut results = Vec::new();
let mut prior_markdown: Option<String> = None;
let mut total_render_ms: u64 = 0;
let mut rx = rx;
while let Some(page) = rx.recv().await {
total_render_ms += page.render_encode_ms;
let page_num = page.page_index + 1;
if let Some(ref cb) = config.progress_callback {
cb.on_page_start(page_num, total_selected_pages);
}
let result = llm::process_page(
provider,
page_num,
page.image_data,
prior_markdown.as_deref(),
config,
)
.await;
if let Some(ref cb) = config.progress_callback {
match &result.error {
None => cb.on_page_complete(page_num, total_selected_pages, result.markdown.len()),
Some(e) => cb.on_page_error(page_num, total_selected_pages, e.to_string()),
}
}
if result.error.is_none() {
prior_markdown = Some(result.markdown.clone());
if let (Some(ref store), Some(ref conv_id)) = (&config.checkpoint_store, conversion_id)
{
let stats = PageStats {
input_tokens: result.input_tokens,
output_tokens: result.output_tokens,
duration_ms: result.duration_ms,
retries: result.retries,
};
if let Err(e) =
store.save_page_checkpoint(conv_id, page_num, &result.markdown, &stats)
{
warn!("Failed to save checkpoint for page {}: {}", page_num, e);
}
}
}
results.push(result);
}
(results, total_render_ms)
}
fn resolve_provider_name(config: &ConversionConfig) -> String {
if config.provider.is_some() {
return "custom".to_string();
}
if let Some(ref name) = config.provider_name {
return name.clone();
}
if let (Ok(prov), Ok(model)) = (
std::env::var("EDGEQUAKE_LLM_PROVIDER"),
std::env::var("EDGEQUAKE_MODEL"),
) {
if !prov.is_empty() && !model.is_empty() {
return prov;
}
}
if let Ok(key) = std::env::var("AWS_ACCESS_KEY_ID") {
if !key.is_empty() {
return "bedrock".to_string();
}
}
if let Ok(key) = std::env::var("OPENAI_API_KEY") {
if !key.is_empty() {
return "openai".to_string();
}
}
if let Ok(key) = std::env::var("MISTRAL_API_KEY") {
if !key.is_empty() {
return "mistral".to_string();
}
}
"auto".to_string()
}
fn resolve_model_name(config: &ConversionConfig, provider_name: &str) -> String {
if let Some(ref model) = config.model {
return model.clone();
}
default_vision_model_for_provider(provider_name).to_string()
}
fn chrono_now_iso() -> String {
use std::time::{SystemTime, UNIX_EPOCH};
let secs = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
format!("epoch:{}", secs)
}
fn assemble_document(
pages: &[PageResult],
config: &ConversionConfig,
metadata: &DocumentMetadata,
) -> String {
let mut parts: Vec<String> = Vec::new();
if config.include_metadata {
parts.push(format_yaml_front_matter(metadata));
}
let successful_pages: Vec<&PageResult> = pages.iter().filter(|p| p.error.is_none()).collect();
for (i, page) in successful_pages.iter().enumerate() {
if i > 0 {
parts.push(config.page_separator.render(page.page_num));
}
parts.push(page.markdown.clone());
}
parts.join("")
}
fn format_yaml_front_matter(meta: &DocumentMetadata) -> String {
let mut yaml = String::from("---\n");
if let Some(ref t) = meta.title {
yaml.push_str(&format!("title: \"{}\"\n", t));
}
if let Some(ref a) = meta.author {
yaml.push_str(&format!("author: \"{}\"\n", a));
}
if let Some(ref s) = meta.subject {
yaml.push_str(&format!("subject: \"{}\"\n", s));
}
if let Some(ref c) = meta.creator {
yaml.push_str(&format!("creator: \"{}\"\n", c));
}
if let Some(ref p) = meta.producer {
yaml.push_str(&format!("producer: \"{}\"\n", p));
}
yaml.push_str(&format!("pages: {}\n", meta.page_count));
if !meta.pdf_version.is_empty() {
yaml.push_str(&format!("pdf_version: \"{}\"\n", meta.pdf_version));
}
yaml.push_str("---\n\n");
yaml
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_write_creates_new_file() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("output.md");
assert!(!path.exists());
write_markdown_to_file(&path, "# Hello\n").await.unwrap();
assert!(path.exists());
assert_eq!(std::fs::read_to_string(&path).unwrap(), "# Hello\n");
}
#[tokio::test]
async fn test_write_overwrites_existing_file() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("paper.md");
std::fs::write(&path, "old content from first run").unwrap();
assert_eq!(
std::fs::read_to_string(&path).unwrap(),
"old content from first run"
);
write_markdown_to_file(&path, "new content from second run\n")
.await
.unwrap();
assert_eq!(
std::fs::read_to_string(&path).unwrap(),
"new content from second run\n"
);
}
#[tokio::test]
async fn test_write_overwrites_larger_file_with_smaller() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("output.md");
let large = "x".repeat(10_000);
std::fs::write(&path, &large).unwrap();
assert_eq!(std::fs::read_to_string(&path).unwrap().len(), 10_000);
write_markdown_to_file(&path, "small\n").await.unwrap();
let content = std::fs::read_to_string(&path).unwrap();
assert_eq!(content, "small\n");
assert_eq!(content.len(), 6);
}
#[tokio::test]
async fn test_write_creates_parent_directories() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("sub").join("dir").join("output.md");
assert!(!path.parent().unwrap().exists());
write_markdown_to_file(&path, "# Nested\n").await.unwrap();
assert!(path.exists());
assert_eq!(std::fs::read_to_string(&path).unwrap(), "# Nested\n");
}
#[tokio::test]
async fn test_write_no_leftover_temp_file() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("output.md");
write_markdown_to_file(&path, "# Clean\n").await.unwrap();
let entries: Vec<_> = std::fs::read_dir(dir.path())
.unwrap()
.filter_map(|e| e.ok())
.filter(|e| e.file_name().to_string_lossy().ends_with(".tmp"))
.collect();
assert!(
entries.is_empty(),
"Temp files should be cleaned up, found: {:?}",
entries.iter().map(|e| e.file_name()).collect::<Vec<_>>()
);
}
#[tokio::test]
async fn test_write_empty_content() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("empty.md");
write_markdown_to_file(&path, "").await.unwrap();
assert!(path.exists());
assert_eq!(std::fs::read_to_string(&path).unwrap(), "");
}
#[tokio::test]
async fn test_write_overwrites_multiple_times() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("multi.md");
for i in 0..5 {
let content = format!("# Version {i}\n");
write_markdown_to_file(&path, &content).await.unwrap();
assert_eq!(std::fs::read_to_string(&path).unwrap(), content);
}
}
#[test]
fn test_default_vision_model_mistral_variants() {
for name in &["mistral", "mistral-ai", "mistralai"] {
assert_eq!(
default_vision_model_for_provider(name),
"pixtral-12b-2409",
"provider '{}' should default to pixtral-12b-2409",
name
);
}
}
#[test]
fn test_default_vision_model_other_providers() {
for name in &["openai", "anthropic", "gemini", "azure", "unknown"] {
assert_eq!(
default_vision_model_for_provider(name),
"gpt-4.1-nano",
"provider '{}' should default to gpt-4.1-nano",
name
);
}
}
#[test]
fn test_default_vision_model_local_providers() {
for name in &["ollama", "lmstudio", "lm-studio", "lm_studio"] {
assert_eq!(
default_vision_model_for_provider(name),
"llava",
"provider '{}' should default to llava (vision-capable local model)",
name
);
}
}
}