#![allow(clippy::print_stdout, reason = "CLI tool needs to output to stdout")]
#![allow(clippy::print_stderr, reason = "CLI tool needs to output to stderr")]
use anyhow::Result;
use chrono::{Local, Utc};
use clap::{Parser, Subcommand, ValueEnum};
use regex::Regex;
use std::io::IsTerminal;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
use tracing::{info, warn};
use tracing_subscriber::EnvFilter;
use catenary_mcp::bridge::{DocumentManager, LspBridgeHandler, PathValidator};
use catenary_mcp::cli::{self, ColorConfig, ColumnWidths};
use catenary_mcp::install;
use catenary_mcp::lsp;
use catenary_mcp::mcp::McpServer;
use catenary_mcp::session::{self, Direction, EventKind, Protocol, Session, SessionEvent};
#[derive(Clone, Copy, Debug, ValueEnum)]
enum HostFormat {
Claude,
Gemini,
}
#[derive(Parser, Debug)]
#[command(name = "catenary")]
#[command(about = "Multiplexing bridge between MCP and multiple LSP servers")]
#[command(version = env!("CATENARY_VERSION"))]
struct Args {
#[command(subcommand)]
command: Option<Command>,
#[arg(short, long = "lsp", global = true)]
lsps: Vec<String>,
#[arg(long, global = true)]
config: Option<PathBuf>,
#[arg(short, long, global = true)]
root: Vec<PathBuf>,
#[arg(long, global = true)]
idle_timeout: Option<u64>,
}
#[derive(Subcommand, Debug)]
enum Command {
List,
Monitor {
id: String,
#[arg(long)]
raw: bool,
#[arg(long)]
nocolor: bool,
#[arg(long, short)]
filter: Option<String>,
},
Status {
id: String,
},
Doctor {
#[arg(long)]
nocolor: bool,
#[arg(long)]
diff: bool,
},
Install {
spec: Option<String>,
#[arg(long)]
list: bool,
#[arg(long)]
remove: Option<String>,
},
Hook {
#[command(subcommand)]
command: HookCommand,
},
#[command(hide = true)]
SyncRoots {
#[arg(long, value_enum)]
format: HostFormat,
},
#[command(hide = true)]
Notify {
#[arg(long, value_enum)]
format: HostFormat,
},
Query {
#[arg(long)]
session: Option<String>,
#[arg(long)]
since: Option<String>,
#[arg(long)]
kind: Option<String>,
#[arg(long)]
search: Option<String>,
#[arg(long)]
sql: Option<String>,
#[arg(long, value_enum, default_value = "table")]
format: QueryFormat,
},
Gc {
#[arg(long)]
older_than: Option<String>,
#[arg(long)]
dead: bool,
#[arg(long)]
session: Option<String>,
#[arg(long)]
sidecars: bool,
},
Restore {
file: Option<String>,
#[arg(long)]
list: bool,
#[arg(long)]
id: Option<i64>,
},
}
#[derive(Subcommand, Debug)]
enum HookCommand {
#[command(name = "post-tool")]
PostTool {
#[arg(long, value_enum)]
format: HostFormat,
},
#[command(name = "pre-tool")]
PreTool {
#[arg(long, value_enum)]
format: HostFormat,
},
}
#[derive(Clone, Copy, Debug, ValueEnum)]
enum QueryFormat {
Table,
Json,
Csv,
}
#[tokio::main]
async fn main() -> Result<()> {
let args = Args::parse();
match args.command {
None => {
if std::io::stdin().is_terminal() && std::io::stdout().is_terminal() {
run_dashboard(&args)
} else {
run_server(args).await
}
}
Some(Command::List) => run_list(),
Some(Command::Monitor {
id,
raw,
nocolor,
filter,
}) => run_monitor(&id, raw, nocolor, filter.as_deref()),
Some(Command::Status { id }) => run_status(&id),
Some(Command::Doctor { nocolor, diff }) => run_doctor(args, nocolor, diff).await,
Some(Command::Install { spec, list, remove }) => {
let conn = catenary_mcp::db::open_and_migrate()?;
if list {
catenary_mcp::install::list_grammars(&conn)
} else if let Some(scope) = remove {
catenary_mcp::install::remove_grammar(&scope, &conn)
} else if let Some(spec) = spec {
catenary_mcp::install::install_grammar(&spec, &conn)
} else {
catenary_mcp::install::list_grammars(&conn)
}
}
Some(Command::Hook { command }) => {
match command {
HookCommand::PostTool { format } => run_notify(format),
HookCommand::PreTool { format } => run_sync_roots(format),
}
Ok(())
}
Some(Command::SyncRoots { format }) => {
run_sync_roots(format);
Ok(())
}
Some(Command::Notify { format }) => {
run_notify(format);
Ok(())
}
Some(Command::Query {
session,
since,
kind,
search,
sql,
format,
}) => {
let conn = catenary_mcp::db::open_and_migrate()?;
run_query(
&conn,
session.as_deref(),
since.as_deref(),
kind.as_deref(),
search.as_deref(),
sql.as_deref(),
format,
)
}
Some(Command::Gc {
older_than,
dead,
session,
sidecars,
}) => {
let conn = catenary_mcp::db::open_and_migrate()?;
run_gc(
&conn,
older_than.as_deref(),
dead,
session.as_deref(),
sidecars,
)
}
Some(Command::Restore { file, list, id }) => {
let conn = catenary_mcp::db::open_and_migrate()?;
let output = if list {
catenary_mcp::restore::list_snapshots(&conn, file.as_deref())?
} else if let Some(id) = id {
catenary_mcp::restore::restore_by_id(&conn, id)?
} else if let Some(ref file) = file {
catenary_mcp::restore::restore_most_recent(&conn, file)?
} else {
anyhow::bail!(
"usage: catenary restore <file> | catenary restore --id=N | catenary restore --list"
);
};
println!("{output}");
Ok(())
}
}
}
fn run_dashboard(args: &Args) -> Result<()> {
let config = catenary_mcp::config::Config::load(args.config.clone())?;
let conn = catenary_mcp::db::open_and_migrate()?;
if let Err(e) = session::prune_sessions_with_conn(&conn, config.log_retention_days) {
warn!("session pruning failed: {e}");
}
catenary_mcp::tui::run(config.icons)
}
#[allow(
clippy::too_many_lines,
reason = "Server setup requires sequential initialization steps"
)]
async fn run_server(args: Args) -> Result<()> {
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env().add_directive("catenary=info".parse()?))
.with_writer(std::io::stderr)
.init();
let mut config = catenary_mcp::config::Config::load(args.config.clone())?;
if let Some(timeout) = args.idle_timeout {
config.idle_timeout = timeout;
}
for lsp_spec in args.lsps {
let (lang, command_str) = lsp_spec.split_once(':').ok_or_else(|| {
anyhow::anyhow!("Invalid LSP spec: {lsp_spec}. Expected 'lang:command'")
})?;
let lang = lang.trim().to_string();
let command_str = command_str.trim();
let mut parts = command_str.split_whitespace();
let program = parts
.next()
.ok_or_else(|| anyhow::anyhow!("command cannot be empty"))?
.to_string();
let cmd_args: Vec<String> = parts.map(std::string::ToString::to_string).collect();
config.server.insert(
lang,
catenary_mcp::config::ServerConfig {
command: program,
args: cmd_args,
initialization_options: None,
min_severity: None,
settings: None,
},
);
}
let raw_roots = if args.root.is_empty() {
vec![PathBuf::from(".")]
} else {
args.root
};
let roots: Vec<PathBuf> = raw_roots
.into_iter()
.map(|r| r.canonicalize())
.collect::<std::io::Result<Vec<_>>>()?;
let workspace_display = roots
.iter()
.map(|r| r.to_string_lossy().into_owned())
.collect::<Vec<_>>()
.join(", ");
let session = Arc::new(std::sync::Mutex::new(Session::create(&workspace_display)?));
let broadcaster = session
.lock()
.map_err(|_| anyhow::anyhow!("mutex poisoned"))?
.broadcaster();
info!("Starting catenary multiplexing bridge");
info!(
"Session ID: {}",
session
.lock()
.map_err(|_| anyhow::anyhow!("mutex poisoned"))?
.info
.id
);
info!("Workspace roots: {}", workspace_display);
let message_log = session
.lock()
.map_err(|_| anyhow::anyhow!("mutex poisoned"))?
.message_log()
.clone();
let client_manager = Arc::new(lsp::ClientManager::new(
config.clone(),
roots,
broadcaster.clone(),
message_log.clone(),
));
client_manager.spawn_all().await;
let doc_manager = Arc::new(Mutex::new(DocumentManager::new()));
let runtime = tokio::runtime::Handle::current();
let current_roots = client_manager.roots().await;
let path_validator = Arc::new(tokio::sync::RwLock::new(PathValidator::new(
current_roots.clone(),
)));
let diagnostics_server = Arc::new(catenary_mcp::bridge::DiagnosticsServer::new(
client_manager.clone(),
doc_manager.clone(),
path_validator.clone(),
));
let sync_roots_server =
catenary_mcp::bridge::SyncRootsServer::new(client_manager.clone(), path_validator.clone());
let hook_server = catenary_mcp::hook::HookServer::new(
diagnostics_server.clone(),
sync_roots_server,
broadcaster.clone(),
message_log.clone(),
"host".to_string(),
);
let socket_path = session
.lock()
.map_err(|_| anyhow::anyhow!("mutex poisoned"))?
.socket_path();
let notify_handle = hook_server.start(&socket_path)?;
session
.lock()
.map_err(|_| anyhow::anyhow!("mutex poisoned"))?
.set_socket_active();
let session_id = session
.lock()
.map_err(|_| anyhow::anyhow!("mutex poisoned"))?
.info
.id
.clone();
let handler = LspBridgeHandler::new(
client_manager.clone(),
doc_manager,
runtime,
broadcaster.clone(),
config.tui.capture_tool_output,
diagnostics_server,
Some(session_id),
);
let session_for_callback = session.clone();
let client_manager_for_roots = client_manager.clone();
let path_validator_for_roots = path_validator.clone();
let runtime_for_roots = tokio::runtime::Handle::current();
let message_log = session
.lock()
.map_err(|_| anyhow::anyhow!("mutex poisoned"))?
.message_log()
.clone();
let mut mcp_server = McpServer::new(handler, broadcaster, message_log)
.on_client_info(Box::new(move |name: &str, version: &str| {
if let Ok(mut session) = session_for_callback.lock() {
session.set_client_info(name, version);
}
}))
.on_roots_changed(Box::new(move |roots| {
let paths: Vec<PathBuf> = roots
.iter()
.filter_map(|root| {
root.uri.strip_prefix("file://").and_then(|p| {
let path = PathBuf::from(p);
match path.canonicalize() {
Ok(canonical) => Some(canonical),
Err(e) => {
warn!("Skipping root {p}: {e}");
None
}
}
})
})
.collect();
runtime_for_roots
.block_on(path_validator_for_roots.write())
.update_roots(paths.clone());
runtime_for_roots.block_on(client_manager_for_roots.sync_roots(paths))?;
runtime_for_roots.block_on(client_manager_for_roots.spawn_all());
Ok(())
}));
let mcp_task = tokio::task::spawn_blocking(move || mcp_server.run());
#[cfg(unix)]
let mut sigterm = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())?;
let mcp_result = tokio::select! {
res = mcp_task => {
res?
}
_ = tokio::signal::ctrl_c() => {
info!("Received shutdown signal");
Ok(())
}
_ = async {
#[cfg(unix)]
{ sigterm.recv().await }
#[cfg(not(unix))]
{ std::future::pending::<Option<()>>().await }
} => {
info!("Received SIGTERM");
Ok(())
}
};
notify_handle.abort();
let _ = notify_handle.await;
info!("Shutting down LSP servers");
client_manager.shutdown_all().await;
mcp_result
}
fn run_list() -> Result<()> {
let conn = catenary_mcp::db::open_and_migrate()?;
let sessions = session::list_sessions_with_conn(&conn)?;
if sessions.is_empty() {
println!("No active Catenary sessions");
return Ok(());
}
let term_width = cli::terminal_width();
let widths = ColumnWidths::calculate(term_width);
let colors = cli::ColorConfig::new(false);
println!(
"{:>width_num$} {:<width_id$} {:<width_pid$} {:<width_client$} {:<width_ws$} STARTED",
"#",
"ID",
"PID",
"CLIENT",
"WORKSPACE",
width_num = widths.row_num,
width_id = widths.id,
width_pid = widths.pid,
width_client = widths.client,
width_ws = widths.workspace,
);
println!("{}", "-".repeat(term_width.min(120)));
let indent = " ".repeat(widths.row_num + 1);
for (idx, (s, alive)) in sessions.iter().enumerate() {
let client = match (&s.client_name, &s.client_version) {
(Some(name), Some(ver)) => format!("{name} v{ver}"),
(Some(name), None) => name.clone(),
_ => "-".to_string(),
};
let ago = format_duration_ago(s.started_at);
let display_id = s.client_session_id.as_deref().unwrap_or(&s.id);
let id = cli::truncate(display_id, widths.id);
let workspace = cli::truncate(&s.workspace, widths.workspace);
let client = cli::truncate(&client, widths.client);
let row_str = format!(
"{:>width_num$} {:<width_id$} {:<width_pid$} {:<width_client$} {:<width_ws$} {}",
idx + 1,
id,
s.pid,
client,
workspace,
ago,
width_num = widths.row_num,
width_id = widths.id,
width_pid = widths.pid,
width_client = widths.client,
width_ws = widths.workspace,
);
if *alive {
println!("{row_str}");
let languages = session::active_languages_with_conn(&conn, &s.id).unwrap_or_default();
if !languages.is_empty() {
let lang_str = languages.join(", ");
println!(
"{}",
colors.dim(&format!("{indent}language servers: {lang_str}"))
);
}
} else {
println!("{} (dead)", colors.dim(&row_str));
}
}
Ok(())
}
fn resolve_session_id(conn: &rusqlite::Connection, id: &str) -> Result<session::SessionInfo> {
if let Ok(row_num) = id.parse::<usize>()
&& row_num > 0
{
let sessions = session::list_sessions_with_conn(conn)?;
if let Some((s, _)) = sessions.get(row_num - 1) {
return Ok(s.clone());
}
if let Ok(session) = find_session(conn, id) {
return Ok(session);
}
anyhow::bail!("Row number {} out of range (1-{})", row_num, sessions.len());
}
find_session(conn, id)
}
fn run_monitor(id: &str, raw: bool, nocolor: bool, filter: Option<&str>) -> Result<()> {
let conn = catenary_mcp::db::open_and_migrate()?;
let session = resolve_session_id(&conn, id)?;
let full_id = session.id;
let colors = ColorConfig::new(nocolor);
let term_width = cli::terminal_width();
let filter_regex = filter
.as_ref()
.map(|f| Regex::new(f))
.transpose()
.map_err(|e| anyhow::anyhow!("Invalid filter regex: {e}"))?;
println!("Monitoring session {full_id} (Ctrl+C to stop)\n");
let mut reader = session::tail_events(&full_id)?;
let mut last_progress: Option<(String, String)> = None;
loop {
match reader.try_next_event() {
Ok(Some(event)) => {
if let Some(ref re) = filter_regex {
let event_str = format!("{:?}", event.kind);
if !re.is_match(&event_str) {
continue;
}
}
if raw {
print_event_raw(&event);
} else {
if let EventKind::Progress {
ref language,
ref title,
..
} = event.kind
{
let key = (language.clone(), title.clone());
if last_progress.as_ref() == Some(&key) {
print!("\x1b[A\x1b[2K");
}
last_progress = Some(key);
} else {
last_progress = None;
}
print_event_annotated(&event, &colors, term_width);
}
}
Ok(None) => {
std::thread::sleep(Duration::from_millis(100));
if let Ok(Some((_, alive))) = session::get_session_with_conn(&conn, &full_id) {
if !alive {
println!("\nSession ended (process dead)");
break;
}
} else {
println!("\nSession ended (files removed)");
break;
}
}
Err(_) => {
println!("\nSession ended");
break;
}
}
}
Ok(())
}
fn run_status(id: &str) -> Result<()> {
let conn = catenary_mcp::db::open_and_migrate()?;
let session = find_session(&conn, id)?;
println!("Session: {}", session.id);
println!("PID: {}", session.pid);
println!("Workspace: {}", session.workspace);
println!(
"Started: {} ({})",
session
.started_at
.with_timezone(&Local)
.format("%Y-%m-%d %H:%M:%S"),
format_duration_ago(session.started_at)
);
if let Some(name) = &session.client_name {
print!("Client: {name}");
if let Some(ver) = &session.client_version {
print!(" v{ver}");
}
println!();
}
println!("\nRecent events:");
let events = session::monitor_events_with_conn(&conn, &session.id)?;
let recent: Vec<_> = events.iter().rev().take(10).collect();
for event in recent.iter().rev() {
print_event(event);
}
Ok(())
}
fn parse_since(s: &str) -> Result<chrono::DateTime<Utc>> {
if s == "today" {
let today = Local::now()
.date_naive()
.and_hms_opt(0, 0, 0)
.ok_or_else(|| anyhow::anyhow!("failed to compute midnight"))?;
let local_midnight = today
.and_local_timezone(Local)
.single()
.ok_or_else(|| anyhow::anyhow!("ambiguous local midnight"))?;
return Ok(local_midnight.with_timezone(&Utc));
}
let (digits, unit) = s
.strip_suffix('m')
.map(|d| (d, "m"))
.or_else(|| s.strip_suffix('h').map(|d| (d, "h")))
.or_else(|| s.strip_suffix('d').map(|d| (d, "d")))
.ok_or_else(|| {
anyhow::anyhow!("unrecognised duration: {s} (expected Nm, Nh, Nd, or today)")
})?;
let n: i64 = digits
.parse()
.map_err(|_| anyhow::anyhow!("invalid number in duration: {s}"))?;
let duration = match unit {
"m" => chrono::Duration::minutes(n),
"h" => chrono::Duration::hours(n),
"d" => chrono::Duration::days(n),
_ => unreachable!(),
};
Ok(Utc::now() - duration)
}
#[allow(
clippy::too_many_lines,
reason = "Sequential query building and output formatting"
)]
fn run_query(
conn: &rusqlite::Connection,
session_filter: Option<&str>,
since: Option<&str>,
kind: Option<&str>,
search: Option<&str>,
raw_sql: Option<&str>,
format: QueryFormat,
) -> Result<()> {
if let Some(sql) = raw_sql {
let mut stmt = conn.prepare(sql)?;
let col_count = stmt.column_count();
let col_names: Vec<String> = (0..col_count)
.map(|i| stmt.column_name(i).unwrap_or("?").to_string())
.collect();
let mut rows_out: Vec<Vec<String>> = Vec::new();
let mut db_rows = stmt.query([])?;
while let Some(row) = db_rows.next()? {
let mut vals = Vec::with_capacity(col_count);
for i in 0..col_count {
let val: String = row
.get::<_, rusqlite::types::Value>(i)
.map(|v| format_sql_value(&v))
.unwrap_or_default();
vals.push(val);
}
rows_out.push(vals);
}
drop(db_rows);
print_query_results(&col_names, &rows_out, format);
return Ok(());
}
let mut conditions: Vec<String> = Vec::new();
let mut params: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
if let Some(sid) = session_filter {
let resolved = resolve_session_id(conn, sid)?;
conditions.push(format!("e.session_id = ?{}", params.len() + 1));
params.push(Box::new(resolved.id));
}
if let Some(since_str) = since {
let cutoff = parse_since(since_str)?;
conditions.push(format!("e.timestamp > ?{}", params.len() + 1));
params.push(Box::new(cutoff.to_rfc3339()));
}
if let Some(k) = kind {
conditions.push(format!("e.kind = ?{}", params.len() + 1));
params.push(Box::new(k.to_string()));
}
if let Some(s) = search {
conditions.push(format!("e.payload LIKE ?{}", params.len() + 1));
params.push(Box::new(format!("%{s}%")));
}
let where_clause = if conditions.is_empty() {
String::new()
} else {
format!(" WHERE {}", conditions.join(" AND "))
};
let sql = format!(
"SELECT e.id, e.session_id, e.timestamp, e.kind, e.payload \
FROM events e{where_clause} ORDER BY e.id DESC LIMIT 100"
);
let param_refs: Vec<&dyn rusqlite::types::ToSql> = params.iter().map(AsRef::as_ref).collect();
let mut stmt = conn.prepare(&sql)?;
let mut db_rows = stmt.query(param_refs.as_slice())?;
let col_names = vec![
"ID".to_string(),
"SESSION".to_string(),
"TIME".to_string(),
"KIND".to_string(),
"PAYLOAD".to_string(),
];
let mut rows_out: Vec<Vec<String>> = Vec::new();
while let Some(row) = db_rows.next()? {
let id: i64 = row.get(0)?;
let sid: String = row.get(1)?;
let ts: String = row.get(2)?;
let k: String = row.get(3)?;
let payload: String = row.get(4)?;
let short_sid = if sid.len() > 8 { &sid[..8] } else { &sid };
let short_ts = chrono::DateTime::parse_from_rfc3339(&ts)
.map(|dt| dt.with_timezone(&Local).format("%H:%M:%S").to_string())
.unwrap_or(ts);
rows_out.push(vec![
id.to_string(),
short_sid.to_string(),
short_ts,
k,
payload,
]);
}
print_query_results(&col_names, &rows_out, format);
Ok(())
}
fn format_sql_value(val: &rusqlite::types::Value) -> String {
match val {
rusqlite::types::Value::Null => "NULL".to_string(),
rusqlite::types::Value::Integer(i) => i.to_string(),
rusqlite::types::Value::Real(f) => f.to_string(),
rusqlite::types::Value::Text(s) => s.clone(),
rusqlite::types::Value::Blob(b) => format!("<blob {} bytes>", b.len()),
}
}
fn print_query_results(col_names: &[String], rows: &[Vec<String>], format: QueryFormat) {
if rows.is_empty() {
println!("No results");
return;
}
match format {
QueryFormat::Table => {
let mut widths: Vec<usize> = col_names.iter().map(String::len).collect();
for row in rows {
for (i, val) in row.iter().enumerate() {
if i < widths.len() {
widths[i] = widths[i].max(val.len());
}
}
}
if let Some(last) = widths.last_mut() {
*last = (*last).min(80);
}
let header: Vec<String> = col_names
.iter()
.zip(&widths)
.map(|(name, w)| format!("{name:<w$}"))
.collect();
println!("{}", header.join(" "));
println!(
"{}",
widths
.iter()
.map(|w| "-".repeat(*w))
.collect::<Vec<_>>()
.join(" ")
);
for row in rows {
let formatted: Vec<String> = row
.iter()
.zip(&widths)
.map(|(val, w)| {
if val.len() > *w {
format!("{}...", &val[..w.saturating_sub(3)])
} else {
format!("{val:<w$}")
}
})
.collect();
println!("{}", formatted.join(" "));
}
}
QueryFormat::Json => {
let arr: Vec<serde_json::Value> = rows
.iter()
.map(|row| {
let mut obj = serde_json::Map::new();
for (name, val) in col_names.iter().zip(row) {
obj.insert(name.to_lowercase(), serde_json::Value::String(val.clone()));
}
serde_json::Value::Object(obj)
})
.collect();
let json = serde_json::to_string_pretty(&arr).unwrap_or_default();
println!("{json}");
}
QueryFormat::Csv => {
println!("{}", col_names.join(","));
for row in rows {
let escaped: Vec<String> = row
.iter()
.map(|v| {
if v.contains(',') || v.contains('"') || v.contains('\n') {
format!("\"{}\"", v.replace('"', "\"\""))
} else {
v.clone()
}
})
.collect();
println!("{}", escaped.join(","));
}
}
}
}
fn run_gc(
conn: &rusqlite::Connection,
older_than: Option<&str>,
dead: bool,
session_id: Option<&str>,
sidecars: bool,
) -> Result<()> {
let mut total_events_deleted: usize = 0;
let mut sessions_deleted: usize = 0;
if let Some(duration_str) = older_than {
let cutoff = parse_since(duration_str)?;
let cutoff_str = cutoff.to_rfc3339();
let events_removed =
conn.execute("DELETE FROM events WHERE timestamp < ?1", [&cutoff_str])?;
total_events_deleted += events_removed;
let filters_removed = conn.execute(
"DELETE FROM filter_history WHERE created_at < ?1",
[&cutoff_str],
)?;
println!(
"Deleted {events_removed} event{}{} older than {duration_str}",
if events_removed == 1 { "" } else { "s" },
if filters_removed > 0 {
format!(", {filters_removed} filter history entries")
} else {
String::new()
},
);
}
if dead {
let crashed: Vec<String> = {
let mut stmt = conn.prepare("SELECT id, pid FROM sessions WHERE alive = 1")?;
let mut rows = stmt.query([])?;
let mut ids = Vec::new();
while let Some(row) = rows.next()? {
let id: String = row.get(0)?;
let pid: u32 = row.get(1)?;
if !session::is_process_alive(pid) {
ids.push(id);
}
}
ids
};
let ended_at = Utc::now().to_rfc3339();
for id in &crashed {
let _ = conn.execute(
"UPDATE sessions SET alive = 0, ended_at = ?1 WHERE id = ?2",
rusqlite::params![&ended_at, id],
);
}
let dead_events: usize = conn
.query_row(
"SELECT COUNT(*) FROM events WHERE session_id IN \
(SELECT id FROM sessions WHERE alive = 0)",
[],
|row| row.get(0),
)
.unwrap_or(0);
let removed = conn.execute("DELETE FROM sessions WHERE alive = 0", [])?;
sessions_deleted += removed;
total_events_deleted += dead_events;
println!(
"Deleted {removed} dead session{} ({dead_events} event{})",
if removed == 1 { "" } else { "s" },
if dead_events == 1 { "" } else { "s" },
);
}
if let Some(sid) = session_id {
let resolved = resolve_session_id(conn, sid)?;
let event_count: usize = conn
.query_row(
"SELECT COUNT(*) FROM events WHERE session_id = ?1",
[&resolved.id],
|row| row.get(0),
)
.unwrap_or(0);
conn.execute("DELETE FROM sessions WHERE id = ?1", [&resolved.id])?;
sessions_deleted += 1;
total_events_deleted += event_count;
let socket_dir = session::sessions_dir().join(&resolved.id);
let _ = std::fs::remove_dir_all(&socket_dir);
println!(
"Deleted session {} ({event_count} event{})",
&resolved.id[..8.min(resolved.id.len())],
if event_count == 1 { "" } else { "s" },
);
}
if sidecars {
gc_restore_sidecars(conn)?;
} let snapshots_deleted = gc_expired_snapshots(conn)?;
if older_than.is_none() && !dead && session_id.is_none() && !sidecars && snapshots_deleted == 0
{
println!("Nothing to do. Use --older-than, --dead, --session, or --sidecars.");
}
if total_events_deleted > 1000 || sessions_deleted > 0 {
let size_before = db_file_size();
conn.execute_batch("VACUUM")?;
let size_after = db_file_size();
if let (Some(before), Some(after)) = (size_before, size_after) {
let saved = before.saturating_sub(after);
if saved > 0 {
println!("Database vacuumed (saved {})", format_bytes(saved));
}
}
}
Ok(())
}
fn gc_restore_sidecars(conn: &rusqlite::Connection) -> Result<usize> {
let mut stmt =
conn.prepare("SELECT id, file_path, content FROM snapshots WHERE source = 'restore'")?;
let rows: Vec<(i64, String, Vec<u8>)> = stmt
.query_map([], |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)))?
.collect::<Result<Vec<_>, _>>()?;
let mut removed: usize = 0;
for (id, file_path, content) in &rows {
let sidecar = catenary_mcp::restore::sidecar_path(Path::new(file_path), *id);
if sidecar.exists() {
match std::fs::read(&sidecar) {
Ok(ref disk_content) if disk_content == content => {
let _ = std::fs::remove_file(&sidecar);
removed += 1;
println!("Removed {}", sidecar.display());
}
Ok(_) => {
println!(
"sidecar {} differs from snapshot #{id} — not deleted.",
sidecar.display(),
);
}
Err(_) => {}
}
}
}
if removed > 0 {
println!(
"Removed {removed} sidecar{}.",
if removed == 1 { "" } else { "s" },
);
}
Ok(removed)
}
fn gc_expired_snapshots(conn: &rusqlite::Connection) -> Result<usize> {
let mut stmt = conn.prepare(
"SELECT id, file_path, content FROM snapshots \
WHERE source = 'restore' AND created_at < datetime('now', '-7 days')",
)?;
let restore_rows: Vec<(i64, String, Vec<u8>)> = stmt
.query_map([], |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)))?
.collect::<Result<Vec<_>, _>>()?;
let mut sidecars_deleted: usize = 0;
for (id, file_path, content) in &restore_rows {
let sidecar = catenary_mcp::restore::sidecar_path(Path::new(file_path), *id);
if sidecar.exists() {
match std::fs::read(&sidecar) {
Ok(ref disk_content) if disk_content == content => {
let _ = std::fs::remove_file(&sidecar);
sidecars_deleted += 1;
}
Ok(_) => {
println!(
"sidecar {} differs from snapshot #{id} — not deleted.",
sidecar.display(),
);
}
Err(_) => {}
}
}
}
let snapshots_deleted = conn.execute(
"DELETE FROM snapshots WHERE created_at < datetime('now', '-7 days')",
[],
)?;
if snapshots_deleted > 0 {
print!(
"Deleted {snapshots_deleted} expired snapshot{}",
if snapshots_deleted == 1 { "" } else { "s" },
);
if sidecars_deleted > 0 {
print!(
" ({sidecars_deleted} sidecar{})",
if sidecars_deleted == 1 { "" } else { "s" },
);
}
println!();
}
Ok(snapshots_deleted)
}
fn db_file_size() -> Option<u64> {
std::fs::metadata(catenary_mcp::db::db_path())
.ok()
.map(|m| m.len())
}
#[allow(
clippy::cast_precision_loss,
reason = "byte counts are small enough for f64"
)]
fn format_bytes(bytes: u64) -> String {
if bytes >= 1_048_576 {
format!("{:.1} MB", bytes as f64 / 1_048_576.0)
} else if bytes >= 1024 {
format!("{:.1} KB", bytes as f64 / 1024.0)
} else {
format!("{bytes} B")
}
}
fn notify_endpoint(session_id: &str) -> PathBuf {
#[cfg(unix)]
{
session::sessions_dir().join(session_id).join("notify.sock")
}
#[cfg(windows)]
{
PathBuf::from(format!(r"\\.\pipe\catenary-{session_id}"))
}
}
#[cfg(unix)]
fn notify_connect(endpoint: &std::path::Path) -> Option<std::os::unix::net::UnixStream> {
if !endpoint.exists() {
return None;
}
let stream = std::os::unix::net::UnixStream::connect(endpoint).ok()?;
let _ = stream.set_read_timeout(Some(Duration::from_secs(60)));
let _ = stream.set_write_timeout(Some(Duration::from_secs(5)));
Some(stream)
}
#[cfg(windows)]
fn notify_connect(endpoint: &std::path::Path) -> Option<std::fs::File> {
use std::os::windows::fs::OpenOptionsExt;
std::fs::OpenOptions::new()
.read(true)
.write(true)
.security_qos_flags(0x0001_0000)
.open(endpoint)
.ok()
}
fn ipc_exchange(
mut stream: impl std::io::Read + std::io::Write,
request: &serde_json::Value,
) -> Vec<String> {
use std::io::BufRead;
if serde_json::to_writer(&mut stream, request).is_err() {
return Vec::new();
}
if stream.write_all(b"\n").is_err() || stream.flush().is_err() {
return Vec::new();
}
let reader = std::io::BufReader::new(stream);
let mut lines = Vec::new();
for line in reader.lines() {
match line {
Ok(text) if !text.is_empty() => lines.push(text),
_ => break,
}
}
lines
}
fn run_notify(format: HostFormat) {
let Ok(stdin_data) = std::io::read_to_string(std::io::stdin()) else {
print!(
"{}",
notify_error(
"hook input unavailable — try restarting your session",
format
)
);
return;
};
let Ok(hook_json) = serde_json::from_str::<serde_json::Value>(&stdin_data) else {
print!(
"{}",
notify_error(
"unexpected hook input — try restarting your session",
format
)
);
return;
};
let Some(file_path) = extract_file_path(&hook_json) else {
print!(
"{}",
notify_error(
"missing file path in hook input — diagnostics skipped",
format
)
);
return;
};
let abs_path = PathBuf::from(&file_path);
let Ok(conn) = catenary_mcp::db::open_and_migrate() else {
print!(
"{}",
notify_error(
"state database unavailable — try running: catenary list",
format
)
);
return;
};
let sessions = session::list_sessions_with_conn(&conn).unwrap_or_default();
let session = sessions
.iter()
.find(|(s, alive)| *alive && abs_path.to_string_lossy().starts_with(&s.workspace));
let Some((session, _)) = session else {
return;
};
if let Some(client_sid) = hook_json.get("session_id").and_then(|v| v.as_str()) {
let _ = conn.execute(
"UPDATE sessions SET client_session_id = ?1 \
WHERE id = ?2 AND client_session_id IS NULL",
rusqlite::params![client_sid, &session.id],
);
}
let endpoint = notify_endpoint(&session.id);
let Some(stream) = notify_connect(&endpoint) else {
print!(
"{}",
notify_error(
&format!(
"session {} is not responding — it may have crashed",
session.id
),
format,
)
);
return;
};
let tool_name = hook_json.get("tool_name").and_then(|v| v.as_str());
let mut request = serde_json::json!({ "file": abs_path.to_string_lossy() });
if let Some(tool) = tool_name {
request["tool"] = serde_json::json!(tool);
}
let lines = ipc_exchange(stream, &request);
let result = lines
.first()
.and_then(|line| serde_json::from_str::<catenary_mcp::hook::NotifyResult>(line).ok())
.unwrap_or_else(|| catenary_mcp::hook::NotifyResult::Content(lines.join("\n")));
match result {
catenary_mcp::hook::NotifyResult::Content(content) => {
let filename = std::path::Path::new(&file_path)
.file_name()
.and_then(|n| n.to_str())
.unwrap_or(&file_path);
let full_content = format!("{filename}\n\t{content}");
let output = format_diagnostics(&full_content, format, "PostToolUse");
print!("{output}");
}
catenary_mcp::hook::NotifyResult::Error(msg) => {
print!("{}", notify_error(&msg, format));
}
}
}
#[allow(
clippy::too_many_lines,
reason = "Sequential hook processing with early returns"
)]
fn run_sync_roots(format: HostFormat) {
use std::io::{BufRead, Seek, SeekFrom};
let Ok(stdin_data) = std::io::read_to_string(std::io::stdin()) else {
return;
};
let Ok(hook_json) = serde_json::from_str::<serde_json::Value>(&stdin_data) else {
return;
};
let Some(transcript_path) = hook_json.get("transcript_path").and_then(|v| v.as_str()) else {
return;
};
let cwd = hook_json.get("cwd").and_then(|v| v.as_str()).map_or_else(
|| std::env::current_dir().unwrap_or_default(),
PathBuf::from,
);
let Ok(db) = catenary_mcp::db::open_and_migrate() else {
return;
};
let sessions = session::list_sessions_with_conn(&db).unwrap_or_default();
let cwd_str = cwd.to_string_lossy();
let session = sessions
.iter()
.find(|(s, alive)| *alive && cwd_str.starts_with(&s.workspace));
let Some((session, _)) = session else {
return;
};
if let Some(client_sid) = hook_json.get("session_id").and_then(|v| v.as_str()) {
let _ = db.execute(
"UPDATE sessions SET client_session_id = ?1 \
WHERE id = ?2 AND client_session_id IS NULL",
rusqlite::params![client_sid, &session.id],
);
}
let (start_offset, mut known_roots) = db
.query_row(
"SELECT offset, roots FROM root_sync_state WHERE session_id = ?1",
[&session.id],
|row| {
let offset: i64 = row.get(0)?;
let roots_json: String = row.get(1)?;
Ok((offset, roots_json))
},
)
.map_or_else(
|_| (0u64, Vec::new()),
|(offset, roots_json)| {
let roots: Vec<String> = serde_json::from_str(&roots_json).unwrap_or_default();
(
u64::try_from(offset).unwrap_or(0),
roots.into_iter().map(PathBuf::from).collect(),
)
},
);
let Ok(mut file) = std::fs::File::open(transcript_path) else {
return;
};
if file.seek(SeekFrom::Start(start_offset)).is_err() {
return;
}
let add_prefix = "Added \\u001b[1m";
let add_suffix = "\\u001b[22m as a working directory";
let remove_prefix = "Removed directory \\u001b[1m";
let remove_suffix = "\\u001b[22m from workspace";
let mut changed = false;
let reader = std::io::BufReader::new(&mut file);
for line in reader.lines() {
let Ok(line) = line else {
break;
};
if line.contains(add_prefix) {
let mut search_from = 0;
while let Some(start) = line[search_from..].find(add_prefix) {
let abs_start = search_from + start + add_prefix.len();
if let Some(end) = line[abs_start..].find(add_suffix) {
let path_str = unescape_json_path(&line[abs_start..abs_start + end]);
let resolved = resolve_transcript_path(&path_str, &cwd);
if !known_roots.contains(&resolved) {
known_roots.push(resolved);
changed = true;
}
search_from = abs_start + end + add_suffix.len();
} else {
break;
}
}
}
if line.contains(remove_prefix) {
let mut search_from = 0;
while let Some(start) = line[search_from..].find(remove_prefix) {
let abs_start = search_from + start + remove_prefix.len();
if let Some(end) = line[abs_start..].find(remove_suffix) {
let path_str = unescape_json_path(&line[abs_start..abs_start + end]);
let resolved = resolve_transcript_path(&path_str, &cwd);
if let Some(pos) = known_roots.iter().position(|r| r == &resolved) {
known_roots.remove(pos);
changed = true;
}
search_from = abs_start + end + remove_suffix.len();
} else {
break;
}
}
}
}
let new_offset = file.stream_position().unwrap_or(start_offset);
let roots_strs: Vec<String> = known_roots
.iter()
.map(|p| p.to_string_lossy().into_owned())
.collect();
let roots_json = serde_json::to_string(&roots_strs).unwrap_or_else(|_| "[]".to_string());
let _ = db.execute(
"INSERT OR REPLACE INTO root_sync_state (session_id, offset, roots) VALUES (?1, ?2, ?3)",
rusqlite::params![
&session.id,
i64::try_from(new_offset).unwrap_or(0),
roots_json
],
);
if !changed {
return;
}
let mut full_roots = vec![cwd];
for root in &known_roots {
if !full_roots.contains(root) {
full_roots.push(root.clone());
}
}
let endpoint = notify_endpoint(&session.id);
let Some(stream) = notify_connect(&endpoint) else {
return;
};
let root_strings: Vec<String> = full_roots
.iter()
.map(|p| p.to_string_lossy().into_owned())
.collect();
let request = serde_json::json!({ "sync_roots": root_strings });
let lines = ipc_exchange(stream, &request);
if lines.is_empty() {
return;
}
let content = lines.join("\n");
let output = format_diagnostics(&content, format, "PreToolUse");
print!("{output}");
}
fn unescape_json_path(raw: &str) -> String {
raw.replace("\\\\", "\\")
.replace("\\/", "/")
.replace("\\\"", "\"")
}
fn resolve_transcript_path(path_str: &str, cwd: &Path) -> PathBuf {
let path = PathBuf::from(path_str);
if path.is_absolute() {
path
} else {
cwd.join(path)
}
}
fn extract_file_path(hook_json: &serde_json::Value) -> Option<String> {
let file_path = hook_json
.get("tool_input")
.and_then(|ti| ti.get("file_path").or_else(|| ti.get("file")))
.and_then(|fp| fp.as_str())?;
let abs_path = if std::path::Path::new(file_path).is_absolute() {
PathBuf::from(file_path)
} else {
let cwd = hook_json.get("cwd").and_then(|v| v.as_str()).map_or_else(
|| std::env::current_dir().unwrap_or_default(),
PathBuf::from,
);
cwd.join(file_path)
};
Some(abs_path.to_string_lossy().into_owned())
}
fn format_diagnostics(content: &str, format: HostFormat, hook_event: &str) -> String {
match format {
HostFormat::Gemini => serde_json::json!({
"hookSpecificOutput": {
"additionalContext": content
}
})
.to_string(),
HostFormat::Claude => serde_json::json!({
"hookSpecificOutput": {
"hookEventName": hook_event,
"additionalContext": content
}
})
.to_string(),
}
}
const BUG_REPORT_URL: &str = "https://github.com/MarkWells-Dev/Catenary/issues";
fn notify_error(message: &str, format: HostFormat) -> String {
let full =
format!("Catenary: {message}. If this persists, please file a bug: {BUG_REPORT_URL}");
format_error(&full, format)
}
fn format_error(message: &str, format: HostFormat) -> String {
match format {
HostFormat::Claude => serde_json::json!({
"hookSpecificOutput": {
"hookEventName": "PostToolUse",
},
"systemMessage": message
})
.to_string(),
HostFormat::Gemini => serde_json::json!({
"hookSpecificOutput": {},
"systemMessage": message
})
.to_string(),
}
}
const CLAUDE_HOOKS_EXPECTED: &str = include_str!("../plugins/catenary/hooks/hooks.json");
const GEMINI_HOOKS_EXPECTED: &str = include_str!("../hooks/hooks.json");
const CONSTRAINED_BASH_EXPECTED: &str = include_str!("../scripts/constrained_bash.py");
fn check_claude_hooks(colors: &ColorConfig, show_diff: bool) {
let label = format!("{:<14}", "Claude Code");
let Ok(home_str) = std::env::var("HOME") else {
println!(
" {label}{}",
colors.dim("- cannot determine home directory"),
);
return;
};
let home = PathBuf::from(home_str);
let plugins_file = home.join(".claude/plugins/installed_plugins.json");
let Ok(plugins_json) = std::fs::read_to_string(&plugins_file) else {
println!(" {label}{}", colors.dim("- not installed"));
return;
};
let Ok(plugins) = serde_json::from_str::<serde_json::Value>(&plugins_json) else {
println!(
" {label}{}",
colors.yellow("? cannot parse installed_plugins.json"),
);
return;
};
let entries = match plugins
.get("plugins")
.and_then(|p| p.get("catenary@catenary"))
.and_then(serde_json::Value::as_array)
{
Some(arr) if !arr.is_empty() => arr,
_ => {
println!(" {label}{}", colors.dim("- not installed"));
return;
}
};
let entry = &entries[0];
let version = entry
.get("version")
.and_then(serde_json::Value::as_str)
.unwrap_or("?");
let Some(install_path_str) = entry.get("installPath").and_then(serde_json::Value::as_str)
else {
println!(
" {label}{version:<8}{}",
colors.yellow("? missing installPath"),
);
return;
};
let install_path = PathBuf::from(install_path_str);
let source_type = read_marketplace_source(&home);
let version_display = source_type
.as_deref()
.map_or_else(|| version.to_string(), |src| format!("{version} ({src})"));
let ver_col = format!("{version_display:<20}");
let hooks_path = install_path.join("hooks/hooks.json");
match std::fs::read_to_string(&hooks_path) {
Ok(installed) => {
if normalize_json(&installed) == normalize_json(CLAUDE_HOOKS_EXPECTED) {
println!(" {label}{ver_col}{}", colors.green("✓ hooks match"));
} else {
println!(
" {label}{ver_col}{}",
colors.red("✗ stale hooks (reinstall: claude plugin uninstall catenary@catenary && claude plugin install catenary@catenary)"),
);
if show_diff {
show_unified_diff(
&pretty_json(&installed),
&pretty_json(CLAUDE_HOOKS_EXPECTED),
"installed",
"expected",
);
}
}
}
Err(_) => {
println!(
" {label}{ver_col}{}",
colors.red("✗ hooks.json not found in plugin cache"),
);
}
}
}
fn read_marketplace_source(home: &Path) -> Option<String> {
let path = home.join(".claude/plugins/known_marketplaces.json");
let contents = std::fs::read_to_string(path).ok()?;
let json: serde_json::Value = serde_json::from_str(&contents).ok()?;
json.get("catenary")
.and_then(|c| c.get("source"))
.and_then(|s| s.get("source"))
.and_then(serde_json::Value::as_str)
.map(std::string::ToString::to_string)
}
fn check_gemini_hooks(colors: &ColorConfig, show_diff: bool) {
let label = format!("{:<14}", "Gemini CLI");
let Ok(home_str) = std::env::var("HOME") else {
println!(
" {label}{}",
colors.dim("- cannot determine home directory"),
);
return;
};
let home = PathBuf::from(home_str);
let ext_dir = home.join(".gemini/extensions");
let candidates = ["Catenary", "catenary"];
let ext_path = candidates
.iter()
.map(|name| ext_dir.join(name))
.find(|p| p.is_dir());
let Some(ext_path) = ext_path else {
println!(" {label}{}", colors.dim("- not installed"));
return;
};
let install_meta_path = ext_path.join(".gemini-extension-install.json");
let install_meta = std::fs::read_to_string(&install_meta_path)
.ok()
.and_then(|s| serde_json::from_str::<serde_json::Value>(&s).ok());
let install_type = install_meta
.as_ref()
.and_then(|m| m.get("type").and_then(serde_json::Value::as_str))
.unwrap_or("unknown");
let resolved = if install_type == "link" {
install_meta
.as_ref()
.and_then(|m| m.get("source").and_then(serde_json::Value::as_str))
.map_or_else(|| ext_path.clone(), PathBuf::from)
} else {
ext_path
};
let manifest_path = resolved.join("gemini-extension.json");
let version = std::fs::read_to_string(&manifest_path)
.ok()
.and_then(|s| serde_json::from_str::<serde_json::Value>(&s).ok())
.and_then(|v| {
v.get("version")
.and_then(serde_json::Value::as_str)
.map(std::string::ToString::to_string)
});
let type_label = if install_type == "link" {
"linked"
} else {
"installed"
};
let version_display = version
.as_deref()
.map_or_else(|| type_label.to_string(), |v| format!("{v} ({type_label})"));
let ver_col = format!("{version_display:<20}");
let hooks_path = resolved.join("hooks/hooks.json");
match std::fs::read_to_string(&hooks_path) {
Ok(installed) => {
if normalize_json(&installed) == normalize_json(GEMINI_HOOKS_EXPECTED) {
println!(" {label}{ver_col}{}", colors.green("✓ hooks match"));
} else {
println!(
" {label}{ver_col}{}",
colors.red("✗ stale hooks (update extension)"),
);
if show_diff {
show_unified_diff(
&pretty_json(&installed),
&pretty_json(GEMINI_HOOKS_EXPECTED),
"installed",
"expected",
);
}
}
}
Err(_) => {
println!(
" {label}{ver_col}{}",
colors.yellow("? hooks.json not found"),
);
}
}
}
fn check_path_binary(colors: &ColorConfig) {
let label = format!("{:<14}", "PATH");
let spacer = " ".repeat(20);
let Some(current_exe) = std::env::current_exe()
.ok()
.and_then(|p| std::fs::canonicalize(p).ok())
else {
println!(
" {label}{}",
colors.yellow("? cannot determine current executable"),
);
return;
};
let path_var = std::env::var("PATH").unwrap_or_default();
let Some(path_binary) = std::env::split_paths(&path_var)
.map(|dir| dir.join("catenary"))
.find(|p| p.is_file())
else {
println!(
" {label}{spacer}{}",
colors.red("✗ catenary not found on PATH"),
);
return;
};
let resolved_path = std::fs::canonicalize(&path_binary).unwrap_or(path_binary);
if current_exe == resolved_path {
println!(
" {label}{spacer}{}",
colors.green(&format!("✓ {}", resolved_path.display())),
);
} else {
println!(
" {label}{spacer}{}",
colors.red(&format!(
"✗ {} differs from {}",
resolved_path.display(),
current_exe.display(),
)),
);
}
}
fn check_constrained_bash_claude(colors: &ColorConfig, show_diff: bool) {
let label = format!("{:<14}", "Claude Code");
let Ok(home_str) = std::env::var("HOME") else {
println!(
" {label}{}",
colors.dim("- cannot determine home directory")
);
return;
};
let home = PathBuf::from(home_str);
let settings_path = home.join(".claude/settings.json");
let Ok(settings_json) = std::fs::read_to_string(&settings_path) else {
println!(" {label}{}", colors.dim("- not configured"));
return;
};
let Ok(settings) = serde_json::from_str::<serde_json::Value>(&settings_json) else {
println!(" {label}{}", colors.yellow("? cannot parse settings.json"));
return;
};
let Some(script_token) = find_script_path_in_json(&settings, "constrained_bash.py") else {
println!(" {label}{}", colors.dim("- not configured"));
return;
};
let script_path = expand_home(&script_token, &home);
match std::fs::read_to_string(&script_path) {
Ok(installed) => {
if installed == CONSTRAINED_BASH_EXPECTED {
println!(" {label}{}", colors.green("✓ up to date"));
} else if show_diff {
println!(" {label}{}", colors.red("✗ out of date"));
show_unified_diff(
&installed,
CONSTRAINED_BASH_EXPECTED,
"installed",
"expected",
);
} else {
println!(
" {label}{}",
colors.red("✗ out of date (run catenary doctor --diff to see changes)"),
);
}
}
Err(_) => {
println!(
" {label}{}",
colors.red(&format!("✗ not found at {}", script_path.display())),
);
}
}
}
fn check_constrained_bash_gemini(colors: &ColorConfig, show_diff: bool) {
let label = format!("{:<14}", "Gemini CLI");
let Ok(home_str) = std::env::var("HOME") else {
println!(
" {label}{}",
colors.dim("- cannot determine home directory")
);
return;
};
let home = PathBuf::from(home_str);
let settings_path = home.join(".gemini/settings.json");
let Ok(settings_json) = std::fs::read_to_string(&settings_path) else {
println!(" {label}{}", colors.dim("- not configured"));
return;
};
let Ok(settings) = serde_json::from_str::<serde_json::Value>(&settings_json) else {
println!(" {label}{}", colors.yellow("? cannot parse settings.json"));
return;
};
let Some(script_token) = find_script_path_in_json(&settings, "constrained_bash.py") else {
println!(" {label}{}", colors.dim("- not configured"));
return;
};
let script_path = expand_home(&script_token, &home);
match std::fs::read_to_string(&script_path) {
Ok(installed) => {
if installed == CONSTRAINED_BASH_EXPECTED {
println!(" {label}{}", colors.green("✓ up to date"));
} else if show_diff {
println!(" {label}{}", colors.red("✗ out of date"));
show_unified_diff(
&installed,
CONSTRAINED_BASH_EXPECTED,
"installed",
"expected",
);
} else {
println!(
" {label}{}",
colors.red("✗ out of date (run catenary doctor --diff to see changes)"),
);
}
}
Err(_) => {
println!(
" {label}{}",
colors.red(&format!("✗ not found at {}", script_path.display())),
);
}
}
}
fn normalize_json(s: &str) -> String {
serde_json::from_str::<serde_json::Value>(s)
.ok()
.and_then(|v| serde_json::to_string(&v).ok())
.unwrap_or_else(|| s.trim().to_string())
}
fn pretty_json(s: &str) -> String {
serde_json::from_str::<serde_json::Value>(s)
.ok()
.and_then(|v| serde_json::to_string_pretty(&v).ok())
.unwrap_or_else(|| s.to_string())
}
fn show_unified_diff(old: &str, new: &str, old_label: &str, new_label: &str) {
use similar::TextDiff;
let diff = TextDiff::from_lines(old, new);
print!(
"{}",
diff.unified_diff()
.context_radius(3)
.header(old_label, new_label)
);
}
fn find_script_path_in_json(json: &serde_json::Value, needle: &str) -> Option<String> {
match json {
serde_json::Value::String(s) if s.contains(needle) => s
.split_whitespace()
.find(|token| token.contains(needle))
.map(std::string::ToString::to_string),
serde_json::Value::Object(map) => map
.values()
.find_map(|v| find_script_path_in_json(v, needle)),
serde_json::Value::Array(arr) => {
arr.iter().find_map(|v| find_script_path_in_json(v, needle))
}
_ => None,
}
}
fn expand_home(path_str: &str, home: &Path) -> PathBuf {
path_str
.strip_prefix("$HOME/")
.or_else(|| path_str.strip_prefix("~/"))
.map_or_else(
|| {
if path_str == "$HOME" || path_str == "~" {
home.to_path_buf()
} else {
PathBuf::from(path_str)
}
},
|rest| home.join(rest),
)
}
fn check_grammars(colors: &ColorConfig) {
check_grammars_compiler(colors);
check_grammars_dir(colors);
let Ok(db) = catenary_mcp::db::open_and_migrate() else {
println!(" {}", colors.red("✗ failed to open database"));
return;
};
check_grammars_installed(colors, &db);
}
fn check_grammars_compiler(colors: &ColorConfig) {
let cc_name = install::c_compiler_name();
if binary_exists(&cc_name) {
println!(" {}", colors.green(&format!("✓ {cc_name} found")));
} else {
println!(
" {}",
colors.red("✗ C compiler not found — catenary install requires a C compiler"),
);
}
}
fn check_grammars_dir(colors: &ColorConfig) {
let gdir = install::grammar_dir();
if gdir.exists() {
println!(" {}", gdir.display());
} else {
println!(
" {}",
colors.dim(&format!("{} (not yet created)", gdir.display())),
);
}
}
fn check_grammars_installed(colors: &ColorConfig, db: &rusqlite::Connection) {
let Ok(mut stmt) = db.prepare("SELECT scope, lib_path, tags_path FROM grammars ORDER BY scope")
else {
println!(" {}", colors.red("✗ failed to query grammars"));
return;
};
let rows: Vec<(String, String, String)> = stmt
.query_map([], |row| {
Ok((
row.get::<_, String>(0)?,
row.get::<_, String>(1)?,
row.get::<_, String>(2)?,
))
})
.ok()
.map(|iter| iter.filter_map(Result::ok).collect())
.unwrap_or_default();
if rows.is_empty() {
println!(" {}", colors.dim("(none installed)"));
return;
}
for (scope, lib_path, tags_path) in &rows {
let lib_ok = Path::new(lib_path).exists();
let tags_ok = Path::new(tags_path).exists();
if lib_ok && tags_ok {
println!(" {}", colors.green(&format!("✓ {scope}")));
} else if !lib_ok {
let lib_name = Path::new(lib_path)
.file_name()
.map_or("parser.so", |n| n.to_str().unwrap_or("parser.so"));
println!(
" {}",
colors.red(&format!("✗ {scope} — missing {lib_name}")),
);
} else {
println!(" {}", colors.red(&format!("✗ {scope} — missing tags.scm")),);
}
}
}
#[allow(
clippy::too_many_lines,
reason = "Doctor command has sequential output logic"
)]
async fn run_doctor(args: Args, nocolor: bool, show_diff: bool) -> Result<()> {
let colors = ColorConfig::new(nocolor);
println!("Catenary {}", env!("CATENARY_VERSION"));
println!();
let mut config = catenary_mcp::config::Config::load(args.config.clone())?;
for lsp_spec in &args.lsps {
let (lang, command_str) = lsp_spec.split_once(':').ok_or_else(|| {
anyhow::anyhow!("Invalid LSP spec: {lsp_spec}. Expected 'lang:command'")
})?;
let lang = lang.trim().to_string();
let command_str = command_str.trim();
let mut parts = command_str.split_whitespace();
let program = parts
.next()
.ok_or_else(|| anyhow::anyhow!("command cannot be empty"))?
.to_string();
let cmd_args: Vec<String> = parts.map(std::string::ToString::to_string).collect();
config.server.insert(
lang,
catenary_mcp::config::ServerConfig {
command: program,
args: cmd_args,
initialization_options: None,
min_severity: None,
settings: None,
},
);
}
let raw_roots = if args.root.is_empty() {
vec![PathBuf::from(".")]
} else {
args.root
};
let roots: Vec<PathBuf> = raw_roots
.into_iter()
.map(|r| r.canonicalize())
.collect::<std::io::Result<Vec<_>>>()?;
let config_source = args
.config
.as_ref()
.map_or_else(|| "default paths".to_string(), |p| p.display().to_string());
println!("{} {}", colors.bold("Config:"), config_source);
println!(
"{} {}",
colors.bold("Roots: "),
roots
.iter()
.map(|r| r.to_string_lossy().into_owned())
.collect::<Vec<_>>()
.join(", ")
);
println!();
if config.server.is_empty() {
println!("No language servers configured.");
return Ok(());
}
let configured_keys: std::collections::HashSet<&str> =
config.server.keys().map(String::as_str).collect();
let detected = lsp::detect_workspace_languages(&roots, &configured_keys);
let mut servers: Vec<(&String, &catenary_mcp::config::ServerConfig)> =
config.server.iter().collect();
servers.sort_by_key(|(lang, _)| *lang);
let max_lang_width = servers.iter().map(|(l, _)| l.len()).max().unwrap_or(10);
let max_cmd_width = servers
.iter()
.map(|(_, s)| s.command.len())
.max()
.unwrap_or(10);
let broadcaster = catenary_mcp::session::EventBroadcaster::noop();
for (lang, server_config) in &servers {
let lang_display = format!("{lang:<max_lang_width$}");
let cmd_display = format!("{cmd:<max_cmd_width$}", cmd = server_config.command);
if !detected.contains(lang.as_str()) {
println!(
"{} {} {}",
colors.dim(&lang_display),
colors.dim(&cmd_display),
colors.dim("- skipped (no matching files)"),
);
continue;
}
if !binary_exists(&server_config.command) {
println!(
"{} {} {}",
lang_display,
cmd_display,
colors.red("✗ command not found"),
);
continue;
}
let args_refs: Vec<&str> = server_config.args.iter().map(String::as_str).collect();
let spawn_result = lsp::LspClient::spawn_quiet(
&server_config.command,
&args_refs,
lang,
broadcaster.clone(),
Arc::new(catenary_mcp::session::MessageLog::noop()),
);
let mut client = match spawn_result {
Ok(client) => client,
Err(e) => {
println!(
"{} {} {}",
lang_display,
cmd_display,
colors.red(&format!("✗ spawn failed: {e}")),
);
continue;
}
};
match client
.initialize(&roots, server_config.initialization_options.clone())
.await
{
Ok(result) => {
let tools =
extract_capabilities(&result["capabilities"], client.supports_type_hierarchy());
println!(
"{} {} {}",
lang_display,
cmd_display,
colors.green("✓ ready"),
);
if !tools.is_empty() {
println!(
"{} {}",
" ".repeat(max_lang_width + max_cmd_width + 4),
colors.dim(&tools.join(" ")),
);
}
}
Err(e) => {
println!(
"{} {} {}",
lang_display,
cmd_display,
colors.red(&format!("✗ initialize failed: {e}")),
);
}
}
let _ = client.shutdown().await;
}
println!();
println!("{}:", colors.bold("Hooks"));
check_claude_hooks(&colors, show_diff);
check_gemini_hooks(&colors, show_diff);
check_path_binary(&colors);
println!();
println!("{}:", colors.bold("Scripts"));
check_constrained_bash_claude(&colors, show_diff);
check_constrained_bash_gemini(&colors, show_diff);
println!();
println!("{}:", colors.bold("Grammars"));
check_grammars(&colors);
Ok(())
}
fn binary_exists(command: &str) -> bool {
if command.contains('/') {
return std::path::Path::new(command).exists();
}
let path_var = std::env::var("PATH").unwrap_or_default();
std::env::split_paths(&path_var).any(|dir| dir.join(command).is_file())
}
fn extract_capabilities(caps: &serde_json::Value, type_hierarchy: bool) -> Vec<&'static str> {
let has = |key: &str| caps.get(key).is_some_and(|v| !v.is_null());
let mut tools = Vec::new();
if has("hoverProvider") {
tools.push("hover");
}
if has("definitionProvider") {
tools.push("definition");
}
if has("typeDefinitionProvider") {
tools.push("type_definition");
}
if has("implementationProvider") {
tools.push("implementation");
}
if has("referencesProvider") {
tools.push("references");
}
if has("documentSymbolProvider") {
tools.push("document_symbols");
}
if has("workspaceSymbolProvider") {
tools.push("search");
}
if has("codeActionProvider") {
tools.push("code_actions");
}
if has("callHierarchyProvider") {
tools.push("call_hierarchy");
}
if type_hierarchy {
tools.push("type_hierarchy");
}
tools
}
fn find_session(conn: &rusqlite::Connection, id: &str) -> Result<session::SessionInfo> {
if let Some((s, _)) = session::get_session_with_conn(conn, id)? {
return Ok(s);
}
let sessions = session::list_sessions_with_conn(conn)?;
let matches: Vec<_> = sessions
.iter()
.filter(|(s, _)| s.id.starts_with(id))
.collect();
match matches.len() {
0 => anyhow::bail!("No session found matching '{id}'"),
1 => Ok(matches[0].0.clone()),
_ => {
eprintln!("Multiple sessions match '{id}':");
for (s, _) in matches {
eprintln!(" {}", s.id);
}
anyhow::bail!("Please specify a more complete session ID")
}
}
}
fn format_duration_ago(timestamp: chrono::DateTime<Utc>) -> String {
let now = Utc::now();
let duration = now.signed_duration_since(timestamp);
if duration.num_hours() > 0 {
format!(
"{}h {}m ago",
duration.num_hours(),
duration.num_minutes() % 60
)
} else if duration.num_minutes() > 0 {
format!("{}m ago", duration.num_minutes())
} else {
format!("{}s ago", duration.num_seconds())
}
}
fn print_event_raw(event: &SessionEvent) {
let time = event.timestamp.with_timezone(&Local).format("%H:%M:%S");
if let EventKind::ProtocolMessage {
protocol,
language,
direction,
message,
} = &event.kind
{
let tag = match protocol {
Protocol::Mcp => "[mcp]".to_string(),
Protocol::Lsp => format!("[{}]", language.as_deref().unwrap_or("lsp")),
};
let arrow = match direction {
Direction::Recv => "\u{2192}",
Direction::Send => "\u{2190}",
};
println!("[{time}] {tag} {arrow}");
let pretty = serde_json::to_string_pretty(message).unwrap_or_default();
println!("{pretty}");
} else {
let json = serde_json::to_string_pretty(&event.kind).unwrap_or_default();
println!("[{time}] {json}");
}
}
#[allow(clippy::too_many_lines, reason = "Match arms for each event kind")]
fn print_event_annotated(event: &SessionEvent, colors: &ColorConfig, term_width: usize) {
let time = event.timestamp.with_timezone(&Local).format("%H:%M:%S");
let time_str = colors.dim(&format!("[{time}]"));
match &event.kind {
EventKind::Started => {
println!("{time_str} Session started");
}
EventKind::Shutdown => {
println!("{time_str} Session shutting down");
}
EventKind::ServerState { language, state } => {
let lang = colors.cyan(language);
println!("{time_str} {lang}: {state}");
}
EventKind::Progress {
language,
title,
message,
percentage,
} => {
let lang = colors.cyan(language);
let pct = percentage.map(|p| format!(" {p}%")).unwrap_or_default();
let msg = message
.as_ref()
.map(|m| format!(" ({m})"))
.unwrap_or_default();
println!("{time_str} {lang}: {title}{pct}{msg}");
}
EventKind::ProgressEnd { language } => {
let lang = colors.cyan(language);
println!("{time_str} {lang}: Ready");
}
EventKind::ToolCall { tool, file, .. } => {
let arrow = colors.green("→");
let file_str = file
.as_ref()
.map(|f| format!(" on {f}"))
.unwrap_or_default();
println!("{time_str} {arrow} {tool}{file_str}");
}
EventKind::ToolResult {
tool,
success,
duration_ms,
..
} => {
let arrow = colors.blue("←");
let status = if *success {
"ok".to_string()
} else {
colors.red("error")
};
println!("{time_str} {arrow} {tool} -> {status} ({duration_ms}ms)");
}
EventKind::Diagnostics {
file,
count,
preview,
} => {
let basename = std::path::Path::new(file)
.file_name()
.and_then(|n| n.to_str())
.unwrap_or(file);
if *count == 0 {
let check = colors.green("ok");
println!("{time_str} {basename}: {check}");
} else {
let label = colors.yellow(&format!(
"{count} diagnostic{}",
if *count == 1 { "" } else { "s" }
));
let detail = if preview.is_empty() {
String::new()
} else {
let max_len = term_width.saturating_sub(14 + basename.len() + 20);
format!(" -- {}", cli::truncate(preview, max_len))
};
println!("{time_str} {basename}: {label}{detail}");
}
}
EventKind::ProtocolMessage {
protocol,
language,
direction,
message,
} => {
let tag = match protocol {
Protocol::Mcp => "[mcp]".to_string(),
Protocol::Lsp => format!("[{}]", language.as_deref().unwrap_or("lsp")),
};
let arrow_colored = match direction {
Direction::Recv => colors.green("\u{2192}"),
Direction::Send => colors.blue("\u{2190}"),
};
let summary = extract_mcp_summary(message, colors);
let prefix_len = 10 + tag.len() + 2 + 2; let max_summary_len = term_width.saturating_sub(prefix_len);
let summary = cli::truncate(&summary, max_summary_len);
println!("{time_str} {tag} {arrow_colored} {summary}");
if matches!(direction, Direction::Send)
&& let Some(obj) = message.as_object()
&& obj.contains_key("error")
&& let Some(error) = obj.get("error")
{
let err_msg = error
.get("message")
.and_then(|m: &serde_json::Value| m.as_str())
.unwrap_or("Unknown error");
println!(" {}", colors.red(&format!("Error: {err_msg}")));
}
}
}
}
fn extract_mcp_summary(message: &serde_json::Value, colors: &ColorConfig) -> String {
let Some(obj) = message.as_object() else {
return message.to_string();
};
obj.get("method").and_then(|m| m.as_str()).map_or_else(
|| {
if obj.contains_key("result") || obj.contains_key("error") {
let id = obj.get("id").map(|i| format!("#{i}")).unwrap_or_default();
if obj.contains_key("error") {
format!("{} {}", colors.red("error"), id)
} else {
format!("result {id}")
}
} else {
serde_json::to_string(message).unwrap_or_default()
}
},
|method| {
let id = obj.get("id").map(|i| format!("#{i}")).unwrap_or_default();
let params_summary = match method {
"tools/call" => {
if let Some(params) = obj.get("params")
&& let Some(name) = params.get("name").and_then(|n| n.as_str())
{
let file_info = params
.get("arguments")
.and_then(|a| a.get("file_path").or_else(|| a.get("path")))
.and_then(|f| f.as_str())
.map(|f| {
std::path::Path::new(f)
.file_name()
.and_then(|n| n.to_str())
.unwrap_or(f)
})
.map(|f| format!(" ({f})"))
.unwrap_or_default();
format!("{}{}", colors.cyan(name), file_info)
} else {
String::new()
}
}
"initialize" => {
if let Some(params) = obj.get("params")
&& let Some(info) = params.get("clientInfo")
&& let Some(name) = info.get("name").and_then(|n| n.as_str())
{
format!("from {name}")
} else {
String::new()
}
}
_ => String::new(),
};
if params_summary.is_empty() {
format!("{method} {id}")
} else {
format!("{method} {params_summary} {id}")
}
},
)
}
fn print_event(event: &SessionEvent) {
let colors = ColorConfig::new(false);
let term_width = cli::terminal_width();
print_event_annotated(event, &colors, term_width);
}
#[cfg(test)]
#[allow(
clippy::expect_used,
reason = "tests use expect for readable assertions"
)]
#[allow(
clippy::similar_names,
reason = "content/context are distinct concepts in hook output tests"
)]
mod tests {
use super::*;
use anyhow::Context;
fn test_db() -> (tempfile::TempDir, std::path::PathBuf, rusqlite::Connection) {
let dir = tempfile::tempdir().expect("failed to create tempdir for test DB");
let path = dir.path().join("catenary").join("catenary.db");
let conn = catenary_mcp::db::open_and_migrate_at(&path).expect("failed to open test DB");
(dir, path, conn)
}
fn create_session(db_path: &std::path::Path, workspace: &str) -> Result<session::Session> {
let arc = std::sync::Arc::new(std::sync::Mutex::new(
catenary_mcp::db::open_and_migrate_at(db_path)?,
));
session::Session::create_with_conn(workspace, arc)
}
#[test]
fn test_format_diagnostics_claude() -> Result<()> {
let content = "error[E0308]: mismatched types\n --> src/main.rs:5:10";
let output = format_diagnostics(content, HostFormat::Claude, "PostToolUse");
let parsed: serde_json::Value =
serde_json::from_str(&output).context("claude format should produce valid JSON")?;
let hook_output = &parsed["hookSpecificOutput"];
assert_eq!(hook_output["hookEventName"], "PostToolUse");
let context = hook_output["additionalContext"]
.as_str()
.expect("additionalContext should be a string");
assert!(context.contains("error[E0308]: mismatched types"));
assert!(context.contains(" --> src/main.rs:5:10"));
Ok(())
}
#[test]
fn test_format_diagnostics_gemini() -> Result<()> {
let content = "error[E0308]: mismatched types";
let output = format_diagnostics(content, HostFormat::Gemini, "PostToolUse");
let parsed: serde_json::Value =
serde_json::from_str(&output).context("gemini format should produce valid JSON")?;
let context = parsed["hookSpecificOutput"]["additionalContext"]
.as_str()
.expect("additionalContext should be a string");
assert_eq!(context, content);
assert!(parsed["hookSpecificOutput"]["hookEventName"].is_null());
Ok(())
}
#[test]
fn test_format_diagnostics_gemini_multiline() -> Result<()> {
let content = "warning: unused variable\n --> lib.rs:3:9";
let output = format_diagnostics(content, HostFormat::Gemini, "PostToolUse");
let parsed: serde_json::Value =
serde_json::from_str(&output).context("should produce valid JSON")?;
let context = parsed["hookSpecificOutput"]["additionalContext"]
.as_str()
.expect("additionalContext should be a string");
assert!(context.contains("warning: unused variable\n --> lib.rs:3:9"));
Ok(())
}
#[test]
fn test_format_diagnostics_claude_propagates_hook_event() -> Result<()> {
let content = "Added roots: /tmp/foo";
let output = format_diagnostics(content, HostFormat::Claude, "PreToolUse");
let parsed: serde_json::Value =
serde_json::from_str(&output).context("should produce valid JSON")?;
assert_eq!(parsed["hookSpecificOutput"]["hookEventName"], "PreToolUse");
Ok(())
}
#[test]
fn test_format_error_claude() -> Result<()> {
let output = format_error("Catenary: database unavailable", HostFormat::Claude);
let parsed: serde_json::Value =
serde_json::from_str(&output).context("should produce valid JSON")?;
assert_eq!(parsed["systemMessage"], "Catenary: database unavailable");
assert_eq!(parsed["hookSpecificOutput"]["hookEventName"], "PostToolUse");
assert!(parsed["hookSpecificOutput"]["additionalContext"].is_null());
Ok(())
}
#[test]
fn test_format_error_gemini() -> Result<()> {
let output = format_error("Catenary: database unavailable", HostFormat::Gemini);
let parsed: serde_json::Value =
serde_json::from_str(&output).context("should produce valid JSON")?;
assert_eq!(parsed["systemMessage"], "Catenary: database unavailable");
assert!(parsed["hookSpecificOutput"]["hookEventName"].is_null());
assert!(parsed["hookSpecificOutput"]["additionalContext"].is_null());
Ok(())
}
#[test]
fn test_parse_since_hours() -> Result<()> {
let cutoff = parse_since("1h")?;
let diff = Utc::now().signed_duration_since(cutoff);
assert!(
diff.num_seconds() >= 3595 && diff.num_seconds() <= 3605,
"expected ~3600s, got {}s",
diff.num_seconds()
);
Ok(())
}
#[test]
fn test_parse_since_days() -> Result<()> {
let cutoff = parse_since("7d")?;
let diff = Utc::now().signed_duration_since(cutoff);
let expected = 7 * 86400;
assert!(
diff.num_seconds() >= expected - 5 && diff.num_seconds() <= expected + 5,
"expected ~{expected}s, got {}s",
diff.num_seconds()
);
Ok(())
}
#[test]
fn test_parse_since_minutes() -> Result<()> {
let cutoff = parse_since("30m")?;
let diff = Utc::now().signed_duration_since(cutoff);
assert!(
diff.num_seconds() >= 1795 && diff.num_seconds() <= 1805,
"expected ~1800s, got {}s",
diff.num_seconds()
);
Ok(())
}
#[test]
fn test_parse_since_today() -> Result<()> {
let cutoff = parse_since("today")?;
let now = Utc::now();
assert!(cutoff <= now);
assert!(now.signed_duration_since(cutoff).num_hours() < 24);
Ok(())
}
#[test]
fn test_parse_since_invalid() {
assert!(parse_since("abc").is_err());
assert!(parse_since("").is_err());
assert!(parse_since("5x").is_err());
}
#[test]
fn test_query_with_kind_filter() -> Result<()> {
use catenary_mcp::session::EventKind;
let (_dir, path, conn) = test_db();
let session = create_session(&path, "/tmp/test-query-kind")?;
let id = session.info.id.clone();
session.broadcast(EventKind::ToolCall {
tool: "grep".to_string(),
file: Some("/tmp/a.rs".to_string()),
params: None,
});
session.broadcast(EventKind::ServerState {
language: "rust".to_string(),
state: "Ready".to_string(),
});
let count: usize = conn.query_row(
"SELECT COUNT(*) FROM events WHERE session_id = ?1 AND kind = 'tool_call'",
[&id],
|row| row.get(0),
)?;
assert!(count >= 1, "should have at least one tool_call event");
let all_count: usize = conn.query_row(
"SELECT COUNT(*) FROM events WHERE session_id = ?1",
[&id],
|row| row.get(0),
)?;
assert!(
all_count > count,
"should have more total events than tool_call events"
);
drop(session);
session::delete_session_data_with_conn(&conn, &id)?;
Ok(())
}
#[test]
fn test_query_with_search() -> Result<()> {
use catenary_mcp::session::EventKind;
let (_dir, path, conn) = test_db();
let session = create_session(&path, "/tmp/test-query-search")?;
let id = session.info.id.clone();
session.broadcast(EventKind::ToolCall {
tool: "hover".to_string(),
file: Some("/tmp/unique_marker_file.rs".to_string()),
params: None,
});
let count: usize = conn.query_row(
"SELECT COUNT(*) FROM events WHERE session_id = ?1 AND payload LIKE '%unique_marker_file%'",
[&id],
|row| row.get(0),
)?;
assert!(count >= 1, "should find event by payload search");
drop(session);
session::delete_session_data_with_conn(&conn, &id)?;
Ok(())
}
#[test]
fn test_gc_dead_sessions() -> Result<()> {
let (_dir, path, conn) = test_db();
let session = create_session(&path, "/tmp/test-gc-dead")?;
let id = session.info.id.clone();
drop(session);
let found = session::get_session_with_conn(&conn, &id)?;
assert!(found.is_some(), "session should exist");
assert!(!found.expect("checked above").1, "session should be dead");
run_gc(&conn, None, true, None, false)?;
assert!(
session::get_session_with_conn(&conn, &id)?.is_none(),
"dead session should be deleted"
);
Ok(())
}
#[test]
fn test_gc_specific_session() -> Result<()> {
let (_dir, path, conn) = test_db();
let s1 = create_session(&path, "/tmp/test-gc-specific-1")?;
let id1 = s1.info.id.clone();
let s2 = create_session(&path, "/tmp/test-gc-specific-2")?;
let id2 = s2.info.id.clone();
drop(s1);
drop(s2);
run_gc(&conn, None, false, Some(&id1), false)?;
assert!(
session::get_session_with_conn(&conn, &id1)?.is_none(),
"targeted session should be deleted"
);
assert!(
session::get_session_with_conn(&conn, &id2)?.is_some(),
"other session should survive"
);
session::delete_session_data_with_conn(&conn, &id2)?;
Ok(())
}
#[test]
fn test_gc_no_flags_is_noop() -> Result<()> {
let (_dir, _path, conn) = test_db();
run_gc(&conn, None, false, None, false)?;
Ok(())
}
#[test]
fn test_gc_sidecar_identical() -> Result<()> {
let (_db_dir, _path, conn) = test_db();
let dir = tempfile::tempdir().expect("tempdir");
let file = dir.path().join("test.rs");
let file_str = file.to_string_lossy().to_string();
let content = b"original content";
conn.execute(
"INSERT INTO snapshots \
(file_path, content, source, created_at) \
VALUES (?1, ?2, 'restore', datetime('now', '-8 days'))",
rusqlite::params![&file_str, content.as_slice()],
)?;
let id = conn.last_insert_rowid();
let sidecar = catenary_mcp::restore::sidecar_path(&file, id);
std::fs::write(&sidecar, content).expect("write sidecar");
gc_expired_snapshots(&conn)?;
assert!(!sidecar.exists(), "sidecar should be deleted");
let count: usize = conn.query_row(
"SELECT COUNT(*) FROM snapshots WHERE id = ?1",
[id],
|row| row.get(0),
)?;
assert_eq!(count, 0, "snapshot row should be deleted");
Ok(())
}
#[test]
fn test_gc_sidecar_modified() -> Result<()> {
let (_db_dir, _path, conn) = test_db();
let dir = tempfile::tempdir().expect("tempdir");
let file = dir.path().join("test.rs");
let file_str = file.to_string_lossy().to_string();
let content = b"original content";
conn.execute(
"INSERT INTO snapshots \
(file_path, content, source, created_at) \
VALUES (?1, ?2, 'restore', datetime('now', '-8 days'))",
rusqlite::params![&file_str, content.as_slice()],
)?;
let id = conn.last_insert_rowid();
let sidecar = catenary_mcp::restore::sidecar_path(&file, id);
std::fs::write(&sidecar, b"modified by user").expect("write sidecar");
gc_expired_snapshots(&conn)?;
assert!(sidecar.exists(), "modified sidecar should survive");
let count: usize = conn.query_row(
"SELECT COUNT(*) FROM snapshots WHERE id = ?1",
[id],
|row| row.get(0),
)?;
assert_eq!(count, 0, "snapshot row should be deleted regardless");
Ok(())
}
#[test]
fn test_gc_sidecar_missing() -> Result<()> {
let (_db_dir, _path, conn) = test_db();
let dir = tempfile::tempdir().expect("tempdir");
let file = dir.path().join("test.rs");
let file_str = file.to_string_lossy().to_string();
conn.execute(
"INSERT INTO snapshots \
(file_path, content, source, created_at) \
VALUES (?1, ?2, 'restore', datetime('now', '-8 days'))",
rusqlite::params![&file_str, b"content".as_slice()],
)?;
let id = conn.last_insert_rowid();
gc_expired_snapshots(&conn)?;
let count: usize = conn.query_row(
"SELECT COUNT(*) FROM snapshots WHERE id = ?1",
[id],
|row| row.get(0),
)?;
assert_eq!(count, 0, "snapshot row should be deleted");
Ok(())
}
#[test]
fn test_gc_non_restore_snapshot() -> Result<()> {
let (_db_dir, _path, conn) = test_db();
conn.execute(
"INSERT INTO snapshots \
(file_path, content, source, pattern, count, created_at) \
VALUES ('src/test.rs', X'00', 'replace', '1 edits', 1, datetime('now', '-8 days'))",
[],
)?;
let id = conn.last_insert_rowid();
gc_expired_snapshots(&conn)?;
let count: usize = conn.query_row(
"SELECT COUNT(*) FROM snapshots WHERE id = ?1",
[id],
|row| row.get(0),
)?;
assert_eq!(count, 0, "replace snapshot row should be deleted");
Ok(())
}
#[test]
fn test_format_bytes() {
assert_eq!(format_bytes(500), "500 B");
assert_eq!(format_bytes(1536), "1.5 KB");
assert_eq!(format_bytes(2_621_440), "2.5 MB");
}
#[test]
fn test_doctor_grammar_section_no_grammars() {
let (_dir, _path, conn) = test_db();
let colors = ColorConfig::new(true);
check_grammars_installed(&colors, &conn);
}
#[test]
fn test_cli_hook_post_tool() {
use clap::Parser;
let args = Args::try_parse_from(["catenary", "hook", "post-tool", "--format=claude"]);
let args = args.expect("hook post-tool should parse");
let Some(Command::Hook { command }) = args.command else {
unreachable!("expected Hook command");
};
assert!(matches!(command, HookCommand::PostTool { .. }));
}
#[test]
fn test_cli_hook_pre_tool() {
use clap::Parser;
let args = Args::try_parse_from(["catenary", "hook", "pre-tool", "--format=gemini"]);
let args = args.expect("hook pre-tool should parse");
let Some(Command::Hook { command }) = args.command else {
unreachable!("expected Hook command");
};
assert!(matches!(command, HookCommand::PreTool { .. }));
}
#[test]
fn test_cli_legacy_notify_still_parses() {
use clap::Parser;
let args = Args::try_parse_from(["catenary", "notify", "--format=claude"]);
let args = args.expect("legacy notify should still parse");
assert!(matches!(args.command, Some(Command::Notify { .. })));
}
#[test]
fn test_cli_legacy_sync_roots_still_parses() {
use clap::Parser;
let args = Args::try_parse_from(["catenary", "sync-roots", "--format=claude"]);
let args = args.expect("legacy sync-roots should still parse");
assert!(matches!(args.command, Some(Command::SyncRoots { .. })));
}
#[test]
fn test_doctor_grammar_section_with_grammar() {
let (_dir, _path, conn) = test_db();
let colors = ColorConfig::new(true);
conn.execute(
"INSERT INTO grammars (scope, file_types, lib_path, tags_path, repo_url, installed_at) \
VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
rusqlite::params![
"source.mock",
r#"["mock"]"#,
"/nonexistent/parser.so",
"/nonexistent/tags.scm",
"https://github.com/test/mock",
"2026-03-07T12:00:00Z",
],
)
.expect("insert grammar row");
check_grammars_installed(&colors, &conn);
}
}