use crate::cli::PdfParserArg;
use crate::config::{
ensure_store_layout, load_or_create_config, normalize_chunk_settings, resolve_model_name,
store_dir,
};
use crate::ingest::{ingest_path, ingest_url, is_web_url, PdfParser};
use crate::models::{Embedder, VisionCaptioner};
use crate::store::{connect_db, ensure_metadata, load_source_fingerprints, replace_source_rows};
use crate::ui::{self, Panel};
use anyhow::Result;
use std::path::PathBuf;
use std::time::Instant;
use tracing::{field, Instrument};
pub async fn run(
name: Option<&str>,
path: PathBuf,
chunk_size: Option<usize>,
chunk_overlap: Option<usize>,
embed_model: Option<String>,
pdf_parser: Option<PdfParserArg>,
exclude: Vec<String>,
include_hidden: bool,
force: bool,
) -> Result<()> {
let command_span = tracing::info_span!(
"index_command",
store_name = name.unwrap_or("default"),
path = %path.display(),
requested_chunk_size = field::debug(chunk_size),
requested_chunk_overlap = field::debug(chunk_overlap),
requested_embed_model = field::debug(embed_model.as_deref()),
pdf_parser = field::debug(pdf_parser),
exclude_count = exclude.len(),
include_hidden,
force,
indexed_files = field::Empty,
total_chunks = field::Empty,
skipped_files = field::Empty,
error_count = field::Empty,
elapsed_ms = field::Empty,
);
let command_span_inner = command_span.clone();
async move {
let started = Instant::now();
let store = store_dir(name)?;
ensure_store_layout(&store)?;
let cfg = load_or_create_config(&store)?;
let (size, overlap) = normalize_chunk_settings(
chunk_size.unwrap_or(cfg.chunk.size),
chunk_overlap.unwrap_or(cfg.chunk.overlap),
);
let embed_model_name =
embed_model.unwrap_or_else(|| resolve_model_name(&store, &cfg.models.embed));
let vision_model_name = resolve_model_name(&store, &cfg.models.vision);
command_span_inner.record("requested_chunk_size", size);
command_span_inner.record("requested_chunk_overlap", overlap);
command_span_inner.record("requested_embed_model", field::debug(&embed_model_name));
let embedder = Embedder::new(cfg.ollama.base_url.clone(), embed_model_name.clone());
let vision = VisionCaptioner::new(cfg.ollama.base_url.clone(), vision_model_name);
let db = connect_db(&store).await?;
let existing_fingerprints = if force {
Default::default()
} else {
load_source_fingerprints(&db).await?
};
let parser = match pdf_parser.unwrap_or(PdfParserArg::Native) {
PdfParserArg::Native => PdfParser::Native,
PdfParserArg::Liteparse => PdfParser::Liteparse,
};
let input = path.to_string_lossy();
let result = if is_web_url(&input) {
let ingest_span = tracing::info_span!(
"ingest_url",
url = %input,
chunk_size = size,
chunk_overlap = overlap,
force,
);
ingest_url(
&input,
size,
overlap,
&embedder,
&existing_fingerprints,
force,
)
.instrument(ingest_span)
.await?
} else {
let ingest_span = tracing::info_span!(
"ingest_path",
path = %path.display(),
chunk_size = size,
chunk_overlap = overlap,
parser = ?parser,
include_hidden,
force,
);
ingest_path(
&path,
size,
overlap,
&embedder,
Some(&vision),
parser,
&exclude,
include_hidden,
&existing_fingerprints,
force,
)
.instrument(ingest_span)
.await?
};
if let Some(dim) = result.embedding_dim {
ensure_metadata(&store, &embed_model_name, dim, size, overlap)?;
}
if !result.source_paths.is_empty() {
replace_source_rows(&db, &result.rows, &result.source_paths).await?;
}
let elapsed_ms = started.elapsed().as_millis();
command_span_inner.record("indexed_files", result.stats.indexed_files);
command_span_inner.record("total_chunks", result.stats.total_chunks);
command_span_inner.record("skipped_files", result.stats.skipped_files);
command_span_inner.record("error_count", result.stats.errors.len());
command_span_inner.record("elapsed_ms", elapsed_ms as u64);
ui::command_header("ragcli index", format!("{}ms", elapsed_ms));
let mut summary = Panel::new("Index Summary");
summary.kv("path", path.display().to_string(), 14);
summary.kv("store", store.display().to_string(), 14);
summary.kv("embed model", &embed_model_name, 14);
summary.kv("pdf parser", format!("{parser:?}"), 14);
summary.status(
"status",
result.stats.errors.is_empty(),
"complete",
"completed with errors",
14,
);
summary.kv(
"result",
format!(
"Index complete: {} files, {} chunks, {} skipped",
result.stats.indexed_files, result.stats.total_chunks, result.stats.skipped_files
),
14,
);
summary.kv("indexed files", result.stats.indexed_files.to_string(), 14);
summary.kv("chunks", result.stats.total_chunks.to_string(), 14);
summary.kv("skipped", result.stats.skipped_files.to_string(), 14);
summary.render();
if !result.stats.errors.is_empty() {
println!();
let mut errors = Panel::new("Index Errors");
for err in &result.stats.errors {
errors.prose("error", err, 8);
}
errors.render();
}
Ok(())
}
.instrument(command_span)
.await
}
#[cfg(test)]
mod tests {
use super::*;
use crate::commands::stat;
use crate::store::{connect_db, extract_contexts, DEFAULT_TABLE_NAME};
use crate::test_support::{sequential_json_server, with_test_env};
use futures::TryStreamExt;
use lancedb::query::ExecutableQuery;
#[tokio::test(flavor = "current_thread")]
async fn test_run_indexes_file_with_mock_ollama() {
let dir = tempfile::tempdir().unwrap();
let docs = dir.path().join("docs");
std::fs::create_dir_all(&docs).unwrap();
let input = docs.join("note.txt");
std::fs::write(&input, "The project is a local RAG CLI.").unwrap();
let server = sequential_json_server(vec![r#"{"embeddings":[[0.1,0.2]]}"#]);
with_test_env(dir.path(), Some(&server), || async {
run(
Some("e2e"),
input.clone(),
Some(200),
Some(0),
None,
None,
Vec::new(),
false,
false,
)
.await
.unwrap();
stat::run(Some("e2e"), false).await.unwrap();
})
.await;
}
#[tokio::test(flavor = "current_thread")]
async fn test_run_skips_unchanged_file_without_reembedding() {
let dir = tempfile::tempdir().unwrap();
let docs = dir.path().join("docs");
std::fs::create_dir_all(&docs).unwrap();
let input = docs.join("note.txt");
std::fs::write(&input, "The project is a local RAG CLI.").unwrap();
let first_server = sequential_json_server(vec![r#"{"embeddings":[[0.1,0.2]]}"#]);
with_test_env(dir.path(), Some(&first_server), || async {
run(
Some("e2e"),
input.clone(),
Some(200),
Some(0),
None,
None,
Vec::new(),
false,
false,
)
.await
.unwrap();
})
.await;
with_test_env(dir.path(), Some("http://127.0.0.1:9"), || async {
run(
Some("e2e"),
input.clone(),
Some(200),
Some(0),
None,
None,
Vec::new(),
false,
false,
)
.await
.unwrap();
})
.await;
}
#[tokio::test(flavor = "current_thread")]
async fn test_run_replaces_rows_when_file_changes() {
let dir = tempfile::tempdir().unwrap();
let docs = dir.path().join("docs");
std::fs::create_dir_all(&docs).unwrap();
let input = docs.join("note.txt");
std::fs::write(&input, "old content").unwrap();
let first_server = sequential_json_server(vec![r#"{"embeddings":[[0.1,0.2]]}"#]);
with_test_env(dir.path(), Some(&first_server), || async {
run(
Some("e2e"),
input.clone(),
Some(200),
Some(0),
None,
None,
Vec::new(),
false,
false,
)
.await
.unwrap();
})
.await;
std::thread::sleep(std::time::Duration::from_millis(2));
std::fs::write(&input, "new content").unwrap();
let second_server = sequential_json_server(vec![r#"{"embeddings":[[0.3,0.4]]}"#]);
with_test_env(dir.path(), Some(&second_server), || async {
run(
Some("e2e"),
input.clone(),
Some(200),
Some(0),
None,
None,
Vec::new(),
false,
false,
)
.await
.unwrap();
})
.await;
with_test_env(dir.path(), None, || async {
let store = store_dir(Some("e2e")).unwrap();
let db = connect_db(&store).await.unwrap();
let table = db.open_table(DEFAULT_TABLE_NAME).execute().await.unwrap();
let batches = table
.query()
.execute()
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();
let contexts = extract_contexts(&batches).unwrap();
assert_eq!(contexts.len(), 1);
assert!(contexts[0].contains("new content"));
assert!(!contexts[0].contains("old content"));
})
.await;
}
}