use std::path::Path;
use std::sync::Arc;
use tracing::info;
use uuid::Uuid;
use crate::error::{Error, Result};
use crate::index::parse::DocumentFormat;
use crate::index::{IndexInput, IndexMode, PipelineExecutor, PipelineOptions, SummaryStrategy};
use crate::llm::LlmClient;
use crate::storage::{DocumentMeta, PersistedDocument};
use super::events::{EventEmitter, IndexEvent};
use super::index_context::IndexSource;
use super::types::{IndexOptions, IndexedDocument};
pub(crate) struct IndexerClient {
executor_factory: Arc<dyn Fn() -> PipelineExecutor + Send + Sync>,
events: EventEmitter,
config: IndexerConfig,
}
#[derive(Debug, Clone)]
pub struct IndexerConfig {
pub min_summary_tokens: usize,
pub generate_ids: bool,
pub generate_descriptions: bool,
}
impl Default for IndexerConfig {
fn default() -> Self {
Self {
min_summary_tokens: 20,
generate_ids: true,
generate_descriptions: false,
}
}
}
impl IndexerClient {
pub fn new(_executor: PipelineExecutor) -> Self {
Self {
executor_factory: Arc::new(PipelineExecutor::new),
events: EventEmitter::new(),
config: IndexerConfig::default(),
}
}
pub fn with_llm(client: LlmClient) -> Self {
let client = Arc::new(client);
Self {
executor_factory: Arc::new(move || PipelineExecutor::with_llm((*client).clone())),
events: EventEmitter::new(),
config: IndexerConfig::default(),
}
}
pub fn with_events(mut self, events: EventEmitter) -> Self {
self.events = events;
self
}
pub fn with_config(mut self, config: IndexerConfig) -> Self {
self.config = config;
self
}
pub(crate) fn from_factory(
factory: Arc<dyn Fn() -> PipelineExecutor + Send + Sync>,
events: EventEmitter,
config: IndexerConfig,
) -> Self {
Self {
executor_factory: factory,
events,
config,
}
}
pub async fn index(
&self,
source: &IndexSource,
name: Option<&str>,
options: &IndexOptions,
) -> Result<IndexedDocument> {
self.index_with_existing(source, name, options, None).await
}
pub async fn index_with_existing(
&self,
source: &IndexSource,
name: Option<&str>,
options: &IndexOptions,
existing_tree: Option<&crate::DocumentTree>,
) -> Result<IndexedDocument> {
match source {
IndexSource::Path(path) => {
self.index_from_path(path, name, options, existing_tree)
.await
}
IndexSource::Content { data, format } => {
self.index_from_content(data, *format, name, options, existing_tree)
.await
}
IndexSource::Bytes { data, format } => {
self.index_from_bytes(data, *format, name, options, existing_tree)
.await
}
}
}
async fn index_from_path(
&self,
path: &Path,
name: Option<&str>,
options: &IndexOptions,
existing_tree: Option<&crate::DocumentTree>,
) -> Result<IndexedDocument> {
let path = path.canonicalize().unwrap_or_else(|_| path.to_path_buf());
if !path.exists() {
return Err(Error::Parse(format!("File not found: {}", path.display())));
}
self.events.emit_index(IndexEvent::Started {
path: path.display().to_string(),
});
let doc_id = Uuid::new_v4().to_string();
let format = self.detect_format_from_path(&path)?;
self.events
.emit_index(IndexEvent::FormatDetected { format });
info!("Indexing {:?} document: {}", format, path.display());
let pipeline_options =
self.build_pipeline_options_with_existing(options, format, existing_tree.cloned());
let input = IndexInput::file(&path);
let mut executor = (self.executor_factory)();
let result = executor.execute(input, pipeline_options).await?;
self.build_indexed_document(doc_id, result, format, name, Some(&path))
}
async fn index_from_content(
&self,
content: &str,
format: DocumentFormat,
name: Option<&str>,
options: &IndexOptions,
existing_tree: Option<&crate::DocumentTree>,
) -> Result<IndexedDocument> {
self.events.emit_index(IndexEvent::Started {
path: name.unwrap_or("content").to_string(),
});
let doc_id = Uuid::new_v4().to_string();
self.events
.emit_index(IndexEvent::FormatDetected { format });
info!("Indexing {:?} document from content", format);
let pipeline_options =
self.build_pipeline_options_with_existing(options, format, existing_tree.cloned());
let input = IndexInput::content(content);
let mut executor = (self.executor_factory)();
let result = executor.execute(input, pipeline_options).await?;
self.build_indexed_document(doc_id, result, format, name, None)
}
async fn index_from_bytes(
&self,
bytes: &[u8],
format: DocumentFormat,
name: Option<&str>,
options: &IndexOptions,
existing_tree: Option<&crate::DocumentTree>,
) -> Result<IndexedDocument> {
self.events.emit_index(IndexEvent::Started {
path: name.unwrap_or("bytes").to_string(),
});
let doc_id = Uuid::new_v4().to_string();
self.events
.emit_index(IndexEvent::FormatDetected { format });
info!(
"Indexing {:?} document from bytes ({} bytes)",
format,
bytes.len()
);
let pipeline_options =
self.build_pipeline_options_with_existing(options, format, existing_tree.cloned());
let input = IndexInput::bytes(bytes);
let mut executor = (self.executor_factory)();
let result = executor.execute(input, pipeline_options).await?;
self.build_indexed_document(doc_id, result, format, name, None)
}
fn build_pipeline_options(
&self,
options: &IndexOptions,
format: DocumentFormat,
) -> PipelineOptions {
self.build_pipeline_options_with_existing(options, format, None)
}
fn build_pipeline_options_with_existing(
&self,
options: &IndexOptions,
format: DocumentFormat,
existing_tree: Option<crate::DocumentTree>,
) -> PipelineOptions {
PipelineOptions {
mode: match format {
DocumentFormat::Markdown => IndexMode::Markdown,
DocumentFormat::Pdf => IndexMode::Pdf,
},
generate_ids: options.generate_ids,
summary_strategy: if options.generate_summaries {
SummaryStrategy::full()
} else {
SummaryStrategy::none()
},
generate_description: options.generate_description,
existing_tree,
..Default::default()
}
}
fn build_indexed_document(
&self,
doc_id: String,
result: crate::index::PipelineResult,
format: DocumentFormat,
name: Option<&str>,
path: Option<&Path>,
) -> Result<IndexedDocument> {
let tree = result
.tree
.ok_or_else(|| Error::Parse("Document tree not generated".to_string()))?;
let node_count = tree.node_count();
self.events.emit_index(IndexEvent::TreeBuilt { node_count });
let doc_name = name
.map(str::to_string)
.or_else(|| {
path.and_then(|p| p.file_stem())
.map(|s| s.to_string_lossy().to_string())
})
.unwrap_or_else(|| result.name.clone());
let mut doc = IndexedDocument::new(&doc_id, format)
.with_name(&doc_name)
.with_tree(tree)
.with_metrics(result.metrics);
doc.reasoning_index = result.reasoning_index;
if let Some(p) = path {
doc = doc.with_source_path(p);
}
if let Some(desc) = &result.description {
doc = doc.with_description(desc);
}
if let Some(page_count) = result.page_count {
doc = doc.with_page_count(page_count);
}
info!("Indexing complete: {} ({} nodes)", doc_id, node_count);
self.events.emit_index(IndexEvent::Complete { doc_id });
Ok(doc)
}
fn detect_format_from_path(&self, path: &Path) -> Result<DocumentFormat> {
let ext = path.extension().and_then(|e| e.to_str()).unwrap_or("");
DocumentFormat::from_extension(ext)
.ok_or_else(|| Error::Parse(format!("Unsupported format: {}", ext)))
}
pub fn validate(&self, path: impl AsRef<Path>) -> Result<ValidationResult> {
let path = path.as_ref();
if !path.exists() {
return Ok(ValidationResult {
valid: false,
errors: vec![format!("File not found: {}", path.display())],
warnings: vec![],
format: None,
estimated_size: 0,
});
}
let metadata = std::fs::metadata(path)
.map_err(|e| Error::Parse(format!("Cannot read file metadata: {}", e)))?;
let estimated_size = metadata.len() as usize;
let mut warnings = Vec::new();
if estimated_size > 100 * 1024 * 1024 {
warnings.push("Large file (>100MB) may take longer to index".to_string());
}
let ext = path.extension().and_then(|e| e.to_str()).unwrap_or("");
let format = DocumentFormat::from_extension(ext);
if format.is_none() {
return Ok(ValidationResult {
valid: false,
errors: vec![format!("Unsupported format: {}", ext)],
warnings,
format: None,
estimated_size,
});
}
Ok(ValidationResult {
valid: true,
errors: vec![],
warnings,
format,
estimated_size,
})
}
pub fn to_persisted(&self, doc: IndexedDocument) -> PersistedDocument {
self.to_persisted_with_options(doc, &PipelineOptions::default())
}
pub fn to_persisted_with_options(
&self,
doc: IndexedDocument,
pipeline_options: &PipelineOptions,
) -> PersistedDocument {
let mut meta = DocumentMeta::new(&doc.id, &doc.name, doc.format.extension())
.with_source_path(
doc.source_path
.as_ref()
.map(|p| p.to_string_lossy().to_string())
.unwrap_or_default(),
)
.with_description(doc.description.clone().unwrap_or_default());
if let Some(ref path) = doc.source_path {
if let Ok(bytes) = std::fs::read(path) {
let fp = crate::utils::fingerprint::Fingerprint::from_bytes(&bytes);
meta = meta.with_fingerprint(fp);
}
}
let logic_fp = pipeline_options.logic_fingerprint();
meta = meta.with_logic_fingerprint(logic_fp);
let tree = doc.tree.expect("IndexedDocument must have a tree");
let node_count = tree.node_count();
let (summary_tokens, duration_ms) = if let Some(ref m) = doc.metrics {
(m.total_tokens_generated, m.total_time_ms())
} else {
(0, 0)
};
let mut persisted = PersistedDocument::new(meta, tree);
for page in doc.pages {
persisted.add_page(page.page, &page.content);
}
persisted.reasoning_index = doc.reasoning_index;
persisted.meta.update_processing_stats(node_count, summary_tokens, duration_ms);
persisted
}
}
impl Clone for IndexerClient {
fn clone(&self) -> Self {
Self {
executor_factory: Arc::clone(&self.executor_factory),
events: self.events.clone(),
config: self.config.clone(),
}
}
}
#[derive(Debug, Clone)]
pub(crate) struct ValidationResult {
pub valid: bool,
pub errors: Vec<String>,
pub warnings: Vec<String>,
pub format: Option<DocumentFormat>,
pub estimated_size: usize,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_indexer_client_creation() {
let executor = PipelineExecutor::new();
let client = IndexerClient::new(executor);
assert_eq!(client.config.min_summary_tokens, 20);
}
#[test]
fn test_validate_missing_file() {
let executor = PipelineExecutor::new();
let client = IndexerClient::new(executor);
let result = client.validate("./nonexistent.md").unwrap();
assert!(!result.valid);
assert!(!result.errors.is_empty());
}
}