#![allow(clippy::print_stdout, reason = "CLI tool needs to output to stdout")]
#![allow(clippy::print_stderr, reason = "CLI tool needs to output to stderr")]
use anyhow::Result;
use chrono::{Local, Utc};
use clap::{Parser, Subcommand};
use regex::Regex;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
use tracing::{debug, info, warn};
use tracing_subscriber::EnvFilter;
use catenary_mcp::bridge::{DocumentManager, LspBridgeHandler, PathValidator};
use catenary_mcp::cli::{self, ColorConfig, ColumnWidths};
use catenary_mcp::lsp;
use catenary_mcp::mcp::McpServer;
use catenary_mcp::session::{self, EventKind, Session, SessionEvent};
#[derive(Parser, Debug)]
#[command(name = "catenary")]
#[command(about = "Multiplexing bridge between MCP and multiple LSP servers")]
struct Args {
#[command(subcommand)]
command: Option<Command>,
#[arg(short, long = "lsp", global = true)]
lsps: Vec<String>,
#[arg(long, global = true)]
config: Option<PathBuf>,
#[arg(short, long, global = true)]
root: Vec<PathBuf>,
#[arg(long, global = true)]
idle_timeout: Option<u64>,
}
#[derive(Subcommand, Debug)]
enum Command {
Serve,
List,
Monitor {
id: String,
#[arg(long)]
raw: bool,
#[arg(long)]
nocolor: bool,
#[arg(long, short)]
filter: Option<String>,
},
Status {
id: String,
},
Notify {
#[arg(long, default_value = "plain")]
format: String,
},
Doctor {
#[arg(long)]
nocolor: bool,
},
SyncRoots {
#[arg(long, default_value = "plain")]
format: String,
},
Lock {
#[command(subcommand)]
action: LockAction,
},
}
#[derive(Subcommand, Debug)]
enum LockAction {
Acquire {
#[arg(long, default_value = "180")]
timeout: u64,
#[arg(long, default_value = "plain")]
format: String,
},
Release {
#[arg(long, default_value = "30")]
grace: u64,
#[arg(long, default_value = "plain")]
format: String,
},
TrackRead {
#[arg(long, default_value = "plain")]
format: String,
},
}
#[tokio::main]
async fn main() -> Result<()> {
let args = Args::parse();
match args.command {
None | Some(Command::Serve) => run_server(args).await,
Some(Command::List) => run_list(),
Some(Command::Monitor {
id,
raw,
nocolor,
filter,
}) => run_monitor(&id, raw, nocolor, filter.as_deref()),
Some(Command::Status { id }) => run_status(&id),
Some(Command::Notify { format }) => {
run_notify(&format);
Ok(())
}
Some(Command::Doctor { nocolor }) => run_doctor(args, nocolor).await,
Some(Command::SyncRoots { format }) => {
run_sync_roots(&format);
Ok(())
}
Some(Command::Lock { action }) => {
run_lock(action);
Ok(())
}
}
}
#[allow(
clippy::too_many_lines,
reason = "Server setup requires sequential initialization steps"
)]
async fn run_server(args: Args) -> Result<()> {
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env().add_directive("catenary=info".parse()?))
.with_writer(std::io::stderr)
.init();
let mut config = catenary_mcp::config::Config::load(args.config.clone())?;
if let Some(timeout) = args.idle_timeout {
config.idle_timeout = timeout;
}
for lsp_spec in args.lsps {
let (lang, command_str) = lsp_spec.split_once(':').ok_or_else(|| {
anyhow::anyhow!("Invalid LSP spec: {lsp_spec}. Expected 'lang:command'")
})?;
let lang = lang.trim().to_string();
let command_str = command_str.trim();
let mut parts = command_str.split_whitespace();
let program = parts
.next()
.ok_or_else(|| anyhow::anyhow!("command cannot be empty"))?
.to_string();
let cmd_args: Vec<String> = parts.map(std::string::ToString::to_string).collect();
config.server.insert(
lang,
catenary_mcp::config::ServerConfig {
command: program,
args: cmd_args,
initialization_options: None,
},
);
}
let raw_roots = if args.root.is_empty() {
vec![PathBuf::from(".")]
} else {
args.root
};
let roots: Vec<PathBuf> = raw_roots
.into_iter()
.map(|r| r.canonicalize())
.collect::<std::io::Result<Vec<_>>>()?;
let workspace_display = roots
.iter()
.map(|r| r.to_string_lossy().into_owned())
.collect::<Vec<_>>()
.join(", ");
let session = Arc::new(std::sync::Mutex::new(Session::create(&workspace_display)?));
let broadcaster = session
.lock()
.map_err(|_| anyhow::anyhow!("mutex poisoned"))?
.broadcaster();
info!("Starting catenary multiplexing bridge");
info!(
"Session ID: {}",
session
.lock()
.map_err(|_| anyhow::anyhow!("mutex poisoned"))?
.info
.id
);
info!("Workspace roots: {}", workspace_display);
info!("Document idle timeout: {}s", config.idle_timeout);
let client_manager = Arc::new(lsp::ClientManager::new(
config.clone(),
roots,
broadcaster.clone(),
));
client_manager.spawn_all().await;
let doc_manager = Arc::new(Mutex::new(DocumentManager::new()));
let runtime = tokio::runtime::Handle::current();
let cleanup_handle = if config.idle_timeout > 0 {
let client_manager_clone = client_manager.clone();
let doc_manager_clone = doc_manager.clone();
let idle_timeout = config.idle_timeout;
Some(tokio::spawn(async move {
document_cleanup_task(client_manager_clone, doc_manager_clone, idle_timeout).await;
}))
} else {
None
};
let current_roots = client_manager.roots().await;
let path_validator = Arc::new(tokio::sync::RwLock::new(PathValidator::new(
current_roots.clone(),
)));
let notify_server = catenary_mcp::notify::NotifyServer::new(
client_manager.clone(),
doc_manager.clone(),
path_validator.clone(),
broadcaster.clone(),
);
let socket_path = session
.lock()
.map_err(|_| anyhow::anyhow!("mutex poisoned"))?
.socket_path();
let notify_handle = notify_server.start(&socket_path)?;
session
.lock()
.map_err(|_| anyhow::anyhow!("mutex poisoned"))?
.set_socket_active();
let handler = LspBridgeHandler::new(
client_manager.clone(),
doc_manager,
runtime,
broadcaster.clone(),
path_validator.clone(),
);
let session_for_callback = session.clone();
let client_manager_for_roots = client_manager.clone();
let path_validator_for_roots = path_validator.clone();
let runtime_for_roots = tokio::runtime::Handle::current();
let mut mcp_server = McpServer::new(handler, broadcaster)
.on_client_info(Box::new(move |name: &str, version: &str| {
if let Ok(mut session) = session_for_callback.lock() {
session.set_client_info(name, version);
}
}))
.on_roots_changed(Box::new(move |roots| {
let paths: Vec<PathBuf> = roots
.iter()
.filter_map(|root| {
root.uri.strip_prefix("file://").and_then(|p| {
let path = PathBuf::from(p);
match path.canonicalize() {
Ok(canonical) => Some(canonical),
Err(e) => {
warn!("Skipping root {p}: {e}");
None
}
}
})
})
.collect();
runtime_for_roots
.block_on(path_validator_for_roots.write())
.update_roots(paths.clone());
runtime_for_roots.block_on(client_manager_for_roots.sync_roots(paths))?;
runtime_for_roots.block_on(client_manager_for_roots.spawn_all());
Ok(())
}));
let mcp_task = tokio::task::spawn_blocking(move || mcp_server.run());
let mcp_result = tokio::select! {
res = mcp_task => {
res?
}
_ = tokio::signal::ctrl_c() => {
info!("Received shutdown signal");
Ok(())
}
};
notify_handle.abort();
let _ = notify_handle.await;
if let Some(handle) = cleanup_handle {
handle.abort();
let _ = handle.await;
}
info!("Shutting down LSP servers");
client_manager.shutdown_all().await;
mcp_result
}
fn run_list() -> Result<()> {
let sessions = session::list_sessions()?;
if sessions.is_empty() {
println!("No active Catenary sessions");
return Ok(());
}
let term_width = cli::terminal_width();
let widths = ColumnWidths::calculate(term_width);
println!(
"{:>width_num$} {:<width_id$} {:<width_pid$} {:<width_ws$} {:<width_client$} {:<width_lang$} STARTED",
"#",
"ID",
"PID",
"WORKSPACE",
"CLIENT",
"LANGUAGES",
width_num = widths.row_num,
width_id = widths.id,
width_pid = widths.pid,
width_ws = widths.workspace,
width_client = widths.client,
width_lang = widths.languages,
);
println!("{}", "-".repeat(term_width.min(120)));
for (idx, s) in sessions.iter().enumerate() {
let client = match (&s.client_name, &s.client_version) {
(Some(name), Some(ver)) => format!("{name} v{ver}"),
(Some(name), None) => name.clone(),
_ => "-".to_string(),
};
let ago = format_duration_ago(s.started_at);
let languages = session::active_languages(&s.id)
.unwrap_or_default()
.join(",");
let languages = if languages.is_empty() {
"-".to_string()
} else {
languages
};
let id = cli::truncate(&s.id, widths.id);
let workspace = cli::truncate(&s.workspace, widths.workspace);
let client = cli::truncate(&client, widths.client);
let languages = cli::truncate(&languages, widths.languages);
println!(
"{:>width_num$} {:<width_id$} {:<width_pid$} {:<width_ws$} {:<width_client$} {:<width_lang$} {}",
idx + 1,
id,
s.pid,
workspace,
client,
languages,
ago,
width_num = widths.row_num,
width_id = widths.id,
width_pid = widths.pid,
width_ws = widths.workspace,
width_client = widths.client,
width_lang = widths.languages,
);
}
Ok(())
}
fn resolve_session_id(id: &str) -> Result<session::SessionInfo> {
if let Ok(row_num) = id.parse::<usize>()
&& row_num > 0
{
let sessions = session::list_sessions()?;
if let Some(s) = sessions.get(row_num - 1) {
return Ok(s.clone());
}
if let Ok(session) = find_session(id) {
return Ok(session);
}
anyhow::bail!("Row number {} out of range (1-{})", row_num, sessions.len());
}
find_session(id)
}
fn run_monitor(id: &str, raw: bool, nocolor: bool, filter: Option<&str>) -> Result<()> {
let session = resolve_session_id(id)?;
let full_id = session.id;
let colors = ColorConfig::new(nocolor);
let term_width = cli::terminal_width();
let filter_regex = filter
.as_ref()
.map(|f| Regex::new(f))
.transpose()
.map_err(|e| anyhow::anyhow!("Invalid filter regex: {e}"))?;
println!("Monitoring session {full_id} (Ctrl+C to stop)\n");
let mut reader = session::tail_events(&full_id)?;
loop {
if let Some(event) = reader.next_event()? {
if let Some(ref re) = filter_regex {
let event_str = format!("{:?}", event.kind);
if !re.is_match(&event_str) {
continue;
}
}
if raw {
print_event_raw(&event);
} else {
print_event_annotated(&event, &colors, term_width);
}
} else {
println!("\nSession ended");
break;
}
}
Ok(())
}
fn run_status(id: &str) -> Result<()> {
let session = find_session(id)?;
println!("Session: {}", session.id);
println!("PID: {}", session.pid);
println!("Workspace: {}", session.workspace);
println!(
"Started: {} ({})",
session
.started_at
.with_timezone(&Local)
.format("%Y-%m-%d %H:%M:%S"),
format_duration_ago(session.started_at)
);
if let Some(name) = &session.client_name {
print!("Client: {name}");
if let Some(ver) = &session.client_version {
print!(" v{ver}");
}
println!();
}
println!("\nRecent events:");
let events: Vec<_> = session::monitor_events(&session.id)?.collect();
let recent: Vec<_> = events.iter().rev().take(10).collect();
for event in recent.iter().rev() {
print_event(event);
}
Ok(())
}
fn notify_endpoint(session_id: &str) -> PathBuf {
#[cfg(unix)]
{
session::sessions_dir().join(session_id).join("notify.sock")
}
#[cfg(windows)]
{
PathBuf::from(format!(r"\\.\pipe\catenary-{session_id}"))
}
}
#[cfg(unix)]
fn notify_connect(endpoint: &std::path::Path) -> Option<std::os::unix::net::UnixStream> {
if !endpoint.exists() {
return None;
}
let stream = std::os::unix::net::UnixStream::connect(endpoint).ok()?;
let _ = stream.set_read_timeout(Some(Duration::from_secs(60)));
let _ = stream.set_write_timeout(Some(Duration::from_secs(5)));
Some(stream)
}
#[cfg(windows)]
fn notify_connect(endpoint: &std::path::Path) -> Option<std::fs::File> {
use std::os::windows::fs::OpenOptionsExt;
std::fs::OpenOptions::new()
.read(true)
.write(true)
.security_qos_flags(0x0001_0000)
.open(endpoint)
.ok()
}
fn ipc_exchange(
mut stream: impl std::io::Read + std::io::Write,
request: &serde_json::Value,
) -> Vec<String> {
use std::io::BufRead;
if serde_json::to_writer(&mut stream, request).is_err() {
return Vec::new();
}
if stream.write_all(b"\n").is_err() || stream.flush().is_err() {
return Vec::new();
}
let reader = std::io::BufReader::new(stream);
let mut lines = Vec::new();
for line in reader.lines() {
match line {
Ok(text) if !text.is_empty() => lines.push(text),
_ => break,
}
}
lines
}
fn run_notify(format: &str) {
let Ok(stdin_data) = std::io::read_to_string(std::io::stdin()) else {
return;
};
let Ok(hook_json) = serde_json::from_str::<serde_json::Value>(&stdin_data) else {
return;
};
let file_path = hook_json
.get("tool_input")
.and_then(|ti| ti.get("file_path").or_else(|| ti.get("file")))
.and_then(|fp| fp.as_str());
let Some(file_path) = file_path else {
return;
};
let abs_path = if std::path::Path::new(file_path).is_absolute() {
std::path::PathBuf::from(file_path)
} else {
let cwd = hook_json.get("cwd").and_then(|v| v.as_str()).map_or_else(
|| std::env::current_dir().unwrap_or_default(),
PathBuf::from,
);
cwd.join(file_path)
};
let sessions = session::list_sessions().unwrap_or_default();
let session = sessions
.iter()
.find(|s| abs_path.to_string_lossy().starts_with(&s.workspace));
let Some(session) = session else {
return;
};
let endpoint = notify_endpoint(&session.id);
let Some(stream) = notify_connect(&endpoint) else {
return;
};
let request = serde_json::json!({ "file": abs_path.to_string_lossy() });
let lines = ipc_exchange(stream, &request);
if lines.is_empty() {
return;
}
let output = format_diagnostics(&lines, format);
print!("{output}");
}
#[allow(
clippy::too_many_lines,
reason = "Sequential hook processing with early returns"
)]
fn run_sync_roots(format: &str) {
use std::io::{BufRead, Seek, SeekFrom};
let Ok(stdin_data) = std::io::read_to_string(std::io::stdin()) else {
return;
};
let Ok(hook_json) = serde_json::from_str::<serde_json::Value>(&stdin_data) else {
return;
};
let Some(transcript_path) = hook_json.get("transcript_path").and_then(|v| v.as_str()) else {
return;
};
let cwd = hook_json.get("cwd").and_then(|v| v.as_str()).map_or_else(
|| std::env::current_dir().unwrap_or_default(),
PathBuf::from,
);
let sessions = session::list_sessions().unwrap_or_default();
let cwd_str = cwd.to_string_lossy();
let session = sessions.iter().find(|s| cwd_str.starts_with(&s.workspace));
let Some(session) = session else {
return;
};
let session_dir = session::sessions_dir().join(&session.id);
let state_path = session_dir.join("known_roots.json");
let (start_offset, mut known_roots) = load_root_state(&state_path);
if start_offset == 0 && known_roots.is_empty() {
let legacy_path = session_dir.join("transcript_offset");
if legacy_path.exists() {
let _ = std::fs::remove_file(&legacy_path);
}
}
let Ok(mut file) = std::fs::File::open(transcript_path) else {
return;
};
if file.seek(SeekFrom::Start(start_offset)).is_err() {
return;
}
let add_prefix = "Added \\u001b[1m";
let add_suffix = "\\u001b[22m as a working directory";
let remove_prefix = "Removed directory \\u001b[1m";
let remove_suffix = "\\u001b[22m from workspace";
let mut changed = false;
let reader = std::io::BufReader::new(&mut file);
for line in reader.lines() {
let Ok(line) = line else {
break;
};
if line.contains(add_prefix) {
let mut search_from = 0;
while let Some(start) = line[search_from..].find(add_prefix) {
let abs_start = search_from + start + add_prefix.len();
if let Some(end) = line[abs_start..].find(add_suffix) {
let path_str = unescape_json_path(&line[abs_start..abs_start + end]);
let resolved = resolve_transcript_path(&path_str, &cwd);
if !known_roots.contains(&resolved) {
known_roots.push(resolved);
changed = true;
}
search_from = abs_start + end + add_suffix.len();
} else {
break;
}
}
}
if line.contains(remove_prefix) {
let mut search_from = 0;
while let Some(start) = line[search_from..].find(remove_prefix) {
let abs_start = search_from + start + remove_prefix.len();
if let Some(end) = line[abs_start..].find(remove_suffix) {
let path_str = unescape_json_path(&line[abs_start..abs_start + end]);
let resolved = resolve_transcript_path(&path_str, &cwd);
if let Some(pos) = known_roots.iter().position(|r| r == &resolved) {
known_roots.remove(pos);
changed = true;
}
search_from = abs_start + end + remove_suffix.len();
} else {
break;
}
}
}
}
let new_offset = file.stream_position().unwrap_or(start_offset);
save_root_state(&state_path, new_offset, &known_roots);
if !changed {
return;
}
let mut full_roots = vec![cwd];
for root in &known_roots {
if !full_roots.contains(root) {
full_roots.push(root.clone());
}
}
let endpoint = notify_endpoint(&session.id);
let Some(stream) = notify_connect(&endpoint) else {
return;
};
let root_strings: Vec<String> = full_roots
.iter()
.map(|p| p.to_string_lossy().into_owned())
.collect();
let request = serde_json::json!({ "sync_roots": root_strings });
let lines = ipc_exchange(stream, &request);
if lines.is_empty() {
return;
}
let output = format_diagnostics(&lines, format);
print!("{output}");
}
fn unescape_json_path(raw: &str) -> String {
raw.replace("\\\\", "\\")
.replace("\\/", "/")
.replace("\\\"", "\"")
}
fn resolve_transcript_path(path_str: &str, cwd: &Path) -> PathBuf {
let path = PathBuf::from(path_str);
if path.is_absolute() {
path
} else {
cwd.join(path)
}
}
#[derive(serde::Serialize, serde::Deserialize)]
struct RootState {
offset: u64,
roots: Vec<String>,
}
fn load_root_state(path: &Path) -> (u64, Vec<PathBuf>) {
let Ok(data) = std::fs::read_to_string(path) else {
return (0, Vec::new());
};
let Ok(state) = serde_json::from_str::<RootState>(&data) else {
return (0, Vec::new());
};
let roots = state.roots.into_iter().map(PathBuf::from).collect();
(state.offset, roots)
}
fn save_root_state(path: &Path, offset: u64, roots: &[PathBuf]) {
let state = RootState {
offset,
roots: roots.iter().map(|p| p.to_string_lossy().into_owned()).collect(),
};
if let Ok(json) = serde_json::to_string(&state) {
let _ = std::fs::write(path, json);
}
}
fn run_lock(action: LockAction) {
let Ok(stdin_data) = std::io::read_to_string(std::io::stdin()) else {
return;
};
let Ok(hook_json) = serde_json::from_str::<serde_json::Value>(&stdin_data) else {
return;
};
let owner = extract_owner(&hook_json);
let Some(file_path) = extract_file_path(&hook_json) else {
return;
};
let Ok(mgr) = catenary_mcp::lock::FileLockManager::new() else {
return;
};
match action {
LockAction::Acquire { timeout, format } => {
run_lock_acquire(&mgr, &file_path, &owner, timeout, &format, &hook_json);
}
LockAction::Release { grace, format: _ } => {
run_lock_release(&mgr, &file_path, &owner, grace, &hook_json);
}
LockAction::TrackRead { format: _ } => {
run_lock_track_read(&mgr, &file_path, &owner);
}
}
}
fn run_lock_acquire(
mgr: &catenary_mcp::lock::FileLockManager,
file_path: &str,
owner: &str,
timeout: u64,
format: &str,
hook_json: &serde_json::Value,
) {
use catenary_mcp::lock::AcquireResult;
let result = mgr.acquire(file_path, owner, timeout);
match &result {
AcquireResult::Acquired | AcquireResult::AcquiredStaleRead { .. } => {
broadcast_lock_event(
hook_json,
EventKind::LockAcquired {
file: file_path.to_string(),
owner: owner.to_string(),
},
);
}
AcquireResult::Denied { .. } => {
let held_by = "unknown".to_string();
broadcast_lock_event(
hook_json,
EventKind::LockDenied {
file: file_path.to_string(),
owner: owner.to_string(),
held_by,
},
);
}
}
match result {
AcquireResult::Acquired => {
}
AcquireResult::AcquiredStaleRead { context } => {
let output = format_lock_output(format, Some(&context), None);
print!("{output}");
}
AcquireResult::Denied { reason } => {
let output = format_lock_output(format, None, Some(&reason));
print!("{output}");
}
}
}
fn run_lock_release(
mgr: &catenary_mcp::lock::FileLockManager,
file_path: &str,
owner: &str,
grace: u64,
hook_json: &serde_json::Value,
) {
if mgr.release(file_path, owner, grace).is_ok() {
broadcast_lock_event(
hook_json,
EventKind::LockReleased {
file: file_path.to_string(),
owner: owner.to_string(),
},
);
}
}
fn run_lock_track_read(mgr: &catenary_mcp::lock::FileLockManager, file_path: &str, owner: &str) {
let _ = mgr.track_read(file_path, owner);
}
fn extract_owner(hook_json: &serde_json::Value) -> String {
let session_id = hook_json
.get("session_id")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let agent_id = hook_json.get("agent_id").and_then(|v| v.as_str());
agent_id.map_or_else(
|| session_id.to_string(),
|aid| format!("{session_id}:{aid}"),
)
}
fn extract_file_path(hook_json: &serde_json::Value) -> Option<String> {
let file_path = hook_json
.get("tool_input")
.and_then(|ti| ti.get("file_path").or_else(|| ti.get("file")))
.and_then(|fp| fp.as_str())?;
let abs_path = if std::path::Path::new(file_path).is_absolute() {
PathBuf::from(file_path)
} else {
let cwd = hook_json.get("cwd").and_then(|v| v.as_str()).map_or_else(
|| std::env::current_dir().unwrap_or_default(),
PathBuf::from,
);
cwd.join(file_path)
};
Some(abs_path.to_string_lossy().into_owned())
}
fn format_lock_output(
format: &str,
additional_context: Option<&str>,
deny_reason: Option<&str>,
) -> String {
let is_gemini = format == "gemini";
match (deny_reason, additional_context) {
(Some(reason), _) if is_gemini => serde_json::json!({
"decision": "deny",
"reason": reason
})
.to_string(),
(Some(reason), _) => serde_json::json!({
"hookSpecificOutput": {
"hookEventName": "PreToolUse",
"permissionDecision": "deny",
"permissionDecisionReason": reason
}
})
.to_string(),
(None, Some(context)) if is_gemini => serde_json::json!({
"decision": "deny",
"reason": context
})
.to_string(),
(None, Some(context)) => serde_json::json!({
"hookSpecificOutput": {
"hookEventName": "PreToolUse",
"permissionDecision": "allow",
"additionalContext": context
}
})
.to_string(),
(None, None) => String::new(),
}
}
fn broadcast_lock_event(hook_json: &serde_json::Value, event: EventKind) {
use std::io::Write;
let cwd = hook_json.get("cwd").and_then(|v| v.as_str()).unwrap_or("");
let sessions = session::list_sessions().unwrap_or_default();
let Some(session_info) = sessions.iter().find(|s| cwd.starts_with(&s.workspace)) else {
return;
};
let events_path = session::sessions_dir()
.join(&session_info.id)
.join("events.jsonl");
let event = SessionEvent {
timestamp: chrono::Utc::now(),
kind: event,
};
if let Ok(mut line) = serde_json::to_string(&event) {
line.push('\n');
if let Ok(mut file) = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&events_path)
{
let _ = file.write_all(line.as_bytes());
}
}
}
fn format_diagnostics(lines: &[String], format: &str) -> String {
if format == "gemini" {
let diagnostics = lines.join("\n");
let envelope = serde_json::json!({
"hookSpecificOutput": {
"additionalContext": format!("LSP Diagnostics:\n{diagnostics}")
}
});
envelope.to_string()
} else {
let mut out = lines.join("\n");
out.push('\n');
out
}
}
#[allow(
clippy::too_many_lines,
reason = "Doctor command has sequential output logic"
)]
async fn run_doctor(args: Args, nocolor: bool) -> Result<()> {
let colors = ColorConfig::new(nocolor);
let mut config = catenary_mcp::config::Config::load(args.config.clone())?;
for lsp_spec in &args.lsps {
let (lang, command_str) = lsp_spec.split_once(':').ok_or_else(|| {
anyhow::anyhow!("Invalid LSP spec: {lsp_spec}. Expected 'lang:command'")
})?;
let lang = lang.trim().to_string();
let command_str = command_str.trim();
let mut parts = command_str.split_whitespace();
let program = parts
.next()
.ok_or_else(|| anyhow::anyhow!("command cannot be empty"))?
.to_string();
let cmd_args: Vec<String> = parts.map(std::string::ToString::to_string).collect();
config.server.insert(
lang,
catenary_mcp::config::ServerConfig {
command: program,
args: cmd_args,
initialization_options: None,
},
);
}
let raw_roots = if args.root.is_empty() {
vec![PathBuf::from(".")]
} else {
args.root
};
let roots: Vec<PathBuf> = raw_roots
.into_iter()
.map(|r| r.canonicalize())
.collect::<std::io::Result<Vec<_>>>()?;
let config_source = args
.config
.as_ref()
.map_or_else(|| "default paths".to_string(), |p| p.display().to_string());
println!("{} {}", colors.bold("Config:"), config_source);
println!(
"{} {}",
colors.bold("Roots: "),
roots
.iter()
.map(|r| r.to_string_lossy().into_owned())
.collect::<Vec<_>>()
.join(", ")
);
println!();
if config.server.is_empty() {
println!("No language servers configured.");
return Ok(());
}
let configured_keys: std::collections::HashSet<&str> =
config.server.keys().map(String::as_str).collect();
let detected = lsp::detect_workspace_languages(&roots, &configured_keys);
let mut servers: Vec<(&String, &catenary_mcp::config::ServerConfig)> =
config.server.iter().collect();
servers.sort_by_key(|(lang, _)| *lang);
let max_lang_width = servers.iter().map(|(l, _)| l.len()).max().unwrap_or(10);
let max_cmd_width = servers
.iter()
.map(|(_, s)| s.command.len())
.max()
.unwrap_or(10);
let broadcaster = catenary_mcp::session::EventBroadcaster::noop()?;
for (lang, server_config) in &servers {
let lang_display = format!("{lang:<max_lang_width$}");
let cmd_display = format!("{cmd:<max_cmd_width$}", cmd = server_config.command);
if !detected.contains(lang.as_str()) {
println!(
"{} {} {}",
colors.dim(&lang_display),
colors.dim(&cmd_display),
colors.dim("- skipped (no matching files)"),
);
continue;
}
if !binary_exists(&server_config.command) {
println!(
"{} {} {}",
lang_display,
cmd_display,
colors.red("✗ command not found"),
);
continue;
}
let args_refs: Vec<&str> = server_config.args.iter().map(String::as_str).collect();
let spawn_result = lsp::LspClient::spawn_quiet(
&server_config.command,
&args_refs,
lang,
broadcaster.clone(),
);
let mut client = match spawn_result {
Ok(client) => client,
Err(e) => {
println!(
"{} {} {}",
lang_display,
cmd_display,
colors.red(&format!("✗ spawn failed: {e}")),
);
continue;
}
};
match client
.initialize(&roots, server_config.initialization_options.clone())
.await
{
Ok(result) => {
let tools = extract_capabilities(&result.capabilities);
println!(
"{} {} {}",
lang_display,
cmd_display,
colors.green("✓ ready"),
);
if !tools.is_empty() {
println!(
"{} {}",
" ".repeat(max_lang_width + max_cmd_width + 4),
colors.dim(&tools.join(" ")),
);
}
}
Err(e) => {
println!(
"{} {} {}",
lang_display,
cmd_display,
colors.red(&format!("✗ initialize failed: {e}")),
);
}
}
let _ = client.shutdown().await;
}
Ok(())
}
fn binary_exists(command: &str) -> bool {
if command.contains('/') {
return std::path::Path::new(command).exists();
}
let path_var = std::env::var("PATH").unwrap_or_default();
std::env::split_paths(&path_var).any(|dir| dir.join(command).is_file())
}
fn extract_capabilities(caps: &lsp_types::ServerCapabilities) -> Vec<&'static str> {
let mut tools = Vec::new();
if caps.hover_provider.is_some() {
tools.push("hover");
}
if caps.definition_provider.is_some() {
tools.push("definition");
}
if caps.type_definition_provider.is_some() {
tools.push("type_definition");
}
if caps.implementation_provider.is_some() {
tools.push("implementation");
}
if caps.references_provider.is_some() {
tools.push("references");
}
if caps.document_symbol_provider.is_some() {
tools.push("document_symbols");
}
if caps.workspace_symbol_provider.is_some() {
tools.push("search");
}
if caps.code_action_provider.is_some() {
tools.push("code_actions");
}
if caps.rename_provider.is_some() {
tools.push("rename");
}
if caps.call_hierarchy_provider.is_some() {
tools.push("call_hierarchy");
}
tools
}
fn find_session(id: &str) -> Result<session::SessionInfo> {
if let Some(s) = session::get_session(id)? {
return Ok(s);
}
let sessions = session::list_sessions()?;
let matches: Vec<_> = sessions.iter().filter(|s| s.id.starts_with(id)).collect();
match matches.len() {
0 => anyhow::bail!("No session found matching '{id}'"),
1 => Ok(matches[0].clone()),
_ => {
eprintln!("Multiple sessions match '{id}':");
for s in matches {
eprintln!(" {}", s.id);
}
anyhow::bail!("Please specify a more complete session ID")
}
}
}
fn format_duration_ago(timestamp: chrono::DateTime<Utc>) -> String {
let now = Utc::now();
let duration = now.signed_duration_since(timestamp);
if duration.num_hours() > 0 {
format!(
"{}h {}m ago",
duration.num_hours(),
duration.num_minutes() % 60
)
} else if duration.num_minutes() > 0 {
format!("{}m ago", duration.num_minutes())
} else {
format!("{}s ago", duration.num_seconds())
}
}
fn print_event_raw(event: &SessionEvent) {
let time = event.timestamp.with_timezone(&Local).format("%H:%M:%S");
if let EventKind::McpMessage { direction, message } = &event.kind {
let arrow = if direction == "in" { "→" } else { "←" };
println!("[{time}] {arrow}");
let pretty = serde_json::to_string_pretty(message).unwrap_or_default();
println!("{pretty}");
} else {
let json = serde_json::to_string_pretty(&event.kind).unwrap_or_default();
println!("[{time}] {json}");
}
}
#[allow(clippy::too_many_lines, reason = "Match arms for each event kind")]
fn print_event_annotated(event: &SessionEvent, colors: &ColorConfig, term_width: usize) {
let time = event.timestamp.with_timezone(&Local).format("%H:%M:%S");
let time_str = colors.dim(&format!("[{time}]"));
match &event.kind {
EventKind::Started => {
println!("{time_str} Session started");
}
EventKind::Shutdown => {
println!("{time_str} Session shutting down");
}
EventKind::ServerState { language, state } => {
let lang = colors.cyan(language);
println!("{time_str} {lang}: {state}");
}
EventKind::Progress {
language,
title,
message,
percentage,
} => {
let lang = colors.cyan(language);
let pct = percentage.map(|p| format!(" {p}%")).unwrap_or_default();
let msg = message
.as_ref()
.map(|m| format!(" ({m})"))
.unwrap_or_default();
println!("{time_str} {lang}: {title}{pct}{msg}");
}
EventKind::ProgressEnd { language } => {
let lang = colors.cyan(language);
println!("{time_str} {lang}: Ready");
}
EventKind::ToolCall { tool, file } => {
let arrow = colors.green("→");
let file_str = file
.as_ref()
.map(|f| format!(" on {f}"))
.unwrap_or_default();
println!("{time_str} {arrow} {tool}{file_str}");
}
EventKind::ToolResult {
tool,
success,
duration_ms,
} => {
let arrow = colors.blue("←");
let status = if *success {
"ok".to_string()
} else {
colors.red("error")
};
println!("{time_str} {arrow} {tool} -> {status} ({duration_ms}ms)");
}
EventKind::Diagnostics {
file,
count,
preview,
} => {
let basename = std::path::Path::new(file)
.file_name()
.and_then(|n| n.to_str())
.unwrap_or(file);
if *count == 0 {
let check = colors.green("ok");
println!("{time_str} {basename}: {check}");
} else {
let label = colors.yellow(&format!(
"{count} diagnostic{}",
if *count == 1 { "" } else { "s" }
));
let detail = if preview.is_empty() {
String::new()
} else {
let max_len = term_width.saturating_sub(14 + basename.len() + 20);
format!(" -- {}", cli::truncate(preview, max_len))
};
println!("{time_str} {basename}: {label}{detail}");
}
}
EventKind::LockAcquired { file, owner } => {
let basename = std::path::Path::new(file.as_str())
.file_name()
.and_then(|n| n.to_str())
.unwrap_or(file);
let lock_icon = colors.green("locked");
let short_owner = cli::truncate(owner, 20);
println!("{time_str} {basename}: {lock_icon} by {short_owner}");
}
EventKind::LockReleased { file, owner } => {
let basename = std::path::Path::new(file.as_str())
.file_name()
.and_then(|n| n.to_str())
.unwrap_or(file);
let unlock_icon = colors.dim("unlocked");
let short_owner = cli::truncate(owner, 20);
println!("{time_str} {basename}: {unlock_icon} by {short_owner}");
}
EventKind::LockDenied {
file,
owner,
held_by,
} => {
let basename = std::path::Path::new(file.as_str())
.file_name()
.and_then(|n| n.to_str())
.unwrap_or(file);
let denied = colors.red("lock denied");
let short_owner = cli::truncate(owner, 20);
let short_held = cli::truncate(held_by, 20);
println!("{time_str} {basename}: {denied} for {short_owner} (held by {short_held})");
}
EventKind::McpMessage { direction, message } => {
let arrow_colored = if direction == "in" {
colors.green("→")
} else {
colors.blue("←")
};
let summary = extract_mcp_summary(message, colors);
let prefix_len = 10 + 2 + 2; let max_summary_len = term_width.saturating_sub(prefix_len);
let summary = cli::truncate(&summary, max_summary_len);
println!("{time_str} {arrow_colored} {summary}");
if direction == "out"
&& let Some(obj) = message.as_object()
&& obj.contains_key("error")
&& let Some(error) = obj.get("error")
{
let err_msg = error
.get("message")
.and_then(|m| m.as_str())
.unwrap_or("Unknown error");
println!(" {}", colors.red(&format!("Error: {err_msg}")));
}
}
}
}
fn extract_mcp_summary(message: &serde_json::Value, colors: &ColorConfig) -> String {
let Some(obj) = message.as_object() else {
return message.to_string();
};
obj.get("method").and_then(|m| m.as_str()).map_or_else(
|| {
if obj.contains_key("result") || obj.contains_key("error") {
let id = obj.get("id").map(|i| format!("#{i}")).unwrap_or_default();
if obj.contains_key("error") {
format!("{} {}", colors.red("error"), id)
} else {
format!("result {id}")
}
} else {
serde_json::to_string(message).unwrap_or_default()
}
},
|method| {
let id = obj.get("id").map(|i| format!("#{i}")).unwrap_or_default();
let params_summary = match method {
"tools/call" => {
if let Some(params) = obj.get("params")
&& let Some(name) = params.get("name").and_then(|n| n.as_str())
{
let file_info = params
.get("arguments")
.and_then(|a| a.get("file_path").or_else(|| a.get("path")))
.and_then(|f| f.as_str())
.map(|f| {
std::path::Path::new(f)
.file_name()
.and_then(|n| n.to_str())
.unwrap_or(f)
})
.map(|f| format!(" ({f})"))
.unwrap_or_default();
format!("{}{}", colors.cyan(name), file_info)
} else {
String::new()
}
}
"initialize" => {
if let Some(params) = obj.get("params")
&& let Some(info) = params.get("clientInfo")
&& let Some(name) = info.get("name").and_then(|n| n.as_str())
{
format!("from {name}")
} else {
String::new()
}
}
_ => String::new(),
};
if params_summary.is_empty() {
format!("{method} {id}")
} else {
format!("{method} {params_summary} {id}")
}
},
)
}
fn print_event(event: &SessionEvent) {
let colors = ColorConfig::new(false);
let term_width = cli::terminal_width();
print_event_annotated(event, &colors, term_width);
}
async fn document_cleanup_task(
client_manager: Arc<lsp::ClientManager>,
doc_manager: Arc<Mutex<DocumentManager>>,
idle_timeout_secs: u64,
) {
let check_interval = Duration::from_secs(idle_timeout_secs.min(60));
loop {
tokio::time::sleep(check_interval).await;
let stale_paths = {
let doc_manager = doc_manager.lock().await;
doc_manager.stale_documents(idle_timeout_secs)
};
if !stale_paths.is_empty() {
debug!("Closing {} stale documents", stale_paths.len());
for path in stale_paths {
let (lang, close_params) = {
let mut doc_manager = doc_manager.lock().await;
let lang = doc_manager.language_id_for_path(&path).to_string();
(lang, doc_manager.close(&path))
};
if let Ok(Some(params)) = close_params {
let active_clients = client_manager.active_clients().await;
if let Some(client_mutex) = active_clients.get(&lang) {
let client = client_mutex.lock().await;
if let Err(e) = client.did_close(params).await {
warn!("Failed to close document {}: {}", path.display(), e);
} else {
debug!("Closed stale document: {}", path.display());
}
}
}
}
}
let active_langs: Vec<String> = client_manager
.active_clients()
.await
.keys()
.cloned()
.collect();
for lang in active_langs {
let has_docs = {
let doc_manager = doc_manager.lock().await;
doc_manager.has_open_documents(&lang)
};
if !has_docs {
client_manager.shutdown_client(&lang).await;
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use anyhow::Context;
#[test]
fn test_format_diagnostics_plain() {
let lines = vec![
"error[E0308]: mismatched types".into(),
" --> src/main.rs:5:10".into(),
];
let output = format_diagnostics(&lines, "plain");
assert_eq!(
output,
"error[E0308]: mismatched types\n --> src/main.rs:5:10\n"
);
}
#[test]
fn test_format_diagnostics_gemini() -> Result<()> {
let lines = vec!["error[E0308]: mismatched types".into()];
let output = format_diagnostics(&lines, "gemini");
let parsed: serde_json::Value =
serde_json::from_str(&output).context("gemini format should produce valid JSON")?;
let context = parsed["hookSpecificOutput"]["additionalContext"]
.as_str()
.context("additionalContext should be a string")?;
assert!(context.starts_with("LSP Diagnostics:\n"));
assert!(context.contains("error[E0308]: mismatched types"));
Ok(())
}
#[test]
fn test_format_diagnostics_gemini_multiline() -> Result<()> {
let lines = vec!["warning: unused variable".into(), " --> lib.rs:3:9".into()];
let output = format_diagnostics(&lines, "gemini");
let parsed: serde_json::Value =
serde_json::from_str(&output).context("should produce valid JSON")?;
let context = parsed["hookSpecificOutput"]["additionalContext"]
.as_str()
.context("additionalContext should be a string")?;
assert!(context.contains("warning: unused variable\n --> lib.rs:3:9"));
Ok(())
}
}