#![allow(clippy::print_stdout, reason = "CLI tool needs to output to stdout")]
#![allow(clippy::print_stderr, reason = "CLI tool needs to output to stderr")]
use anyhow::Result;
use chrono::{Local, Utc};
use regex::Regex;
use std::time::Duration;
use crate::cli::{self, ColorConfig, ColumnWidths, QueryFormat};
use crate::session::{self, SessionMessage};
pub fn run_list() -> Result<()> {
let conn = crate::db::open_and_migrate()?;
let sessions = session::list_sessions_with_conn(&conn)?;
if sessions.is_empty() {
println!("No active Catenary sessions");
return Ok(());
}
let term_width = cli::terminal_width();
let widths = ColumnWidths::calculate(term_width);
let colors = cli::ColorConfig::new(false);
println!(
"{:>width_num$} {:<width_id$} {:<width_pid$} {:<width_client$} {:<width_ws$} STARTED",
"#",
"ID",
"PID",
"CLIENT",
"WORKSPACE",
width_num = widths.row_num,
width_id = widths.id,
width_pid = widths.pid,
width_client = widths.client,
width_ws = widths.workspace,
);
println!("{}", "-".repeat(term_width.min(120)));
let indent = " ".repeat(widths.row_num + 1);
for (idx, (s, alive)) in sessions.iter().enumerate() {
let client = match (&s.client_name, &s.client_version) {
(Some(name), Some(ver)) => format!("{name} v{ver}"),
(Some(name), None) => name.clone(),
_ => "-".to_string(),
};
let ago = format_duration_ago(s.started_at);
let id = cli::truncate(&s.id, widths.id);
let workspace = cli::truncate(&s.workspace, widths.workspace);
let client = cli::truncate(&client, widths.client);
let row_str = format!(
"{:>width_num$} {:<width_id$} {:<width_pid$} {:<width_client$} {:<width_ws$} {}",
idx + 1,
id,
s.pid,
client,
workspace,
ago,
width_num = widths.row_num,
width_id = widths.id,
width_pid = widths.pid,
width_client = widths.client,
width_ws = widths.workspace,
);
if *alive {
println!("{row_str}");
let languages = session::active_languages_with_conn(&conn, &s.id).unwrap_or_default();
if !languages.is_empty() {
let lang_str = languages.join(", ");
println!(
"{}",
colors.dim(&format!("{indent}language servers: {lang_str}"))
);
}
} else {
println!("{} (dead)", colors.dim(&row_str));
}
}
Ok(())
}
pub fn resolve_session_id(conn: &rusqlite::Connection, id: &str) -> Result<session::SessionInfo> {
if let Ok(row_num) = id.parse::<usize>()
&& row_num > 0
{
let sessions = session::list_sessions_with_conn(conn)?;
if let Some((s, _)) = sessions.get(row_num - 1) {
return Ok(s.clone());
}
if let Ok(session) = find_session(conn, id) {
return Ok(session);
}
anyhow::bail!("Row number {} out of range (1-{})", row_num, sessions.len());
}
find_session(conn, id)
}
pub fn run_monitor(id: &str, raw: bool, nocolor: bool, filter: Option<&str>) -> Result<()> {
let conn = crate::db::open_and_migrate()?;
let session = resolve_session_id(&conn, id)?;
let full_id = session.id;
let colors = ColorConfig::new(nocolor);
let term_width = cli::terminal_width();
let filter_regex = filter
.as_ref()
.map(|f| Regex::new(f))
.transpose()
.map_err(|e| anyhow::anyhow!("Invalid filter regex: {e}"))?;
println!("Monitoring session {full_id} (Ctrl+C to stop)\n");
let mut reader = session::tail_messages_new(&full_id)?;
let mut last_progress: Option<(String, String)> = None;
loop {
match reader.try_next_message() {
Ok(Some(msg)) => {
if let Some(ref re) = filter_regex {
let msg_str = format!("{} {} {}", msg.r#type, msg.method, msg.server);
if !re.is_match(&msg_str) {
continue;
}
}
if raw {
print_message_raw(&msg);
} else {
if msg.r#type == "lsp" && msg.method == "$/progress" {
let title = msg
.payload
.get("value")
.and_then(|v| v.get("title"))
.and_then(|t| t.as_str())
.unwrap_or("")
.to_string();
let key = (msg.server.clone(), title);
if last_progress.as_ref() == Some(&key) {
print!("\x1b[A\x1b[2K");
}
last_progress = Some(key);
} else {
last_progress = None;
}
print_message_annotated(&msg, &colors, term_width);
}
}
Ok(None) => {
std::thread::sleep(Duration::from_millis(100));
if let Ok(Some((_, alive))) = session::get_session_with_conn(&conn, &full_id) {
if !alive {
println!("\nSession ended (process dead)");
break;
}
} else {
println!("\nSession ended (files removed)");
break;
}
}
Err(_) => {
println!("\nSession ended");
break;
}
}
}
Ok(())
}
pub fn run_status(id: &str) -> Result<()> {
let conn = crate::db::open_and_migrate()?;
let session = find_session(&conn, id)?;
println!("Session: {}", session.id);
println!("PID: {}", session.pid);
println!("Workspace: {}", session.workspace);
println!(
"Started: {} ({})",
session
.started_at
.with_timezone(&Local)
.format("%Y-%m-%d %H:%M:%S"),
format_duration_ago(session.started_at)
);
if let Some(name) = &session.client_name {
print!("Client: {name}");
if let Some(ver) = &session.client_version {
print!(" v{ver}");
}
println!();
}
println!("\nRecent messages:");
let messages = session::monitor_messages_with_conn(&conn, &session.id)?;
let recent: Vec<_> = messages.iter().rev().take(10).collect();
let colors = ColorConfig::new(false);
let term_width = cli::terminal_width();
for msg in recent.iter().rev() {
print_message_annotated(msg, &colors, term_width);
}
Ok(())
}
pub(crate) fn parse_since(s: &str) -> Result<chrono::DateTime<Utc>> {
if s == "today" {
let today = Local::now()
.date_naive()
.and_hms_opt(0, 0, 0)
.ok_or_else(|| anyhow::anyhow!("failed to compute midnight"))?;
let local_midnight = today
.and_local_timezone(Local)
.single()
.ok_or_else(|| anyhow::anyhow!("ambiguous local midnight"))?;
return Ok(local_midnight.with_timezone(&Utc));
}
let (digits, unit) = s
.strip_suffix('m')
.map(|d| (d, "m"))
.or_else(|| s.strip_suffix('h').map(|d| (d, "h")))
.or_else(|| s.strip_suffix('d').map(|d| (d, "d")))
.ok_or_else(|| {
anyhow::anyhow!("unrecognised duration: {s} (expected Nm, Nh, Nd, or today)")
})?;
let n: i64 = digits
.parse()
.map_err(|_| anyhow::anyhow!("invalid number in duration: {s}"))?;
let duration = match unit {
"m" => chrono::Duration::minutes(n),
"h" => chrono::Duration::hours(n),
"d" => chrono::Duration::days(n),
_ => unreachable!(),
};
Ok(Utc::now() - duration)
}
#[allow(
clippy::too_many_lines,
reason = "Sequential query building and output formatting"
)]
pub fn run_query(
conn: &rusqlite::Connection,
session_filter: Option<&str>,
since: Option<&str>,
kind: Option<&str>,
search: Option<&str>,
raw_sql: Option<&str>,
format: QueryFormat,
) -> Result<()> {
if let Some(sql) = raw_sql {
let mut stmt = conn.prepare(sql)?;
let col_count = stmt.column_count();
let col_names: Vec<String> = (0..col_count)
.map(|i| stmt.column_name(i).unwrap_or("?").to_string())
.collect();
let mut rows_out: Vec<Vec<String>> = Vec::new();
let mut db_rows = stmt.query([])?;
while let Some(row) = db_rows.next()? {
let mut vals = Vec::with_capacity(col_count);
for i in 0..col_count {
let val: String = row
.get::<_, rusqlite::types::Value>(i)
.map(|v| format_sql_value(&v))
.unwrap_or_default();
vals.push(val);
}
rows_out.push(vals);
}
drop(db_rows);
print_query_results(&col_names, &rows_out, format);
return Ok(());
}
let mut conditions: Vec<String> = Vec::new();
let mut params: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
if let Some(sid) = session_filter {
let resolved = resolve_session_id(conn, sid)?;
conditions.push(format!("m.session_id = ?{}", params.len() + 1));
params.push(Box::new(resolved.id));
}
if let Some(since_str) = since {
let cutoff = parse_since(since_str)?;
conditions.push(format!("m.timestamp > ?{}", params.len() + 1));
params.push(Box::new(cutoff.to_rfc3339()));
}
if let Some(k) = kind {
conditions.push(format!("m.type = ?{}", params.len() + 1));
params.push(Box::new(k.to_string()));
}
if let Some(s) = search {
conditions.push(format!("m.payload LIKE ?{}", params.len() + 1));
params.push(Box::new(format!("%{s}%")));
}
let where_clause = if conditions.is_empty() {
String::new()
} else {
format!(" WHERE {}", conditions.join(" AND "))
};
let sql = format!(
"SELECT m.id, m.session_id, m.timestamp, m.type, m.method, m.server, \
m.payload \
FROM messages m{where_clause} ORDER BY m.id DESC LIMIT 100"
);
let param_refs: Vec<&dyn rusqlite::types::ToSql> = params.iter().map(AsRef::as_ref).collect();
let mut stmt = conn.prepare(&sql)?;
let mut db_rows = stmt.query(param_refs.as_slice())?;
let col_names = vec![
"ID".to_string(),
"SESSION".to_string(),
"TIME".to_string(),
"TYPE".to_string(),
"METHOD".to_string(),
"SERVER".to_string(),
"PAYLOAD".to_string(),
];
let mut rows_out: Vec<Vec<String>> = Vec::new();
while let Some(row) = db_rows.next()? {
let id: i64 = row.get(0)?;
let sid: String = row.get(1)?;
let ts: String = row.get(2)?;
let r#type: String = row.get(3)?;
let method: String = row.get(4)?;
let server: String = row.get(5)?;
let payload: String = row.get(6)?;
let short_sid = if sid.len() > 8 { &sid[..8] } else { &sid };
let short_ts = chrono::DateTime::parse_from_rfc3339(&ts)
.map(|dt| dt.with_timezone(&Local).format("%H:%M:%S").to_string())
.unwrap_or(ts);
rows_out.push(vec![
id.to_string(),
short_sid.to_string(),
short_ts,
r#type,
method,
server,
payload,
]);
}
print_query_results(&col_names, &rows_out, format);
Ok(())
}
fn format_sql_value(val: &rusqlite::types::Value) -> String {
match val {
rusqlite::types::Value::Null => "NULL".to_string(),
rusqlite::types::Value::Integer(i) => i.to_string(),
rusqlite::types::Value::Real(f) => f.to_string(),
rusqlite::types::Value::Text(s) => s.clone(),
rusqlite::types::Value::Blob(b) => format!("<blob {} bytes>", b.len()),
}
}
fn print_query_results(col_names: &[String], rows: &[Vec<String>], format: QueryFormat) {
if rows.is_empty() {
println!("No results");
return;
}
match format {
QueryFormat::Table => {
let mut widths: Vec<usize> = col_names.iter().map(String::len).collect();
for row in rows {
for (i, val) in row.iter().enumerate() {
if i < widths.len() {
widths[i] = widths[i].max(val.len());
}
}
}
if let Some(last) = widths.last_mut() {
*last = (*last).min(80);
}
let header: Vec<String> = col_names
.iter()
.zip(&widths)
.map(|(name, w)| format!("{name:<w$}"))
.collect();
println!("{}", header.join(" "));
println!(
"{}",
widths
.iter()
.map(|w| "-".repeat(*w))
.collect::<Vec<_>>()
.join(" ")
);
for row in rows {
let formatted: Vec<String> = row
.iter()
.zip(&widths)
.map(|(val, w)| {
if val.len() > *w {
format!("{}...", &val[..w.saturating_sub(3)])
} else {
format!("{val:<w$}")
}
})
.collect();
println!("{}", formatted.join(" "));
}
}
QueryFormat::Json => {
let arr: Vec<serde_json::Value> = rows
.iter()
.map(|row| {
let mut obj = serde_json::Map::new();
for (name, val) in col_names.iter().zip(row) {
obj.insert(name.to_lowercase(), serde_json::Value::String(val.clone()));
}
serde_json::Value::Object(obj)
})
.collect();
let json = serde_json::to_string_pretty(&arr).unwrap_or_default();
println!("{json}");
}
QueryFormat::Csv => {
println!("{}", col_names.join(","));
for row in rows {
let escaped: Vec<String> = row
.iter()
.map(|v| {
if v.contains(',') || v.contains('"') || v.contains('\n') {
format!("\"{}\"", v.replace('"', "\"\""))
} else {
v.clone()
}
})
.collect();
println!("{}", escaped.join(","));
}
}
}
}
pub fn run_gc(
conn: &rusqlite::Connection,
older_than: Option<&str>,
dead: bool,
session_id: Option<&str>,
) -> Result<()> {
let mut total_events_deleted: usize = 0;
let mut sessions_deleted: usize = 0;
if let Some(duration_str) = older_than {
let cutoff = parse_since(duration_str)?;
let cutoff_str = cutoff.to_rfc3339();
let events_removed =
conn.execute("DELETE FROM events WHERE timestamp < ?1", [&cutoff_str])?;
total_events_deleted += events_removed;
let filters_removed = conn.execute(
"DELETE FROM filter_history WHERE created_at < ?1",
[&cutoff_str],
)?;
println!(
"Deleted {events_removed} event{}{} older than {duration_str}",
if events_removed == 1 { "" } else { "s" },
if filters_removed > 0 {
format!(", {filters_removed} filter history entries")
} else {
String::new()
},
);
}
if dead {
let crashed: Vec<String> = {
let mut stmt = conn.prepare("SELECT id, pid FROM sessions WHERE alive = 1")?;
let mut rows = stmt.query([])?;
let mut ids = Vec::new();
while let Some(row) = rows.next()? {
let id: String = row.get(0)?;
let pid: u32 = row.get(1)?;
if !session::is_process_alive(pid) {
ids.push(id);
}
}
ids
};
let ended_at = Utc::now().to_rfc3339();
for id in &crashed {
let _ = conn.execute(
"UPDATE sessions SET alive = 0, ended_at = ?1 WHERE id = ?2",
rusqlite::params![&ended_at, id],
);
}
let dead_events: usize = conn
.query_row(
"SELECT COUNT(*) FROM events WHERE session_id IN \
(SELECT id FROM sessions WHERE alive = 0)",
[],
|row| row.get(0),
)
.unwrap_or(0);
let removed = conn.execute("DELETE FROM sessions WHERE alive = 0", [])?;
sessions_deleted += removed;
total_events_deleted += dead_events;
println!(
"Deleted {removed} dead session{} ({dead_events} event{})",
if removed == 1 { "" } else { "s" },
if dead_events == 1 { "" } else { "s" },
);
}
if let Some(sid) = session_id {
let resolved = resolve_session_id(conn, sid)?;
let event_count: usize = conn
.query_row(
"SELECT COUNT(*) FROM events WHERE session_id = ?1",
[&resolved.id],
|row| row.get(0),
)
.unwrap_or(0);
conn.execute("DELETE FROM sessions WHERE id = ?1", [&resolved.id])?;
sessions_deleted += 1;
total_events_deleted += event_count;
let socket_dir = session::sessions_dir().join(&resolved.id);
let _ = std::fs::remove_dir_all(&socket_dir);
println!(
"Deleted session {} ({event_count} event{})",
&resolved.id[..8.min(resolved.id.len())],
if event_count == 1 { "" } else { "s" },
);
}
if older_than.is_none() && !dead && session_id.is_none() {
println!("Nothing to do. Use --older-than, --dead, or --session.");
}
if total_events_deleted > 1000 || sessions_deleted > 0 {
let size_before = db_file_size();
conn.execute_batch("VACUUM")?;
let size_after = db_file_size();
if let (Some(before), Some(after)) = (size_before, size_after) {
let saved = before.saturating_sub(after);
if saved > 0 {
println!("Database vacuumed (saved {})", format_bytes(saved));
}
}
}
Ok(())
}
fn db_file_size() -> Option<u64> {
std::fs::metadata(crate::db::db_path())
.ok()
.map(|m| m.len())
}
#[allow(
clippy::cast_precision_loss,
reason = "byte counts are small enough for f64"
)]
fn format_bytes(bytes: u64) -> String {
if bytes >= 1_048_576 {
format!("{:.1} MB", bytes as f64 / 1_048_576.0)
} else if bytes >= 1024 {
format!("{:.1} KB", bytes as f64 / 1024.0)
} else {
format!("{bytes} B")
}
}
pub fn find_session(conn: &rusqlite::Connection, id: &str) -> Result<session::SessionInfo> {
if let Some((s, _)) = session::get_session_with_conn(conn, id)? {
return Ok(s);
}
let sessions = session::list_sessions_with_conn(conn)?;
let matches: Vec<_> = sessions
.iter()
.filter(|(s, _)| {
s.id.starts_with(id)
|| s.client_session_id
.as_deref()
.is_some_and(|csid| csid.starts_with(id))
})
.collect();
match matches.len() {
0 => anyhow::bail!("No session found matching '{id}'"),
1 => Ok(matches[0].0.clone()),
_ => {
eprintln!("Multiple sessions match '{id}':");
for (s, _) in matches {
eprintln!(" {}", s.id);
}
anyhow::bail!("Please specify a more complete session ID")
}
}
}
#[must_use]
pub fn format_duration_ago(timestamp: chrono::DateTime<Utc>) -> String {
let now = Utc::now();
let duration = now.signed_duration_since(timestamp);
if duration.num_hours() > 0 {
format!(
"{}h {}m ago",
duration.num_hours(),
duration.num_minutes() % 60
)
} else if duration.num_minutes() > 0 {
format!("{}m ago", duration.num_minutes())
} else {
format!("{}s ago", duration.num_seconds())
}
}
fn print_message_raw(msg: &SessionMessage) {
let time = msg.timestamp.with_timezone(&Local).format("%H:%M:%S");
let tag = match msg.r#type.as_str() {
"lsp" => format!("[{}]", msg.server),
"mcp" => "[mcp]".to_string(),
"hook" => "[hook]".to_string(),
other => format!("[{other}]"),
};
let arrow = if msg.payload.get("result").is_some() || msg.payload.get("error").is_some() {
"\u{2190}" } else {
"\u{2192}" };
println!("[{time}] {tag} {arrow}");
let pretty = serde_json::to_string_pretty(&msg.payload).unwrap_or_default();
println!("{pretty}");
}
#[allow(clippy::too_many_lines, reason = "match arms for each message type")]
fn print_message_annotated(msg: &SessionMessage, colors: &ColorConfig, term_width: usize) {
let time = msg.timestamp.with_timezone(&Local).format("%H:%M:%S");
let time_str = colors.dim(&format!("[{time}]"));
let is_response = msg.payload.get("result").is_some() || msg.payload.get("error").is_some();
match msg.r#type.as_str() {
"lsp" => {
let lang = colors.cyan(&msg.server);
if msg.method == "$/progress" {
let title = msg
.payload
.get("value")
.and_then(|v| v.get("title"))
.and_then(|t| t.as_str())
.unwrap_or("");
let pct = msg
.payload
.get("value")
.and_then(|v| v.get("percentage"))
.and_then(serde_json::Value::as_u64)
.map(|p| format!(" {p}%"))
.unwrap_or_default();
let detail = msg
.payload
.get("value")
.and_then(|v| v.get("message"))
.and_then(|m| m.as_str())
.map(|m| format!(" ({m})"))
.unwrap_or_default();
println!("{time_str} {lang}: {title}{pct}{detail}");
} else if is_response {
let arrow = colors.blue("\u{2190}");
if msg.payload.get("error").is_some() {
let err_msg = msg
.payload
.get("error")
.and_then(|e| e.get("message"))
.and_then(|m| m.as_str())
.unwrap_or("Unknown error");
println!(
"{time_str} {lang} {arrow} {} {}",
msg.method,
colors.red(err_msg)
);
} else {
println!("{time_str} {lang} {arrow} {}", msg.method);
}
} else {
let arrow = colors.green("\u{2192}");
println!("{time_str} {lang} {arrow} {}", msg.method);
}
}
"mcp" => {
let summary = extract_message_summary(&msg.payload, colors);
let prefix_len = 10 + 5 + 2 + 2; let max_summary_len = term_width.saturating_sub(prefix_len);
if is_response {
let arrow = colors.blue("\u{2190}");
let summary = cli::truncate(&summary, max_summary_len);
println!("{time_str} [mcp] {arrow} {summary}");
} else {
let arrow = colors.green("\u{2192}");
let summary = cli::truncate(&summary, max_summary_len);
println!("{time_str} [mcp] {arrow} {summary}");
}
if let Some(error) = msg.payload.get("error") {
let err_msg = error
.get("message")
.and_then(|m| m.as_str())
.unwrap_or("Unknown error");
println!(" {}", colors.red(&format!("Error: {err_msg}")));
}
}
"hook" => {
if let Some(count_val) = msg.payload.get("count") {
let count = count_val.as_u64().unwrap_or(0);
let file = msg
.payload
.get("file")
.and_then(|f| f.as_str())
.unwrap_or(&msg.method);
let basename = std::path::Path::new(file)
.file_name()
.and_then(|n| n.to_str())
.unwrap_or(file);
if count == 0 {
let check = colors.green("ok");
println!("{time_str} {basename}: {check}");
} else {
let label = colors.yellow(&format!(
"{count} diagnostic{}",
if count == 1 { "" } else { "s" }
));
let preview = msg
.payload
.get("preview")
.and_then(|p| p.as_str())
.unwrap_or("");
let detail = if preview.is_empty() {
String::new()
} else {
let max_len = term_width.saturating_sub(14 + basename.len() + 20);
format!(" -- {}", cli::truncate(preview, max_len))
};
println!("{time_str} {basename}: {label}{detail}");
}
} else {
println!("{time_str} [hook] {}", msg.method);
}
}
other => {
println!("{time_str} [{other}] {}", msg.method);
}
}
}
fn extract_message_summary(payload: &serde_json::Value, colors: &ColorConfig) -> String {
let Some(obj) = payload.as_object() else {
return payload.to_string();
};
obj.get("method").and_then(|m| m.as_str()).map_or_else(
|| {
if obj.contains_key("result") || obj.contains_key("error") {
let id = obj.get("id").map(|i| format!("#{i}")).unwrap_or_default();
if obj.contains_key("error") {
format!("{} {}", colors.red("error"), id)
} else {
format!("result {id}")
}
} else {
serde_json::to_string(payload).unwrap_or_default()
}
},
|method| {
let id = obj.get("id").map(|i| format!("#{i}")).unwrap_or_default();
let params_summary = match method {
"tools/call" => {
if let Some(params) = obj.get("params")
&& let Some(name) = params.get("name").and_then(|n| n.as_str())
{
let file_info = params
.get("arguments")
.and_then(|a| a.get("file_path").or_else(|| a.get("path")))
.and_then(|f| f.as_str())
.map(|f| {
std::path::Path::new(f)
.file_name()
.and_then(|n| n.to_str())
.unwrap_or(f)
})
.map(|f| format!(" ({f})"))
.unwrap_or_default();
format!("{}{}", colors.cyan(name), file_info)
} else {
String::new()
}
}
"initialize" => {
if let Some(params) = obj.get("params")
&& let Some(info) = params.get("clientInfo")
&& let Some(name) = info.get("name").and_then(|n| n.as_str())
{
format!("from {name}")
} else {
String::new()
}
}
_ => String::new(),
};
if params_summary.is_empty() {
format!("{method} {id}")
} else {
format!("{method} {params_summary} {id}")
}
},
)
}
#[cfg(test)]
#[allow(
clippy::expect_used,
reason = "tests use expect for readable assertions"
)]
mod tests {
use super::*;
use crate::session;
fn test_db() -> (tempfile::TempDir, std::path::PathBuf, rusqlite::Connection) {
let dir = tempfile::tempdir().expect("failed to create tempdir for test DB");
let path = dir.path().join("catenary").join("catenary.db");
let conn = crate::db::open_and_migrate_at(&path).expect("failed to open test DB");
(dir, path, conn)
}
fn create_session(
db_path: &std::path::Path,
workspace: &str,
) -> anyhow::Result<session::Session> {
let arc = std::sync::Arc::new(std::sync::Mutex::new(crate::db::open_and_migrate_at(
db_path,
)?));
session::Session::create_with_conn(workspace, arc)
}
#[test]
fn test_parse_since_hours() -> anyhow::Result<()> {
let cutoff = parse_since("1h")?;
let diff = Utc::now().signed_duration_since(cutoff);
assert!(
diff.num_seconds() >= 3595 && diff.num_seconds() <= 3605,
"expected ~3600s, got {}s",
diff.num_seconds()
);
Ok(())
}
#[test]
fn test_parse_since_days() -> anyhow::Result<()> {
let cutoff = parse_since("7d")?;
let diff = Utc::now().signed_duration_since(cutoff);
let expected = 7 * 86400;
assert!(
diff.num_seconds() >= expected - 5 && diff.num_seconds() <= expected + 5,
"expected ~{expected}s, got {}s",
diff.num_seconds()
);
Ok(())
}
#[test]
fn test_parse_since_minutes() -> anyhow::Result<()> {
let cutoff = parse_since("30m")?;
let diff = Utc::now().signed_duration_since(cutoff);
assert!(
diff.num_seconds() >= 1795 && diff.num_seconds() <= 1805,
"expected ~1800s, got {}s",
diff.num_seconds()
);
Ok(())
}
#[test]
fn test_parse_since_today() -> anyhow::Result<()> {
let cutoff = parse_since("today")?;
let now = Utc::now();
assert!(cutoff <= now);
assert!(now.signed_duration_since(cutoff).num_hours() < 24);
Ok(())
}
#[test]
fn test_parse_since_invalid() {
assert!(parse_since("abc").is_err());
assert!(parse_since("").is_err());
assert!(parse_since("5x").is_err());
}
#[test]
fn test_query_with_type_filter() -> anyhow::Result<()> {
let (_dir, path, conn) = test_db();
let conn_arc = std::sync::Arc::new(std::sync::Mutex::new(crate::db::open_and_migrate_at(
&path,
)?));
conn.execute(
"INSERT INTO sessions (id, pid, display_name, started_at) \
VALUES ('s1', 1, 'test', '2026-01-01T00:00:00Z')",
[],
)?;
let log = session::MessageLog::new(conn_arc, "s1".to_string());
log.log(
"mcp",
"tools/call",
"catenary",
"claude-code",
None,
None,
&serde_json::json!({"params": {"name": "grep"}}),
);
log.log(
"lsp",
"textDocument/hover",
"rust-analyzer",
"catenary",
None,
None,
&serde_json::json!({}),
);
run_query(
&conn,
Some("s1"),
None,
Some("mcp"),
None,
None,
QueryFormat::Table,
)?;
Ok(())
}
#[test]
fn test_query_with_search() -> anyhow::Result<()> {
let (_dir, path, conn) = test_db();
let conn_arc = std::sync::Arc::new(std::sync::Mutex::new(crate::db::open_and_migrate_at(
&path,
)?));
conn.execute(
"INSERT INTO sessions (id, pid, display_name, started_at) \
VALUES ('s1', 1, 'test', '2026-01-01T00:00:00Z')",
[],
)?;
let log = session::MessageLog::new(conn_arc, "s1".to_string());
log.log(
"mcp",
"tools/call",
"catenary",
"claude-code",
None,
None,
&serde_json::json!({"params": {"name": "grep"}}),
);
run_query(
&conn,
Some("s1"),
None,
None,
Some("grep"),
None,
QueryFormat::Table,
)?;
Ok(())
}
#[test]
fn test_gc_dead_sessions() -> anyhow::Result<()> {
let (_dir, path, conn) = test_db();
let session = create_session(&path, "/tmp/test-gc-dead")?;
let id = session.info.id.clone();
drop(session);
let found = session::get_session_with_conn(&conn, &id)?;
assert!(found.is_some(), "session should exist");
assert!(!found.expect("checked above").1, "session should be dead");
run_gc(&conn, None, true, None)?;
assert!(
session::get_session_with_conn(&conn, &id)?.is_none(),
"dead session should be deleted"
);
Ok(())
}
#[test]
fn test_gc_specific_session() -> anyhow::Result<()> {
let (_dir, path, conn) = test_db();
let s1 = create_session(&path, "/tmp/test-gc-specific-1")?;
let id1 = s1.info.id.clone();
let s2 = create_session(&path, "/tmp/test-gc-specific-2")?;
let id2 = s2.info.id.clone();
drop(s1);
drop(s2);
run_gc(&conn, None, false, Some(&id1))?;
assert!(
session::get_session_with_conn(&conn, &id1)?.is_none(),
"targeted session should be deleted"
);
assert!(
session::get_session_with_conn(&conn, &id2)?.is_some(),
"other session should survive"
);
session::delete_session_data_with_conn(&conn, &id2)?;
Ok(())
}
#[test]
fn test_gc_no_flags_is_noop() -> anyhow::Result<()> {
let (_dir, _path, conn) = test_db();
run_gc(&conn, None, false, None)?;
Ok(())
}
#[test]
fn test_format_bytes() {
assert_eq!(format_bytes(500), "500 B");
assert_eq!(format_bytes(1536), "1.5 KB");
assert_eq!(format_bytes(2_621_440), "2.5 MB");
}
}