use anyhow::{Result, anyhow};
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::Mutex;
use rmcp_memex::{EmbeddingClient, EmbeddingConfig, StorageManager, path_utils};
#[allow(dead_code)]
fn parse_features(raw: &str) -> Vec<String> {
raw.split(',')
.map(|s| s.trim())
.filter(|s| !s.is_empty())
.map(|s| s.to_string())
.collect()
}
#[allow(dead_code)]
const CONFIG_SEARCH_PATHS: &[&str] = &[
"~/.rmcp-servers/rmcp-memex/config.toml",
"~/.config/rmcp-memex/config.toml",
"~/.rmcp_servers/rmcp_memex/config.toml", ];
#[allow(dead_code)]
fn discover_config() -> Option<String> {
if let Ok(path) = std::env::var("RMCP_MEMEX_CONFIG") {
let expanded = shellexpand::tilde(&path).to_string();
if std::path::Path::new(&expanded).exists() {
return Some(path);
}
}
for path in CONFIG_SEARCH_PATHS {
let expanded = shellexpand::tilde(path).to_string();
if std::path::Path::new(&expanded).exists() {
return Some(path.to_string());
}
}
None
}
#[allow(dead_code)]
fn load_file_config(path: &str) -> Result<FileConfig> {
let (_canonical, contents) = path_utils::safe_read_to_string(path)
.map_err(|e| anyhow::anyhow!("Cannot load config '{}': {}", path, e))?;
toml::from_str(&contents).map_err(Into::into)
}
#[allow(dead_code)]
fn load_or_discover_config(explicit_path: Option<&str>) -> Result<(FileConfig, Option<String>)> {
if let Some(path) = explicit_path {
return Ok((load_file_config(path)?, Some(path.to_string())));
}
if let Some(discovered) = discover_config() {
return Ok((load_file_config(&discovered)?, Some(discovered)));
}
Ok((FileConfig::default(), None))
}
use crate::cli::config::*;
#[derive(Debug, Serialize, Deserialize)]
pub struct ExportRecord {
pub id: String,
pub text: String,
pub metadata: serde_json::Value,
pub content_hash: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub embeddings: Option<Vec<f32>>,
}
pub async fn run_export(
namespace: String,
output: Option<PathBuf>,
include_embeddings: bool,
db_path: String,
) -> Result<()> {
let storage = StorageManager::new_lance_only(&db_path).await?;
let docs = storage.all_documents(Some(&namespace), 100000).await?;
if docs.is_empty() {
eprintln!("No documents found in namespace '{}'", namespace);
return Ok(());
}
eprintln!(
"Exporting {} documents from namespace '{}'...",
docs.len(),
namespace
);
let mut lines: Vec<String> = Vec::with_capacity(docs.len());
for doc in &docs {
let record = ExportRecord {
id: doc.id.clone(),
text: doc.document.clone(),
metadata: doc.metadata.clone(),
content_hash: doc.content_hash.clone(),
embeddings: if include_embeddings {
Some(doc.embedding.clone())
} else {
None
},
};
let line = serde_json::to_string(&record)?;
lines.push(line);
}
let jsonl_content = lines.join("\n");
match output {
Some(path) => {
tokio::fs::write(&path, &jsonl_content).await?;
eprintln!(
"Exported {} documents from '{}' to {:?}",
docs.len(),
namespace,
path
);
if include_embeddings {
eprintln!(" (embeddings included - file may be large)");
}
}
None => {
println!("{}", jsonl_content);
}
}
Ok(())
}
pub async fn run_import(
namespace: String,
input: PathBuf,
skip_existing: bool,
db_path: String,
embedding_config: &EmbeddingConfig,
) -> Result<()> {
let (_validated_input, content) = path_utils::safe_read_to_string_async(&input).await?;
let lines: Vec<&str> = content.lines().filter(|l| !l.trim().is_empty()).collect();
if lines.is_empty() {
eprintln!("No records found in input file");
return Ok(());
}
eprintln!(
"Importing {} records into namespace '{}'...",
lines.len(),
namespace
);
let storage = Arc::new(StorageManager::new_lance_only(&db_path).await?);
let embedding_client = Arc::new(Mutex::new(EmbeddingClient::new(embedding_config).await?));
let mut imported_count = 0usize;
let mut skipped_count = 0usize;
let mut error_count = 0usize;
let mut records_to_embed: Vec<(ExportRecord, usize)> = Vec::new();
let mut records_with_embeddings: Vec<(ExportRecord, Vec<f32>)> = Vec::new();
for (line_num, line) in lines.iter().enumerate() {
let record: ExportRecord = match serde_json::from_str(line) {
Ok(r) => r,
Err(e) => {
eprintln!(" Line {}: parse error - {}", line_num + 1, e);
error_count += 1;
continue;
}
};
if skip_existing
&& let Some(ref hash) = record.content_hash
&& storage.has_content_hash(&namespace, hash).await?
{
skipped_count += 1;
continue;
}
if record.embeddings.is_some() {
let emb = record
.embeddings
.clone()
.ok_or_else(|| anyhow!("missing embeddings"))?;
records_with_embeddings.push((record, emb));
} else {
records_to_embed.push((record, line_num));
}
}
if !records_with_embeddings.is_empty() {
eprintln!(
" Storing {} records with existing embeddings...",
records_with_embeddings.len()
);
let mut docs = Vec::new();
for (record, embedding) in records_with_embeddings {
let doc = rmcp_memex::ChromaDocument::new_flat_with_hash(
record.id,
namespace.clone(),
embedding,
record.metadata,
record.text,
record.content_hash.unwrap_or_default(),
);
docs.push(doc);
}
storage.add_to_store(docs.clone()).await?;
imported_count += docs.len();
}
if !records_to_embed.is_empty() {
eprintln!(
" Re-embedding {} records without embeddings...",
records_to_embed.len()
);
let texts: Vec<String> = records_to_embed
.iter()
.map(|(r, _)| r.text.clone())
.collect();
let embeddings = embedding_client.lock().await.embed_batch(&texts).await?;
let mut docs = Vec::new();
for ((record, _line_num), embedding) in records_to_embed.into_iter().zip(embeddings) {
let doc = rmcp_memex::ChromaDocument::new_flat_with_hash(
record.id,
namespace.clone(),
embedding,
record.metadata,
record.text,
record.content_hash.unwrap_or_default(),
);
docs.push(doc);
}
storage.add_to_store(docs.clone()).await?;
imported_count += docs.len();
}
eprintln!();
eprintln!("Import complete:");
eprintln!(" Imported: {} documents", imported_count);
if skipped_count > 0 {
eprintln!(" Skipped: {} (already exist)", skipped_count);
}
if error_count > 0 {
eprintln!(" Errors: {}", error_count);
}
Ok(())
}
#[derive(Debug, Serialize)]
pub struct NamespaceAuditResult {
pub namespace: String,
pub document_count: usize,
pub avg_chunk_length: usize,
pub sentence_integrity: f32,
pub word_integrity: f32,
pub chunk_quality: f32,
pub overall_score: f32,
pub recommendation: String,
pub passes_threshold: bool,
}
pub async fn run_audit(
namespace: Option<String>,
threshold: u8,
verbose: bool,
json: bool,
db_path: String,
) -> Result<()> {
use rmcp_memex::{IntegrityRecommendation, TextIntegrityMetrics};
let storage = StorageManager::new_lance_only(&db_path).await?;
let namespaces: Vec<String> = if let Some(ns) = namespace {
vec![ns]
} else {
storage
.list_namespaces()
.await?
.into_iter()
.map(|(name, _count)| name)
.collect()
};
if namespaces.is_empty() {
if json {
println!(r#"{{"namespaces": [], "summary": {{"total": 0}}}}"#);
} else {
eprintln!("No namespaces found in database");
}
return Ok(());
}
let threshold_f32 = threshold as f32 / 100.0;
let mut results: Vec<NamespaceAuditResult> = Vec::new();
if !json {
eprintln!(
"Auditing {} namespace(s) with {}% quality threshold...\n",
namespaces.len(),
threshold
);
}
for ns in &namespaces {
let docs = storage.get_all_in_namespace(ns).await?;
if docs.is_empty() {
results.push(NamespaceAuditResult {
namespace: ns.clone(),
document_count: 0,
avg_chunk_length: 0,
sentence_integrity: 0.0,
word_integrity: 0.0,
chunk_quality: 0.0,
overall_score: 0.0,
recommendation: "EMPTY".to_string(),
passes_threshold: false,
});
continue;
}
let chunks: Vec<String> = docs.iter().map(|d| d.document.clone()).collect();
let combined_text = chunks.join(" ");
let metrics = TextIntegrityMetrics::compute(&combined_text, &chunks);
let passes = metrics.overall >= threshold_f32;
let recommendation = match metrics.recommendation() {
IntegrityRecommendation::Excellent => "EXCELLENT",
IntegrityRecommendation::Good => "GOOD",
IntegrityRecommendation::Warn => "WARN",
IntegrityRecommendation::Purge => "PURGE",
};
results.push(NamespaceAuditResult {
namespace: ns.clone(),
document_count: docs.len(),
avg_chunk_length: metrics.avg_chunk_length,
sentence_integrity: metrics.sentence_integrity,
word_integrity: metrics.word_integrity,
chunk_quality: metrics.chunk_quality,
overall_score: metrics.overall,
recommendation: recommendation.to_string(),
passes_threshold: passes,
});
if verbose && !json {
eprintln!("Namespace: {}", ns);
eprintln!(" Documents: {}", docs.len());
eprintln!(" {}", metrics);
eprintln!();
}
}
if json {
let passing = results.iter().filter(|r| r.passes_threshold).count();
let failing = results.len() - passing;
let output = serde_json::json!({
"namespaces": results,
"summary": {
"total": results.len(),
"passing": passing,
"failing": failing,
"threshold": threshold
}
});
println!("{}", serde_json::to_string_pretty(&output)?);
} else {
println!("╔════════════════════════════════════════════════════════════════╗");
println!("║ NAMESPACE QUALITY AUDIT ║");
println!("╠════════════════════════════════════════════════════════════════╣");
println!(
"║ {:30} │ {:>6} │ {:>6} │ {:>8} ║",
"Namespace", "Docs", "Score", "Status"
);
println!("╠════════════════════════════════════════════════════════════════╣");
for result in &results {
let status_icon = if result.passes_threshold {
"✅"
} else {
"❌"
};
let ns_display = if result.namespace.len() > 28 {
format!("{}...", &result.namespace[..25])
} else {
result.namespace.clone()
};
println!(
"║ {:30} │ {:>6} │ {:>5.1}% │ {} {:>6} ║",
ns_display,
result.document_count,
result.overall_score * 100.0,
status_icon,
result.recommendation
);
}
println!("╚════════════════════════════════════════════════════════════════╝");
let passing = results.iter().filter(|r| r.passes_threshold).count();
let failing = results.len() - passing;
println!();
println!(
"Summary: {} passing, {} failing (threshold: {}%)",
passing, failing, threshold
);
if failing > 0 {
println!();
println!("Namespaces below threshold:");
for result in results.iter().filter(|r| !r.passes_threshold) {
println!(
" - {} ({:.1}% quality, {} docs)",
result.namespace,
result.overall_score * 100.0,
result.document_count
);
}
println!();
println!(
"Run 'rmcp-memex purge-quality --threshold {}' to remove low-quality namespaces",
threshold
);
}
}
Ok(())
}
pub async fn run_purge_quality(
threshold: u8,
confirm: bool,
json: bool,
db_path: String,
) -> Result<()> {
use rmcp_memex::TextIntegrityMetrics;
let storage = StorageManager::new_lance_only(&db_path).await?;
let namespace_list = storage.list_namespaces().await?;
if namespace_list.is_empty() {
if json {
println!(r#"{{"purged": [], "dry_run": {}}}"#, !confirm);
} else {
eprintln!("No namespaces found in database");
}
return Ok(());
}
let threshold_f32 = threshold as f32 / 100.0;
let mut to_purge: Vec<(String, f32, usize)> = Vec::new();
if !json {
eprintln!(
"Analyzing {} namespace(s) with {}% quality threshold...\n",
namespace_list.len(),
threshold
);
}
for (ns, _count) in &namespace_list {
let docs = storage.get_all_in_namespace(ns).await?;
if docs.is_empty() {
to_purge.push((ns.clone(), 0.0, 0));
continue;
}
let chunks: Vec<String> = docs.iter().map(|d| d.document.clone()).collect();
let combined_text = chunks.join(" ");
let metrics = TextIntegrityMetrics::compute(&combined_text, &chunks);
if metrics.overall < threshold_f32 {
to_purge.push((ns.clone(), metrics.overall, docs.len()));
}
}
if to_purge.is_empty() {
if json {
println!(r#"{{"purged": [], "message": "All namespaces pass quality threshold"}}"#);
} else {
println!(
"All namespaces pass the {}% quality threshold. Nothing to purge.",
threshold
);
}
return Ok(());
}
if json {
let output = serde_json::json!({
"dry_run": !confirm,
"threshold": threshold,
"to_purge": to_purge.iter().map(|(ns, score, count)| {
serde_json::json!({
"namespace": ns,
"quality_score": score,
"document_count": count
})
}).collect::<Vec<_>>()
});
println!("{}", serde_json::to_string_pretty(&output)?);
if !confirm {
return Ok(());
}
} else {
println!(
"Found {} namespace(s) below {}% quality threshold:",
to_purge.len(),
threshold
);
for (ns, score, count) in &to_purge {
println!(" - {} ({:.1}% quality, {} docs)", ns, score * 100.0, count);
}
println!();
if !confirm {
println!("DRY RUN - No changes made.");
println!("Run with --confirm to actually delete these namespaces.");
return Ok(());
}
}
let mut purged_count = 0;
for (ns, _score, count) in &to_purge {
if !json {
eprint!("Purging '{}' ({} docs)... ", ns, count);
}
match storage.delete_namespace_documents(ns).await {
Ok(_) => {
purged_count += 1;
if !json {
eprintln!("done");
}
}
Err(e) => {
if !json {
eprintln!("ERROR: {}", e);
}
}
}
}
if !json {
println!();
println!(
"Purged {} namespace(s) with quality below {}%",
purged_count, threshold
);
}
Ok(())
}