use anyhow::Result;
use clap::{Parser, Subcommand};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::sync::Mutex;
use tracing::{Level, info};
use tracing_subscriber::FmtSubscriber;
use walkdir::WalkDir;
use rmcp_memex::{
EmbeddingClient, EmbeddingConfig, HybridConfig, HybridSearchResult, HybridSearcher,
IndexProgressTracker, MlxConfig, NamespaceSecurityConfig, PreprocessingConfig, ProviderConfig,
RAGPipeline, RerankerConfig, SearchMode, ServerConfig, SliceLayer, SliceMode, StorageManager,
WizardConfig, run_stdio_server, run_wizard,
};
fn parse_features(raw: &str) -> Vec<String> {
raw.split(',')
.map(|s| s.trim())
.filter(|s| !s.is_empty())
.map(|s| s.to_string())
.collect()
}
const CONFIG_SEARCH_PATHS: &[&str] = &[
"~/.rmcp-servers/rmcp-memex/config.toml",
"~/.config/rmcp-memex/config.toml",
"~/.rmcp_servers/rmcp_memex/config.toml", ];
fn discover_config() -> Option<String> {
if let Ok(path) = std::env::var("RMCP_MEMEX_CONFIG") {
let expanded = shellexpand::tilde(&path).to_string();
if std::path::Path::new(&expanded).exists() {
return Some(path);
}
}
for path in CONFIG_SEARCH_PATHS {
let expanded = shellexpand::tilde(path).to_string();
if std::path::Path::new(&expanded).exists() {
return Some(path.to_string());
}
}
None
}
fn load_file_config(path: &str) -> Result<FileConfig> {
let expanded = shellexpand::tilde(path).to_string();
let canonical = std::path::Path::new(&expanded) .canonicalize()
.map_err(|e| anyhow::anyhow!("Cannot resolve config path '{}': {}", path, e))?;
let home = std::env::var("HOME")
.or_else(|_| std::env::var("USERPROFILE"))
.map(std::path::PathBuf::from)
.ok();
let cwd = std::env::current_dir().ok();
let is_safe = home
.as_ref()
.map(|h| canonical.starts_with(h))
.unwrap_or(false)
|| cwd
.as_ref()
.map(|c| canonical.starts_with(c))
.unwrap_or(false);
if !is_safe {
return Err(anyhow::anyhow!(
"Access denied: config path '{}' is outside allowed directories",
path
));
}
let contents = std::fs::read_to_string(&canonical)?; toml::from_str(&contents).map_err(Into::into)
}
fn load_or_discover_config(explicit_path: Option<&str>) -> Result<(FileConfig, Option<String>)> {
if let Some(path) = explicit_path {
return Ok((load_file_config(path)?, Some(path.to_string())));
}
if let Some(discovered) = discover_config() {
return Ok((load_file_config(&discovered)?, Some(discovered)));
}
Ok((FileConfig::default(), None))
}
#[derive(serde::Deserialize, Default, Clone)]
struct FileConfig {
mode: Option<String>,
features: Option<String>,
cache_mb: Option<usize>,
db_path: Option<String>,
max_request_bytes: Option<usize>,
log_level: Option<String>,
allowed_paths: Option<Vec<String>>,
security_enabled: Option<bool>,
token_store_path: Option<String>,
preprocessing_enabled: Option<bool>,
#[serde(default)]
embeddings: Option<EmbeddingsFileConfig>,
#[serde(default)]
mlx: Option<MlxFileConfig>,
}
#[derive(serde::Deserialize, Clone)]
struct EmbeddingsFileConfig {
#[serde(default = "default_dimension")]
required_dimension: usize,
#[serde(default)]
providers: Vec<ProviderFileConfig>,
#[serde(default)]
reranker: Option<RerankerFileConfig>,
}
fn default_dimension() -> usize {
4096
}
impl Default for EmbeddingsFileConfig {
fn default() -> Self {
Self {
required_dimension: 4096,
providers: vec![],
reranker: None,
}
}
}
#[derive(serde::Deserialize, Clone)]
struct ProviderFileConfig {
name: String,
base_url: String,
model: String,
#[serde(default = "default_priority")]
priority: u8,
#[serde(default = "default_endpoint")]
endpoint: String,
}
fn default_priority() -> u8 {
10
}
fn default_endpoint() -> String {
"/v1/embeddings".to_string()
}
#[derive(serde::Deserialize, Clone)]
struct RerankerFileConfig {
base_url: String,
model: String,
#[serde(default = "default_rerank_endpoint")]
endpoint: String,
}
fn default_rerank_endpoint() -> String {
"/v1/rerank".to_string()
}
#[derive(serde::Deserialize, Default, Clone)]
struct MlxFileConfig {
#[serde(default)]
disabled: bool,
local_port: Option<u16>,
dragon_url: Option<String>,
dragon_port: Option<u16>,
embedder_model: Option<String>,
reranker_model: Option<String>,
reranker_port_offset: Option<u16>,
}
impl MlxFileConfig {
fn to_mlx_config(&self) -> MlxConfig {
let mut config = MlxConfig::from_env();
config.merge_file_config(
Some(self.disabled),
self.local_port,
self.dragon_url.clone(),
self.dragon_port,
self.embedder_model.clone(),
self.reranker_model.clone(),
self.reranker_port_offset,
);
config
}
}
impl FileConfig {
fn to_embedding_config(&self) -> EmbeddingConfig {
if let Some(ref emb) = self.embeddings
&& !emb.providers.is_empty()
{
let providers = emb
.providers
.iter()
.map(|p| ProviderConfig {
name: p.name.clone(),
base_url: p.base_url.clone(),
model: p.model.clone(),
priority: p.priority,
endpoint: p.endpoint.clone(),
})
.collect();
let reranker = emb
.reranker
.as_ref()
.map(|r| RerankerConfig {
base_url: Some(r.base_url.clone()),
model: Some(r.model.clone()),
endpoint: r.endpoint.clone(),
})
.unwrap_or_default();
return EmbeddingConfig {
required_dimension: emb.required_dimension,
max_batch_chars: 32000, max_batch_items: 16, providers,
reranker,
};
}
if let Some(ref mlx) = self.mlx {
tracing::warn!("Using legacy [mlx] config - please migrate to [embeddings.providers]");
return mlx.to_mlx_config().to_embedding_config();
}
MlxConfig::from_env().to_embedding_config()
}
}
#[derive(Parser, Debug)]
#[command(author, version, about = "RAG/memory MCP server with LanceDB vector storage", long_about = None)]
struct Cli {
#[command(subcommand)]
command: Option<Commands>,
#[arg(long, global = true)]
config: Option<String>,
#[arg(long, value_parser = ["memory", "full"], global = true)]
mode: Option<String>,
#[arg(long, global = true)]
features: Option<String>,
#[arg(long, global = true)]
cache_mb: Option<usize>,
#[arg(long, global = true)]
db_path: Option<String>,
#[arg(long, global = true)]
max_request_bytes: Option<usize>,
#[arg(long, global = true)]
log_level: Option<String>,
#[arg(long, global = true, action = clap::ArgAction::Append)]
allowed_paths: Option<Vec<String>>,
#[arg(long, global = true)]
security_enabled: bool,
#[arg(long, global = true)]
token_store_path: Option<String>,
}
#[derive(Subcommand, Debug)]
enum Commands {
Serve,
#[command(alias = "config")]
Wizard {
#[arg(long)]
dry_run: bool,
},
Overview {
namespace: Option<String>,
#[arg(long)]
json: bool,
},
Dive {
#[arg(long, short = 'n', required = true)]
namespace: String,
#[arg(long, short = 'q', required = true)]
query: String,
#[arg(long, short = 'l', default_value = "5")]
limit: usize,
#[arg(long, short = 'v')]
verbose: bool,
#[arg(long)]
json: bool,
},
Index {
#[arg(required = true)]
path: PathBuf,
#[arg(long, short = 'n')]
namespace: Option<String>,
#[arg(long, short = 'r')]
recursive: bool,
#[arg(long, short = 'g')]
glob: Option<String>,
#[arg(long, default_value = "0")]
max_depth: usize,
#[arg(long, short = 'p')]
preprocess: bool,
#[arg(long)]
sanitize_metadata: bool,
#[arg(long, short = 's', default_value = "onion", value_parser = ["onion", "flat"])]
slice_mode: String,
#[arg(long, default_value = "true", action = clap::ArgAction::Set)]
dedup: bool,
#[arg(long)]
progress: bool,
},
Search {
#[arg(long, short = 'n', required = true)]
namespace: String,
#[arg(long, short = 'q', required = true)]
query: String,
#[arg(long, short = 'l', default_value = "10")]
limit: usize,
#[arg(long)]
json: bool,
#[arg(long)]
deep: bool,
#[arg(long, value_parser = ["outer", "middle", "inner", "core"])]
layer: Option<String>,
#[arg(long, short = 'm', default_value = "hybrid", value_parser = ["vector", "keyword", "bm25", "hybrid"])]
mode: String,
#[arg(long, default_value = "true", action = clap::ArgAction::Set)]
scores: bool,
},
Expand {
#[arg(long, short = 'n', required = true)]
namespace: String,
#[arg(long, short = 'i', required = true)]
id: String,
#[arg(long)]
json: bool,
},
Get {
#[arg(long, short = 'n', required = true)]
namespace: String,
#[arg(long, short = 'i', required = true)]
id: String,
#[arg(long)]
json: bool,
},
RagSearch {
#[arg(long, short = 'q', required = true)]
query: String,
#[arg(long, short = 'l', default_value = "10")]
limit: usize,
#[arg(long, short = 'n')]
namespace: Option<String>,
#[arg(long)]
json: bool,
},
Namespaces {
#[arg(long, short = 's')]
stats: bool,
#[arg(long)]
json: bool,
},
Export {
#[arg(long, short = 'n', required = true)]
namespace: String,
#[arg(long, short = 'o')]
output: Option<PathBuf>,
#[arg(long)]
include_embeddings: bool,
},
Upsert {
#[arg(long, short = 'n', required = true)]
namespace: String,
#[arg(long, short = 'i', required = true)]
id: String,
#[arg(long, short = 't')]
text: Option<String>,
#[arg(long, short = 'm', default_value = "{}")]
metadata: String,
},
}
impl Cli {
fn into_server_config(self) -> Result<ServerConfig> {
let (file_cfg, config_path) = load_or_discover_config(self.config.as_deref())?;
if let Some(ref path) = config_path {
eprintln!("Using config: {}", path);
}
let embeddings = file_cfg.to_embedding_config();
let mode = self.mode.as_deref().or(file_cfg.mode.as_deref());
let base_cfg = match mode {
Some("memory") => ServerConfig::for_memory_only(),
Some("full") => ServerConfig::for_full_rag(),
_ => ServerConfig::default(),
};
let features = self
.features
.or(file_cfg.features)
.map(|s| parse_features(&s))
.unwrap_or(base_cfg.features);
let security_enabled = self.security_enabled || file_cfg.security_enabled.unwrap_or(false);
let token_store_path = self.token_store_path.or(file_cfg.token_store_path);
Ok(ServerConfig {
features,
cache_mb: self
.cache_mb
.or(file_cfg.cache_mb)
.unwrap_or(base_cfg.cache_mb),
db_path: self
.db_path
.or(file_cfg.db_path)
.unwrap_or(base_cfg.db_path),
max_request_bytes: self
.max_request_bytes
.or(file_cfg.max_request_bytes)
.unwrap_or(base_cfg.max_request_bytes),
log_level: self
.log_level
.or(file_cfg.log_level)
.map(|s| parse_log_level(&s))
.unwrap_or(base_cfg.log_level),
allowed_paths: self
.allowed_paths
.or(file_cfg.allowed_paths)
.unwrap_or(base_cfg.allowed_paths),
security: NamespaceSecurityConfig {
enabled: security_enabled,
token_store_path,
},
embeddings,
hybrid: base_cfg.hybrid,
})
}
}
fn parse_log_level(level: &str) -> Level {
match level.to_ascii_lowercase().as_str() {
"trace" => Level::TRACE,
"debug" => Level::DEBUG,
"info" => Level::INFO,
"warn" => Level::WARN,
"error" => Level::ERROR,
_ => Level::INFO,
}
}
fn matches_glob(path: &Path, pattern: &str) -> bool {
let file_name = match path.file_name().and_then(|n| n.to_str()) {
Some(n) => n,
None => return false,
};
glob::Pattern::new(pattern)
.map(|p| p.matches(file_name))
.unwrap_or(false)
}
fn collect_files(
path: &Path,
recursive: bool,
glob_pattern: Option<&str>,
max_depth: usize,
) -> Result<Vec<PathBuf>> {
let mut files = Vec::new();
if path.is_file() {
if let Some(pattern) = glob_pattern {
if matches_glob(path, pattern) {
files.push(path.to_path_buf());
}
} else {
files.push(path.to_path_buf());
}
return Ok(files);
}
let mut walker = WalkDir::new(path);
if !recursive {
walker = walker.max_depth(1);
} else if max_depth > 0 {
walker = walker.max_depth(max_depth);
}
for entry in walker.into_iter().filter_map(|e| e.ok()) {
let entry_path = entry.path();
if !entry_path.is_file() {
continue;
}
if glob_pattern.is_some_and(|pattern| !matches_glob(entry_path, pattern)) {
continue;
}
files.push(entry_path.to_path_buf());
}
Ok(files)
}
fn display_search_results(
query: &str,
namespace: Option<&str>,
results: &[rmcp_memex::SearchResult],
layer_filter: Option<SliceLayer>,
) {
let ns_display = namespace.unwrap_or("all namespaces");
let layer_display = layer_filter
.map(|l| format!(" (layer: {})", l.name()))
.unwrap_or_default();
eprintln!(
"\n-> Search Results for \"{}\" in [{}]{}\n",
query, ns_display, layer_display
);
if results.is_empty() {
eprintln!("No results found.");
return;
}
for (i, result) in results.iter().enumerate() {
let preview: String = result
.text
.chars()
.take(100)
.collect::<String>()
.replace('\n', " ");
let ellipsis = if result.text.len() > 100 { "..." } else { "" };
let layer_str = result
.layer
.map(|l| format!("[{}]", l.name()))
.unwrap_or_default();
eprintln!(
"{}. [{:.2}] {} {}",
i + 1,
result.score,
result.namespace,
layer_str
);
eprintln!(" \"{}{ellipsis}\"", preview);
eprintln!(" ID: {}", result.id);
if !result.keywords.is_empty() {
eprintln!(" Keywords: {}", result.keywords.join(", "));
}
if result.can_expand() {
eprintln!(" [expandable: {} children]", result.children_ids.len());
}
if !result.metadata.is_null() && result.metadata != serde_json::json!({}) {
eprintln!(" Metadata: {}", result.metadata);
}
eprintln!();
}
}
fn json_search_results(
query: &str,
namespace: Option<&str>,
results: &[rmcp_memex::SearchResult],
layer_filter: Option<SliceLayer>,
) -> serde_json::Value {
serde_json::json!({
"query": query,
"namespace": namespace,
"layer_filter": layer_filter.map(|l| l.name()),
"count": results.len(),
"results": results.iter().map(|r| serde_json::json!({
"id": r.id,
"namespace": r.namespace,
"score": r.score,
"text": r.text,
"layer": r.layer.map(|l| l.name()),
"keywords": r.keywords,
"parent_id": r.parent_id,
"children_ids": r.children_ids,
"can_expand": r.can_expand(),
"metadata": r.metadata
})).collect::<Vec<_>>()
})
}
fn display_hybrid_search_results(
query: &str,
namespace: Option<&str>,
results: &[HybridSearchResult],
layer_filter: Option<SliceLayer>,
search_mode: SearchMode,
) {
let ns_display = namespace.unwrap_or("all namespaces");
let layer_display = layer_filter
.map(|l| format!(" (layer: {})", l.name()))
.unwrap_or_default();
let mode_display = match search_mode {
SearchMode::Hybrid => "hybrid",
SearchMode::Keyword => "keyword/bm25",
SearchMode::Vector => "vector",
};
eprintln!(
"\n-> Search Results for \"{}\" in [{}]{} [mode: {}]\n",
query, ns_display, layer_display, mode_display
);
if results.is_empty() {
eprintln!("No results found.");
return;
}
for (i, result) in results.iter().enumerate() {
let preview: String = result
.document
.chars()
.take(100)
.collect::<String>()
.replace('\n', " ");
let ellipsis = if result.document.len() > 100 {
"..."
} else {
""
};
let layer_str = result
.layer
.map(|l| format!("[{}]", l.name()))
.unwrap_or_default();
let score_details = match (result.vector_score, result.bm25_score) {
(Some(v), Some(b)) => format!(
"[combined: {:.2}, vec: {:.2}, bm25: {:.2}]",
result.combined_score, v, b
),
(Some(v), None) => format!("[vec: {:.2}]", v),
(None, Some(b)) => format!("[bm25: {:.2}]", b),
(None, None) => format!("[score: {:.2}]", result.combined_score),
};
eprintln!(
"{}. {} {} {}",
i + 1,
score_details,
result.namespace,
layer_str
);
eprintln!(" \"{}{ellipsis}\"", preview);
eprintln!(" ID: {}", result.id);
if !result.keywords.is_empty() {
eprintln!(" Keywords: {}", result.keywords.join(", "));
}
if !result.children_ids.is_empty() {
eprintln!(" [expandable: {} children]", result.children_ids.len());
}
if !result.metadata.is_null() && result.metadata != serde_json::json!({}) {
eprintln!(" Metadata: {}", result.metadata);
}
eprintln!();
}
}
fn json_hybrid_search_results(
query: &str,
namespace: Option<&str>,
results: &[HybridSearchResult],
layer_filter: Option<SliceLayer>,
search_mode: SearchMode,
) -> serde_json::Value {
serde_json::json!({
"query": query,
"namespace": namespace,
"layer_filter": layer_filter.map(|l| l.name()),
"search_mode": match search_mode {
SearchMode::Hybrid => "hybrid",
SearchMode::Keyword => "keyword",
SearchMode::Vector => "vector",
},
"count": results.len(),
"results": results.iter().map(|r| serde_json::json!({
"id": r.id,
"namespace": r.namespace,
"combined_score": r.combined_score,
"vector_score": r.vector_score,
"bm25_score": r.bm25_score,
"text": r.document,
"layer": r.layer.map(|l| l.name()),
"keywords": r.keywords,
"parent_id": r.parent_id,
"children_ids": r.children_ids,
"metadata": r.metadata
})).collect::<Vec<_>>()
})
}
#[allow(clippy::too_many_arguments)] async fn run_search(
namespace: String,
query: String,
limit: usize,
json_output: bool,
db_path: String,
layer_filter: Option<SliceLayer>,
search_mode: SearchMode,
embedding_config: &EmbeddingConfig,
) -> Result<()> {
let embedding_client = Arc::new(Mutex::new(EmbeddingClient::new(embedding_config).await?));
let storage = Arc::new(StorageManager::new_lance_only(&db_path).await?);
if search_mode != SearchMode::Vector {
let hybrid_config = HybridConfig {
mode: search_mode,
..Default::default()
};
let hybrid_searcher = HybridSearcher::new(storage, hybrid_config).await?;
let query_embedding = embedding_client.lock().await.embed(&query).await?;
let results = hybrid_searcher
.search(
&query,
query_embedding,
Some(&namespace),
limit,
layer_filter,
)
.await?;
if json_output {
let json = json_hybrid_search_results(
&query,
Some(&namespace),
&results,
layer_filter,
search_mode,
);
println!("{}", serde_json::to_string_pretty(&json)?);
} else {
display_hybrid_search_results(
&query,
Some(&namespace),
&results,
layer_filter,
search_mode,
);
}
} else {
let rag = RAGPipeline::new(embedding_client, storage).await?;
let results = rag
.memory_search_with_layer(&namespace, &query, limit, layer_filter)
.await?;
if json_output {
let json = json_search_results(&query, Some(&namespace), &results, layer_filter);
println!("{}", serde_json::to_string_pretty(&json)?);
} else {
display_search_results(&query, Some(&namespace), &results, layer_filter);
}
}
Ok(())
}
async fn run_expand(
namespace: String,
id: String,
json_output: bool,
db_path: String,
embedding_config: &EmbeddingConfig,
) -> Result<()> {
let embedding_client = Arc::new(Mutex::new(EmbeddingClient::new(embedding_config).await?));
let storage = Arc::new(StorageManager::new_lance_only(&db_path).await?);
let rag = RAGPipeline::new(embedding_client, storage).await?;
let results = rag.expand_result(&namespace, &id).await?;
if json_output {
let json = serde_json::json!({
"parent_id": id,
"namespace": namespace,
"children_count": results.len(),
"children": results.iter().map(|r| serde_json::json!({
"id": r.id,
"layer": r.layer.map(|l| l.name()),
"text": r.text,
"keywords": r.keywords,
"parent_id": r.parent_id,
"children_ids": r.children_ids,
})).collect::<Vec<_>>()
});
println!("{}", serde_json::to_string_pretty(&json)?);
} else {
eprintln!("\n-> Children of slice \"{id}\" in [{namespace}]\n");
if results.is_empty() {
eprintln!("No children found (this may be a leaf/outer slice).");
} else {
for (i, result) in results.iter().enumerate() {
let layer_str = result.layer.map(|l| l.name()).unwrap_or("flat");
let preview: String = result
.text
.chars()
.take(100)
.collect::<String>()
.replace('\n', " ");
let ellipsis = if result.text.len() > 100 { "..." } else { "" };
eprintln!("{}. [{}] {}", i + 1, layer_str, result.id);
eprintln!(" \"{}{ellipsis}\"", preview);
if !result.keywords.is_empty() {
eprintln!(" Keywords: {}", result.keywords.join(", "));
}
eprintln!();
}
}
}
Ok(())
}
async fn run_get(
namespace: String,
id: String,
json_output: bool,
db_path: String,
embedding_config: &EmbeddingConfig,
) -> Result<()> {
let embedding_client = Arc::new(Mutex::new(EmbeddingClient::new(embedding_config).await?));
let storage = Arc::new(StorageManager::new_lance_only(&db_path).await?);
let rag = RAGPipeline::new(embedding_client, storage).await?;
match rag.memory_get(&namespace, &id).await? {
Some(result) => {
if json_output {
let json = serde_json::json!({
"found": true,
"id": result.id,
"namespace": result.namespace,
"text": result.text,
"metadata": result.metadata
});
println!("{}", serde_json::to_string_pretty(&json)?);
} else {
eprintln!("\n-> Found chunk in [{namespace}]\n");
eprintln!("ID: {}", result.id);
eprintln!("Namespace: {}", result.namespace);
if !result.metadata.is_null() && result.metadata != serde_json::json!({}) {
eprintln!("Metadata: {}", result.metadata);
}
eprintln!("\n--- Content ---\n");
println!("{}", result.text);
}
}
None => {
if json_output {
let json = serde_json::json!({
"found": false,
"namespace": namespace,
"id": id
});
println!("{}", serde_json::to_string_pretty(&json)?);
} else {
eprintln!("Chunk '{}' not found in namespace '{}'", id, namespace);
}
}
}
Ok(())
}
async fn run_rag_search(
query: String,
limit: usize,
namespace: Option<String>,
json_output: bool,
db_path: String,
embedding_config: &EmbeddingConfig,
) -> Result<()> {
let embedding_client = Arc::new(Mutex::new(EmbeddingClient::new(embedding_config).await?));
let storage = Arc::new(StorageManager::new_lance_only(&db_path).await?);
let rag = RAGPipeline::new(embedding_client, storage).await?;
let results = rag
.search_inner(namespace.as_deref(), &query, limit)
.await?;
if json_output {
let json = json_search_results(&query, namespace.as_deref(), &results, None);
println!("{}", serde_json::to_string_pretty(&json)?);
} else {
display_search_results(&query, namespace.as_deref(), &results, None);
}
Ok(())
}
async fn run_list_namespaces(stats: bool, json_output: bool, db_path: String) -> Result<()> {
let storage = StorageManager::new_lance_only(&db_path).await?;
let storage = Arc::new(storage);
let zero_embedding = vec![0.0_f32; 4096]; let all_docs = storage.search_store(None, zero_embedding, 10000).await?;
let mut namespace_counts: std::collections::HashMap<String, usize> =
std::collections::HashMap::new();
for doc in &all_docs {
*namespace_counts.entry(doc.namespace.clone()).or_insert(0) += 1;
}
let mut namespaces: Vec<_> = namespace_counts.into_iter().collect();
namespaces.sort_by(|a, b| a.0.cmp(&b.0));
if json_output {
let json = if stats {
serde_json::json!({
"namespaces": namespaces.iter().map(|(ns, count)| serde_json::json!({
"name": ns,
"document_count": count
})).collect::<Vec<_>>()
})
} else {
serde_json::json!({
"namespaces": namespaces.iter().map(|(ns, _)| ns).collect::<Vec<_>>()
})
};
println!("{}", serde_json::to_string_pretty(&json)?);
} else {
eprintln!("\n-> Namespaces in {}\n", storage.lance_path());
if namespaces.is_empty() {
eprintln!("No namespaces found (database may be empty).");
} else {
for (ns, count) in &namespaces {
if stats {
eprintln!(" {} ({} documents)", ns, count);
} else {
eprintln!(" {}", ns);
}
}
eprintln!();
eprintln!("Total: {} namespace(s)", namespaces.len());
}
}
Ok(())
}
#[derive(Debug, Clone, serde::Serialize)]
struct NamespaceStats {
name: String,
total_chunks: usize,
layer_counts: std::collections::HashMap<String, usize>,
top_keywords: Vec<(String, usize)>,
has_timestamps: bool,
earliest_indexed: Option<String>,
latest_indexed: Option<String>,
}
async fn run_overview(namespace: Option<String>, json_output: bool, db_path: String) -> Result<()> {
let storage = StorageManager::new_lance_only(&db_path).await?;
let storage = Arc::new(storage);
let zero_embedding = vec![0.0_f32; 4096];
let all_docs = storage
.search_store(namespace.as_deref(), zero_embedding, 100000)
.await?;
if all_docs.is_empty() {
if json_output {
println!(
"{}",
serde_json::to_string_pretty(&serde_json::json!({
"status": "empty",
"message": "No documents found",
"namespace": namespace,
"db_path": db_path
}))?
);
} else {
eprintln!("\n-> Overview for {}\n", storage.lance_path());
if let Some(ns) = &namespace {
eprintln!("No documents found in namespace '{}'", ns);
} else {
eprintln!("Database is empty. Use 'rmcp-memex index' to add documents.");
}
}
return Ok(());
}
let mut by_namespace: std::collections::HashMap<String, Vec<_>> =
std::collections::HashMap::new();
for doc in &all_docs {
by_namespace
.entry(doc.namespace.clone())
.or_default()
.push(doc);
}
let mut stats_list: Vec<NamespaceStats> = Vec::new();
for (ns_name, docs) in &by_namespace {
let mut layer_counts: std::collections::HashMap<String, usize> =
std::collections::HashMap::new();
for doc in docs {
let layer_name = match doc.layer {
1 => "outer",
2 => "middle",
3 => "inner",
4 => "core",
_ => "flat",
};
*layer_counts.entry(layer_name.to_string()).or_insert(0) += 1;
}
let mut keyword_counts: std::collections::HashMap<String, usize> =
std::collections::HashMap::new();
for doc in docs {
for kw in &doc.keywords {
*keyword_counts.entry(kw.clone()).or_insert(0) += 1;
}
}
let mut top_keywords: Vec<_> = keyword_counts.into_iter().collect();
top_keywords.sort_by(|a, b| b.1.cmp(&a.1));
let top_keywords: Vec<(String, usize)> = top_keywords.into_iter().take(10).collect();
let has_timestamps = docs.iter().any(|d| {
let meta_str = d.metadata.to_string();
meta_str.contains("timestamp")
|| meta_str.contains("created_at")
|| meta_str.contains("indexed_at")
|| meta_str.contains("date")
});
let mut dates: Vec<String> = Vec::new();
for doc in docs {
if let Some(obj) = doc.metadata.as_object() {
for (k, v) in obj {
if (k.contains("date") || k.contains("timestamp") || k.contains("time"))
&& let Some(s) = v.as_str()
{
dates.push(s.to_string());
}
}
}
}
dates.sort();
stats_list.push(NamespaceStats {
name: ns_name.clone(),
total_chunks: docs.len(),
layer_counts,
top_keywords,
has_timestamps,
earliest_indexed: dates.first().cloned(),
latest_indexed: dates.last().cloned(),
});
}
stats_list.sort_by(|a, b| a.name.cmp(&b.name));
if json_output {
let json = serde_json::json!({
"db_path": db_path,
"total_chunks": all_docs.len(),
"namespace_count": stats_list.len(),
"namespaces": stats_list
});
println!("{}", serde_json::to_string_pretty(&json)?);
} else {
eprintln!("\n=== RMCP-MEMEX OVERVIEW ===\n");
eprintln!("Database: {}", db_path);
eprintln!("Total chunks: {}", all_docs.len());
eprintln!("Namespaces: {}\n", stats_list.len());
for stats in &stats_list {
eprintln!("--- {} ---", stats.name);
eprintln!(" Chunks: {}", stats.total_chunks);
if !stats.layer_counts.is_empty() {
let layer_str: Vec<String> = stats
.layer_counts
.iter()
.map(|(k, v)| format!("{}:{}", k, v))
.collect();
eprintln!(" Layers: {}", layer_str.join(", "));
}
if !stats.top_keywords.is_empty() {
let kw_str: Vec<String> = stats
.top_keywords
.iter()
.take(5)
.map(|(k, v)| format!("{}({})", k, v))
.collect();
eprintln!(" Top topics: {}", kw_str.join(", "));
}
if let (Some(earliest), Some(latest)) = (&stats.earliest_indexed, &stats.latest_indexed)
{
if earliest != latest {
eprintln!(" Date range: {} -> {}", earliest, latest);
} else {
eprintln!(" Date: {}", earliest);
}
}
if !stats.has_timestamps {
eprintln!(" [!] No timestamp metadata found");
}
eprintln!();
}
eprintln!("Tip: Use 'rmcp-memex search -n <namespace> -q <query>' to search");
eprintln!(" Use 'rmcp-memex dive -n <namespace> -q <query>' for deep exploration");
}
Ok(())
}
async fn run_dive(
namespace: String,
query: String,
limit: usize,
verbose: bool,
json_output: bool,
db_path: String,
embedding_config: &EmbeddingConfig,
) -> Result<()> {
let embedding_client = Arc::new(Mutex::new(EmbeddingClient::new(embedding_config).await?));
let storage = Arc::new(StorageManager::new_lance_only(&db_path).await?);
let rag = RAGPipeline::new(embedding_client, storage).await?;
let layers = [
(Some(SliceLayer::Outer), "OUTER"),
(Some(SliceLayer::Middle), "MIDDLE"),
(Some(SliceLayer::Inner), "INNER"),
(Some(SliceLayer::Core), "CORE"),
];
let mut all_results: Vec<serde_json::Value> = Vec::new();
if !json_output {
eprintln!("\n=== DEEP DIVE: \"{}\" in [{}] ===\n", query, namespace);
}
for (layer_filter, layer_name) in &layers {
let results = rag
.memory_search_with_layer(&namespace, &query, limit, *layer_filter)
.await?;
if json_output {
let layer_results: Vec<serde_json::Value> = results
.iter()
.map(|r| {
let mut obj = serde_json::json!({
"id": r.id,
"score": r.score,
"keywords": r.keywords,
"layer": r.layer.map(|l| l.name()),
"can_expand": r.can_expand(),
"parent_id": r.parent_id,
});
if verbose {
obj["text"] = serde_json::json!(r.text);
obj["metadata"] = r.metadata.clone();
obj["children_ids"] = serde_json::json!(r.children_ids);
} else {
let preview: String = r.text.chars().take(200).collect();
obj["preview"] = serde_json::json!(preview);
}
obj
})
.collect();
all_results.push(serde_json::json!({
"layer": layer_name,
"count": results.len(),
"results": layer_results
}));
} else {
eprintln!("--- {} LAYER ({} results) ---", layer_name, results.len());
if results.is_empty() {
eprintln!(" (no results)\n");
continue;
}
for (i, result) in results.iter().enumerate() {
eprintln!(" {}. [score: {:.3}] {}", i + 1, result.score, result.id);
if !result.keywords.is_empty() {
eprintln!(" Keywords: {}", result.keywords.join(", "));
}
if verbose {
eprintln!(" ---");
for line in result.text.lines().take(20) {
eprintln!(" {}", line);
}
if result.text.lines().count() > 20 {
eprintln!(" ... ({} more lines)", result.text.lines().count() - 20);
}
eprintln!(" ---");
if !result.metadata.is_null() && result.metadata != serde_json::json!({}) {
eprintln!(" Metadata: {}", result.metadata);
}
} else {
let preview: String = result
.text
.chars()
.take(100)
.collect::<String>()
.replace('\n', " ");
let ellipsis = if result.text.len() > 100 { "..." } else { "" };
eprintln!(" \"{}{}\"", preview, ellipsis);
}
if result.can_expand() {
eprintln!(" [expandable: {} children]", result.children_ids.len());
}
if result.parent_id.is_some() {
eprintln!(" [has parent: can drill up]");
}
eprintln!();
}
}
}
if json_output {
let output = serde_json::json!({
"query": query,
"namespace": namespace,
"limit_per_layer": limit,
"verbose": verbose,
"layers": all_results
});
println!("{}", serde_json::to_string_pretty(&output)?);
} else {
eprintln!("=== END DIVE ===\n");
eprintln!(
"Tip: Use 'rmcp-memex expand -n {} -i <id>' to expand a result",
namespace
);
}
Ok(())
}
async fn run_export(
namespace: String,
output: Option<PathBuf>,
include_embeddings: bool,
db_path: String,
) -> Result<()> {
let storage = StorageManager::new_lance_only(&db_path).await?;
let zero_embedding = vec![0.0_f32; 4096]; let docs = storage
.search_store(Some(&namespace), zero_embedding, 100000)
.await?;
if docs.is_empty() {
eprintln!("No documents found in namespace '{}'", namespace);
return Ok(());
}
let export_data: Vec<serde_json::Value> = docs
.iter()
.map(|doc| {
let mut obj = serde_json::json!({
"id": doc.id,
"namespace": doc.namespace,
"text": doc.document,
"metadata": doc.metadata
});
if include_embeddings {
obj["embedding"] = serde_json::json!(doc.embedding);
}
obj
})
.collect();
let export_json = serde_json::json!({
"namespace": namespace,
"exported_at": chrono::Utc::now().to_rfc3339(),
"document_count": export_data.len(),
"include_embeddings": include_embeddings,
"documents": export_data
});
let json_string = serde_json::to_string_pretty(&export_json)?;
match output {
Some(path) => {
std::fs::write(&path, &json_string)?;
eprintln!(
"Exported {} documents from '{}' to {:?}",
docs.len(),
namespace,
path
);
}
None => {
println!("{}", json_string);
}
}
Ok(())
}
struct BatchIndexConfig {
path: PathBuf,
namespace: Option<String>,
recursive: bool,
glob_pattern: Option<String>,
max_depth: usize,
db_path: String,
preprocess: bool,
sanitize_metadata: bool,
slice_mode: SliceMode,
dedup: bool,
embedding_config: EmbeddingConfig,
show_progress: bool,
}
async fn run_batch_index(config: BatchIndexConfig) -> Result<()> {
let BatchIndexConfig {
path,
namespace,
recursive,
glob_pattern,
max_depth,
db_path,
preprocess,
sanitize_metadata,
slice_mode,
dedup,
embedding_config,
show_progress,
} = config;
let expanded = shellexpand::tilde(path.to_str().unwrap_or("")).to_string();
let canonical = Path::new(&expanded).canonicalize()?;
let files = collect_files(&canonical, recursive, glob_pattern.as_deref(), max_depth)?;
let total = files.len();
if total == 0 {
eprintln!("No files found matching criteria");
return Ok(());
}
let mode_name = match slice_mode {
SliceMode::Onion => "onion (hierarchical)",
SliceMode::Flat => "flat (traditional)",
};
let mut tracker = if show_progress {
let t = IndexProgressTracker::pre_scan(&files);
t.display_pre_scan();
Some(t)
} else {
eprintln!("Found {} files to index (slice mode: {})", total, mode_name);
if preprocess {
eprintln!("Preprocessing enabled: filtering tool artifacts, CLI output, and metadata");
}
if dedup {
eprintln!("Deduplication enabled: skipping files with identical content");
}
None
};
let expanded_db = shellexpand::tilde(&db_path).to_string();
let db_dir = Path::new(&expanded_db);
if let Some(parent) = db_dir.parent() {
std::fs::create_dir_all(parent)?;
}
let embedding_client = Arc::new(Mutex::new(EmbeddingClient::new(&embedding_config).await?));
let storage = Arc::new(StorageManager::new_lance_only(&expanded_db).await?);
let rag = RAGPipeline::new(embedding_client, storage).await?;
let effective_mode = if preprocess {
SliceMode::Flat
} else {
slice_mode
};
let ns = namespace.as_deref();
let mut indexed = 0;
let mut skipped = 0;
let mut failed = 0;
let mut total_chunks = 0;
let embedder_model = embedding_config
.providers
.first()
.map(|p| p.model.clone())
.unwrap_or_else(|| "unknown".to_string());
for (i, file_path) in files.iter().enumerate() {
if i == 0
&& let Some(ref mut t) = tracker
{
t.start_calibration();
}
let display_path = file_path
.strip_prefix(&canonical)
.unwrap_or(file_path)
.display();
if let Some(ref mut t) = tracker {
t.set_message(&format!("{}", display_path));
} else {
let progress = format!("[{}/{}]", i + 1, total);
eprint!("{} Indexing {}... ", progress, display_path);
}
let file_bytes = std::fs::metadata(file_path).map(|m| m.len()).unwrap_or(0);
let preprocess_config = PreprocessingConfig {
remove_metadata: sanitize_metadata,
..Default::default()
};
if dedup {
let result = if preprocess {
rag.index_document_with_preprocessing_and_dedup(
file_path,
ns,
preprocess_config.clone(),
)
.await
} else {
rag.index_document_with_dedup(file_path, ns, effective_mode)
.await
};
match result {
Ok(rmcp_memex::IndexResult::Indexed { chunks_indexed, .. }) => {
if let Some(ref mut t) = tracker {
if i == 0 {
t.finish_calibration(chunks_indexed, &embedder_model);
t.adjust_estimate(file_bytes, chunks_indexed);
t.start_progress_bar();
}
t.file_indexed(chunks_indexed);
} else {
eprintln!("done ({} chunks)", chunks_indexed);
}
indexed += 1;
total_chunks += chunks_indexed;
}
Ok(rmcp_memex::IndexResult::Skipped { reason, .. }) => {
if let Some(ref mut t) = tracker {
if i == 0 {
t.finish_calibration(0, &embedder_model);
t.start_progress_bar();
}
t.file_skipped();
} else {
eprintln!("SKIPPED ({})", reason);
}
skipped += 1;
}
Err(e) => {
if let Some(ref mut t) = tracker {
if i == 0 {
t.finish_calibration(0, &embedder_model);
t.start_progress_bar();
}
t.file_failed();
} else {
eprintln!("FAILED: {}", e);
}
failed += 1;
}
}
} else {
let result = if preprocess {
rag.index_document_with_preprocessing(file_path, ns, preprocess_config.clone())
.await
} else {
rag.index_document_with_mode(file_path, ns, effective_mode)
.await
};
match result {
Ok(()) => {
if let Some(ref mut t) = tracker {
let estimated_chunks = (file_bytes as usize) / 500;
if i == 0 {
t.finish_calibration(estimated_chunks.max(1), &embedder_model);
t.adjust_estimate(file_bytes, estimated_chunks.max(1));
t.start_progress_bar();
}
t.file_indexed(estimated_chunks.max(1));
} else {
eprintln!("done");
}
indexed += 1;
}
Err(e) => {
if let Some(ref mut t) = tracker {
if i == 0 {
t.finish_calibration(0, &embedder_model);
t.start_progress_bar();
}
t.file_failed();
} else {
eprintln!("FAILED: {}", e);
}
failed += 1;
}
}
}
}
if let Some(ref mut t) = tracker {
t.finish();
t.display_summary();
} else {
eprintln!();
eprintln!("Indexing complete:");
eprintln!(" New chunks: {}", total_chunks);
eprintln!(" Files indexed: {}", indexed);
if dedup && skipped > 0 {
eprintln!(" Skipped (duplicate): {}", skipped);
}
if failed > 0 {
eprintln!(" Failed: {}", failed);
}
eprintln!(" Total processed: {}", total);
if let Some(ns) = ns {
eprintln!(" Namespace: {}", ns);
}
eprintln!(" Slice mode: {}", mode_name);
eprintln!(
" Deduplication: {}",
if dedup { "enabled" } else { "disabled" }
);
eprintln!(" DB path: {}", expanded_db);
}
Ok(())
}
#[tokio::main]
async fn main() -> Result<()> {
let cli = Cli::parse();
match cli.command {
Some(Commands::Wizard { dry_run }) => {
let wizard_config = WizardConfig {
config_path: cli.config,
dry_run,
};
run_wizard(wizard_config)
}
Some(Commands::Index {
path,
namespace,
recursive,
glob,
max_depth,
preprocess,
sanitize_metadata,
slice_mode,
dedup,
progress,
}) => {
let (file_cfg, config_path) = load_or_discover_config(cli.config.as_deref())?;
if let Some(ref path) = config_path {
eprintln!("Using config: {}", path);
}
let embedding_config = file_cfg.to_embedding_config();
let db_path = cli
.db_path
.or(file_cfg.db_path)
.unwrap_or_else(|| "~/.rmcp-servers/rmcp-memex/lancedb".to_string());
let _cache_mb = cli.cache_mb.or(file_cfg.cache_mb).unwrap_or(4096);
let preprocess = preprocess || file_cfg.preprocessing_enabled.unwrap_or(false);
let slice_mode: SliceMode = slice_mode.parse().unwrap_or_default();
run_batch_index(BatchIndexConfig {
path,
namespace,
recursive,
glob_pattern: glob,
max_depth,
db_path,
preprocess,
sanitize_metadata,
slice_mode,
dedup,
embedding_config,
show_progress: progress,
})
.await
}
Some(Commands::Overview { namespace, json }) => {
let (file_cfg, config_path) = load_or_discover_config(cli.config.as_deref())?;
if let Some(ref path) = config_path {
eprintln!("Using config: {}", path);
}
let db_path = cli
.db_path
.or(file_cfg.db_path)
.unwrap_or_else(|| "~/.rmcp-servers/rmcp-memex/lancedb".to_string());
let db_path = shellexpand::tilde(&db_path).to_string();
run_overview(namespace, json, db_path).await
}
Some(Commands::Dive {
namespace,
query,
limit,
verbose,
json,
}) => {
let (file_cfg, config_path) = load_or_discover_config(cli.config.as_deref())?;
if let Some(ref path) = config_path {
eprintln!("Using config: {}", path);
}
let embedding_config = file_cfg.to_embedding_config();
let db_path = cli
.db_path
.or(file_cfg.db_path)
.unwrap_or_else(|| "~/.rmcp-servers/rmcp-memex/lancedb".to_string());
let db_path = shellexpand::tilde(&db_path).to_string();
run_dive(
namespace,
query,
limit,
verbose,
json,
db_path,
&embedding_config,
)
.await
}
Some(Commands::Search {
namespace,
query,
limit,
json,
deep,
layer,
mode,
..
}) => {
let (file_cfg, config_path) = load_or_discover_config(cli.config.as_deref())?;
if let Some(ref path) = config_path {
eprintln!("Using config: {}", path);
}
let embedding_config = file_cfg.to_embedding_config();
let db_path = cli
.db_path
.or(file_cfg.db_path)
.unwrap_or_else(|| "~/.rmcp-servers/rmcp-memex/lancedb".to_string());
let db_path = shellexpand::tilde(&db_path).to_string();
let layer_filter = if deep {
None } else if let Some(layer_str) = layer {
match layer_str.as_str() {
"outer" => Some(SliceLayer::Outer),
"middle" => Some(SliceLayer::Middle),
"inner" => Some(SliceLayer::Inner),
"core" => Some(SliceLayer::Core),
_ => None,
}
} else {
None };
let search_mode: SearchMode = mode.parse().unwrap_or_default();
run_search(
namespace,
query,
limit,
json,
db_path,
layer_filter,
search_mode,
&embedding_config,
)
.await
}
Some(Commands::Expand {
namespace,
id,
json,
}) => {
let (file_cfg, config_path) = load_or_discover_config(cli.config.as_deref())?;
if let Some(ref path) = config_path {
eprintln!("Using config: {}", path);
}
let embedding_config = file_cfg.to_embedding_config();
let db_path = cli
.db_path
.or(file_cfg.db_path)
.unwrap_or_else(|| "~/.rmcp-servers/rmcp-memex/lancedb".to_string());
let db_path = shellexpand::tilde(&db_path).to_string();
run_expand(namespace, id, json, db_path, &embedding_config).await
}
Some(Commands::Get {
namespace,
id,
json,
}) => {
let (file_cfg, config_path) = load_or_discover_config(cli.config.as_deref())?;
if let Some(ref path) = config_path {
eprintln!("Using config: {}", path);
}
let embedding_config = file_cfg.to_embedding_config();
let db_path = cli
.db_path
.or(file_cfg.db_path)
.unwrap_or_else(|| "~/.rmcp-servers/rmcp-memex/lancedb".to_string());
let db_path = shellexpand::tilde(&db_path).to_string();
run_get(namespace, id, json, db_path, &embedding_config).await
}
Some(Commands::RagSearch {
query,
limit,
namespace,
json,
}) => {
let (file_cfg, config_path) = load_or_discover_config(cli.config.as_deref())?;
if let Some(ref path) = config_path {
eprintln!("Using config: {}", path);
}
let embedding_config = file_cfg.to_embedding_config();
let db_path = cli
.db_path
.or(file_cfg.db_path)
.unwrap_or_else(|| "~/.rmcp-servers/rmcp-memex/lancedb".to_string());
let db_path = shellexpand::tilde(&db_path).to_string();
run_rag_search(query, limit, namespace, json, db_path, &embedding_config).await
}
Some(Commands::Namespaces { stats, json }) => {
let (file_cfg, config_path) = load_or_discover_config(cli.config.as_deref())?;
if let Some(ref path) = config_path {
eprintln!("Using config: {}", path);
}
let db_path = cli
.db_path
.or(file_cfg.db_path)
.unwrap_or_else(|| "~/.rmcp-servers/rmcp-memex/lancedb".to_string());
let db_path = shellexpand::tilde(&db_path).to_string();
run_list_namespaces(stats, json, db_path).await
}
Some(Commands::Export {
namespace,
output,
include_embeddings,
}) => {
let (file_cfg, config_path) = load_or_discover_config(cli.config.as_deref())?;
if let Some(ref path) = config_path {
eprintln!("Using config: {}", path);
}
let db_path = cli
.db_path
.or(file_cfg.db_path)
.unwrap_or_else(|| "~/.rmcp-servers/rmcp-memex/lancedb".to_string());
let db_path = shellexpand::tilde(&db_path).to_string();
run_export(namespace, output, include_embeddings, db_path).await
}
Some(Commands::Upsert {
namespace,
id,
text,
metadata,
}) => {
let (file_cfg, config_path) = load_or_discover_config(cli.config.as_deref())?;
if let Some(ref path) = config_path {
eprintln!("Using config: {}", path);
}
let embedding_config = file_cfg.to_embedding_config();
let db_path = cli
.db_path
.or(file_cfg.db_path)
.unwrap_or_else(|| "~/.rmcp-servers/rmcp-memex/lancedb".to_string());
let db_path = shellexpand::tilde(&db_path).to_string();
let content = match text {
Some(t) => t,
None => {
use std::io::Read;
let mut buffer = String::new();
std::io::stdin().read_to_string(&mut buffer)?;
buffer
}
};
if content.trim().is_empty() {
return Err(anyhow::anyhow!(
"No text provided (use --text or pipe to stdin)"
));
}
let meta: serde_json::Value = serde_json::from_str(&metadata)
.map_err(|e| anyhow::anyhow!("Invalid metadata JSON: {}", e))?;
let embedding_client =
Arc::new(Mutex::new(EmbeddingClient::new(&embedding_config).await?));
let storage = Arc::new(StorageManager::new_lance_only(&db_path).await?);
let rag = RAGPipeline::new(embedding_client, storage).await?;
rag.memory_upsert(&namespace, id.clone(), content.clone(), meta)
.await?;
eprintln!("✓ Upserted chunk '{}' to namespace '{}'", id, namespace);
eprintln!(" Text: {} chars", content.len());
eprintln!(" DB: {}", db_path);
Ok(())
}
Some(Commands::Serve) | None => {
let config = cli.into_server_config()?;
let subscriber = FmtSubscriber::builder()
.with_max_level(config.log_level)
.with_writer(std::io::stderr)
.with_ansi(false)
.finish();
tracing::subscriber::set_global_default(subscriber)?;
info!("Starting RMCP Memex");
info!("Features (informational): {:?}", config.features);
info!("Cache: {}MB", config.cache_mb);
info!("DB Path: {}", config.db_path);
run_stdio_server(config).await
}
}
}