use std::io::{BufRead, Write};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, Ordering};
#[cfg(not(target_family = "wasm"))]
use fs2::FileExt;
use crate::session::event::SessionEvent;
#[derive(Debug)]
pub enum EventLogError {
Io(std::io::Error),
Json(serde_json::Error),
}
impl std::fmt::Display for EventLogError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Io(e) => write!(f, "event log io: {e}"),
Self::Json(e) => write!(f, "event log json: {e}"),
}
}
}
impl std::error::Error for EventLogError {}
impl From<std::io::Error> for EventLogError {
fn from(e: std::io::Error) -> Self { Self::Io(e) }
}
impl From<serde_json::Error> for EventLogError {
fn from(e: serde_json::Error) -> Self { Self::Json(e) }
}
pub struct EventLog {
path: PathBuf,
sequence: AtomicU64,
}
impl EventLog {
pub fn open(session_dir: &Path) -> Result<Self, EventLogError> {
std::fs::create_dir_all(session_dir)?;
let path = session_dir.join("events.jsonl");
let sequence = if path.exists() {
let file = std::fs::File::open(&path)?;
let reader = std::io::BufReader::new(file);
let count = reader.lines().filter(|l| l.is_ok()).count() as u64;
AtomicU64::new(count)
} else {
AtomicU64::new(0)
};
Ok(Self { path, sequence })
}
pub fn append(&self, event: &mut SessionEvent) -> Result<(), EventLogError> {
self.append_locked(event)
}
#[cfg(not(target_family = "wasm"))]
fn append_locked(&self, event: &mut SessionEvent) -> Result<(), EventLogError> {
use std::time::{Duration, Instant};
let lock_path = self.path.with_extension("jsonl.lock");
let lock_file = open_lock_file(&lock_path)?;
let mut acquired = false;
let start = Instant::now();
let deadline = Duration::from_millis(500);
loop {
match lock_file.try_lock_exclusive() {
Ok(()) => {
acquired = true;
break;
}
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
if start.elapsed() >= deadline {
eprintln!(
"[treeship] event_log: lock contention on {} \
exceeded {}ms; appending without sequence ordering guarantee",
lock_path.display(),
deadline.as_millis()
);
break;
}
std::thread::sleep(Duration::from_millis(10));
}
Err(e) => return Err(e.into()),
}
}
let count = if self.path.exists() {
let f = std::fs::File::open(&self.path)?;
let r = std::io::BufReader::new(f);
r.lines().filter(|l| l.is_ok()).count() as u64
} else {
0
};
event.sequence_no = count;
let mut line = serde_json::to_vec(event)?;
line.push(b'\n');
let mut file = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&self.path)?;
file.write_all(&line)?;
file.flush()?;
self.sequence.store(count + 1, Ordering::SeqCst);
let _ = acquired;
Ok(())
}
#[cfg(target_family = "wasm")]
fn append_locked(&self, event: &mut SessionEvent) -> Result<(), EventLogError> {
event.sequence_no = self.sequence.fetch_add(1, Ordering::SeqCst);
let mut line = serde_json::to_vec(event)?;
line.push(b'\n');
let mut file = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&self.path)?;
file.write_all(&line)?;
file.flush()?;
Ok(())
}
pub fn read_all(&self) -> Result<Vec<SessionEvent>, EventLogError> {
if !self.path.exists() {
return Ok(Vec::new());
}
let file = std::fs::File::open(&self.path)?;
let reader = std::io::BufReader::new(file);
let mut events = Vec::new();
for line in reader.lines() {
let line = line?;
if line.trim().is_empty() {
continue;
}
let event: SessionEvent = serde_json::from_str(&line)?;
events.push(event);
}
Ok(events)
}
pub fn event_count(&self) -> u64 {
self.sequence.load(Ordering::SeqCst)
}
pub fn path(&self) -> &Path {
&self.path
}
}
#[cfg(all(not(target_family = "wasm"), unix))]
fn open_lock_file(path: &Path) -> Result<std::fs::File, std::io::Error> {
use std::os::unix::fs::{MetadataExt, OpenOptionsExt, PermissionsExt};
let file = std::fs::OpenOptions::new()
.create(true)
.read(true)
.write(true)
.mode(0o600)
.open(path)?;
if let Ok(meta) = file.metadata() {
let mode = meta.permissions().mode() & 0o777;
let owned_by_us = meta.uid() == nix_uid();
if owned_by_us && mode != 0o600 {
let _ = std::fs::set_permissions(path, std::fs::Permissions::from_mode(0o600));
}
}
Ok(file)
}
#[cfg(all(not(target_family = "wasm"), unix))]
fn nix_uid() -> u32 {
unsafe extern "C" {
fn geteuid() -> u32;
}
unsafe { geteuid() }
}
#[cfg(all(not(target_family = "wasm"), not(unix)))]
fn open_lock_file(path: &Path) -> Result<std::fs::File, std::io::Error> {
std::fs::OpenOptions::new()
.create(true)
.read(true)
.write(true)
.open(path)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::session::event::*;
fn make_event(session_id: &str, event_type: EventType) -> SessionEvent {
SessionEvent {
session_id: session_id.into(),
event_id: generate_event_id(),
timestamp: "2026-04-05T08:00:00Z".into(),
sequence_no: 0,
trace_id: generate_trace_id(),
span_id: generate_span_id(),
parent_span_id: None,
agent_id: "agent://test".into(),
agent_instance_id: "ai_test_1".into(),
agent_name: "test-agent".into(),
agent_role: None,
host_id: "host_test".into(),
tool_runtime_id: None,
event_type,
artifact_ref: None,
meta: None,
}
}
#[test]
fn append_and_read_back() {
let dir = std::env::temp_dir().join(format!("treeship-evtlog-test-{}", rand::random::<u32>()));
let log = EventLog::open(&dir).unwrap();
let mut e1 = make_event("ssn_001", EventType::SessionStarted);
let mut e2 = make_event("ssn_001", EventType::AgentStarted {
parent_agent_instance_id: None,
});
log.append(&mut e1).unwrap();
log.append(&mut e2).unwrap();
assert_eq!(log.event_count(), 2);
assert_eq!(e1.sequence_no, 0);
assert_eq!(e2.sequence_no, 1);
let events = log.read_all().unwrap();
assert_eq!(events.len(), 2);
assert_eq!(events[0].sequence_no, 0);
assert_eq!(events[1].sequence_no, 1);
let _ = std::fs::remove_dir_all(&dir);
}
#[test]
fn reopen_preserves_sequence() {
let dir = std::env::temp_dir().join(format!("treeship-evtlog-reopen-{}", rand::random::<u32>()));
{
let log = EventLog::open(&dir).unwrap();
let mut e = make_event("ssn_001", EventType::SessionStarted);
log.append(&mut e).unwrap();
}
let log = EventLog::open(&dir).unwrap();
assert_eq!(log.event_count(), 1);
let mut e2 = make_event("ssn_001", EventType::AgentStarted {
parent_agent_instance_id: None,
});
log.append(&mut e2).unwrap();
assert_eq!(e2.sequence_no, 1);
let _ = std::fs::remove_dir_all(&dir);
}
#[cfg(not(target_family = "wasm"))]
#[test]
fn concurrent_appends_have_unique_sequence_numbers() {
use std::sync::Arc;
use std::thread;
let dir = std::env::temp_dir()
.join(format!("treeship-evtlog-race-{}", rand::random::<u32>()));
std::fs::create_dir_all(&dir).unwrap();
const WRITERS: usize = 16;
let dir = Arc::new(dir);
let mut handles = Vec::with_capacity(WRITERS);
for _ in 0..WRITERS {
let dir = Arc::clone(&dir);
handles.push(thread::spawn(move || {
let log = EventLog::open(&dir).unwrap();
let mut e = make_event("ssn_race", EventType::SessionStarted);
log.append(&mut e).unwrap();
e.sequence_no
}));
}
let mut seqs: Vec<u64> = handles.into_iter().map(|h| h.join().unwrap()).collect();
seqs.sort();
let expected: Vec<u64> = (0..WRITERS as u64).collect();
assert_eq!(seqs, expected, "sequence_no collisions under contention");
let log = EventLog::open(&dir).unwrap();
let read = log.read_all().unwrap();
assert_eq!(read.len(), WRITERS);
let mut on_disk: Vec<u64> = read.iter().map(|e| e.sequence_no).collect();
on_disk.sort();
assert_eq!(on_disk, expected);
let _ = std::fs::remove_dir_all(&*dir);
}
#[cfg(all(not(target_family = "wasm"), unix))]
#[test]
fn lock_file_has_owner_only_permissions() {
use std::os::unix::fs::PermissionsExt;
let dir = std::env::temp_dir()
.join(format!("treeship-evtlog-perms-{}", rand::random::<u32>()));
let log = EventLog::open(&dir).unwrap();
let mut e = make_event("ssn_perms", EventType::SessionStarted);
log.append(&mut e).unwrap();
let lock_path = log.path().with_extension("jsonl.lock");
let meta = std::fs::metadata(&lock_path).expect("lock file must exist after first append");
let mode = meta.permissions().mode() & 0o777;
assert_eq!(
mode, 0o600,
"lock file mode is {:o}, expected 0o600 (owner-only)",
mode
);
let _ = std::fs::remove_dir_all(&dir);
}
#[cfg(all(not(target_family = "wasm"), unix))]
#[test]
fn existing_lock_file_is_re_tightened() {
use std::os::unix::fs::PermissionsExt;
let dir = std::env::temp_dir()
.join(format!("treeship-evtlog-retighten-{}", rand::random::<u32>()));
std::fs::create_dir_all(&dir).unwrap();
let lock_path = dir.join("events.jsonl.lock");
std::fs::write(&lock_path, b"").unwrap();
std::fs::set_permissions(&lock_path, std::fs::Permissions::from_mode(0o644)).unwrap();
let pre_mode = std::fs::metadata(&lock_path).unwrap().permissions().mode() & 0o777;
assert_eq!(pre_mode, 0o644, "test setup: pre-existing perms should be 0o644");
let log = EventLog::open(&dir).unwrap();
let mut e = make_event("ssn_retighten", EventType::SessionStarted);
log.append(&mut e).unwrap();
let post_mode = std::fs::metadata(&lock_path).unwrap().permissions().mode() & 0o777;
assert_eq!(
post_mode, 0o600,
"lock file should be re-tightened to 0o600 after open; got {:o}",
post_mode
);
let _ = std::fs::remove_dir_all(&dir);
}
}