use anyhow::Result;
use futures::{StreamExt, pin_mut};
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::Mutex;
pub use rust_memex::contracts::audit::{
AuditRecommendation, AuditResult as NamespaceAuditResult, ChunkQuality, QualityTier,
};
use rust_memex::{
ChunkerKind, EmbeddingClient, EmbeddingConfig, RAGPipeline, ReindexJob, ReindexOutcome,
ReprocessJob, ReprocessOutcome, SchemaVersion, SliceMode, StorageManager, diagnostics,
export_namespace_jsonl_stream, import_jsonl_file, reindex_namespace, reprocess_jsonl_file,
};
use crate::cli::batch_policy::{BatchFailurePolicy, BatchRunSummary};
pub struct ReprocessConfig {
pub namespace: String,
pub input: PathBuf,
pub slice_mode: SliceMode,
pub chunker: Option<ChunkerKind>,
pub preprocess: bool,
pub skip_existing: bool,
pub allow_duplicates: bool,
pub strict: bool,
pub max_failure_rate: f64,
pub json: bool,
pub dry_run: bool,
pub db_path: String,
}
pub struct ReindexConfig {
pub source_namespace: String,
pub target_namespace: String,
pub slice_mode: SliceMode,
pub chunker: Option<ChunkerKind>,
pub preprocess: bool,
pub skip_existing: bool,
pub allow_duplicates: bool,
pub strict: bool,
pub max_failure_rate: f64,
pub json: bool,
pub dry_run: bool,
pub db_path: String,
}
fn slice_mode_name(slice_mode: SliceMode) -> &'static str {
match slice_mode {
SliceMode::Onion => "onion",
SliceMode::OnionFast => "onion-fast",
SliceMode::Flat => "flat",
}
}
fn reprocess_summary(outcome: &ReprocessOutcome) -> BatchRunSummary {
let failed = outcome.failed_ids.len() + outcome.parse_errors;
let total = outcome.canonical_documents + outcome.parse_errors;
let mut errors = outcome
.failed_ids
.iter()
.map(|id| format!("document {id} failed"))
.collect::<Vec<_>>();
if outcome.parse_errors > 0 {
errors.push(format!(
"{} input record(s) failed to parse",
outcome.parse_errors
));
}
BatchRunSummary::new(
outcome.indexed_documents + outcome.replaced_documents,
failed,
total,
errors,
)
}
fn reindex_summary(outcome: &ReindexOutcome) -> BatchRunSummary {
let errors = outcome
.failed_ids
.iter()
.map(|id| format!("document {id} failed"))
.collect();
BatchRunSummary::new(
outcome.indexed_documents + outcome.replaced_documents,
outcome.failed_ids.len(),
outcome.canonical_documents,
errors,
)
}
pub async fn run_export(
namespace: String,
output: Option<PathBuf>,
include_embeddings: bool,
db_path: String,
) -> Result<()> {
let storage = Arc::new(StorageManager::new_lance_only(&db_path).await?);
let stream = export_namespace_jsonl_stream(storage, namespace.clone(), include_embeddings);
pin_mut!(stream);
let mut exported_count = 0usize;
match output {
Some(path) => {
use tokio::io::AsyncWriteExt;
let mut file = tokio::fs::File::create(&path).await?;
while let Some(line) = stream.next().await {
let line = line?;
file.write_all(line.as_bytes()).await?;
exported_count += 1;
}
file.flush().await?;
eprintln!(
"Exported {} documents from '{}' to {:?}",
exported_count, namespace, path
);
}
None => {
use tokio::io::AsyncWriteExt;
let mut stdout = tokio::io::stdout();
while let Some(line) = stream.next().await {
let line = line?;
stdout.write_all(line.as_bytes()).await?;
exported_count += 1;
}
stdout.flush().await?;
}
}
if include_embeddings && exported_count > 0 {
eprintln!(" (embeddings included - file may be large)");
}
Ok(())
}
pub async fn run_import(
namespace: String,
input: PathBuf,
skip_existing: bool,
db_path: String,
embedding_config: &EmbeddingConfig,
) -> Result<()> {
let storage = Arc::new(StorageManager::new_lance_only(&db_path).await?);
let embedding_client = Arc::new(Mutex::new(EmbeddingClient::new(embedding_config).await?));
let rag = Arc::new(RAGPipeline::new(embedding_client, storage).await?);
let outcome = import_jsonl_file(rag, namespace.clone(), &input, skip_existing).await?;
eprintln!();
eprintln!("Import complete:");
eprintln!(" Imported: {} documents", outcome.imported_count);
if outcome.skipped_count > 0 {
eprintln!(" Skipped: {} (already exist)", outcome.skipped_count);
}
if outcome.error_count > 0 {
eprintln!(" Errors: {}", outcome.error_count);
}
Ok(())
}
pub async fn run_reprocess(
config: ReprocessConfig,
embedding_config: &EmbeddingConfig,
) -> Result<()> {
let ReprocessConfig {
namespace,
input,
slice_mode,
chunker,
preprocess,
skip_existing,
allow_duplicates,
strict,
max_failure_rate,
json,
dry_run,
db_path,
} = config;
let failure_policy = BatchFailurePolicy::new(strict, max_failure_rate)?;
let storage = Arc::new(StorageManager::new_lance_only(&db_path).await?);
let embedding_client = Arc::new(Mutex::new(EmbeddingClient::new(embedding_config).await?));
let rag = Arc::new(RAGPipeline::new(embedding_client, storage).await?);
let outcome = reprocess_jsonl_file(
rag,
ReprocessJob {
input_path: input.clone(),
target_namespace: namespace.clone(),
slice_mode,
chunker,
preprocess,
skip_existing,
allow_duplicates,
dry_run,
},
|_| {},
)
.await?;
let collapsed_records = outcome
.source_records
.saturating_sub(outcome.canonical_documents);
eprintln!(
"Reprocessing {} source records into {} canonical documents for namespace '{}'...",
outcome.source_records, outcome.canonical_documents, namespace
);
eprintln!(" Source: {}", outcome.source_label);
eprintln!(" Slice mode: {}", slice_mode_name(slice_mode));
eprintln!(
" Preprocess: {}",
if preprocess { "enabled" } else { "disabled" }
);
eprintln!(
" Collapsed: {} duplicate slice records",
collapsed_records
);
if dry_run {
eprintln!();
eprintln!("Dry run only: no documents were written.");
}
eprintln!();
eprintln!("Reprocess complete:");
eprintln!(" Indexed: {}", outcome.indexed_documents);
if outcome.replaced_documents > 0 {
eprintln!(" Replaced: {}", outcome.replaced_documents);
}
if outcome.skipped_existing_documents > 0 {
eprintln!(" Skipped existing: {}", outcome.skipped_existing_documents);
}
if outcome.skipped_empty_documents > 0 {
eprintln!(" Skipped empty: {}", outcome.skipped_empty_documents);
}
if outcome.skipped_preprocess_short_documents > 0 {
eprintln!(
" Skipped too short: {}",
outcome.skipped_preprocess_short_documents
);
}
if !outcome.failed_ids.is_empty() {
eprintln!(
" FAILED: {} (IDs: {})",
outcome.failed_ids.len(),
if outcome.failed_ids.len() <= 10 {
outcome.failed_ids.join(", ")
} else {
format!(
"{}... and {} more",
outcome.failed_ids[..10].join(", "),
outcome.failed_ids.len() - 10
)
}
);
}
if outcome.parse_errors > 0 {
eprintln!(" Parse errors: {}", outcome.parse_errors);
}
let summary = reprocess_summary(&outcome);
if json {
summary.emit_json()?;
} else {
summary.emit_warning("documents");
}
summary.enforce(failure_policy, "documents")?;
Ok(())
}
pub async fn run_reindex(config: ReindexConfig, embedding_config: &EmbeddingConfig) -> Result<()> {
let ReindexConfig {
source_namespace,
target_namespace,
slice_mode,
chunker,
preprocess,
skip_existing,
allow_duplicates,
strict,
max_failure_rate,
json,
dry_run,
db_path,
} = config;
let failure_policy = BatchFailurePolicy::new(strict, max_failure_rate)?;
let storage = Arc::new(StorageManager::new_lance_only(&db_path).await?);
let embedding_client = Arc::new(Mutex::new(EmbeddingClient::new(embedding_config).await?));
let rag = Arc::new(RAGPipeline::new(embedding_client, storage).await?);
let outcome = reindex_namespace(
rag,
ReindexJob {
source_namespace: source_namespace.clone(),
target_namespace: target_namespace.clone(),
slice_mode,
chunker,
preprocess,
skip_existing,
allow_duplicates,
dry_run,
},
|_| {},
)
.await?;
if dry_run {
eprintln!("Dry run only: no documents were written.");
}
eprintln!(
"Reindexed {} source rows into {} canonical documents from '{}' to '{}'",
outcome.source_records,
outcome.canonical_documents,
outcome.source_namespace,
outcome.target_namespace
);
eprintln!(" Indexed: {}", outcome.indexed_documents);
if outcome.replaced_documents > 0 {
eprintln!(" Replaced: {}", outcome.replaced_documents);
}
if outcome.skipped_documents > 0 {
eprintln!(" Skipped: {}", outcome.skipped_documents);
}
if !outcome.failed_ids.is_empty() {
eprintln!(" Failed: {}", outcome.failed_ids.len());
}
let summary = reindex_summary(&outcome);
if json {
summary.emit_json()?;
} else {
summary.emit_warning("documents");
}
summary.enforce(failure_policy, "documents")?;
Ok(())
}
pub async fn run_audit(
namespace: Option<String>,
threshold: u8,
verbose: bool,
json: bool,
db_path: String,
) -> Result<()> {
let storage = StorageManager::new_lance_only(&db_path).await?;
let namespaces: Vec<String> = if let Some(ns) = namespace.as_deref() {
vec![ns.to_string()]
} else {
storage
.list_namespaces()
.await?
.into_iter()
.map(|(name, _count)| name)
.collect()
};
if namespaces.is_empty() {
if json {
println!(r#"{{"namespaces": [], "summary": {{"total": 0}}}}"#);
} else {
eprintln!("No namespaces found in database");
}
return Ok(());
}
let results = diagnostics::audit_namespaces(&storage, namespace.as_deref(), threshold).await?;
if !json {
eprintln!(
"Auditing {} namespace(s) with {}% quality threshold...\n",
namespaces.len(),
threshold
);
}
if verbose && !json {
for result in &results {
eprintln!("Namespace: {}", result.namespace);
eprintln!(" Documents: {}", result.document_count);
eprintln!(
" avg_chunk_length={} sentence_integrity={:.2} word_integrity={:.2} chunk_quality={:.2} overall={:.2}",
result.avg_chunk_length,
result.sentence_integrity,
result.word_integrity,
result.chunk_quality,
result.overall_score
);
eprintln!();
}
}
if json {
let passing = results.iter().filter(|r| r.passes_threshold).count();
let failing = results.len() - passing;
let output = serde_json::json!({
"namespaces": results,
"summary": {
"total": results.len(),
"passing": passing,
"failing": failing,
"threshold": threshold
}
});
println!("{}", serde_json::to_string_pretty(&output)?);
} else {
println!("╔════════════════════════════════════════════════════════════════╗");
println!("║ NAMESPACE QUALITY AUDIT ║");
println!("╠════════════════════════════════════════════════════════════════╣");
println!(
"║ {:30} │ {:>6} │ {:>6} │ {:>8} ║",
"Namespace", "Docs", "Score", "Status"
);
println!("╠════════════════════════════════════════════════════════════════╣");
for result in &results {
let status_icon = if result.passes_threshold {
"✅"
} else {
"❌"
};
let ns_display = if result.namespace.len() > 28 {
format!("{}...", &result.namespace[..25])
} else {
result.namespace.clone()
};
println!(
"║ {:30} │ {:>6} │ {:>5.1}% │ {} {:>6} ║",
ns_display,
result.document_count,
result.overall_score * 100.0,
status_icon,
result.recommendation
);
}
println!("╚════════════════════════════════════════════════════════════════╝");
let passing = results.iter().filter(|r| r.passes_threshold).count();
let failing = results.len() - passing;
println!();
println!(
"Summary: {} passing, {} failing (threshold: {}%)",
passing, failing, threshold
);
if failing > 0 {
println!();
println!("Namespaces below threshold:");
for result in results.iter().filter(|r| !r.passes_threshold) {
println!(
" - {} ({:.1}% quality, {} docs)",
result.namespace,
result.overall_score * 100.0,
result.document_count
);
}
println!();
println!(
"Run 'rust-memex purge-quality --threshold {}' to remove low-quality namespaces",
threshold
);
}
}
Ok(())
}
pub async fn run_backfill_hashes(
namespace: Option<String>,
dry_run: bool,
json: bool,
strict: bool,
max_failure_rate: f64,
db_path: String,
) -> Result<()> {
let failure_policy = BatchFailurePolicy::new(strict, max_failure_rate)?;
let storage = StorageManager::new_lance_only(&db_path).await?;
if !json {
let scope = namespace
.as_deref()
.map(|ns| format!("namespace '{}'", ns))
.unwrap_or_else(|| "all namespaces".to_string());
eprintln!(
"Backfilling content_hash + source_hash across {} (dry_run={})...",
scope, dry_run
);
}
let result =
diagnostics::backfill_chunk_and_source_hashes(&storage, namespace.as_deref(), dry_run)
.await?;
let summary = BatchRunSummary::new(
result.content_hash_backfilled + result.source_hash_backfilled,
result.skipped_no_embedding,
result.total_docs,
if result.skipped_no_embedding > 0 {
vec![format!(
"{} document(s) could not be backfilled because embedding was missing",
result.skipped_no_embedding
)]
} else {
Vec::new()
},
);
if json {
summary.emit_json()?;
summary.enforce(failure_policy, "documents")?;
return Ok(());
}
println!("╔════════════════════════════════════════════════════════════════╗");
println!("║ BACKFILL HASHES SUMMARY ║");
println!("╠════════════════════════════════════════════════════════════════╣");
println!(
"║ {:48} {:>13} ║",
"Total documents inspected", result.total_docs
);
println!(
"║ {:48} {:>13} ║",
"content_hash backfilled (per-chunk SHA256)", result.content_hash_backfilled
);
println!(
"║ {:48} {:>13} ║",
"source_hash backfilled (recovered legacy)", result.source_hash_backfilled
);
println!(
"║ {:48} {:>13} ║",
"Already consistent (skipped)", result.already_consistent
);
println!(
"║ {:48} {:>13} ║",
"Skipped (no embedding)", result.skipped_no_embedding
);
println!("╚════════════════════════════════════════════════════════════════╝");
if result.dry_run {
let touched = result.content_hash_backfilled + result.source_hash_backfilled;
if touched > 0 {
println!();
println!(
"DRY RUN - {} document(s) would be rewritten. Re-run with --dry-run false to apply.",
touched
);
} else {
println!();
println!("DRY RUN - nothing would change. Backfill is a no-op for this scope.");
}
} else {
println!();
println!(
"Wrote {} content_hash + {} source_hash updates.",
result.content_hash_backfilled, result.source_hash_backfilled
);
}
summary.emit_warning("documents");
summary.enforce(failure_policy, "documents")?;
Ok(())
}
pub async fn run_migrate_schema(
target: SchemaVersion,
check_only: bool,
db_path: String,
) -> Result<()> {
let report = StorageManager::migrate_lance_schema(&db_path, target, check_only).await?;
let missing_names = report.missing_column_names();
if missing_names.is_empty() {
println!(
"Schema is up-to-date (target={}). No migration needed.",
report.target
);
return Ok(());
}
if check_only {
println!("Migration needed. Missing columns: {missing_names:?}");
std::process::exit(1);
}
println!(
"Migrating schema to {}: adding {} columns",
report.target,
report.missing_columns.len()
);
for field in &report.missing_columns {
println!(
" + adding column: {} ({:?})",
field.name(),
field.data_type()
);
}
println!("Migration complete. Schema is now {}.", report.target);
Ok(())
}
pub async fn run_purge_quality(
threshold: u8,
confirm: bool,
json: bool,
db_path: String,
) -> Result<()> {
let storage = StorageManager::new_lance_only(&db_path).await?;
let namespace_list = storage.list_namespaces().await?;
if namespace_list.is_empty() {
if json {
println!(r#"{{"purged": [], "dry_run": {}}}"#, !confirm);
} else {
eprintln!("No namespaces found in database");
}
return Ok(());
}
if !json {
eprintln!(
"Analyzing {} namespace(s) with {}% quality threshold...\n",
namespace_list.len(),
threshold
);
}
let result = diagnostics::purge_quality_namespaces(&storage, None, threshold, !confirm).await?;
if result.candidates.is_empty() {
if json {
println!(r#"{{"purged": [], "message": "All namespaces pass quality threshold"}}"#);
} else {
println!(
"All namespaces pass the {}% quality threshold. Nothing to purge.",
threshold
);
}
return Ok(());
}
if json {
let output = serde_json::json!({
"dry_run": !confirm,
"threshold": threshold,
"to_purge": result.candidates.iter().map(|candidate| {
serde_json::json!({
"namespace": candidate.namespace,
"quality_score": candidate.quality_score,
"document_count": candidate.document_count
})
}).collect::<Vec<_>>()
});
println!("{}", serde_json::to_string_pretty(&output)?);
} else {
println!(
"Found {} namespace(s) below {}% quality threshold:",
result.candidates.len(),
threshold
);
for candidate in &result.candidates {
println!(
" - {} ({:.1}% quality, {} docs)",
candidate.namespace,
candidate.quality_score * 100.0,
candidate.document_count
);
}
println!();
if !confirm {
println!("DRY RUN - No changes made.");
println!("Run with --confirm to actually delete these namespaces.");
return Ok(());
}
}
if !json {
println!();
println!(
"Purged {} namespace(s) with quality below {}%",
result.purged_namespaces, threshold
);
}
Ok(())
}