use anyhow::{Context, Result};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::fs::{self, File, OpenOptions};
use std::io::{BufRead, BufReader, Write};
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use tracing::warn;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SessionInfo {
pub id: String,
pub pid: u32,
pub workspace: String,
pub started_at: DateTime<Utc>,
#[serde(default)]
pub client_name: Option<String>,
#[serde(default)]
pub client_version: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SessionEvent {
pub timestamp: DateTime<Utc>,
#[serde(flatten)]
pub kind: EventKind,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum EventKind {
ServerState { language: String, state: String },
Progress {
language: String,
title: String,
message: Option<String>,
percentage: Option<u32>,
},
ProgressEnd { language: String },
ToolCall { tool: String, file: Option<String> },
ToolResult {
tool: String,
success: bool,
duration_ms: u64,
},
Started,
Shutdown,
McpMessage {
direction: String, message: serde_json::Value,
},
}
fn sessions_dir() -> Result<PathBuf> {
let state_dir = dirs::state_dir()
.or_else(dirs::data_local_dir)
.unwrap_or_else(|| PathBuf::from("/tmp"));
Ok(state_dir.join("catenary").join("sessions"))
}
pub struct Session {
pub info: SessionInfo,
session_dir: PathBuf,
events_file: Arc<Mutex<File>>,
}
impl Session {
pub fn create(workspace: &str) -> Result<Self> {
let id = Self::generate_id();
let sessions_base = sessions_dir()?;
let session_dir = sessions_base.join(&id);
fs::create_dir_all(&session_dir)
.with_context(|| format!("Failed to create session dir: {}", session_dir.display()))?;
let info = SessionInfo {
id: id.clone(),
pid: std::process::id(),
workspace: workspace.to_string(),
started_at: Utc::now(),
client_name: None,
client_version: None,
};
let info_path = session_dir.join("info.json");
let info_file = File::create(&info_path)?;
serde_json::to_writer_pretty(info_file, &info)?;
let events_path = session_dir.join("events.jsonl");
let events_file = OpenOptions::new()
.create(true)
.append(true)
.open(&events_path)?;
let session = Self {
info,
session_dir,
events_file: Arc::new(Mutex::new(events_file)),
};
session.broadcast(EventKind::Started);
Ok(session)
}
fn generate_id() -> String {
use std::time::{SystemTime, UNIX_EPOCH};
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis();
let pid = std::process::id();
let tid = format!("{:?}", std::thread::current().id());
let tid_hash = tid.bytes().fold(0u32, |acc, x| acc.wrapping_add(x as u32));
format!("{:x}{:x}{:x}", now as u32, pid, tid_hash)
}
pub fn set_client_info(&mut self, name: &str, version: &str) {
self.info.client_name = Some(name.to_string());
self.info.client_version = Some(version.to_string());
let info_path = self.session_dir.join("info.json");
if let Ok(file) = File::create(&info_path) {
let _ = serde_json::to_writer_pretty(file, &self.info);
}
}
pub fn broadcast(&self, kind: EventKind) {
let event = SessionEvent {
timestamp: Utc::now(),
kind,
};
if let Ok(mut file) = self.events_file.lock()
&& let Ok(json) = serde_json::to_string(&event)
{
let _ = writeln!(file, "{}", json);
let _ = file.flush();
}
}
pub fn broadcaster(&self) -> EventBroadcaster {
EventBroadcaster {
events_file: self.events_file.clone(),
}
}
}
impl Drop for Session {
fn drop(&mut self) {
self.broadcast(EventKind::Shutdown);
if let Err(e) = fs::remove_dir_all(&self.session_dir) {
warn!("Failed to clean up session directory: {}", e);
}
}
}
#[derive(Clone)]
pub struct EventBroadcaster {
events_file: Arc<Mutex<File>>,
}
impl EventBroadcaster {
pub fn send(&self, kind: EventKind) {
let event = SessionEvent {
timestamp: Utc::now(),
kind,
};
if let Ok(mut file) = self.events_file.lock()
&& let Ok(json) = serde_json::to_string(&event)
{
let _ = writeln!(file, "{}", json);
let _ = file.flush();
}
}
pub fn noop() -> Self {
let file = OpenOptions::new()
.write(true)
.open("/dev/null")
.unwrap_or_else(|_| {
OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(std::env::temp_dir().join(".catenary_null"))
.unwrap()
});
Self {
events_file: Arc::new(Mutex::new(file)),
}
}
}
pub fn list_sessions() -> Result<Vec<SessionInfo>> {
let sessions_base = sessions_dir()?;
if !sessions_base.exists() {
return Ok(vec![]);
}
let mut sessions = Vec::new();
for entry in fs::read_dir(&sessions_base)? {
let entry = entry?;
let info_path = entry.path().join("info.json");
if info_path.exists()
&& let Ok(file) = File::open(&info_path)
&& let Ok(info) = serde_json::from_reader::<_, SessionInfo>(file)
{
if is_process_alive(info.pid) {
sessions.push(info);
} else {
warn!("Cleaning up dead session {} (pid {})", info.id, info.pid);
let _ = fs::remove_dir_all(entry.path());
}
}
}
sessions.sort_by(|a, b| b.started_at.cmp(&a.started_at));
Ok(sessions)
}
pub fn get_session(id: &str) -> Result<Option<SessionInfo>> {
let sessions_base = sessions_dir()?;
let info_path = sessions_base.join(id).join("info.json");
if !info_path.exists() {
return Ok(None);
}
let file = File::open(&info_path)?;
let info: SessionInfo = serde_json::from_reader(file)?;
if is_process_alive(info.pid) {
Ok(Some(info))
} else {
let _ = fs::remove_dir_all(sessions_base.join(id));
Ok(None)
}
}
pub fn monitor_events(id: &str) -> Result<impl Iterator<Item = SessionEvent>> {
let sessions_base = sessions_dir()?;
let events_path = sessions_base.join(id).join("events.jsonl");
if !events_path.exists() {
anyhow::bail!("Session not found: {}", id);
}
let file = File::open(&events_path)?;
let reader = BufReader::new(file);
Ok(reader.lines().filter_map(|line| {
line.ok()
.and_then(|l| serde_json::from_str::<SessionEvent>(&l).ok())
}))
}
pub fn tail_events(id: &str) -> Result<TailReader> {
let sessions_base = sessions_dir()?;
let events_path = sessions_base.join(id).join("events.jsonl");
if !events_path.exists() {
anyhow::bail!("Session not found: {}", id);
}
TailReader::new(events_path)
}
pub struct TailReader {
path: PathBuf,
reader: BufReader<File>,
last_size: u64,
}
impl TailReader {
fn new(path: PathBuf) -> Result<Self> {
let file = File::open(&path)?;
let metadata = file.metadata()?;
let reader = BufReader::new(file);
Ok(Self {
path,
reader,
last_size: metadata.len(),
})
}
pub fn next_event(&mut self) -> Result<Option<SessionEvent>> {
loop {
let mut line = String::new();
let bytes_read = self.reader.read_line(&mut line)?;
if bytes_read > 0 {
let line = line.trim();
if !line.is_empty()
&& let Ok(event) = serde_json::from_str::<SessionEvent>(line)
{
return Ok(Some(event));
}
} else {
if let Ok(metadata) = fs::metadata(&self.path) {
if metadata.len() < self.last_size {
let file = File::open(&self.path)?;
self.reader = BufReader::new(file);
self.last_size = 0;
continue;
}
self.last_size = metadata.len();
} else {
return Ok(None);
}
std::thread::sleep(std::time::Duration::from_millis(100));
}
}
}
}
pub fn active_languages(id: &str) -> Result<Vec<String>> {
use std::collections::HashMap;
let sessions_base = sessions_dir()?;
let events_path = sessions_base.join(id).join("events.jsonl");
if !events_path.exists() {
return Ok(vec![]);
}
let file = File::open(&events_path)?;
let reader = BufReader::new(file);
let mut states: HashMap<String, String> = HashMap::new();
for line in reader.lines() {
if let Ok(line) = line
&& let Ok(event) = serde_json::from_str::<SessionEvent>(&line)
&& let EventKind::ServerState { language, state } = event.kind
{
if state == "Dead" {
states.remove(&language);
} else {
states.insert(language, state);
}
}
}
let mut languages: Vec<String> = states.keys().cloned().collect();
languages.sort();
Ok(languages)
}
fn is_process_alive(pid: u32) -> bool {
#[cfg(unix)]
{
unsafe { libc::kill(pid as i32, 0) == 0 }
}
#[cfg(not(unix))]
{
true
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_session_create_and_list() {
let session = Session::create("/tmp/test-workspace").unwrap();
let id = session.info.id.clone();
let sessions = list_sessions().unwrap();
assert!(sessions.iter().any(|s| s.id == id));
let found = get_session(&id).unwrap();
assert!(found.is_some());
assert_eq!(found.unwrap().workspace, "/tmp/test-workspace");
drop(session);
let found = get_session(&id).unwrap();
assert!(found.is_none());
}
#[test]
fn test_event_broadcast() {
let session = Session::create("/tmp/test-events").unwrap();
let id = session.info.id.clone();
session.broadcast(EventKind::ServerState {
language: "rust".to_string(),
state: "Indexing".to_string(),
});
session.broadcast(EventKind::Progress {
language: "rust".to_string(),
title: "Loading".to_string(),
message: Some("crates".to_string()),
percentage: Some(50),
});
let events: Vec<_> = monitor_events(&id).unwrap().collect();
assert!(events.len() >= 2);
drop(session);
}
#[test]
fn test_active_languages_empty() {
let session = Session::create("/tmp/test-langs-empty").unwrap();
let id = session.info.id.clone();
let langs = active_languages(&id).unwrap();
assert!(langs.is_empty());
drop(session);
}
#[test]
fn test_active_languages_tracks_server_state() {
let session = Session::create("/tmp/test-langs-state").unwrap();
let id = session.info.id.clone();
session.broadcast(EventKind::ServerState {
language: "rust".to_string(),
state: "Initializing".to_string(),
});
session.broadcast(EventKind::ServerState {
language: "rust".to_string(),
state: "Ready".to_string(),
});
let langs = active_languages(&id).unwrap();
assert_eq!(langs, vec!["rust"]);
drop(session);
}
#[test]
fn test_active_languages_removes_dead() {
let session = Session::create("/tmp/test-langs-dead").unwrap();
let id = session.info.id.clone();
session.broadcast(EventKind::ServerState {
language: "rust".to_string(),
state: "Ready".to_string(),
});
session.broadcast(EventKind::ServerState {
language: "rust".to_string(),
state: "Dead".to_string(),
});
let langs = active_languages(&id).unwrap();
assert!(langs.is_empty());
drop(session);
}
#[test]
fn test_active_languages_multiple_languages() {
let session = Session::create("/tmp/test-langs-multi").unwrap();
let id = session.info.id.clone();
session.broadcast(EventKind::ServerState {
language: "rust".to_string(),
state: "Ready".to_string(),
});
session.broadcast(EventKind::ServerState {
language: "python".to_string(),
state: "Ready".to_string(),
});
session.broadcast(EventKind::ServerState {
language: "typescript".to_string(),
state: "Initializing".to_string(),
});
let langs = active_languages(&id).unwrap();
assert_eq!(langs, vec!["python", "rust", "typescript"]);
drop(session);
}
}