mod commands;
mod detect;
mod doctor;
pub(crate) use trusty_search::{core, mcp, service};
pub(crate) use doctor::run_doctor_checks;
use anyhow::Result;
use clap::{CommandFactory, Parser, Subcommand, ValueEnum};
use clap_complete::{generate, Shell};
use colored::Colorize;
use detect::{detect_project, DetectionMethod};
use eventsource_stream::Eventsource;
use futures_util::stream::StreamExt;
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
use std::io;
use std::time::Duration;
#[derive(Parser)]
#[command(
name = "trusty-search",
version,
author,
propagate_version = true,
subcommand_required = true,
arg_required_else_help = true
)]
struct Cli {
#[arg(short = 'i', long, global = true, env = "TRUSTY_INDEX")]
index: Option<String>,
#[arg(long, global = true)]
json: bool,
#[arg(short, long, global = true)]
verbose: bool,
#[command(subcommand)]
command: Commands,
}
#[derive(Subcommand)]
enum Commands {
#[command(alias = "s", display_order = 1)]
Search {
query: String,
#[arg(short = 'k', long, default_value = "10")]
top_k: usize,
#[arg(short, long)]
full: bool,
#[arg(long, value_enum)]
intent: Option<IntentArg>,
#[arg(long)]
no_kg: bool,
#[arg(long, default_value = "0")]
offset: usize,
#[arg(long, default_value = "8000")]
budget: u32,
},
#[command(alias = "w", display_order = 2)]
Watch {
path: Option<std::path::PathBuf>,
},
#[command(alias = "st", display_order = 3)]
Status,
#[command(alias = "idx", display_order = 4)]
Index {
path: Option<std::path::PathBuf>,
#[arg(short, long)]
name: Option<String>,
#[arg(short, long)]
force: bool,
},
#[command(alias = "i", display_order = 4)]
Init {
path: Option<std::path::PathBuf>,
#[arg(short, long)]
name: Option<String>,
#[arg(long)]
exclude: Vec<String>,
},
#[command(display_order = 5)]
Add {
file: std::path::PathBuf,
},
#[command(alias = "rm", display_order = 6)]
Remove {
file: std::path::PathBuf,
},
#[command(display_order = 7)]
Reindex {
path: Option<std::path::PathBuf>,
},
#[command(alias = "ls", display_order = 10)]
List,
#[command(alias = "q", display_order = 11)]
Query {
query: String,
#[arg(long, default_value = "*")]
indexes: String,
#[arg(short = 'k', long, default_value = "10")]
top_k: usize,
#[arg(short, long)]
full: bool,
},
#[command(display_order = 12)]
Health,
#[command(display_order = 20)]
Start {
#[arg(long, default_value = "7878")]
port: u16,
#[arg(long, default_value_t = false)]
foreground: bool,
},
#[command(display_order = 21)]
Stop,
#[command(display_order = 22)]
Serve {
#[arg(long, default_value_t = false)]
no_http: bool,
#[arg(long, default_value_t = 0)]
port: u16,
#[arg(long)]
http: Option<String>,
},
#[command(display_order = 24)]
Service {
#[command(subcommand)]
action: ServiceAction,
},
#[command(display_order = 23)]
Dashboard,
#[command(display_order = 25)]
Convert {
#[arg(value_name = "TARGET")]
target: ConvertTarget,
#[arg(long)]
dry_run: bool,
#[arg(long, default_value = "4")]
concurrency: usize,
},
#[command(display_order = 23)]
Ui {
#[arg(long)]
port: Option<u16>,
},
#[command(display_order = 28)]
Doctor {
#[arg(long)]
fix: bool,
},
#[command(display_order = 30)]
Completions {
#[arg(value_enum)]
shell: Shell,
},
}
#[derive(Debug, Clone, ValueEnum)]
enum IntentArg {
Definition,
Usage,
Conceptual,
Bugdebt,
Unknown,
}
#[derive(Debug, Clone, Subcommand)]
enum ServiceAction {
Install,
Uninstall,
Status,
Logs,
}
#[derive(Debug, Clone, ValueEnum)]
enum ConvertTarget {
Project,
All,
}
fn resolve_index(explicit: &Option<String>) -> (String, bool) {
if let Some(id) = explicit {
return (id.clone(), false);
}
let cwd = std::env::current_dir().unwrap_or_default();
let ctx = detect_project(&cwd);
let warned = matches!(ctx.detection_method, DetectionMethod::Fallback);
(ctx.index_id, warned)
}
fn print_index_header(index_id: &str, warned: bool) {
if warned {
eprintln!(
"{} No .git or .trusty-search found — using directory name '{}'. \
Run `trusty-search init` to register this project.",
"⚠".yellow(),
index_id
);
}
}
fn daemon_base_url() -> String {
if let Some(addr) = read_http_addr_file() {
return format!("http://{addr}");
}
let port = daemon_port_path()
.and_then(|p| std::fs::read_to_string(p).ok())
.and_then(|s| s.trim().parse::<u16>().ok())
.unwrap_or(7878);
format!("http://127.0.0.1:{port}")
}
fn read_http_addr_file() -> Option<String> {
let path = http_addr_path()?;
let raw = std::fs::read_to_string(&path).ok()?;
let trimmed = raw.trim();
if trimmed.is_empty() {
None
} else {
Some(trimmed.to_string())
}
}
fn http_addr_path() -> Option<std::path::PathBuf> {
dirs::home_dir().map(|h| h.join(".trusty-search").join("http_addr"))
}
fn daemon_port_path() -> Option<std::path::PathBuf> {
dirs::data_local_dir().map(|d| d.join("trusty-search").join("daemon.port"))
}
async fn index_single_file(
client: &reqwest::Client,
base: &str,
index_id: &str,
file: &std::path::Path,
) -> Result<()> {
let content = tokio::fs::read_to_string(file)
.await
.map_err(|e| anyhow::anyhow!("read {}: {e}", file.display()))?;
let url = format!("{}/indexes/{}/index-file", base, index_id);
let body = serde_json::json!({
"path": file.display().to_string(),
"content": content,
});
let resp = client.post(&url).json(&body).send().await?;
if !resp.status().is_success() {
anyhow::bail!("daemon returned {} for {}", resp.status(), url);
}
Ok(())
}
async fn add_path(index_id: &str, path: &std::path::Path) -> Result<()> {
let base = daemon_base_url();
let client = trusty_common::server::daemon_http_client()?;
if path.is_dir() {
let walk = crate::service::walker::walk_source_files(path);
println!(
"{} [{}] indexing {} files under {}",
"→".cyan(),
index_id,
walk.files.len(),
path.display()
);
let mut ok = 0usize;
let mut err = 0usize;
for f in &walk.files {
match index_single_file(&client, &base, index_id, f).await {
Ok(()) => ok += 1,
Err(e) => {
eprintln!(" {} {}: {e}", "⚠".yellow(), f.display());
err += 1;
}
}
}
println!("{} indexed {} files ({} errors)", "✓".green(), ok, err);
Ok(())
} else {
index_single_file(&client, &base, index_id, path).await?;
println!("{} [{}] {}", "→".cyan(), index_id, path.display());
Ok(())
}
}
fn fmt_elapsed(ms: u64) -> String {
let secs = ms / 1000;
if secs >= 60 {
format!("{}m {:02}s", secs / 60, secs % 60)
} else if secs > 0 {
format!("{}s", secs)
} else {
format!("{}ms", ms)
}
}
fn fmt_secs(secs: u64) -> String {
if secs >= 60 {
format!("{}m {:02}s", secs / 60, secs % 60)
} else {
format!("{}s", secs)
}
}
struct ReindexUi {
#[allow(dead_code)]
multi: MultiProgress,
header: ProgressBar,
files: ProgressBar,
stats: ProgressBar,
}
impl ReindexUi {
fn new(index_id: &str) -> Self {
let multi = MultiProgress::new();
let header = multi.add(ProgressBar::new(1));
if let Ok(s) = ProgressStyle::with_template("{spinner:.cyan} {msg}") {
header.set_style(s);
}
header.set_message(format!("Indexing {}", index_id.bold()));
header.enable_steady_tick(Duration::from_millis(120));
let files = multi.add(ProgressBar::new(1));
if let Ok(s) = ProgressStyle::with_template(
" [{bar:40.cyan/blue}] {pos}/{len} files ({percent}%) — ETA {eta}",
) {
files.set_style(s.progress_chars("█░ "));
}
let stats = multi.add(ProgressBar::new(1));
if let Ok(s) = ProgressStyle::with_template(" {msg}") {
stats.set_style(s);
}
stats.set_message("Waiting for daemon…".to_string());
Self {
multi,
header,
files,
stats,
}
}
fn set_total(&self, total: u64) {
self.files.set_length(total.max(1));
}
fn set_position(&self, indexed: u64) {
self.files.set_position(indexed);
}
fn update_stats(&self, indexed: u64, total_chunks: u64, skipped: u64, elapsed_secs: u64) {
let files_per_sec = indexed.checked_div(elapsed_secs).unwrap_or(0);
self.stats.set_message(format!(
"Chunks: {chunks} Skipped: {skipped} Speed: {fps} files/s Elapsed: {elapsed}",
chunks = format_with_commas(total_chunks),
skipped = format_with_commas(skipped),
fps = files_per_sec,
elapsed = fmt_secs(elapsed_secs),
));
}
fn finish(self, final_msg: String) {
self.files.finish_and_clear();
self.stats.finish_and_clear();
self.header.finish_with_message(final_msg);
}
fn abandon(self, final_msg: String) {
self.files.abandon();
self.stats.abandon();
self.header.abandon_with_message(final_msg);
}
}
#[derive(Debug, Clone, Copy, Default)]
struct ReindexOptions {
verify_after: bool,
prior_chunk_count: Option<u64>,
force: bool,
}
#[derive(Debug, Default, Clone, Copy)]
struct ReindexOutcome {
indexed: u64,
total_chunks: u64,
skipped: u64,
errors: u64,
elapsed_ms: u64,
completed: bool,
}
async fn run_reindex(index_id: &str, root_path: &std::path::Path) -> Result<()> {
run_reindex_with(index_id, root_path, ReindexOptions::default())
.await
.map(|_| ())
}
async fn run_reindex_force(index_id: &str, root_path: &std::path::Path) -> Result<()> {
let prior = fetch_chunk_count(index_id).await;
let opts = ReindexOptions {
verify_after: true,
prior_chunk_count: prior,
force: true,
};
run_reindex_with(index_id, root_path, opts)
.await
.map(|_| ())
}
async fn run_reindex_with(
index_id: &str,
root_path: &std::path::Path,
opts: ReindexOptions,
) -> Result<ReindexOutcome> {
let base = daemon_base_url();
let client = trusty_common::server::daemon_http_client()?;
let kickoff_url = format!("{}/indexes/{}/reindex", base, index_id);
let kickoff_body = serde_json::json!({
"root_path": root_path,
"force": opts.force,
});
let kickoff = client
.post(&kickoff_url)
.json(&kickoff_body)
.send()
.await
.map_err(|e| anyhow::anyhow!("could not reach daemon at {base}: {e}"))?;
if kickoff.status() == reqwest::StatusCode::NOT_FOUND {
eprintln!(
"{} index '{}' is not registered on the daemon — run `trusty-search index` first",
"✗".red(),
index_id
);
std::process::exit(1);
}
if !kickoff.status().is_success() {
anyhow::bail!("daemon returned {} for reindex kickoff", kickoff.status());
}
let kickoff_body: serde_json::Value = kickoff
.json()
.await
.unwrap_or_else(|_| serde_json::json!({}));
let stream_path = kickoff_body
.get("stream_url")
.and_then(|v| v.as_str())
.map(|s| s.to_string())
.unwrap_or_else(|| format!("/indexes/{}/reindex/stream", index_id));
let stream_url = format!("{}{}", base, stream_path);
let sse_client = reqwest::Client::builder()
.connect_timeout(Duration::from_secs(5))
.timeout(Duration::MAX)
.build()
.map_err(|e| anyhow::anyhow!("could not build SSE client: {e}"))?;
let resp = sse_client
.get(&stream_url)
.send()
.await
.map_err(|e| anyhow::anyhow!("could not connect to SSE stream {stream_url}: {e}"))?;
if !resp.status().is_success() {
eprintln!(
"{} reindex stream returned {} — daemon may be an older version that doesn't support /reindex/stream",
"✗".red(),
resp.status()
);
std::process::exit(1);
}
let ui = ReindexUi::new(index_id);
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc as StdArc;
let started = std::time::Instant::now();
let indexed_now = StdArc::new(AtomicU64::new(0));
let chunks_now = StdArc::new(AtomicU64::new(0));
let skipped_now = StdArc::new(AtomicU64::new(0));
let tick_done = StdArc::new(AtomicBool::new(false));
let ticker = {
let indexed_now = indexed_now.clone();
let chunks_now = chunks_now.clone();
let skipped_now = skipped_now.clone();
let tick_done = tick_done.clone();
let stats_bar = ui.stats.clone();
let files_bar = ui.files.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(1));
interval.tick().await; loop {
interval.tick().await;
if tick_done.load(Ordering::Acquire) {
break;
}
let elapsed = started.elapsed().as_secs();
let indexed = indexed_now.load(Ordering::Acquire);
let chunks = chunks_now.load(Ordering::Acquire);
let skipped = skipped_now.load(Ordering::Acquire);
let fps = indexed.checked_div(elapsed).unwrap_or(0);
let total = files_bar.length().unwrap_or(0);
let eta = if fps > 0 && total > indexed {
fmt_secs((total - indexed) / fps)
} else {
"?".to_string()
};
stats_bar.set_message(format!(
"Chunks: {chunks} Skipped: {skipped} Speed: {fps} files/s Elapsed: {elapsed}s ETA: ~{eta}",
chunks = format_with_commas(chunks),
skipped = format_with_commas(skipped),
fps = fps,
elapsed = elapsed,
eta = eta,
));
}
})
};
let mut outcome = ReindexOutcome::default();
let mut done = false;
let byte_stream = resp.bytes_stream();
let stream = byte_stream.eventsource();
tokio::pin!(stream);
while !done {
let event = match stream.next().await {
Some(Ok(e)) => e,
Some(Err(e)) => {
ui.stats
.println(format!("{} stream read error: {e}", "⚠".yellow()));
break;
}
None => break,
};
let evt: serde_json::Value = match serde_json::from_str(event.data.trim()) {
Ok(v) => v,
Err(_) => continue,
};
match evt.get("event").and_then(|v| v.as_str()) {
Some("start") => {
let total = evt.get("total_files").and_then(|v| v.as_u64()).unwrap_or(0);
ui.set_total(total);
}
Some("batch") => {
let indexed = evt.get("indexed").and_then(|v| v.as_u64()).unwrap_or(0);
let batch_chunks = evt
.get("batch_chunks")
.and_then(|v| v.as_u64())
.unwrap_or(0);
let total = evt.get("total_files").and_then(|v| v.as_u64()).unwrap_or(0);
if total > 0 && ui.files.length() != Some(total.max(1)) {
ui.set_total(total);
}
indexed_now.store(indexed, Ordering::Release);
let new_chunks =
chunks_now.fetch_add(batch_chunks, Ordering::AcqRel) + batch_chunks;
ui.set_position(indexed);
ui.update_stats(
indexed,
new_chunks,
skipped_now.load(Ordering::Acquire),
started.elapsed().as_secs(),
);
}
Some("skip") => {
let indexed = evt.get("indexed").and_then(|v| v.as_u64()).unwrap_or(0);
indexed_now.store(indexed, Ordering::Release);
let skipped = skipped_now.fetch_add(1, Ordering::AcqRel) + 1;
ui.set_position(indexed);
ui.update_stats(
indexed,
chunks_now.load(Ordering::Acquire),
skipped,
started.elapsed().as_secs(),
);
}
Some("complete") => {
outcome.indexed = evt.get("indexed").and_then(|v| v.as_u64()).unwrap_or(0);
outcome.total_chunks = evt
.get("total_chunks")
.and_then(|v| v.as_u64())
.unwrap_or(0);
outcome.skipped = evt
.get("skipped")
.and_then(|v| v.as_u64())
.unwrap_or_else(|| skipped_now.load(Ordering::Acquire));
outcome.errors = evt.get("errors").and_then(|v| v.as_u64()).unwrap_or(0);
outcome.elapsed_ms = evt.get("elapsed_ms").and_then(|v| v.as_u64()).unwrap_or(0);
outcome.completed = true;
ui.set_position(outcome.indexed);
done = true;
}
Some("error") => {
let msg = evt
.get("message")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let file = evt.get("file").and_then(|v| v.as_str()).unwrap_or("");
ui.stats
.println(format!("{} {}: {}", "⚠".yellow(), file, msg));
}
_ => {}
}
}
tick_done.store(true, Ordering::Release);
let _ = ticker.await;
if !outcome.completed {
ui.abandon(format!(
"{} Reindex stream ended without completion event",
"⚠".yellow()
));
anyhow::bail!("reindex did not complete");
}
let elapsed = fmt_elapsed(outcome.elapsed_ms);
let changed = outcome.indexed.saturating_sub(outcome.skipped);
let final_msg = if outcome.errors > 0 {
format!(
"{} Indexed {} files → {} chunks [took {}, {} errors, {} unchanged]",
"✓".green(),
format_with_commas(changed),
format_with_commas(outcome.total_chunks),
elapsed,
outcome.errors,
format_with_commas(outcome.skipped),
)
} else if changed == 0 && outcome.indexed > 0 {
format!(
"{} '{}' is up to date ({} chunks, {} files — no changes detected) [took {}]",
"✓".green(),
index_id,
format_with_commas(outcome.total_chunks),
format_with_commas(outcome.indexed),
elapsed,
)
} else {
format!(
"{} Indexed {} changed file{} → {} chunks [took {}, {} unchanged]",
"✓".green(),
format_with_commas(changed),
if changed == 1 { "" } else { "s" },
format_with_commas(outcome.total_chunks),
elapsed,
format_with_commas(outcome.skipped),
)
};
ui.finish(final_msg);
if opts.verify_after {
verify_reindex_health(&client, &base, index_id, &outcome, opts.prior_chunk_count).await?;
}
Ok(outcome)
}
async fn verify_reindex_health(
client: &reqwest::Client,
base: &str,
index_id: &str,
outcome: &ReindexOutcome,
prior: Option<u64>,
) -> Result<()> {
let status_url = format!("{}/indexes/{}/status", base, index_id);
let new_chunks = match client.get(&status_url).send().await {
Ok(r) if r.status().is_success() => r
.json::<serde_json::Value>()
.await
.ok()
.and_then(|v| v.get("chunk_count").and_then(|n| n.as_u64()))
.unwrap_or(0),
_ => 0,
};
let search_url = format!("{}/indexes/{}/search", base, index_id);
let probes = ["fn", "function", "def", "class", "the"];
let mut got_hit = false;
for probe in probes {
let body = serde_json::json!({ "text": probe, "top_k": 1 });
if let Ok(resp) = client.post(&search_url).json(&body).send().await {
if resp.status().is_success() {
if let Ok(json) = resp.json::<serde_json::Value>().await {
let n = json
.get("results")
.and_then(|r| r.as_array())
.map(|a| a.len())
.unwrap_or(0);
if n > 0 {
got_hit = true;
break;
}
}
}
}
}
let healthy = new_chunks > 0 && got_hit && outcome.errors == 0;
let was = prior
.map(|p| format!(" (was {})", format_with_commas(p)))
.unwrap_or_default();
if healthy {
println!(
"{} Reindex complete: {} chunks{}",
"✓".green(),
format_with_commas(new_chunks),
was
);
Ok(())
} else {
eprintln!(
"{} Reindex produced unhealthy index: {} chunks{}, sanity query {} — old index NOT preserved (daemon reindex is in-place; see crates/trusty-search-service/src/reindex.rs)",
"✗".red(),
format_with_commas(new_chunks),
was,
if got_hit { "ok" } else { "returned 0 results" }
);
std::process::exit(1);
}
}
async fn register_index_with_daemon(
index_name: &str,
project_path: &std::path::Path,
) -> Result<(bool, bool)> {
let base = daemon_base_url();
let client = trusty_common::server::daemon_http_client()?;
let create_url = format!("{}/indexes", base);
let create_body = serde_json::json!({
"id": index_name,
"root_path": project_path,
});
match client.post(&create_url).json(&create_body).send().await {
Ok(resp) if resp.status().is_success() => {
let body: serde_json::Value =
resp.json().await.unwrap_or_else(|_| serde_json::json!({}));
let created = body
.get("created")
.and_then(|v| v.as_bool())
.unwrap_or(false);
Ok((created, true))
}
Ok(resp) => {
anyhow::bail!("daemon returned {} for POST /indexes", resp.status());
}
Err(_) => Ok((false, false)),
}
}
async fn fetch_chunk_count(index_id: &str) -> Option<u64> {
let base = daemon_base_url();
let url = format!("{}/indexes/{}/status", base, index_id);
let client = trusty_common::server::daemon_http_client().ok()?;
let resp = client.get(&url).send().await.ok()?;
if !resp.status().is_success() {
return None;
}
let body: serde_json::Value = resp.json().await.ok()?;
body.get("chunk_count").and_then(|v| v.as_u64())
}
#[derive(Debug, serde::Deserialize)]
struct MvsConfig {
project_root: std::path::PathBuf,
}
fn find_mvs_config(start: &std::path::Path) -> Option<std::path::PathBuf> {
let mut dir = start.to_path_buf();
loop {
let candidate = dir.join(".mcp-vector-search").join("config.json");
if candidate.exists() {
return Some(candidate);
}
if !dir.pop() {
return None;
}
}
}
fn find_all_mvs_configs() -> Vec<std::path::PathBuf> {
let home = match dirs::home_dir() {
Some(h) => h,
None => return Vec::new(),
};
let mut configs = Vec::new();
for entry in walkdir::WalkDir::new(&home)
.max_depth(6)
.follow_links(false)
.into_iter()
.filter_entry(|e| {
let name = e.file_name().to_string_lossy();
!matches!(
name.as_ref(),
"node_modules"
| ".git"
| "target"
| "Library"
| ".cache"
| ".cargo"
| ".rustup"
| ".npm"
| ".pnpm"
| ".pyenv"
| ".nvm"
| "venv"
| ".venv"
| "__pycache__"
)
})
.filter_map(|e| e.ok())
{
if entry.file_name() == "config.json"
&& entry
.path()
.parent()
.and_then(|p| p.file_name())
.map(|n| n == ".mcp-vector-search")
.unwrap_or(false)
{
configs.push(entry.path().to_path_buf());
}
}
configs
}
fn parse_mvs_config(config_path: &std::path::Path) -> Result<(std::path::PathBuf, String)> {
let content = std::fs::read_to_string(config_path)
.map_err(|e| anyhow::anyhow!("read {}: {e}", config_path.display()))?;
let config: MvsConfig = serde_json::from_str(&content)
.map_err(|e| anyhow::anyhow!("parse {}: {e}", config_path.display()))?;
let name = config
.project_root
.file_name()
.map(|n| n.to_string_lossy().to_lowercase().replace(' ', "-"))
.unwrap_or_else(|| "project".to_string());
Ok((config.project_root, name))
}
#[derive(Debug)]
enum ConvertStatus {
Queued,
AlreadyRegistered,
DryRun,
Failed(String),
}
#[derive(Debug)]
struct ConvertResult {
name: String,
path: std::path::PathBuf,
status: ConvertStatus,
}
async fn convert_one(
project_root: std::path::PathBuf,
index_name: String,
base_url: &str,
dry_run: bool,
) -> ConvertResult {
if dry_run {
return ConvertResult {
name: index_name,
path: project_root,
status: ConvertStatus::DryRun,
};
}
let client = match trusty_common::server::daemon_http_client() {
Ok(c) => c,
Err(e) => {
return ConvertResult {
name: index_name,
path: project_root,
status: ConvertStatus::Failed(format!("failed to build HTTP client: {e}")),
};
}
};
let create_url = format!("{base_url}/indexes");
let create_resp = client
.post(&create_url)
.json(&serde_json::json!({
"id": index_name,
"root_path": project_root,
}))
.send()
.await;
let already_existed = match create_resp {
Ok(resp) if resp.status().is_success() => {
let body: serde_json::Value =
resp.json().await.unwrap_or_else(|_| serde_json::json!({}));
!body
.get("created")
.and_then(|v| v.as_bool())
.unwrap_or(true)
}
Ok(resp) => {
return ConvertResult {
name: index_name,
path: project_root,
status: ConvertStatus::Failed(format!("create returned {}", resp.status())),
};
}
Err(e) => {
return ConvertResult {
name: index_name,
path: project_root,
status: ConvertStatus::Failed(format!("create error: {e}")),
};
}
};
let reindex_url = format!("{base_url}/indexes/{index_name}/reindex");
let reindex_resp = client
.post(&reindex_url)
.json(&serde_json::json!({ "root_path": project_root }))
.send()
.await;
match reindex_resp {
Ok(resp) if resp.status().is_success() => ConvertResult {
name: index_name,
path: project_root,
status: if already_existed {
ConvertStatus::AlreadyRegistered
} else {
ConvertStatus::Queued
},
},
Ok(resp) => ConvertResult {
name: index_name,
path: project_root,
status: ConvertStatus::Failed(format!("reindex returned {}", resp.status())),
},
Err(e) => ConvertResult {
name: index_name,
path: project_root,
status: ConvertStatus::Failed(format!("reindex error: {e}")),
},
}
}
fn print_convert_line(idx: usize, total: usize, r: &ConvertResult) {
let prefix = format!("[{}/{}]", idx, total);
let path = r.path.display().to_string();
match &r.status {
ConvertStatus::Queued => {
println!(
" {} {} {:<24} → {}",
prefix.dimmed(),
"✓".green(),
r.name,
path.dimmed()
);
}
ConvertStatus::AlreadyRegistered => {
println!(
" {} {} {:<24} → {} {}",
prefix.dimmed(),
"↻".cyan(),
r.name,
path.dimmed(),
"(already registered, reindexing)".dimmed()
);
}
ConvertStatus::DryRun => {
println!(" {} {:<24} {}", prefix.dimmed(), r.name, path.dimmed());
}
ConvertStatus::Failed(msg) => {
println!(
" {} {} {:<24} → {} {}",
prefix.dimmed(),
"✗".red(),
r.name,
path.dimmed(),
format!("({})", msg).red()
);
}
}
}
fn format_with_commas(n: u64) -> String {
let s = n.to_string();
let mut result = String::with_capacity(s.len() + s.len() / 3);
for (i, ch) in s.chars().rev().enumerate() {
if i > 0 && i % 3 == 0 {
result.push(',');
}
result.push(ch);
}
result.chars().rev().collect()
}
async fn run_status(json: bool) -> Result<()> {
let base = daemon_base_url();
let client = trusty_common::server::daemon_http_client()?;
let health = client.get(format!("{}/health", base)).send().await;
let health_body: serde_json::Value = match health {
Ok(r) if r.status().is_success() => {
r.json().await.unwrap_or_else(|_| serde_json::json!({}))
}
_ => {
if json {
println!(r#"{{"daemon":"not_running"}}"#);
} else {
eprintln!(
"{} Daemon not running (start with `trusty-search start`)",
"✗".red()
);
}
std::process::exit(1);
}
};
let list = client.get(format!("{}/indexes", base)).send().await;
let list_body: serde_json::Value = match list {
Ok(r) if r.status().is_success() => {
r.json().await.unwrap_or_else(|_| serde_json::json!({}))
}
_ => serde_json::json!({"indexes": []}),
};
let empty: Vec<serde_json::Value> = Vec::new();
let names: Vec<String> = list_body
.get("indexes")
.and_then(|v| v.as_array())
.unwrap_or(&empty)
.iter()
.filter_map(|v| v.as_str().map(|s| s.to_string()))
.collect();
let mut joinset = tokio::task::JoinSet::new();
for name in &names {
let n = name.clone();
let url = format!("{}/indexes/{}/status", base, n);
let c = client.clone();
joinset.spawn(async move {
let body: serde_json::Value = match c.get(&url).send().await {
Ok(r) if r.status().is_success() => {
r.json().await.unwrap_or_else(|_| serde_json::json!({}))
}
_ => serde_json::json!({}),
};
(n, body)
});
}
let mut per_index: Vec<(String, serde_json::Value)> = Vec::new();
while let Some(j) = joinset.join_next().await {
if let Ok(pair) = j {
per_index.push(pair);
}
}
per_index.sort_by(|a, b| a.0.cmp(&b.0));
if json {
let arr: Vec<serde_json::Value> = per_index
.iter()
.map(|(n, b)| serde_json::json!({"id": n, "status": b}))
.collect();
println!(
"{}",
serde_json::json!({
"daemon": "running",
"url": base,
"version": health_body.get("version").cloned().unwrap_or(serde_json::json!(null)),
"indexes": arr,
})
);
} else {
let version = health_body
.get("version")
.and_then(|v| v.as_str())
.unwrap_or("?");
println!(
"{} Daemon running {} v{}",
"✓".green(),
base.cyan(),
version
);
if per_index.is_empty() {
println!("{}", "Indexes:".bold());
println!(" {}", "(none)".dimmed());
} else {
println!("{}", "Indexes:".bold());
for (name, body) in &per_index {
let chunks = body
.get("chunk_count")
.and_then(|v| v.as_u64())
.unwrap_or(0);
let root = body.get("root_path").and_then(|v| v.as_str()).unwrap_or("");
let chunks_fmt = format_with_commas(chunks);
if root.is_empty() {
println!(" {:<16} {:>12} chunks", name.bold(), chunks_fmt,);
} else {
println!(
" {:<16} {:>12} chunks {}",
name.bold(),
chunks_fmt,
root.dimmed()
);
}
}
}
}
Ok(())
}
fn fastembed_cache_dir() -> std::path::PathBuf {
if let Ok(s) = std::env::var("FASTEMBED_CACHE_DIR") {
return std::path::PathBuf::from(s);
}
if let Some(d) = dirs::data_local_dir() {
let candidate = d.join("trusty-search").join(".fastembed_cache");
if candidate.exists() {
return candidate;
}
}
if let Ok(exe) = std::env::current_exe() {
let candidate = exe
.parent()
.unwrap_or(std::path::Path::new("."))
.join(".fastembed_cache");
if candidate.exists() {
return candidate;
}
}
std::path::PathBuf::from(".fastembed_cache")
}
fn dir_size_bytes(path: &std::path::Path) -> u64 {
let mut total = 0u64;
if let Ok(entries) = std::fs::read_dir(path) {
for entry in entries.flatten() {
let p = entry.path();
if p.is_file() {
total += std::fs::metadata(&p).map(|m| m.len()).unwrap_or(0);
} else if p.is_dir() {
total += dir_size_bytes(&p);
}
}
}
total
}
fn fmt_bytes(bytes: u64) -> String {
if bytes >= 1_000_000 {
format!("{:.0}MB", bytes as f64 / 1_000_000.0)
} else if bytes >= 1_000 {
format!("{:.0}KB", bytes as f64 / 1_000.0)
} else {
format!("{}B", bytes)
}
}
async fn port_reachable(host: &str, port: u16) -> bool {
let addr = format!("{}:{}", host, port);
tokio::time::timeout(
std::time::Duration::from_millis(500),
tokio::net::TcpStream::connect(&addr),
)
.await
.ok()
.and_then(|r| r.ok())
.is_some()
}
fn read_daemon_port() -> u16 {
daemon_port_path()
.and_then(|p| std::fs::read_to_string(p).ok())
.and_then(|s| s.trim().parse::<u16>().ok())
.unwrap_or(7878)
}
#[derive(Debug, Clone, PartialEq)]
enum CheckResult {
Ok(String),
Warn(String),
Error(String),
}
impl CheckResult {
fn print(&self) {
match self {
CheckResult::Ok(msg) => println!("{} {}", "✓".green(), msg),
CheckResult::Warn(msg) => println!("{} {}", "⚠".yellow(), msg),
CheckResult::Error(msg) => println!("{} {}", "✗".red(), msg),
}
}
fn is_error(&self) -> bool {
matches!(self, CheckResult::Error(_))
}
fn is_warn(&self) -> bool {
matches!(self, CheckResult::Warn(_))
}
}
#[derive(Debug)]
struct EmptyIndex {
name: String,
root_path: String,
}
async fn probe_daemon_health(client: &reqwest::Client, base: &str) -> (bool, String) {
let health_result = client.get(format!("{}/health", base)).send().await;
match health_result {
Ok(r) if r.status().is_success() => {
let body: serde_json::Value = r.json().await.unwrap_or_else(|_| serde_json::json!({}));
let ver = body
.get("version")
.and_then(|v| v.as_str())
.unwrap_or("?")
.to_string();
(true, ver)
}
_ => (false, String::new()),
}
}
fn check_daemon_running(running: bool, base: &str, version: &str) -> CheckResult {
if running {
CheckResult::Ok(format!("Daemon running at {} (v{})", base, version))
} else {
CheckResult::Error("Daemon not running — run `trusty-search start`".to_string())
}
}
fn check_model_cache() -> CheckResult {
let model_cache = fastembed_cache_dir();
let model_name = "all-MiniLM-L6-v2";
let model_subdir = model_cache.join("models--Qdrant--all-MiniLM-L6-v2-onnx");
if model_subdir.exists() {
let size = dir_size_bytes(&model_cache);
CheckResult::Ok(format!(
"Model cache: {} ({}, {})",
model_cache.display(),
fmt_bytes(size),
model_name
))
} else if model_cache.exists() {
CheckResult::Warn(format!(
"Model cache directory exists ({}) but {} not found — will download on first start",
model_cache.display(),
model_name
))
} else {
CheckResult::Warn(
"Model not cached — will download on first `trusty-search start`".to_string(),
)
}
}
fn doctor_data_dir() -> std::path::PathBuf {
dirs::data_local_dir()
.map(|d| d.join("trusty-search"))
.unwrap_or_else(|| std::path::PathBuf::from("~/.local/share/trusty-search"))
}
fn check_data_dir(data_dir: &std::path::Path) -> CheckResult {
if !data_dir.exists() {
return CheckResult::Warn(format!(
"Data directory {} does not exist (will be created on first start)",
data_dir.display()
));
}
let probe = data_dir.join(".write_probe");
let writable = std::fs::write(&probe, b"").is_ok();
let _ = std::fs::remove_file(&probe);
if writable {
CheckResult::Ok(format!("Data directory: {} (writable)", data_dir.display()))
} else {
CheckResult::Error(format!(
"Data directory {} is not writable",
data_dir.display()
))
}
}
fn check_lock_file(data_dir: &std::path::Path, daemon_running: bool) -> CheckResult {
let lock_path = data_dir.join("daemon.lock");
if !lock_path.exists() {
return CheckResult::Ok("Lock file: healthy (no stale lock)".into());
}
let pid_opt = std::fs::read_to_string(&lock_path)
.ok()
.and_then(|s| s.trim().parse::<u32>().ok());
let Some(pid) = pid_opt else {
return CheckResult::Warn(format!(
"Lock file exists but contains no valid PID ({})",
lock_path.display()
));
};
let alive = unsafe { libc::kill(pid as libc::pid_t, 0) } == 0;
if !alive {
return CheckResult::Warn(format!(
"Stale lock file: PID {} is not running ({})",
pid,
lock_path.display()
));
}
if daemon_running {
CheckResult::Ok(format!("Lock file: healthy (PID {} is running)", pid))
} else {
CheckResult::Warn(format!(
"Lock file contains PID {} which is alive but /health failed",
pid
))
}
}
async fn fetch_index_names(client: &reqwest::Client, base: &str) -> Vec<String> {
let list = client.get(format!("{}/indexes", base)).send().await;
let list_body: serde_json::Value = match list {
Ok(r) if r.status().is_success() => {
r.json().await.unwrap_or_else(|_| serde_json::json!({}))
}
_ => serde_json::json!({"indexes": []}),
};
let empty_arr: Vec<serde_json::Value> = Vec::new();
list_body
.get("indexes")
.and_then(|v| v.as_array())
.unwrap_or(&empty_arr)
.iter()
.filter_map(|v| v.as_str().map(|s| s.to_string()))
.collect()
}
async fn fetch_index_statuses(
client: &reqwest::Client,
base: &str,
names: &[String],
) -> Vec<(String, serde_json::Value)> {
let mut joinset = tokio::task::JoinSet::new();
for name in names {
let n = name.clone();
let url = format!("{}/indexes/{}/status", base, n);
let c = client.clone();
joinset.spawn(async move {
let body: serde_json::Value = match c.get(&url).send().await {
Ok(r) if r.status().is_success() => {
r.json().await.unwrap_or_else(|_| serde_json::json!({}))
}
_ => serde_json::json!({}),
};
(n, body)
});
}
let mut per_index: Vec<(String, serde_json::Value)> = Vec::new();
while let Some(j) = joinset.join_next().await {
if let Ok(pair) = j {
per_index.push(pair);
}
}
per_index.sort_by(|a, b| a.0.cmp(&b.0));
per_index
}
fn summarize_indexes(total: usize, zero_count: usize) -> CheckResult {
if zero_count == 0 {
CheckResult::Ok(format!(
"{} index{} registered, all have chunks",
total,
if total == 1 { "" } else { "es" }
))
} else {
CheckResult::Warn(format!(
"{} index{} registered, {} {} no chunks yet:",
total,
if total == 1 { "" } else { "es" },
zero_count,
if zero_count == 1 { "has" } else { "have" }
))
}
}
fn print_index_breakdown(
per_index: &[(String, serde_json::Value)],
empty_indexes: &mut Vec<EmptyIndex>,
) {
for (name, body) in per_index {
let chunks = body
.get("chunk_count")
.and_then(|v| v.as_u64())
.unwrap_or(0);
let root = body
.get("root_path")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let chunks_fmt = format_with_commas(chunks);
if chunks == 0 {
println!(
" {} {:<16} {:>12} chunks {} — run `trusty-search index` to populate",
"⚠".yellow(),
name.bold(),
chunks_fmt,
root.dimmed()
);
empty_indexes.push(EmptyIndex {
name: name.clone(),
root_path: root,
});
} else {
println!(
" {} {:<16} {:>12} chunks {}",
"✓".green(),
name.bold(),
chunks_fmt,
root.dimmed()
);
}
}
}
async fn check_port_reachable(port: u16) -> CheckResult {
if port_reachable("127.0.0.1", port).await {
CheckResult::Ok(format!("Port {} is reachable", port))
} else {
CheckResult::Error(format!("Port {} is not reachable", port))
}
}
fn fix_stale_lock(data_dir: &std::path::Path) {
let lock_path = data_dir.join("daemon.lock");
if lock_path.exists() {
let pid_opt = std::fs::read_to_string(&lock_path)
.ok()
.and_then(|s| s.trim().parse::<u32>().ok());
let stale = pid_opt
.map(|pid| unsafe { libc::kill(pid as libc::pid_t, 0) } != 0)
.unwrap_or(true);
if stale {
match std::fs::remove_file(&lock_path) {
Ok(()) => println!(
" {} Removed stale lock file {}",
"✓".green(),
lock_path.display()
),
Err(e) => println!(
" {} Could not remove lock file {}: {e}",
"✗".red(),
lock_path.display()
),
}
} else {
println!(
" {} Lock file is held by a live process — not removing",
"⚠".yellow()
);
}
}
}
fn run_dashboard() -> Result<()> {
let Some(path) = http_addr_path() else {
anyhow::bail!("could not resolve $HOME — set HOME and try again");
};
let addr = match std::fs::read_to_string(&path) {
Ok(s) => s.trim().to_string(),
Err(_) => {
eprintln!(
"{} No daemon running ({} not found). Start one with {}.",
"✗".red(),
path.display(),
"trusty-search start".cyan()
);
std::process::exit(1);
}
};
if addr.is_empty() {
anyhow::bail!("{} is empty — daemon may be shutting down", path.display());
}
let url = format!("http://{addr}/ui");
println!("{} Opening {} …", "◉".green(), url.cyan());
if let Err(e) = open::that(&url) {
eprintln!(
"{} could not launch browser ({e}). Open this URL manually: {}",
"⚠".yellow(),
url
);
}
Ok(())
}
#[cfg(target_os = "macos")]
const LAUNCHD_LABEL: &str = "com.trusty.trusty-search";
fn run_service_action(action: &ServiceAction) -> Result<()> {
#[cfg(target_os = "macos")]
{
match action {
ServiceAction::Install => service_install(),
ServiceAction::Uninstall => service_uninstall(),
ServiceAction::Status => service_status(),
ServiceAction::Logs => service_logs(),
}
}
#[cfg(not(target_os = "macos"))]
{
let _ = action;
eprintln!(
"{} `trusty-search service` is not supported on this platform — \
use your distro's service manager (systemd, OpenRC, etc.) directly.",
"✗".red()
);
std::process::exit(1);
}
}
#[cfg(target_os = "macos")]
fn launchd_plist_path() -> Result<std::path::PathBuf> {
let home = dirs::home_dir().ok_or_else(|| anyhow::anyhow!("could not resolve $HOME"))?;
Ok(home
.join("Library")
.join("LaunchAgents")
.join(format!("{LAUNCHD_LABEL}.plist")))
}
#[cfg(target_os = "macos")]
fn launchd_log_dir() -> Result<std::path::PathBuf> {
let home = dirs::home_dir().ok_or_else(|| anyhow::anyhow!("could not resolve $HOME"))?;
let dir = home.join("Library").join("Logs").join("trusty-search");
std::fs::create_dir_all(&dir)?;
Ok(dir)
}
#[cfg(target_os = "macos")]
fn launchd_plist_body(exe: &std::path::Path, log_dir: &std::path::Path) -> String {
let exe = exe.display();
let stdout = log_dir.join("stdout.log");
let stderr = log_dir.join("stderr.log");
format!(
r#"<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
<plist version="1.0">
<dict>
<key>Label</key>
<string>{LAUNCHD_LABEL}</string>
<key>ProgramArguments</key>
<array>
<string>{exe}</string>
<string>start</string>
<string>--foreground</string>
</array>
<key>RunAtLoad</key>
<true/>
<!-- KeepAlive=SuccessfulExit:false means launchd only restarts the daemon
on a non-zero exit. The `start` command exits 0 when a live daemon is
already running (idempotent fast-path); without this, launchd would
immediately re-spawn and crash-loop on the existing lockfile. -->
<key>KeepAlive</key>
<dict>
<key>SuccessfulExit</key>
<false/>
</dict>
<key>ThrottleInterval</key>
<integer>30</integer>
<key>StandardOutPath</key>
<string>{}</string>
<key>StandardErrorPath</key>
<string>{}</string>
<key>ProcessType</key>
<string>Interactive</string>
</dict>
</plist>
"#,
stdout.display(),
stderr.display(),
)
}
#[cfg(target_os = "macos")]
fn service_install() -> Result<()> {
let exe = std::env::current_exe()
.map_err(|e| anyhow::anyhow!("could not resolve current exe: {e}"))?;
let plist_path = launchd_plist_path()?;
if let Some(parent) = plist_path.parent() {
std::fs::create_dir_all(parent)?;
}
let log_dir = launchd_log_dir()?;
let body = launchd_plist_body(&exe, &log_dir);
std::fs::write(&plist_path, body)
.map_err(|e| anyhow::anyhow!("write {}: {e}", plist_path.display()))?;
println!(
"{} Wrote LaunchAgent plist: {}",
"✓".green(),
plist_path.display()
);
let uid = unsafe { libc::getuid() };
let domain = format!("gui/{uid}");
let _ = std::process::Command::new("launchctl")
.args(["bootout", &domain])
.arg(&plist_path)
.status();
let status = std::process::Command::new("launchctl")
.args(["bootstrap", &domain])
.arg(&plist_path)
.status()
.map_err(|e| anyhow::anyhow!("launchctl bootstrap failed: {e}"))?;
if !status.success() {
anyhow::bail!("launchctl bootstrap exited with {status}");
}
println!(
"{} Loaded {} into {} — daemon will start automatically.",
"✓".green(),
LAUNCHD_LABEL,
domain
);
println!(
" Logs: {}\n Status: {}",
log_dir.display().to_string().dimmed(),
"trusty-search service status".cyan(),
);
Ok(())
}
#[cfg(target_os = "macos")]
fn service_uninstall() -> Result<()> {
let plist_path = launchd_plist_path()?;
let uid = unsafe { libc::getuid() };
let domain = format!("gui/{uid}");
if plist_path.exists() {
let _ = std::process::Command::new("launchctl")
.args(["bootout", &domain])
.arg(&plist_path)
.status();
std::fs::remove_file(&plist_path)
.map_err(|e| anyhow::anyhow!("remove {}: {e}", plist_path.display()))?;
println!(
"{} Unloaded and removed {}",
"✓".green(),
plist_path.display()
);
} else {
println!(
"{} {} not installed — nothing to do",
"·".dimmed(),
plist_path.display()
);
}
Ok(())
}
#[cfg(target_os = "macos")]
fn service_status() -> Result<()> {
let uid = unsafe { libc::getuid() };
let target = format!("gui/{uid}/{LAUNCHD_LABEL}");
let output = std::process::Command::new("launchctl")
.args(["print", &target])
.output()
.map_err(|e| anyhow::anyhow!("launchctl print failed: {e}"))?;
if output.status.success() {
println!("{}", String::from_utf8_lossy(&output.stdout));
} else {
eprintln!(
"{} {} is not loaded ({})",
"✗".red(),
target,
String::from_utf8_lossy(&output.stderr).trim()
);
eprintln!(" Install with: {}", "trusty-search service install".cyan());
std::process::exit(1);
}
Ok(())
}
#[cfg(target_os = "macos")]
fn service_logs() -> Result<()> {
let log_dir = launchd_log_dir()?;
let stdout = log_dir.join("stdout.log");
let stderr = log_dir.join("stderr.log");
if !stdout.exists() && !stderr.exists() {
eprintln!(
"{} No logs at {} yet — start the service first.",
"·".dimmed(),
log_dir.display()
);
return Ok(());
}
let status = std::process::Command::new("tail")
.arg("-F")
.arg(&stdout)
.arg(&stderr)
.status()
.map_err(|e| anyhow::anyhow!("tail failed: {e}"))?;
if !status.success() {
anyhow::bail!("tail exited with {status}");
}
Ok(())
}
#[tokio::main]
async fn main() -> Result<()> {
dotenvy::from_filename(".env.local").ok();
let cli = Cli::parse();
trusty_common::init_tracing(if cli.verbose { 2 } else { 0 });
trusty_common::maybe_disable_color(false);
match cli.command {
Commands::Search {
query,
top_k,
full: _,
intent: _,
no_kg: _,
offset: _,
budget: _,
} => {
commands::search::handle_search(&cli.index, query, top_k).await?;
}
Commands::Watch { path } => {
commands::watch::handle_watch(&cli.index, path).await?;
}
Commands::Status => {
commands::status::handle_status(cli.json).await?;
}
Commands::Init {
path,
name,
exclude,
} => {
commands::init::handle_init(path, name, exclude).await?;
}
Commands::Index { path, name, force } => {
commands::index::handle_index(path, name, force).await?;
}
Commands::Add { file } => {
commands::add::handle_add(&cli.index, file).await?;
}
Commands::Remove { file } => {
commands::remove::handle_remove(&cli.index, file).await?;
}
Commands::Reindex { path } => {
commands::reindex::handle_reindex(&cli.index, path).await?;
}
Commands::List => {
commands::list::handle_list(cli.json).await?;
}
Commands::Query {
query,
indexes,
top_k,
full,
} => {
commands::query::handle_query(&cli.index, cli.json, query, indexes, top_k, full)
.await?;
}
Commands::Health => {
commands::status::handle_status(cli.json).await?;
}
Commands::Start { port, foreground } => {
commands::start::handle_start(port, foreground).await?;
}
Commands::Stop => {
commands::stop::handle_stop().await?;
}
Commands::Serve {
no_http,
port,
http,
} => {
commands::serve::handle_serve(no_http, port, http).await?;
}
Commands::Service { action } => {
commands::service::handle_service(&action)?;
}
Commands::Dashboard => {
commands::dashboard::handle_dashboard().await?;
}
Commands::Convert {
target,
dry_run,
concurrency,
} => {
commands::convert::handle_convert(target, dry_run, concurrency).await?;
}
Commands::Ui { port } => {
commands::ui::handle_ui(port).await?;
}
Commands::Doctor { fix } => {
commands::doctor::handle_doctor(fix).await?;
}
Commands::Completions { shell } => {
let mut cmd = Cli::command();
let name = cmd.get_name().to_string();
generate(shell, &mut cmd, name, &mut io::stdout());
}
}
Ok(())
}