use crate::config::ConversionConfig;
use crate::error::{PageError, Pdf2MdError};
use crate::output::PageResult;
use crate::pipeline::{encode, input, llm, postprocess, render};
use edgequake_llm::{LLMProvider, ProviderFactory};
use futures::stream::{self, StreamExt};
use std::io::Write;
use std::pin::Pin;
use std::sync::Arc;
use tokio_stream::Stream;
use tracing::{info, warn};
pub type PageStream = Pin<Box<dyn Stream<Item = Result<PageResult, PageError>> + Send>>;
pub async fn convert_stream(
input_str: impl AsRef<str>,
config: &ConversionConfig,
) -> Result<PageStream, Pdf2MdError> {
let input_str = input_str.as_ref();
info!("Starting streaming conversion: {}", input_str);
let resolved = input::resolve_input(input_str, config.download_timeout_secs).await?;
let pdf_path = resolved.path().to_path_buf();
let provider = resolve_provider(config)?;
let metadata = render::extract_metadata(&pdf_path, config.password.as_deref()).await?;
let total_pages = metadata.page_count;
let page_indices = config.pages.to_indices(total_pages);
if page_indices.is_empty() {
return Err(Pdf2MdError::PageOutOfRange {
page: 0,
total: total_pages,
});
}
let rendered = render::render_pages(&pdf_path, config, &page_indices).await?;
let encoded: Vec<(usize, edgequake_llm::ImageData)> = rendered
.iter()
.filter_map(|(idx, img)| match encode::encode_page(img) {
Ok(data) => Some((*idx, data)),
Err(e) => {
warn!("Failed to encode page {}: {}", idx + 1, e);
None
}
})
.collect();
let concurrency = config.concurrency;
let config_clone = config.clone();
if config.maintain_format {
let s = stream::iter(encoded.into_iter()).then(move |(idx, img_data)| {
let provider = Arc::clone(&provider);
let cfg = config_clone.clone();
async move {
let page_num = idx + 1;
let mut result = llm::process_page(&provider, page_num, img_data, None, &cfg).await;
if result.error.is_none() {
result.markdown = postprocess::clean_markdown(&result.markdown);
Ok(result)
} else {
let err = result.error.take().unwrap();
Err(err)
}
}
});
Ok(Box::pin(s))
} else {
let s = stream::iter(encoded.into_iter().map(move |(idx, img_data)| {
let provider = Arc::clone(&provider);
let cfg = config_clone.clone();
async move {
let page_num = idx + 1;
let mut result = llm::process_page(&provider, page_num, img_data, None, &cfg).await;
if result.error.is_none() {
result.markdown = postprocess::clean_markdown(&result.markdown);
Ok(result)
} else {
let err = result.error.take().unwrap();
Err(err)
}
}
}))
.buffer_unordered(concurrency);
Ok(Box::pin(s))
}
}
pub async fn convert_stream_from_bytes(
bytes: &[u8],
config: &ConversionConfig,
) -> Result<PageStream, Pdf2MdError> {
let mut tmp = tempfile::NamedTempFile::new()
.map_err(|e| Pdf2MdError::Internal(format!("tempfile: {e}")))?;
tmp.write_all(bytes)
.map_err(|e| Pdf2MdError::Internal(format!("tempfile write: {e}")))?;
let path = tmp.path().to_string_lossy().to_string();
let stream = convert_stream(&path, config).await?;
drop(tmp);
Ok(stream)
}
fn resolve_provider(config: &ConversionConfig) -> Result<Arc<dyn LLMProvider>, Pdf2MdError> {
if let Some(ref provider) = config.provider {
return Ok(Arc::clone(provider));
}
if let Some(ref name) = config.provider_name {
let model = config.model.as_deref().unwrap_or("gpt-4.1-nano");
return create_vision_provider(name, model);
}
if let (Ok(prov), Ok(model)) = (
std::env::var("EDGEQUAKE_LLM_PROVIDER"),
std::env::var("EDGEQUAKE_MODEL"),
) {
if !prov.is_empty() && !model.is_empty() {
return create_vision_provider(&prov, &model);
}
}
if let Ok(openai_key) = std::env::var("OPENAI_API_KEY") {
if !openai_key.is_empty() {
let model = config.model.as_deref().unwrap_or("gpt-4.1-nano");
return create_vision_provider("openai", model);
}
}
let (llm_provider, _) =
ProviderFactory::from_env().map_err(|e| Pdf2MdError::ProviderNotConfigured {
provider: "auto".to_string(),
hint: format!("No LLM provider auto-detected: {}", e),
})?;
Ok(llm_provider)
}
fn create_vision_provider(
provider_name: &str,
model: &str,
) -> Result<Arc<dyn LLMProvider>, Pdf2MdError> {
ProviderFactory::create_llm_provider(provider_name, model).map_err(|e| {
Pdf2MdError::ProviderNotConfigured {
provider: provider_name.to_string(),
hint: format!("{e}"),
}
})
}