use anyhow::Result;
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::io::IsTerminal;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use tokio::sync::{Mutex, Semaphore};
use rmcp_memex::{
EmbeddingClient, EmbeddingConfig, IndexProgressTracker, PreprocessingConfig, RAGPipeline,
SliceMode, StorageManager, path_utils,
};
#[allow(dead_code)]
fn parse_features(raw: &str) -> Vec<String> {
raw.split(',')
.map(|s| s.trim())
.filter(|s| !s.is_empty())
.map(|s| s.to_string())
.collect()
}
#[allow(dead_code)]
const CONFIG_SEARCH_PATHS: &[&str] = &[
"~/.rmcp-servers/rmcp-memex/config.toml",
"~/.config/rmcp-memex/config.toml",
"~/.rmcp_servers/rmcp_memex/config.toml", ];
#[allow(dead_code)]
fn discover_config() -> Option<String> {
if let Ok(path) = std::env::var("RMCP_MEMEX_CONFIG") {
let expanded = shellexpand::tilde(&path).to_string();
if std::path::Path::new(&expanded).exists() {
return Some(path);
}
}
for path in CONFIG_SEARCH_PATHS {
let expanded = shellexpand::tilde(path).to_string();
if std::path::Path::new(&expanded).exists() {
return Some(path.to_string());
}
}
None
}
#[allow(dead_code)]
fn load_file_config(path: &str) -> Result<FileConfig> {
let (_canonical, contents) = path_utils::safe_read_to_string(path)
.map_err(|e| anyhow::anyhow!("Cannot load config '{}': {}", path, e))?;
toml::from_str(&contents).map_err(Into::into)
}
#[allow(dead_code)]
fn load_or_discover_config(explicit_path: Option<&str>) -> Result<(FileConfig, Option<String>)> {
if let Some(path) = explicit_path {
return Ok((load_file_config(path)?, Some(path.to_string())));
}
if let Some(discovered) = discover_config() {
return Ok((load_file_config(&discovered)?, Some(discovered)));
}
Ok((FileConfig::default(), None))
}
use crate::cli::config::*;
use crate::cli::definition::*;
#[derive(Debug, Serialize, Deserialize)]
pub struct IndexCheckpoint {
pub namespace: String,
pub indexed_files: HashSet<String>,
pub updated_at: String,
}
impl IndexCheckpoint {
pub fn new(namespace: &str) -> Self {
Self {
namespace: namespace.to_string(),
indexed_files: HashSet::new(),
updated_at: chrono::Utc::now().to_rfc3339(),
}
}
pub fn checkpoint_path(db_path: &str, namespace: &str) -> PathBuf {
let expanded = shellexpand::tilde(db_path).to_string();
Path::new(&expanded)
.parent()
.unwrap_or(Path::new("."))
.join(format!(".index-checkpoint-{}.json", namespace))
}
pub fn load(db_path: &str, namespace: &str) -> Option<Self> {
let path = Self::checkpoint_path(db_path, namespace);
if path.exists() {
std::fs::read_to_string(&path)
.ok()
.and_then(|s| serde_json::from_str(&s).ok())
} else {
None
}
}
pub fn save(&mut self, db_path: &str) -> Result<()> {
self.updated_at = chrono::Utc::now().to_rfc3339();
let path = Self::checkpoint_path(db_path, &self.namespace);
let json = serde_json::to_string_pretty(self)?;
std::fs::write(&path, json)?;
Ok(())
}
pub fn delete(db_path: &str, namespace: &str) {
let path = Self::checkpoint_path(db_path, namespace);
let _ = std::fs::remove_file(path);
}
pub fn mark_indexed(&mut self, file_path: &Path) {
self.indexed_files
.insert(file_path.to_string_lossy().to_string());
}
pub fn is_indexed(&self, file_path: &Path) -> bool {
self.indexed_files
.contains(&file_path.to_string_lossy().to_string())
}
}
pub struct BatchIndexConfig {
pub path: PathBuf,
pub namespace: Option<String>,
pub recursive: bool,
pub glob_pattern: Option<String>,
pub max_depth: usize,
pub db_path: String,
pub preprocess: bool,
pub sanitize_metadata: bool,
pub slice_mode: SliceMode,
pub dedup: bool,
pub embedding_config: EmbeddingConfig,
pub show_progress: bool,
pub resume: bool,
pub pipeline: bool,
pub parallel: u8,
}
#[derive(Debug)]
pub enum FileIndexResult {
Indexed,
Skipped,
SkippedResume,
Failed,
}
pub async fn run_batch_index(config: BatchIndexConfig) -> Result<()> {
let BatchIndexConfig {
path,
namespace,
recursive,
glob_pattern,
max_depth,
db_path,
preprocess,
sanitize_metadata,
slice_mode,
dedup,
embedding_config,
show_progress,
resume,
pipeline,
parallel,
} = config;
let expanded = shellexpand::tilde(path.to_str().unwrap_or("")).to_string();
let canonical = Path::new(&expanded).canonicalize()?;
let files = collect_files(&canonical, recursive, glob_pattern.as_deref(), max_depth)?;
let total = files.len();
if total == 0 {
eprintln!("No files found matching criteria");
return Ok(());
}
let mode_name = match slice_mode {
SliceMode::Onion => "onion (hierarchical, 4 layers)",
SliceMode::OnionFast => "onion-fast (outer+core, 2 layers)",
SliceMode::Flat => "flat (traditional chunks)",
};
let use_progress_bar = show_progress && std::io::stderr().is_terminal();
if show_progress && !use_progress_bar {
eprintln!("Warning: --progress requires an interactive terminal (using line logs)");
}
let tracker = if use_progress_bar {
let t = IndexProgressTracker::pre_scan(&files);
t.display_pre_scan();
Some(t)
} else {
eprintln!("Found {} files to index (slice mode: {})", total, mode_name);
if preprocess {
eprintln!("Preprocessing enabled: filtering tool artifacts, CLI output, and metadata");
}
if dedup {
eprintln!("Deduplication enabled: skipping files with identical content");
}
None
};
let expanded_db = shellexpand::tilde(&db_path).to_string();
let db_dir = Path::new(&expanded_db);
if let Some(parent) = db_dir.parent() {
std::fs::create_dir_all(parent)?;
}
let embedding_client = Arc::new(Mutex::new(EmbeddingClient::new(&embedding_config).await?));
let storage = Arc::new(StorageManager::new(&expanded_db).await?);
let ns_name = namespace.as_deref().unwrap_or("rag");
if pipeline {
if preprocess {
eprintln!("Warning: --preprocess is not supported in pipeline mode (ignoring)");
}
if resume {
eprintln!("Warning: --resume is not supported in pipeline mode (ignoring)");
}
if show_progress {
eprintln!("Warning: --progress is not supported in pipeline mode (ignoring)");
}
eprintln!(
"Pipeline mode: {} files, slice mode: {:?}",
total, slice_mode
);
eprintln!("Running concurrent stages: reader -> chunker -> embedder -> storage");
let pipeline_config = rmcp_memex::PipelineConfig {
slice_mode,
dedup_enabled: dedup,
..Default::default()
};
let result = rmcp_memex::run_pipeline(
files,
ns_name.to_string(),
storage,
embedding_client,
pipeline_config,
)
.await?;
eprintln!();
eprintln!("Pipeline complete:");
eprintln!(" Files read: {}", result.stats.files_read);
if result.stats.files_skipped > 0 {
eprintln!(" Files skipped: {}", result.stats.files_skipped);
}
eprintln!(" Chunks created: {}", result.stats.chunks_created);
eprintln!(" Chunks embedded: {}", result.stats.chunks_embedded);
eprintln!(" Chunks stored: {}", result.stats.chunks_stored);
if result.stats.errors > 0 {
eprintln!(" Errors: {}", result.stats.errors);
}
eprintln!(" Namespace: {}", ns_name);
eprintln!(" DB path: {}", expanded_db);
return Ok(());
}
let rag = Arc::new(RAGPipeline::new(embedding_client, storage).await?);
let effective_mode = if preprocess {
SliceMode::Flat
} else {
slice_mode
};
let checkpoint = if resume {
if let Some(cp) = IndexCheckpoint::load(&db_path, ns_name) {
let resumed_count = cp.indexed_files.len();
eprintln!(
"Resuming from checkpoint: {} files already indexed",
resumed_count
);
Arc::new(Mutex::new(cp))
} else {
Arc::new(Mutex::new(IndexCheckpoint::new(ns_name)))
}
} else {
IndexCheckpoint::delete(&db_path, ns_name);
Arc::new(Mutex::new(IndexCheckpoint::new(ns_name)))
};
let indexed_count = Arc::new(AtomicUsize::new(0));
let skipped_count = Arc::new(AtomicUsize::new(0));
let skipped_resume_count = Arc::new(AtomicUsize::new(0));
let failed_count = Arc::new(AtomicUsize::new(0));
let total_chunks_count = Arc::new(AtomicUsize::new(0));
let processed_count = Arc::new(AtomicUsize::new(0));
let semaphore = Arc::new(Semaphore::new(parallel as usize));
let embedder_model = embedding_config
.providers
.first()
.map(|p| p.model.clone())
.unwrap_or_else(|| "unknown".to_string());
let calibration_done = Arc::new(AtomicBool::new(false));
let tracker = tracker.map(|t| Arc::new(Mutex::new(t)));
if let Some(ref t) = tracker {
t.lock().await.start_calibration();
}
let mut handles = Vec::with_capacity(files.len());
for file_path in files.into_iter() {
let semaphore = Arc::clone(&semaphore);
let rag = Arc::clone(&rag);
let checkpoint = Arc::clone(&checkpoint);
let tracker = tracker.clone();
let indexed_count = Arc::clone(&indexed_count);
let skipped_count = Arc::clone(&skipped_count);
let skipped_resume_count = Arc::clone(&skipped_resume_count);
let failed_count = Arc::clone(&failed_count);
let total_chunks_count = Arc::clone(&total_chunks_count);
let processed_count = Arc::clone(&processed_count);
let calibration_done = Arc::clone(&calibration_done);
let db_path = db_path.clone();
let ns = namespace.clone();
let canonical = canonical.clone();
let embedder_model = embedder_model.clone();
let _ns_name = ns_name.to_string();
let handle = tokio::spawn(async move {
let _permit = semaphore.acquire().await.expect("semaphore closed");
let display_path = file_path
.strip_prefix(&canonical)
.unwrap_or(&file_path)
.display()
.to_string();
if resume {
let cp = checkpoint.lock().await;
if cp.is_indexed(&file_path) {
drop(cp);
skipped_resume_count.fetch_add(1, Ordering::SeqCst);
processed_count.fetch_add(1, Ordering::SeqCst);
if let Some(ref t) = tracker {
t.lock().await.file_skipped();
}
return FileIndexResult::SkippedResume;
}
}
let file_bytes = std::fs::metadata(&file_path).map(|m| m.len()).unwrap_or(0);
let current_processed = processed_count.load(Ordering::SeqCst);
if let Some(ref t) = tracker {
t.lock().await.set_message(&display_path);
} else {
let progress = format!("[{}/{}]", current_processed + 1, total);
eprintln!("{} Indexing {}... ", progress, display_path);
}
let preprocess_config = PreprocessingConfig {
remove_metadata: sanitize_metadata,
..Default::default()
};
let result = if dedup {
if preprocess {
rag.index_document_with_preprocessing_and_dedup(
&file_path,
ns.as_deref(),
preprocess_config,
)
.await
} else {
rag.index_document_with_dedup(&file_path, ns.as_deref(), effective_mode)
.await
}
} else {
if preprocess {
rag.index_document_with_preprocessing(
&file_path,
ns.as_deref(),
preprocess_config,
)
.await
.map(|()| rmcp_memex::IndexResult::Indexed {
chunks_indexed: (file_bytes as usize / 500).max(1),
content_hash: String::new(),
})
} else {
rag.index_document_with_mode(&file_path, ns.as_deref(), effective_mode)
.await
.map(|()| rmcp_memex::IndexResult::Indexed {
chunks_indexed: (file_bytes as usize / 500).max(1),
content_hash: String::new(),
})
}
};
let file_result = match result {
Ok(rmcp_memex::IndexResult::Indexed { chunks_indexed, .. }) => {
if !calibration_done.swap(true, Ordering::SeqCst)
&& let Some(ref t) = tracker
{
let mut guard = t.lock().await;
guard.finish_calibration(chunks_indexed, &embedder_model);
guard.adjust_estimate(file_bytes, chunks_indexed);
guard.start_progress_bar();
}
indexed_count.fetch_add(1, Ordering::SeqCst);
total_chunks_count.fetch_add(chunks_indexed, Ordering::SeqCst);
if let Some(ref t) = tracker {
t.lock().await.file_indexed(chunks_indexed);
} else {
eprintln!(" -> {} done ({} chunks)", display_path, chunks_indexed);
}
if resume {
let mut cp = checkpoint.lock().await;
cp.mark_indexed(&file_path);
let _ = cp.save(&db_path);
}
FileIndexResult::Indexed
}
Ok(rmcp_memex::IndexResult::Skipped { reason, .. }) => {
if !calibration_done.swap(true, Ordering::SeqCst)
&& let Some(ref t) = tracker
{
let mut guard = t.lock().await;
guard.finish_calibration(0, &embedder_model);
guard.start_progress_bar();
}
skipped_count.fetch_add(1, Ordering::SeqCst);
if let Some(ref t) = tracker {
t.lock().await.file_skipped();
} else {
eprintln!(" -> {} SKIPPED ({})", display_path, reason);
}
if resume {
let mut cp = checkpoint.lock().await;
cp.mark_indexed(&file_path);
let _ = cp.save(&db_path);
}
FileIndexResult::Skipped
}
Err(e) => {
if !calibration_done.swap(true, Ordering::SeqCst)
&& let Some(ref t) = tracker
{
let mut guard = t.lock().await;
guard.finish_calibration(0, &embedder_model);
guard.start_progress_bar();
}
failed_count.fetch_add(1, Ordering::SeqCst);
if let Some(ref t) = tracker {
t.lock().await.file_failed();
} else {
eprintln!(" -> {} FAILED: {}", display_path, e);
}
FileIndexResult::Failed
}
};
processed_count.fetch_add(1, Ordering::SeqCst);
file_result
});
handles.push(handle);
}
let mut results = Vec::with_capacity(handles.len());
for handle in handles {
match handle.await {
Ok(result) => results.push(result),
Err(e) => {
failed_count.fetch_add(1, Ordering::SeqCst);
eprintln!("Task panicked: {}", e);
}
}
}
let indexed = indexed_count.load(Ordering::SeqCst);
let skipped = skipped_count.load(Ordering::SeqCst);
let skipped_resume = skipped_resume_count.load(Ordering::SeqCst);
let failed = failed_count.load(Ordering::SeqCst);
let total_chunks = total_chunks_count.load(Ordering::SeqCst);
if let Some(ref t) = tracker {
let mut guard = t.lock().await;
guard.finish();
guard.display_summary();
if skipped_resume > 0 {
eprintln!(" Skipped (resumed): {}", skipped_resume);
}
} else {
eprintln!();
let all_skipped = indexed == 0 && skipped > 0 && failed == 0;
let all_failed = indexed == 0 && skipped == 0 && failed > 0;
if all_skipped {
eprintln!("Indexing complete: All content already indexed");
eprintln!();
eprintln!(" Files checked: {}", total);
eprintln!(" Already indexed: {} (skipped)", skipped);
if skipped_resume > 0 {
eprintln!(" Resumed from: {} (checkpoint)", skipped_resume);
}
eprintln!();
eprintln!(" [OK] No new content to index - your memory is up to date!");
} else if all_failed {
eprintln!("Indexing FAILED: No files were indexed");
eprintln!();
eprintln!(" Files attempted: {}", total);
eprintln!(" Failed: {}", failed);
eprintln!();
eprintln!(" [!] Check file permissions and embedding server connectivity");
} else {
eprintln!("Indexing complete:");
eprintln!();
eprintln!(" New chunks: {}", total_chunks);
eprintln!(" Files indexed: {}", indexed);
if dedup && skipped > 0 {
eprintln!(" Already indexed: {} (skipped)", skipped);
}
if skipped_resume > 0 {
eprintln!(" Resumed from: {} (checkpoint)", skipped_resume);
}
if failed > 0 {
eprintln!(" Failed: {}", failed);
}
eprintln!(" Total processed: {}", total);
}
eprintln!();
eprintln!("Config:");
if let Some(ref ns) = namespace {
eprintln!(" Namespace: {}", ns);
}
eprintln!(" Slice mode: {}", mode_name);
eprintln!(" Parallel workers: {}", parallel);
eprintln!(
" Deduplication: {}",
if dedup { "enabled" } else { "disabled" }
);
eprintln!(" DB path: {}", expanded_db);
}
if resume && failed == 0 {
IndexCheckpoint::delete(&db_path, ns_name);
eprintln!("Checkpoint cleared (all files indexed successfully)");
} else if resume && failed > 0 {
eprintln!(
"Checkpoint preserved ({} files failed - rerun with --resume to retry)",
failed
);
}
Ok(())
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum KeepStrategy {
Oldest,
Newest,
HighestScore,
}
impl KeepStrategy {
#[allow(clippy::should_implement_trait)]
pub fn from_str(s: &str) -> Self {
match s {
"newest" => Self::Newest,
"highest-score" => Self::HighestScore,
_ => Self::Oldest,
}
}
}
#[derive(Debug, Clone, Serialize)]
pub struct DedupResult {
pub total_docs: usize,
pub unique_docs: usize,
pub duplicate_groups: usize,
pub duplicates_removed: usize,
pub docs_without_hash: usize,
pub groups: Vec<DedupGroup>,
}
#[derive(Debug, Clone, Serialize)]
pub struct DedupGroup {
pub content_hash: String,
pub kept_id: String,
pub kept_namespace: String,
removed_ids: Vec<(String, String)>, }
pub async fn run_dedup(
namespace: Option<String>,
dry_run: bool,
keep_strategy: KeepStrategy,
cross_namespace: bool,
json_output: bool,
db_path: String,
) -> Result<()> {
let storage = Arc::new(StorageManager::new_lance_only(&db_path).await?);
let all_docs = storage
.all_documents(namespace.as_deref(), 1_000_000)
.await?;
if all_docs.is_empty() {
if json_output {
println!(
"{}",
serde_json::to_string_pretty(&serde_json::json!({
"status": "empty",
"message": "No documents found",
"namespace": namespace,
}))?
);
} else {
eprintln!("No documents found in database.");
}
return Ok(());
}
if !json_output {
eprintln!("Scanning {} documents for duplicates...", all_docs.len());
if dry_run {
eprintln!("(dry-run mode: no changes will be made)");
}
}
let mut hash_groups: std::collections::HashMap<String, Vec<_>> =
std::collections::HashMap::new();
let mut docs_without_hash = 0;
for doc in &all_docs {
match &doc.content_hash {
Some(hash) if !hash.is_empty() => {
let key = if cross_namespace {
hash.clone()
} else {
format!("{}:{}", doc.namespace, hash)
};
hash_groups.entry(key).or_default().push(doc);
}
_ => {
docs_without_hash += 1;
}
}
}
let mut result = DedupResult {
total_docs: all_docs.len(),
unique_docs: 0,
duplicate_groups: 0,
duplicates_removed: 0,
docs_without_hash,
groups: Vec::new(),
};
for (_key, mut docs) in hash_groups {
if docs.len() == 1 {
result.unique_docs += 1;
continue;
}
match keep_strategy {
KeepStrategy::Oldest => {
docs.sort_by(|a, b| a.id.cmp(&b.id));
}
KeepStrategy::Newest => {
docs.sort_by(|a, b| b.id.cmp(&a.id));
}
KeepStrategy::HighestScore => {
}
}
let kept = &docs[0];
let to_remove: Vec<_> = docs[1..].to_vec();
let group = DedupGroup {
content_hash: kept.content_hash.clone().unwrap_or_default(),
kept_id: kept.id.clone(),
kept_namespace: kept.namespace.clone(),
removed_ids: to_remove
.iter()
.map(|d| (d.id.clone(), d.namespace.clone()))
.collect(),
};
result.duplicate_groups += 1;
result.duplicates_removed += to_remove.len();
result.unique_docs += 1;
if !dry_run {
for doc in &to_remove {
storage.delete_document(&doc.namespace, &doc.id).await?;
}
}
result.groups.push(group);
}
if json_output {
let output = serde_json::json!({
"dry_run": dry_run,
"namespace": namespace,
"cross_namespace": cross_namespace,
"keep_strategy": format!("{:?}", keep_strategy).to_lowercase(),
"result": result,
});
println!("{}", serde_json::to_string_pretty(&output)?);
} else {
eprintln!();
eprintln!(
"Deduplication {}:",
if dry_run { "report" } else { "complete" }
);
eprintln!(" Total documents: {}", result.total_docs);
eprintln!(" Unique documents: {}", result.unique_docs);
eprintln!(" Duplicate groups: {}", result.duplicate_groups);
eprintln!(
" Duplicates {}: {}",
if dry_run { "found" } else { "removed" },
result.duplicates_removed
);
if result.docs_without_hash > 0 {
eprintln!(
" Without hash: {} (cannot deduplicate)",
result.docs_without_hash
);
}
if !result.groups.is_empty() {
eprintln!();
let show_count = result.groups.len().min(5);
eprintln!(
"Sample duplicate groups ({} of {}):",
show_count,
result.groups.len()
);
for group in result.groups.iter().take(show_count) {
eprintln!();
eprintln!(
" Hash: {}...",
&group.content_hash[..group.content_hash.len().min(16)]
);
eprintln!(" Kept: {} (ns: {})", group.kept_id, group.kept_namespace);
for (id, ns) in &group.removed_ids {
eprintln!(
" {} {} (ns: {})",
if dry_run { "Would remove:" } else { "Removed:" },
id,
ns
);
}
}
if result.groups.len() > 5 {
eprintln!();
eprintln!(" ... and {} more groups", result.groups.len() - 5);
}
}
if dry_run && result.duplicates_removed > 0 {
eprintln!();
eprintln!("To actually remove duplicates, run with: --dry-run false");
}
}
Ok(())
}
#[derive(Debug, Clone, Serialize)]
pub struct MigrationResult {
pub from_namespace: String,
pub to_namespace: String,
pub docs_migrated: usize,
pub docs_merged: usize,
pub source_deleted: bool,
pub dry_run: bool,
}
pub async fn run_migrate_namespace(
from: String,
to: String,
db_path: String,
merge: bool,
delete_source: bool,
dry_run: bool,
json_output: bool,
) -> Result<()> {
let db_path = shellexpand::tilde(&db_path).to_string();
let storage = StorageManager::new_lance_only(&db_path).await?;
if from == to {
if json_output {
println!(
"{}",
serde_json::to_string_pretty(&serde_json::json!({
"status": "no-op",
"message": "Source and target namespaces are the same",
"namespace": from
}))?
);
} else {
eprintln!(
"Warning: Source and target namespaces are the same ('{}').",
from
);
eprintln!("No migration needed.");
}
return Ok(());
}
let source_exists = storage.namespace_exists(&from).await?;
if !source_exists {
let msg = format!("Source namespace '{}' does not exist or is empty", from);
if json_output {
println!(
"{}",
serde_json::to_string_pretty(&serde_json::json!({
"status": "error",
"message": msg
}))?
);
} else {
eprintln!("Error: {}", msg);
}
return Err(anyhow::anyhow!(msg));
}
let target_exists = storage.namespace_exists(&to).await?;
if target_exists && !merge {
let msg = format!(
"Target namespace '{}' already exists. Use --merge to merge documents.",
to
);
if json_output {
println!(
"{}",
serde_json::to_string_pretty(&serde_json::json!({
"status": "error",
"message": msg,
"hint": "Use --merge flag to merge into existing namespace"
}))?
);
} else {
eprintln!("Error: {}", msg);
}
return Err(anyhow::anyhow!(msg));
}
let source_docs = storage.get_all_in_namespace(&from).await?;
let source_count = source_docs.len();
if source_count == 0 {
if json_output {
println!(
"{}",
serde_json::to_string_pretty(&serde_json::json!({
"status": "no-op",
"message": "Source namespace is empty",
"namespace": from
}))?
);
} else {
eprintln!("Source namespace '{}' is empty. Nothing to migrate.", from);
}
return Ok(());
}
let target_count_before = if target_exists {
storage.count_namespace(&to).await?
} else {
0
};
if dry_run {
let result = MigrationResult {
from_namespace: from.clone(),
to_namespace: to.clone(),
docs_migrated: source_count,
docs_merged: if target_exists {
target_count_before
} else {
0
},
source_deleted: delete_source,
dry_run: true,
};
if json_output {
println!(
"{}",
serde_json::to_string_pretty(&serde_json::json!({
"status": "dry-run",
"result": result,
"message": "No changes made"
}))?
);
} else {
eprintln!("\n-> Dry Run: Namespace Migration\n");
eprintln!(" From: '{}'", from);
eprintln!(" To: '{}'", to);
eprintln!(" Docs to move: {}", source_count);
if target_exists {
eprintln!(" Existing docs: {} (will be merged)", target_count_before);
}
eprintln!(
" Delete source: {}",
if delete_source { "yes" } else { "no" }
);
eprintln!("\nNo changes made (dry run).");
}
return Ok(());
}
let migrated_docs: Vec<rmcp_memex::ChromaDocument> = source_docs
.into_iter()
.map(|mut doc| {
doc.namespace = to.clone();
doc
})
.collect();
storage.add_to_store(migrated_docs).await?;
let source_deleted = if delete_source {
storage.delete_namespace_documents(&from).await?;
true
} else {
false
};
let result = MigrationResult {
from_namespace: from.clone(),
to_namespace: to.clone(),
docs_migrated: source_count,
docs_merged: if target_exists {
target_count_before
} else {
0
},
source_deleted,
dry_run: false,
};
if json_output {
println!(
"{}",
serde_json::to_string_pretty(&serde_json::json!({
"status": "success",
"result": result
}))?
);
} else {
eprintln!("\n-> Namespace Migration Complete\n");
eprintln!(" From: '{}'", from);
eprintln!(" To: '{}'", to);
eprintln!(" Docs migrated: {}", source_count);
if target_exists {
eprintln!(" Merged with: {} existing docs", target_count_before);
eprintln!(
" Total in '{}': {}",
to,
source_count + target_count_before
);
}
if source_deleted {
eprintln!(" Source '{}': deleted", from);
} else {
eprintln!(
" Source '{}': preserved (use --delete-source to remove)",
from
);
}
eprintln!("\n DB path: {}", db_path);
}
Ok(())
}
pub async fn run_purge_namespace(
namespace: String,
db_path: String,
confirm: bool,
json_output: bool,
) -> Result<()> {
let db_path = shellexpand::tilde(&db_path).to_string();
let storage = StorageManager::new_lance_only(&db_path).await?;
let exists = storage.namespace_exists(&namespace).await?;
if !exists {
let msg = format!("Namespace '{}' does not exist or is empty", namespace);
if json_output {
println!(
"{}",
serde_json::to_string_pretty(&serde_json::json!({
"status": "error",
"message": msg
}))?
);
} else {
eprintln!("Error: {}", msg);
}
return Err(anyhow::anyhow!(msg));
}
let docs = storage.get_all_in_namespace(&namespace).await?;
let doc_count = docs.len();
if !confirm && !json_output {
eprintln!(
"\n⚠️ WARNING: This will permanently delete {} documents from namespace '{}'",
doc_count, namespace
);
eprintln!(" This action cannot be undone!\n");
eprint!(" Type 'yes' to confirm: ");
use std::io::{self, BufRead, Write};
io::stderr().flush()?;
let stdin = io::stdin();
let mut input = String::new();
stdin.lock().read_line(&mut input)?;
if input.trim().to_lowercase() != "yes" {
eprintln!("\n Aborted. No changes made.");
return Ok(());
}
}
let deleted = storage.delete_namespace_documents(&namespace).await?;
if json_output {
println!(
"{}",
serde_json::to_string_pretty(&serde_json::json!({
"status": "success",
"namespace": namespace,
"documents_deleted": doc_count,
"rows_deleted": deleted
}))?
);
} else {
eprintln!("\n✓ Purged namespace '{}'", namespace);
eprintln!(" Documents deleted: {}", doc_count);
eprintln!(" Rows deleted: {}", deleted);
eprintln!(" DB path: {}", db_path);
}
Ok(())
}
#[derive(Debug, Clone, Default, Serialize)]
pub struct MergeStats {
pub total_docs: usize,
pub docs_copied: usize,
pub docs_skipped: usize,
pub namespaces: HashSet<String>,
pub sources_processed: usize,
pub errors: usize,
}
pub async fn run_merge(
source_paths: Vec<PathBuf>,
target_path: PathBuf,
dedup: bool,
namespace_prefix: Option<String>,
dry_run: bool,
json_output: bool,
) -> Result<()> {
let mut stats = MergeStats::default();
let mut validated_sources: Vec<PathBuf> = Vec::new();
for source in &source_paths {
let source_str = source.to_str().unwrap_or("");
match path_utils::sanitize_existing_path(source_str) {
Ok(validated) => validated_sources.push(validated),
Err(e) => {
if !json_output {
eprintln!("Warning: Source database invalid: {} - {}", source_str, e);
}
stats.errors += 1;
}
}
}
if validated_sources.is_empty() {
return Err(anyhow::anyhow!("No valid source databases found"));
}
let target_str = target_path.to_str().unwrap_or("");
let validated_target = path_utils::sanitize_new_path(target_str)?;
if !json_output {
eprintln!("\n=== RMCP-MEMEX MERGE ===\n");
eprintln!("Sources: {} database(s)", validated_sources.len());
for src in &validated_sources {
eprintln!(" - {}", src.display());
}
eprintln!("Target: {}", validated_target.display());
if let Some(ref prefix) = namespace_prefix {
eprintln!("Prefix: {}", prefix);
}
eprintln!("Dedup: {}", if dedup { "enabled" } else { "disabled" });
if dry_run {
eprintln!("\n[DRY RUN - no changes will be made]\n");
}
eprintln!();
}
let target_storage = if !dry_run {
if let Some(parent) = validated_target.parent() {
std::fs::create_dir_all(parent)?;
}
Some(StorageManager::new_lance_only(validated_target.to_str().unwrap_or("")).await?)
} else {
None
};
let mut seen_hashes: HashSet<String> = HashSet::new();
if dedup
&& !dry_run
&& let Some(ref target) = target_storage
{
if let Ok(existing_docs) = target.all_documents(None, 100000).await {
for doc in existing_docs {
if let Some(hash) = doc.content_hash {
seen_hashes.insert(hash);
}
}
if !json_output && !seen_hashes.is_empty() {
eprintln!(
"Found {} existing documents in target for dedup\n",
seen_hashes.len()
);
}
}
}
for source_path in &validated_sources {
if !json_output {
eprintln!("Processing: {}", source_path.display());
}
let source_path_str = source_path.to_str().unwrap_or("");
let source_storage = match StorageManager::new_lance_only(source_path_str).await {
Ok(s) => s,
Err(e) => {
if !json_output {
eprintln!(" Error opening source: {}", e);
}
stats.errors += 1;
continue;
}
};
let source_docs = match source_storage.all_documents(None, 100000).await {
Ok(docs) => docs,
Err(e) => {
if !json_output {
eprintln!(" Error reading source: {}", e);
}
stats.errors += 1;
continue;
}
};
if source_docs.is_empty() {
if !json_output {
eprintln!(" (empty database)\n");
}
stats.sources_processed += 1;
continue;
}
let source_doc_count = source_docs.len();
stats.total_docs += source_doc_count;
let mut by_namespace: std::collections::HashMap<String, Vec<_>> =
std::collections::HashMap::new();
for doc in source_docs {
by_namespace
.entry(doc.namespace.clone())
.or_default()
.push(doc);
}
if !json_output {
eprintln!(
" Found {} documents in {} namespace(s)",
source_doc_count,
by_namespace.len()
);
}
for (ns_name, docs) in by_namespace {
let target_namespace = if let Some(ref prefix) = namespace_prefix {
format!("{}{}", prefix, ns_name)
} else {
ns_name.clone()
};
stats.namespaces.insert(target_namespace.clone());
let mut ns_copied = 0;
let mut ns_skipped = 0;
let mut batch: Vec<rmcp_memex::ChromaDocument> = Vec::new();
for doc in docs {
if dedup && let Some(ref hash) = doc.content_hash {
if seen_hashes.contains(hash) {
ns_skipped += 1;
stats.docs_skipped += 1;
continue;
}
seen_hashes.insert(hash.clone());
}
let new_doc = rmcp_memex::ChromaDocument {
id: doc.id,
namespace: target_namespace.clone(),
embedding: doc.embedding,
metadata: doc.metadata,
document: doc.document,
layer: doc.layer,
parent_id: doc.parent_id,
children_ids: doc.children_ids,
keywords: doc.keywords,
content_hash: doc.content_hash,
};
batch.push(new_doc);
ns_copied += 1;
stats.docs_copied += 1;
}
if !dry_run
&& !batch.is_empty()
&& let Some(ref target) = target_storage
&& let Err(e) = target.add_to_store(batch).await
{
if !json_output {
eprintln!(" Error writing to target: {}", e);
}
stats.errors += 1;
}
if !json_output {
let prefix_info = if namespace_prefix.is_some() {
format!(" -> {}", target_namespace)
} else {
String::new()
};
if ns_skipped > 0 {
eprintln!(
" [{}{}] {} copied, {} skipped (duplicate)",
ns_name, prefix_info, ns_copied, ns_skipped
);
} else {
eprintln!(" [{}{}] {} copied", ns_name, prefix_info, ns_copied);
}
}
}
stats.sources_processed += 1;
if !json_output {
eprintln!();
}
}
if json_output {
let output = serde_json::json!({
"status": if dry_run { "dry_run" } else { "completed" },
"sources_processed": stats.sources_processed,
"total_docs": stats.total_docs,
"docs_copied": stats.docs_copied,
"docs_skipped": stats.docs_skipped,
"namespaces": stats.namespaces.iter().collect::<Vec<_>>(),
"namespace_count": stats.namespaces.len(),
"errors": stats.errors,
"target": validated_target.display().to_string(),
"dedup_enabled": dedup,
"namespace_prefix": namespace_prefix,
});
println!("{}", serde_json::to_string_pretty(&output)?);
} else {
eprintln!(
"=== MERGE {} ===\n",
if dry_run { "PREVIEW" } else { "COMPLETE" }
);
eprintln!(" Sources processed: {}", stats.sources_processed);
eprintln!(" Total documents: {}", stats.total_docs);
eprintln!(" Documents copied: {}", stats.docs_copied);
if dedup && stats.docs_skipped > 0 {
eprintln!(" Skipped (dedup): {}", stats.docs_skipped);
}
eprintln!(" Namespaces: {}", stats.namespaces.len());
if stats.errors > 0 {
eprintln!(" Errors: {}", stats.errors);
}
eprintln!(" Target database: {}", validated_target.display());
if dry_run {
eprintln!("\n[DRY RUN - run without --dry-run to apply changes]");
}
}
Ok(())
}