use crate::config::ConversionConfig;
use crate::error::{PageError, Pdf2MdError};
use crate::output::PageResult;
use crate::pipeline::{input, llm, postprocess, render};
use edgequake_llm::{LLMProvider, ProviderFactory};
use futures::StreamExt;
use std::io::Write;
use std::pin::Pin;
use std::sync::Arc;
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::Stream;
use tracing::info;
pub type PageStream = Pin<Box<dyn Stream<Item = Result<PageResult, PageError>> + Send>>;
pub async fn convert_stream(
input_str: impl AsRef<str>,
config: &ConversionConfig,
) -> Result<PageStream, Pdf2MdError> {
let input_str = input_str.as_ref();
info!("Starting streaming conversion: {}", input_str);
let resolved = input::resolve_input(input_str, config.download_timeout_secs).await?;
let pdf_path = resolved.path().to_path_buf();
let provider = resolve_provider(config)?;
let metadata = render::extract_metadata(&pdf_path, config.password.as_deref()).await?;
let total_pages = metadata.page_count;
let page_indices = config.pages.to_indices(total_pages);
if page_indices.is_empty() {
return Err(Pdf2MdError::PageOutOfRange {
page: 0,
total: total_pages,
});
}
let rx = render::spawn_lazy_render_encode(&pdf_path, config, &page_indices, config.concurrency)
.await?;
let concurrency = config.concurrency;
let config_clone = config.clone();
if config.maintain_format {
let s = futures::stream::unfold(
(rx, provider, config_clone, None::<String>),
|(mut rx, provider, cfg, prior_markdown)| async move {
let page = rx.recv().await?;
let page_num = page.page_index + 1;
let mut result = llm::process_page(
&provider,
page_num,
page.image_data,
prior_markdown.as_deref(),
&cfg,
)
.await;
if result.error.is_none() {
result.markdown = postprocess::clean_markdown(&result.markdown);
let new_prior = Some(result.markdown.clone());
Some((Ok(result), (rx, provider, cfg, new_prior)))
} else {
let err = result.error.take().unwrap();
Some((Err(err), (rx, provider, cfg, prior_markdown)))
}
},
);
Ok(Box::pin(s))
} else {
let s = ReceiverStream::new(rx)
.map(move |page| {
let provider = Arc::clone(&provider);
let cfg = config_clone.clone();
async move {
let page_num = page.page_index + 1;
let mut result =
llm::process_page(&provider, page_num, page.image_data, None, &cfg).await;
if result.error.is_none() {
result.markdown = postprocess::clean_markdown(&result.markdown);
Ok(result)
} else {
let err = result.error.take().unwrap();
Err(err)
}
}
})
.buffer_unordered(concurrency);
Ok(Box::pin(s))
}
}
pub async fn convert_stream_from_bytes(
bytes: &[u8],
config: &ConversionConfig,
) -> Result<PageStream, Pdf2MdError> {
let mut tmp = tempfile::NamedTempFile::new()
.map_err(|e| Pdf2MdError::Internal(format!("tempfile: {e}")))?;
tmp.write_all(bytes)
.map_err(|e| Pdf2MdError::Internal(format!("tempfile write: {e}")))?;
let path = tmp.path().to_string_lossy().to_string();
let inner = convert_stream(&path, config).await?;
let held = futures::stream::unfold((inner, tmp), |(mut stream, tmp)| async move {
let item = stream.next().await?;
Some((item, (stream, tmp)))
});
Ok(Box::pin(held))
}
fn resolve_provider(config: &ConversionConfig) -> Result<Arc<dyn LLMProvider>, Pdf2MdError> {
if let Some(ref provider) = config.provider {
return Ok(Arc::clone(provider));
}
if let Some(ref name) = config.provider_name {
let model = config.model.as_deref().unwrap_or("gpt-4.1-nano");
return create_vision_provider(name, model);
}
if let (Ok(prov), Ok(model)) = (
std::env::var("EDGEQUAKE_LLM_PROVIDER"),
std::env::var("EDGEQUAKE_MODEL"),
) {
if !prov.is_empty() && !model.is_empty() {
return create_vision_provider(&prov, &model);
}
}
if let Ok(openai_key) = std::env::var("OPENAI_API_KEY") {
if !openai_key.is_empty() {
let model = config.model.as_deref().unwrap_or("gpt-4.1-nano");
return create_vision_provider("openai", model);
}
}
let (llm_provider, _) =
ProviderFactory::from_env().map_err(|e| Pdf2MdError::ProviderNotConfigured {
provider: "auto".to_string(),
hint: format!("No LLM provider auto-detected: {}", e),
})?;
Ok(llm_provider)
}
fn create_vision_provider(
provider_name: &str,
model: &str,
) -> Result<Arc<dyn LLMProvider>, Pdf2MdError> {
ProviderFactory::create_llm_provider(provider_name, model).map_err(|e| {
Pdf2MdError::ProviderNotConfigured {
provider: provider_name.to_string(),
hint: format!("{e}"),
}
})
}