1use std::io::{BufRead, Write};
29use std::path::{Path, PathBuf};
30use std::sync::atomic::{AtomicU64, Ordering};
31
32#[cfg(not(target_family = "wasm"))]
33use fs2::FileExt;
34
35use crate::session::event::SessionEvent;
36
37#[derive(Debug)]
39pub enum EventLogError {
40 Io(std::io::Error),
41 Json(serde_json::Error),
42}
43
44impl std::fmt::Display for EventLogError {
45 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46 match self {
47 Self::Io(e) => write!(f, "event log io: {e}"),
48 Self::Json(e) => write!(f, "event log json: {e}"),
49 }
50 }
51}
52
53impl std::error::Error for EventLogError {}
54impl From<std::io::Error> for EventLogError {
55 fn from(e: std::io::Error) -> Self { Self::Io(e) }
56}
57impl From<serde_json::Error> for EventLogError {
58 fn from(e: serde_json::Error) -> Self { Self::Json(e) }
59}
60
61pub struct EventLog {
63 path: PathBuf,
64 sequence: AtomicU64,
65}
66
67impl EventLog {
68 pub fn open(session_dir: &Path) -> Result<Self, EventLogError> {
73 std::fs::create_dir_all(session_dir)?;
74 let path = session_dir.join("events.jsonl");
75
76 let sequence = if path.exists() {
78 let file = std::fs::File::open(&path)?;
79 let reader = std::io::BufReader::new(file);
80 let count = reader.lines().filter(|l| l.is_ok()).count() as u64;
81 AtomicU64::new(count)
82 } else {
83 AtomicU64::new(0)
84 };
85
86 Ok(Self { path, sequence })
87 }
88
89 pub fn append(&self, event: &mut SessionEvent) -> Result<(), EventLogError> {
96 self.append_locked(event)
97 }
98
99 #[cfg(not(target_family = "wasm"))]
115 fn append_locked(&self, event: &mut SessionEvent) -> Result<(), EventLogError> {
116 use std::time::{Duration, Instant};
117
118 let lock_path = self.path.with_extension("jsonl.lock");
120
121 let lock_file = open_lock_file(&lock_path)?;
125
126 let mut acquired = false;
131 let start = Instant::now();
132 let deadline = Duration::from_millis(500);
133 loop {
134 match lock_file.try_lock_exclusive() {
135 Ok(()) => {
136 acquired = true;
137 break;
138 }
139 Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
140 if start.elapsed() >= deadline {
141 eprintln!(
142 "[treeship] event_log: lock contention on {} \
143 exceeded {}ms; appending without sequence ordering guarantee",
144 lock_path.display(),
145 deadline.as_millis()
146 );
147 break;
148 }
149 std::thread::sleep(Duration::from_millis(10));
150 }
151 Err(e) => return Err(e.into()),
152 }
153 }
154
155 let count = if self.path.exists() {
160 let f = std::fs::File::open(&self.path)?;
161 let r = std::io::BufReader::new(f);
162 r.lines().filter(|l| l.is_ok()).count() as u64
163 } else {
164 0
165 };
166 event.sequence_no = count;
167
168 let mut line = serde_json::to_vec(event)?;
169 line.push(b'\n');
170
171 let mut file = std::fs::OpenOptions::new()
172 .create(true)
173 .append(true)
174 .open(&self.path)?;
175 file.write_all(&line)?;
176 file.flush()?;
177
178 self.sequence.store(count + 1, Ordering::SeqCst);
181
182 let _ = acquired;
184 Ok(())
186 }
187
188 #[cfg(target_family = "wasm")]
191 fn append_locked(&self, event: &mut SessionEvent) -> Result<(), EventLogError> {
192 event.sequence_no = self.sequence.fetch_add(1, Ordering::SeqCst);
193
194 let mut line = serde_json::to_vec(event)?;
195 line.push(b'\n');
196
197 let mut file = std::fs::OpenOptions::new()
198 .create(true)
199 .append(true)
200 .open(&self.path)?;
201 file.write_all(&line)?;
202 file.flush()?;
203
204 Ok(())
205 }
206
207 pub fn read_all(&self) -> Result<Vec<SessionEvent>, EventLogError> {
209 if !self.path.exists() {
210 return Ok(Vec::new());
211 }
212 let file = std::fs::File::open(&self.path)?;
213 let reader = std::io::BufReader::new(file);
214 let mut events = Vec::new();
215 for line in reader.lines() {
216 let line = line?;
217 if line.trim().is_empty() {
218 continue;
219 }
220 let event: SessionEvent = serde_json::from_str(&line)?;
221 events.push(event);
222 }
223 Ok(events)
224 }
225
226 pub fn event_count(&self) -> u64 {
228 self.sequence.load(Ordering::SeqCst)
229 }
230
231 pub fn path(&self) -> &Path {
233 &self.path
234 }
235}
236
237#[cfg(all(not(target_family = "wasm"), unix))]
253fn open_lock_file(path: &Path) -> Result<std::fs::File, std::io::Error> {
254 use std::os::unix::fs::{MetadataExt, OpenOptionsExt, PermissionsExt};
255
256 let file = std::fs::OpenOptions::new()
257 .create(true)
258 .read(true)
259 .write(true)
260 .mode(0o600)
261 .open(path)?;
262
263 if let Ok(meta) = file.metadata() {
267 let mode = meta.permissions().mode() & 0o777;
268 let owned_by_us = meta.uid() == nix_uid();
269 if owned_by_us && mode != 0o600 {
270 let _ = std::fs::set_permissions(path, std::fs::Permissions::from_mode(0o600));
271 }
272 }
273
274 Ok(file)
275}
276
277#[cfg(all(not(target_family = "wasm"), unix))]
281fn nix_uid() -> u32 {
282 unsafe extern "C" {
284 fn geteuid() -> u32;
285 }
286 unsafe { geteuid() }
287}
288
289#[cfg(all(not(target_family = "wasm"), not(unix)))]
290fn open_lock_file(path: &Path) -> Result<std::fs::File, std::io::Error> {
291 std::fs::OpenOptions::new()
292 .create(true)
293 .read(true)
294 .write(true)
295 .open(path)
296}
297
298#[cfg(test)]
299mod tests {
300 use super::*;
301 use crate::session::event::*;
302
303 fn make_event(session_id: &str, event_type: EventType) -> SessionEvent {
304 SessionEvent {
305 session_id: session_id.into(),
306 event_id: generate_event_id(),
307 timestamp: "2026-04-05T08:00:00Z".into(),
308 sequence_no: 0,
309 trace_id: generate_trace_id(),
310 span_id: generate_span_id(),
311 parent_span_id: None,
312 agent_id: "agent://test".into(),
313 agent_instance_id: "ai_test_1".into(),
314 agent_name: "test-agent".into(),
315 agent_role: None,
316 host_id: "host_test".into(),
317 tool_runtime_id: None,
318 event_type,
319 artifact_ref: None,
320 meta: None,
321 }
322 }
323
324 #[test]
325 fn append_and_read_back() {
326 let dir = std::env::temp_dir().join(format!("treeship-evtlog-test-{}", rand::random::<u32>()));
327 let log = EventLog::open(&dir).unwrap();
328
329 let mut e1 = make_event("ssn_001", EventType::SessionStarted);
330 let mut e2 = make_event("ssn_001", EventType::AgentStarted {
331 parent_agent_instance_id: None,
332 });
333
334 log.append(&mut e1).unwrap();
335 log.append(&mut e2).unwrap();
336
337 assert_eq!(log.event_count(), 2);
338 assert_eq!(e1.sequence_no, 0);
339 assert_eq!(e2.sequence_no, 1);
340
341 let events = log.read_all().unwrap();
342 assert_eq!(events.len(), 2);
343 assert_eq!(events[0].sequence_no, 0);
344 assert_eq!(events[1].sequence_no, 1);
345
346 let _ = std::fs::remove_dir_all(&dir);
347 }
348
349 #[test]
350 fn reopen_preserves_sequence() {
351 let dir = std::env::temp_dir().join(format!("treeship-evtlog-reopen-{}", rand::random::<u32>()));
352
353 {
354 let log = EventLog::open(&dir).unwrap();
355 let mut e = make_event("ssn_001", EventType::SessionStarted);
356 log.append(&mut e).unwrap();
357 }
358
359 let log = EventLog::open(&dir).unwrap();
361 assert_eq!(log.event_count(), 1);
362
363 let mut e2 = make_event("ssn_001", EventType::AgentStarted {
364 parent_agent_instance_id: None,
365 });
366 log.append(&mut e2).unwrap();
367 assert_eq!(e2.sequence_no, 1);
368
369 let _ = std::fs::remove_dir_all(&dir);
370 }
371
372 #[cfg(not(target_family = "wasm"))]
382 #[test]
383 fn concurrent_appends_have_unique_sequence_numbers() {
384 use std::sync::Arc;
385 use std::thread;
386
387 let dir = std::env::temp_dir()
388 .join(format!("treeship-evtlog-race-{}", rand::random::<u32>()));
389 std::fs::create_dir_all(&dir).unwrap();
390
391 const WRITERS: usize = 16;
392 let dir = Arc::new(dir);
393 let mut handles = Vec::with_capacity(WRITERS);
394
395 for _ in 0..WRITERS {
396 let dir = Arc::clone(&dir);
397 handles.push(thread::spawn(move || {
398 let log = EventLog::open(&dir).unwrap();
402 let mut e = make_event("ssn_race", EventType::SessionStarted);
403 log.append(&mut e).unwrap();
404 e.sequence_no
405 }));
406 }
407
408 let mut seqs: Vec<u64> = handles.into_iter().map(|h| h.join().unwrap()).collect();
409 seqs.sort();
410
411 let expected: Vec<u64> = (0..WRITERS as u64).collect();
413 assert_eq!(seqs, expected, "sequence_no collisions under contention");
414
415 let log = EventLog::open(&dir).unwrap();
417 let read = log.read_all().unwrap();
418 assert_eq!(read.len(), WRITERS);
419 let mut on_disk: Vec<u64> = read.iter().map(|e| e.sequence_no).collect();
420 on_disk.sort();
421 assert_eq!(on_disk, expected);
422
423 let _ = std::fs::remove_dir_all(&*dir);
424 }
425
426 #[cfg(all(not(target_family = "wasm"), unix))]
429 #[test]
430 fn lock_file_has_owner_only_permissions() {
431 use std::os::unix::fs::PermissionsExt;
432
433 let dir = std::env::temp_dir()
434 .join(format!("treeship-evtlog-perms-{}", rand::random::<u32>()));
435 let log = EventLog::open(&dir).unwrap();
436
437 let mut e = make_event("ssn_perms", EventType::SessionStarted);
438 log.append(&mut e).unwrap();
439
440 let lock_path = log.path().with_extension("jsonl.lock");
441 let meta = std::fs::metadata(&lock_path).expect("lock file must exist after first append");
442 let mode = meta.permissions().mode() & 0o777;
443 assert_eq!(
444 mode, 0o600,
445 "lock file mode is {:o}, expected 0o600 (owner-only)",
446 mode
447 );
448
449 let _ = std::fs::remove_dir_all(&dir);
450 }
451
452 #[cfg(all(not(target_family = "wasm"), unix))]
456 #[test]
457 fn existing_lock_file_is_re_tightened() {
458 use std::os::unix::fs::PermissionsExt;
459
460 let dir = std::env::temp_dir()
461 .join(format!("treeship-evtlog-retighten-{}", rand::random::<u32>()));
462 std::fs::create_dir_all(&dir).unwrap();
463
464 let lock_path = dir.join("events.jsonl.lock");
467 std::fs::write(&lock_path, b"").unwrap();
468 std::fs::set_permissions(&lock_path, std::fs::Permissions::from_mode(0o644)).unwrap();
469 let pre_mode = std::fs::metadata(&lock_path).unwrap().permissions().mode() & 0o777;
470 assert_eq!(pre_mode, 0o644, "test setup: pre-existing perms should be 0o644");
471
472 let log = EventLog::open(&dir).unwrap();
474 let mut e = make_event("ssn_retighten", EventType::SessionStarted);
475 log.append(&mut e).unwrap();
476
477 let post_mode = std::fs::metadata(&lock_path).unwrap().permissions().mode() & 0o777;
478 assert_eq!(
479 post_mode, 0o600,
480 "lock file should be re-tightened to 0o600 after open; got {:o}",
481 post_mode
482 );
483
484 let _ = std::fs::remove_dir_all(&dir);
485 }
486}