1use std::io::{BufRead, Write};
29use std::path::{Path, PathBuf};
30use std::sync::atomic::{AtomicU64, Ordering};
31
32#[cfg(not(target_family = "wasm"))]
33use fs2::FileExt;
34
35use crate::session::event::SessionEvent;
36
37#[derive(Debug)]
39pub enum EventLogError {
40 Io(std::io::Error),
41 Json(serde_json::Error),
42}
43
44impl std::fmt::Display for EventLogError {
45 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46 match self {
47 Self::Io(e) => write!(f, "event log io: {e}"),
48 Self::Json(e) => write!(f, "event log json: {e}"),
49 }
50 }
51}
52
53impl std::error::Error for EventLogError {}
54impl From<std::io::Error> for EventLogError {
55 fn from(e: std::io::Error) -> Self { Self::Io(e) }
56}
57impl From<serde_json::Error> for EventLogError {
58 fn from(e: serde_json::Error) -> Self { Self::Json(e) }
59}
60
61pub struct EventLog {
63 path: PathBuf,
64 sequence: AtomicU64,
65}
66
67impl EventLog {
68 pub fn open(session_dir: &Path) -> Result<Self, EventLogError> {
73 std::fs::create_dir_all(session_dir)?;
74 let path = session_dir.join("events.jsonl");
75
76 let sequence = if path.exists() {
78 let file = std::fs::File::open(&path)?;
79 let reader = std::io::BufReader::new(file);
80 let count = reader.lines().filter(|l| l.is_ok()).count() as u64;
81 AtomicU64::new(count)
82 } else {
83 AtomicU64::new(0)
84 };
85
86 Ok(Self { path, sequence })
87 }
88
89 pub fn append(&self, event: &mut SessionEvent) -> Result<(), EventLogError> {
96 self.append_locked(event)
97 }
98
99 #[cfg(not(target_family = "wasm"))]
115 fn append_locked(&self, event: &mut SessionEvent) -> Result<(), EventLogError> {
116 use std::time::{Duration, Instant};
117
118 let lock_path = self.path.with_extension("jsonl.lock");
120
121 let lock_file = open_lock_file(&lock_path)?;
125
126 let mut acquired = false;
131 let start = Instant::now();
132 let deadline = Duration::from_millis(500);
133 loop {
134 match lock_file.try_lock_exclusive() {
135 Ok(()) => {
136 acquired = true;
137 break;
138 }
139 Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
140 if start.elapsed() >= deadline {
141 eprintln!(
142 "[treeship] event_log: lock contention on {} \
143 exceeded {}ms; appending without sequence ordering guarantee",
144 lock_path.display(),
145 deadline.as_millis()
146 );
147 break;
148 }
149 std::thread::sleep(Duration::from_millis(10));
150 }
151 Err(e) => return Err(e.into()),
152 }
153 }
154
155 let count = if self.path.exists() {
160 let f = std::fs::File::open(&self.path)?;
161 let r = std::io::BufReader::new(f);
162 r.lines().filter(|l| l.is_ok()).count() as u64
163 } else {
164 0
165 };
166 event.sequence_no = count;
167
168 let mut line = serde_json::to_vec(event)?;
169 line.push(b'\n');
170
171 let mut file = std::fs::OpenOptions::new()
172 .create(true)
173 .append(true)
174 .open(&self.path)?;
175 file.write_all(&line)?;
176 file.flush()?;
177
178 self.sequence.store(count + 1, Ordering::SeqCst);
181
182 let _ = acquired;
184 Ok(())
186 }
187
188 #[cfg(target_family = "wasm")]
191 fn append_locked(&self, event: &mut SessionEvent) -> Result<(), EventLogError> {
192 event.sequence_no = self.sequence.fetch_add(1, Ordering::SeqCst);
193
194 let mut line = serde_json::to_vec(event)?;
195 line.push(b'\n');
196
197 let mut file = std::fs::OpenOptions::new()
198 .create(true)
199 .append(true)
200 .open(&self.path)?;
201 file.write_all(&line)?;
202 file.flush()?;
203
204 Ok(())
205 }
206
207 pub fn read_all(&self) -> Result<Vec<SessionEvent>, EventLogError> {
209 if !self.path.exists() {
210 return Ok(Vec::new());
211 }
212 let file = std::fs::File::open(&self.path)?;
213 let reader = std::io::BufReader::new(file);
214 let mut events = Vec::new();
215 for line in reader.lines() {
216 let line = line?;
217 if line.trim().is_empty() {
218 continue;
219 }
220 let event: SessionEvent = serde_json::from_str(&line)?;
221 events.push(event);
222 }
223 Ok(events)
224 }
225
226 pub fn event_count(&self) -> u64 {
228 self.sequence.load(Ordering::SeqCst)
229 }
230
231 pub fn path(&self) -> &Path {
233 &self.path
234 }
235}
236
237#[cfg(all(not(target_family = "wasm"), unix))]
253fn open_lock_file(path: &Path) -> Result<std::fs::File, std::io::Error> {
254 use std::os::unix::fs::{MetadataExt, OpenOptionsExt, PermissionsExt};
255 use std::os::unix::io::AsRawFd;
256
257 let file = std::fs::OpenOptions::new()
258 .create(true)
259 .read(true)
260 .write(true)
261 .mode(0o600)
262 .open(path)?;
263
264 if let Ok(meta) = file.metadata() {
276 let mode = meta.permissions().mode() & 0o777;
277 let owned_by_us = meta.uid() == nix_uid();
278 if owned_by_us && mode != 0o600 {
279 let fd = file.as_raw_fd();
280 let rc = unsafe { libc_fchmod(fd, 0o600) };
283 if rc != 0 {
284 let err = std::io::Error::last_os_error();
285 eprintln!(
286 "[treeship] warning: could not tighten lock file perms on {} \
287 to 0o600 (current: 0o{:o}). Error: {}. Lock still functions; \
288 only the privacy of the sidecar is affected. Common cause: \
289 NFS mount or filesystem without full POSIX perm support.",
290 path.display(), mode, err
291 );
292 }
293 }
294 }
295
296 Ok(file)
297}
298
299#[cfg(all(not(target_family = "wasm"), unix))]
303fn libc_fchmod(fd: i32, mode: u32) -> i32 {
304 unsafe extern "C" {
307 fn fchmod(fd: i32, mode: u32) -> i32;
308 }
309 unsafe { fchmod(fd, mode) }
310}
311
312#[cfg(all(not(target_family = "wasm"), unix))]
316fn nix_uid() -> u32 {
317 unsafe extern "C" {
319 fn geteuid() -> u32;
320 }
321 unsafe { geteuid() }
322}
323
324#[cfg(all(not(target_family = "wasm"), not(unix)))]
325fn open_lock_file(path: &Path) -> Result<std::fs::File, std::io::Error> {
326 std::fs::OpenOptions::new()
327 .create(true)
328 .read(true)
329 .write(true)
330 .open(path)
331}
332
333#[cfg(test)]
334mod tests {
335 use super::*;
336 use crate::session::event::*;
337
338 fn make_event(session_id: &str, event_type: EventType) -> SessionEvent {
339 SessionEvent {
340 session_id: session_id.into(),
341 event_id: generate_event_id(),
342 timestamp: "2026-04-05T08:00:00Z".into(),
343 sequence_no: 0,
344 trace_id: generate_trace_id(),
345 span_id: generate_span_id(),
346 parent_span_id: None,
347 agent_id: "agent://test".into(),
348 agent_instance_id: "ai_test_1".into(),
349 agent_name: "test-agent".into(),
350 agent_role: None,
351 host_id: "host_test".into(),
352 tool_runtime_id: None,
353 event_type,
354 artifact_ref: None,
355 meta: None,
356 }
357 }
358
359 #[test]
360 fn append_and_read_back() {
361 let dir = std::env::temp_dir().join(format!("treeship-evtlog-test-{}", rand::random::<u32>()));
362 let log = EventLog::open(&dir).unwrap();
363
364 let mut e1 = make_event("ssn_001", EventType::SessionStarted);
365 let mut e2 = make_event("ssn_001", EventType::AgentStarted {
366 parent_agent_instance_id: None,
367 });
368
369 log.append(&mut e1).unwrap();
370 log.append(&mut e2).unwrap();
371
372 assert_eq!(log.event_count(), 2);
373 assert_eq!(e1.sequence_no, 0);
374 assert_eq!(e2.sequence_no, 1);
375
376 let events = log.read_all().unwrap();
377 assert_eq!(events.len(), 2);
378 assert_eq!(events[0].sequence_no, 0);
379 assert_eq!(events[1].sequence_no, 1);
380
381 let _ = std::fs::remove_dir_all(&dir);
382 }
383
384 #[test]
385 fn reopen_preserves_sequence() {
386 let dir = std::env::temp_dir().join(format!("treeship-evtlog-reopen-{}", rand::random::<u32>()));
387
388 {
389 let log = EventLog::open(&dir).unwrap();
390 let mut e = make_event("ssn_001", EventType::SessionStarted);
391 log.append(&mut e).unwrap();
392 }
393
394 let log = EventLog::open(&dir).unwrap();
396 assert_eq!(log.event_count(), 1);
397
398 let mut e2 = make_event("ssn_001", EventType::AgentStarted {
399 parent_agent_instance_id: None,
400 });
401 log.append(&mut e2).unwrap();
402 assert_eq!(e2.sequence_no, 1);
403
404 let _ = std::fs::remove_dir_all(&dir);
405 }
406
407 #[cfg(not(target_family = "wasm"))]
417 #[test]
418 fn concurrent_appends_have_unique_sequence_numbers() {
419 use std::sync::Arc;
420 use std::thread;
421
422 let dir = std::env::temp_dir()
423 .join(format!("treeship-evtlog-race-{}", rand::random::<u32>()));
424 std::fs::create_dir_all(&dir).unwrap();
425
426 const WRITERS: usize = 16;
427 let dir = Arc::new(dir);
428 let mut handles = Vec::with_capacity(WRITERS);
429
430 for _ in 0..WRITERS {
431 let dir = Arc::clone(&dir);
432 handles.push(thread::spawn(move || {
433 let log = EventLog::open(&dir).unwrap();
437 let mut e = make_event("ssn_race", EventType::SessionStarted);
438 log.append(&mut e).unwrap();
439 e.sequence_no
440 }));
441 }
442
443 let mut seqs: Vec<u64> = handles.into_iter().map(|h| h.join().unwrap()).collect();
444 seqs.sort();
445
446 let expected: Vec<u64> = (0..WRITERS as u64).collect();
448 assert_eq!(seqs, expected, "sequence_no collisions under contention");
449
450 let log = EventLog::open(&dir).unwrap();
452 let read = log.read_all().unwrap();
453 assert_eq!(read.len(), WRITERS);
454 let mut on_disk: Vec<u64> = read.iter().map(|e| e.sequence_no).collect();
455 on_disk.sort();
456 assert_eq!(on_disk, expected);
457
458 let _ = std::fs::remove_dir_all(&*dir);
459 }
460
461 #[cfg(all(not(target_family = "wasm"), unix))]
464 #[test]
465 fn lock_file_has_owner_only_permissions() {
466 use std::os::unix::fs::PermissionsExt;
467
468 let dir = std::env::temp_dir()
469 .join(format!("treeship-evtlog-perms-{}", rand::random::<u32>()));
470 let log = EventLog::open(&dir).unwrap();
471
472 let mut e = make_event("ssn_perms", EventType::SessionStarted);
473 log.append(&mut e).unwrap();
474
475 let lock_path = log.path().with_extension("jsonl.lock");
476 let meta = std::fs::metadata(&lock_path).expect("lock file must exist after first append");
477 let mode = meta.permissions().mode() & 0o777;
478 assert_eq!(
479 mode, 0o600,
480 "lock file mode is {:o}, expected 0o600 (owner-only)",
481 mode
482 );
483
484 let _ = std::fs::remove_dir_all(&dir);
485 }
486
487 #[cfg(all(not(target_family = "wasm"), unix))]
491 #[test]
492 fn existing_lock_file_is_re_tightened() {
493 use std::os::unix::fs::PermissionsExt;
494
495 let dir = std::env::temp_dir()
496 .join(format!("treeship-evtlog-retighten-{}", rand::random::<u32>()));
497 std::fs::create_dir_all(&dir).unwrap();
498
499 let lock_path = dir.join("events.jsonl.lock");
502 std::fs::write(&lock_path, b"").unwrap();
503 std::fs::set_permissions(&lock_path, std::fs::Permissions::from_mode(0o644)).unwrap();
504 let pre_mode = std::fs::metadata(&lock_path).unwrap().permissions().mode() & 0o777;
505 assert_eq!(pre_mode, 0o644, "test setup: pre-existing perms should be 0o644");
506
507 let log = EventLog::open(&dir).unwrap();
509 let mut e = make_event("ssn_retighten", EventType::SessionStarted);
510 log.append(&mut e).unwrap();
511
512 let post_mode = std::fs::metadata(&lock_path).unwrap().permissions().mode() & 0o777;
513 assert_eq!(
514 post_mode, 0o600,
515 "lock file should be re-tightened to 0o600 after open; got {:o}",
516 post_mode
517 );
518
519 let _ = std::fs::remove_dir_all(&dir);
520 }
521}