use anyhow::{Context, Result};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::fs::{self, File, OpenOptions};
use std::io::{BufRead, BufReader, Write};
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use tracing::warn;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SessionInfo {
pub id: String,
pub pid: u32,
pub workspace: String,
pub started_at: DateTime<Utc>,
#[serde(default)]
pub client_name: Option<String>,
#[serde(default)]
pub client_version: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SessionEvent {
pub timestamp: DateTime<Utc>,
#[serde(flatten)]
pub kind: EventKind,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum EventKind {
ServerState {
language: String,
state: String,
},
Progress {
language: String,
title: String,
message: Option<String>,
percentage: Option<u32>,
},
ProgressEnd {
language: String,
},
ToolCall {
tool: String,
file: Option<String>,
},
ToolResult {
tool: String,
success: bool,
duration_ms: u64,
},
Diagnostics {
file: String,
count: usize,
preview: String,
},
Started,
Shutdown,
McpMessage {
direction: String,
message: serde_json::Value,
},
LockAcquired {
file: String,
owner: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
tool: Option<String>,
},
LockReleased {
file: String,
owner: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
tool: Option<String>,
},
LockDenied {
file: String,
owner: String,
held_by: String,
},
}
pub fn sessions_dir() -> PathBuf {
let state_dir = dirs::state_dir()
.or_else(dirs::data_local_dir)
.unwrap_or_else(|| PathBuf::from("/tmp"));
state_dir.join("catenary").join("sessions")
}
pub struct Session {
pub info: SessionInfo,
dir: PathBuf,
events_file: Arc<Mutex<File>>,
socket_path: Option<PathBuf>,
}
impl Session {
pub fn create(workspace: &str) -> Result<Self> {
let id = Self::generate_id();
let sessions_base = sessions_dir();
let session_dir = sessions_base.join(&id);
fs::create_dir_all(&session_dir)
.with_context(|| format!("Failed to create session dir: {}", session_dir.display()))?;
let info = SessionInfo {
id,
pid: std::process::id(),
workspace: workspace.to_string(),
started_at: Utc::now(),
client_name: None,
client_version: None,
};
let info_path = session_dir.join("info.json");
let info_file = File::create(&info_path)?;
serde_json::to_writer_pretty(info_file, &info)?;
let events_path = session_dir.join("events.jsonl");
let events_file = OpenOptions::new()
.create(true)
.append(true)
.open(&events_path)?;
let session = Self {
info,
dir: session_dir,
events_file: Arc::new(Mutex::new(events_file)),
socket_path: None,
};
session.broadcast(EventKind::Started);
Ok(session)
}
fn generate_id() -> String {
use std::time::{SystemTime, UNIX_EPOCH};
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or(std::time::Duration::ZERO)
.as_millis();
let pid = std::process::id();
let tid = format!("{:?}", std::thread::current().id());
let tid_hash = tid
.bytes()
.fold(0u32, |acc, x| acc.wrapping_add(u32::from(x)));
format!(
"{:x}{:x}{:x}",
u32::try_from(now).unwrap_or(0),
pid,
tid_hash
)
}
#[must_use]
pub fn socket_path(&self) -> PathBuf {
#[cfg(unix)]
{
self.dir.join("notify.sock")
}
#[cfg(windows)]
{
PathBuf::from(format!(r"\\.\pipe\catenary-{}", self.info.id))
}
}
pub fn set_socket_active(&mut self) {
self.socket_path = Some(self.socket_path());
}
pub fn set_client_info(&mut self, name: &str, version: &str) {
self.info.client_name = Some(name.to_string());
self.info.client_version = Some(version.to_string());
let info_path = self.dir.join("info.json");
if let Ok(file) = File::create(&info_path) {
let _ = serde_json::to_writer_pretty(file, &self.info);
}
}
pub fn broadcast(&self, kind: EventKind) {
let event = SessionEvent {
timestamp: Utc::now(),
kind,
};
if let Ok(mut file) = self.events_file.lock()
&& let Ok(json) = serde_json::to_string(&event)
{
let _ = writeln!(file, "{json}");
let _ = file.flush();
}
}
#[must_use]
pub fn broadcaster(&self) -> EventBroadcaster {
EventBroadcaster {
events_file: self.events_file.clone(),
}
}
}
impl Drop for Session {
fn drop(&mut self) {
self.broadcast(EventKind::Shutdown);
#[cfg(unix)]
if let Some(ref sock) = self.socket_path {
let _ = fs::remove_file(sock);
}
if let Err(e) = fs::remove_dir_all(&self.dir) {
warn!("Failed to clean up session directory: {}", e);
}
}
}
#[derive(Clone)]
pub struct EventBroadcaster {
events_file: Arc<Mutex<File>>,
}
impl EventBroadcaster {
pub fn send(&self, kind: EventKind) {
let event = SessionEvent {
timestamp: Utc::now(),
kind,
};
if let Ok(mut file) = self.events_file.lock()
&& let Ok(json) = serde_json::to_string(&event)
{
let _ = writeln!(file, "{json}");
let _ = file.flush();
}
}
pub fn noop() -> Result<Self> {
let file = OpenOptions::new()
.write(true)
.open("/dev/null")
.or_else(|_| {
OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(std::env::temp_dir().join(".catenary_null"))
})?;
Ok(Self {
events_file: Arc::new(Mutex::new(file)),
})
}
}
pub fn list_sessions() -> Result<Vec<SessionInfo>> {
let sessions_base = sessions_dir();
if !sessions_base.exists() {
return Ok(vec![]);
}
let mut sessions = Vec::new();
for entry in fs::read_dir(&sessions_base)? {
let entry = entry?;
let info_path = entry.path().join("info.json");
if info_path.exists()
&& let Ok(file) = File::open(&info_path)
&& let Ok(info) = serde_json::from_reader::<_, SessionInfo>(file)
{
if is_process_alive(info.pid) {
sessions.push(info);
} else {
warn!("Cleaning up dead session {} (pid {})", info.id, info.pid);
let _ = fs::remove_dir_all(entry.path());
}
}
}
sessions.sort_by(|a, b| b.started_at.cmp(&a.started_at));
Ok(sessions)
}
pub fn get_session(id: &str) -> Result<Option<SessionInfo>> {
let sessions_base = sessions_dir();
let info_path = sessions_base.join(id).join("info.json");
if !info_path.exists() {
return Ok(None);
}
let file = File::open(&info_path)?;
let info: SessionInfo = serde_json::from_reader(file)?;
if is_process_alive(info.pid) {
Ok(Some(info))
} else {
let _ = fs::remove_dir_all(sessions_base.join(id));
Ok(None)
}
}
pub fn monitor_events(id: &str) -> Result<impl Iterator<Item = SessionEvent>> {
let sessions_base = sessions_dir();
let events_path = sessions_base.join(id).join("events.jsonl");
if !events_path.exists() {
anyhow::bail!("Session not found: {id}");
}
let file = File::open(&events_path)?;
let reader = BufReader::new(file);
Ok(reader.lines().filter_map(|line| {
line.ok()
.and_then(|l| serde_json::from_str::<SessionEvent>(&l).ok())
}))
}
pub fn tail_events(id: &str) -> Result<TailReader> {
let sessions_base = sessions_dir();
let events_path = sessions_base.join(id).join("events.jsonl");
if !events_path.exists() {
anyhow::bail!("Session not found: {id}");
}
TailReader::new(events_path)
}
pub struct TailReader {
path: PathBuf,
reader: BufReader<File>,
last_size: u64,
}
impl TailReader {
fn new(path: PathBuf) -> Result<Self> {
let file = File::open(&path)?;
let metadata = file.metadata()?;
let reader = BufReader::new(file);
Ok(Self {
path,
reader,
last_size: metadata.len(),
})
}
pub fn next_event(&mut self) -> Result<Option<SessionEvent>> {
use std::io::Seek;
loop {
let mut line = String::new();
let bytes_read = self.reader.read_line(&mut line)?;
if bytes_read > 0 {
let line = line.trim();
if !line.is_empty()
&& let Ok(event) = serde_json::from_str::<SessionEvent>(line)
{
return Ok(Some(event));
}
} else {
if let Ok(metadata) = fs::metadata(&self.path) {
if metadata.len() < self.last_size {
let file = File::open(&self.path)?;
self.reader = BufReader::new(file);
self.last_size = 0;
continue;
}
if metadata.len() > self.last_size {
self.reader.stream_position()?;
}
self.last_size = metadata.len();
} else {
return Ok(None);
}
std::thread::sleep(std::time::Duration::from_millis(100));
}
}
}
}
pub fn active_languages(id: &str) -> Result<Vec<String>> {
use std::collections::HashMap;
let sessions_base = sessions_dir();
let events_path = sessions_base.join(id).join("events.jsonl");
if !events_path.exists() {
return Ok(vec![]);
}
let file = File::open(&events_path)?;
let reader = BufReader::new(file);
let mut states: HashMap<String, String> = HashMap::new();
for line in reader.lines().map_while(Result::ok) {
if let Ok(event) = serde_json::from_str::<SessionEvent>(&line)
&& let EventKind::ServerState { language, state } = event.kind
{
if state == "Dead" {
states.remove(&language);
} else {
states.insert(language, state);
}
}
}
let mut languages: Vec<String> = states.keys().cloned().collect();
languages.sort();
Ok(languages)
}
fn is_process_alive(pid: u32) -> bool {
#[cfg(target_os = "linux")]
{
std::path::Path::new("/proc").join(pid.to_string()).exists()
}
#[cfg(all(unix, not(target_os = "linux")))]
{
std::process::Command::new("kill")
.arg("-0")
.arg(pid.to_string())
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.status()
.map(|s| s.success())
.unwrap_or(false)
}
#[cfg(not(unix))]
{
let _ = pid;
true
}
}
#[cfg(test)]
mod tests {
use super::*;
use anyhow::{Context, Result};
#[test]
fn test_session_create_and_list() -> Result<()> {
let session = Session::create("/tmp/test-workspace")?;
let id = session.info.id.clone();
let sessions = list_sessions()?;
assert!(sessions.iter().any(|s| s.id == id));
let found = get_session(&id)?;
let found_session = found.context("missing session")?;
assert_eq!(found_session.workspace, "/tmp/test-workspace");
drop(session);
let found = get_session(&id)?;
assert!(found.is_none());
Ok(())
}
#[test]
fn test_event_broadcast() -> Result<()> {
let session = Session::create("/tmp/test-events")?;
let id = session.info.id.clone();
session.broadcast(EventKind::ServerState {
language: "rust".to_string(),
state: "Indexing".to_string(),
});
session.broadcast(EventKind::Progress {
language: "rust".to_string(),
title: "Loading".to_string(),
message: Some("crates".to_string()),
percentage: Some(50),
});
assert!(monitor_events(&id)?.count() >= 2);
drop(session);
Ok(())
}
#[test]
fn test_active_languages_empty() -> Result<()> {
let session = Session::create("/tmp/test-langs-empty")?;
let id = session.info.id.clone();
let langs = active_languages(&id)?;
assert!(langs.is_empty());
drop(session);
Ok(())
}
#[test]
fn test_active_languages_tracks_server_state() -> Result<()> {
let session = Session::create("/tmp/test-langs-state")?;
let id = session.info.id.clone();
session.broadcast(EventKind::ServerState {
language: "rust".to_string(),
state: "Initializing".to_string(),
});
session.broadcast(EventKind::ServerState {
language: "rust".to_string(),
state: "Ready".to_string(),
});
let langs = active_languages(&id)?;
assert_eq!(langs, vec!["rust"]);
drop(session);
Ok(())
}
#[test]
fn test_active_languages_removes_dead() -> Result<()> {
let session = Session::create("/tmp/test-langs-dead")?;
let id = session.info.id.clone();
session.broadcast(EventKind::ServerState {
language: "rust".to_string(),
state: "Ready".to_string(),
});
session.broadcast(EventKind::ServerState {
language: "rust".to_string(),
state: "Dead".to_string(),
});
let langs = active_languages(&id)?;
assert!(langs.is_empty());
drop(session);
Ok(())
}
#[test]
fn test_active_languages_multiple_languages() -> Result<()> {
let session = Session::create("/tmp/test-langs-multi")?;
let id = session.info.id.clone();
session.broadcast(EventKind::ServerState {
language: "rust".to_string(),
state: "Ready".to_string(),
});
session.broadcast(EventKind::ServerState {
language: "python".to_string(),
state: "Ready".to_string(),
});
session.broadcast(EventKind::ServerState {
language: "typescript".to_string(),
state: "Initializing".to_string(),
});
let langs = active_languages(&id)?;
assert_eq!(langs, vec!["python", "rust", "typescript"]);
drop(session);
Ok(())
}
}