1use std::io::{BufRead, Write};
44use std::path::{Path, PathBuf};
45use std::sync::atomic::{AtomicU64, Ordering};
46
47#[cfg(not(target_family = "wasm"))]
48use fs2::FileExt;
49
50use crate::session::event::SessionEvent;
51
52#[derive(Debug)]
54pub enum EventLogError {
55 Io(std::io::Error),
56 Json(serde_json::Error),
57}
58
59impl std::fmt::Display for EventLogError {
60 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61 match self {
62 Self::Io(e) => write!(f, "event log io: {e}"),
63 Self::Json(e) => write!(f, "event log json: {e}"),
64 }
65 }
66}
67
68impl std::error::Error for EventLogError {}
69impl From<std::io::Error> for EventLogError {
70 fn from(e: std::io::Error) -> Self { Self::Io(e) }
71}
72impl From<serde_json::Error> for EventLogError {
73 fn from(e: serde_json::Error) -> Self { Self::Json(e) }
74}
75
76pub struct EventLog {
78 path: PathBuf,
79 sequence: AtomicU64,
80}
81
82impl EventLog {
83 pub fn open(session_dir: &Path) -> Result<Self, EventLogError> {
93 std::fs::create_dir_all(session_dir)?;
94 let path = session_dir.join("events.jsonl");
95 let count = read_counter_or_recount(&path)?;
96 Ok(Self { path, sequence: AtomicU64::new(count) })
97 }
98
99 pub fn append(&self, event: &mut SessionEvent) -> Result<(), EventLogError> {
106 self.append_locked(event)
107 }
108
109 #[cfg(not(target_family = "wasm"))]
125 fn append_locked(&self, event: &mut SessionEvent) -> Result<(), EventLogError> {
126 use std::time::{Duration, Instant};
127
128 let lock_path = self.path.with_extension("jsonl.lock");
130
131 let lock_file = open_lock_file(&lock_path)?;
135
136 let mut acquired = false;
141 let start = Instant::now();
142 let deadline = Duration::from_millis(500);
143 loop {
144 match lock_file.try_lock_exclusive() {
145 Ok(()) => {
146 acquired = true;
147 break;
148 }
149 Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
150 if start.elapsed() >= deadline {
151 eprintln!(
152 "[treeship] event_log: lock contention on {} \
153 exceeded {}ms; appending without sequence ordering guarantee",
154 lock_path.display(),
155 deadline.as_millis()
156 );
157 break;
158 }
159 std::thread::sleep(Duration::from_millis(10));
160 }
161 Err(e) => return Err(e.into()),
162 }
163 }
164
165 let count = read_counter_or_recount(&self.path)?;
173 event.sequence_no = count;
174
175 let mut line = serde_json::to_vec(event)?;
176 line.push(b'\n');
177
178 let mut file = std::fs::OpenOptions::new()
179 .create(true)
180 .append(true)
181 .open(&self.path)?;
182 file.write_all(&line)?;
183 file.flush()?;
184
185 let new_size = file.metadata().map(|m| m.len()).unwrap_or(0);
190 let _ = write_counter(&self.path, count + 1, new_size);
191
192 self.sequence.store(count + 1, Ordering::SeqCst);
195
196 let _ = acquired;
198 Ok(())
200 }
201
202 #[cfg(target_family = "wasm")]
205 fn append_locked(&self, event: &mut SessionEvent) -> Result<(), EventLogError> {
206 event.sequence_no = self.sequence.fetch_add(1, Ordering::SeqCst);
207
208 let mut line = serde_json::to_vec(event)?;
209 line.push(b'\n');
210
211 let mut file = std::fs::OpenOptions::new()
212 .create(true)
213 .append(true)
214 .open(&self.path)?;
215 file.write_all(&line)?;
216 file.flush()?;
217
218 Ok(())
219 }
220
221 pub fn read_all(&self) -> Result<Vec<SessionEvent>, EventLogError> {
223 if !self.path.exists() {
224 return Ok(Vec::new());
225 }
226 let file = std::fs::File::open(&self.path)?;
227 let reader = std::io::BufReader::new(file);
228 let mut events = Vec::new();
229 for line in reader.lines() {
230 let line = line?;
231 if line.trim().is_empty() {
232 continue;
233 }
234 let event: SessionEvent = serde_json::from_str(&line)?;
235 events.push(event);
236 }
237 Ok(events)
238 }
239
240 pub fn event_count(&self) -> u64 {
242 self.sequence.load(Ordering::SeqCst)
243 }
244
245 pub fn path(&self) -> &Path {
247 &self.path
248 }
249}
250
251#[cfg(all(not(target_family = "wasm"), unix))]
267fn open_lock_file(path: &Path) -> Result<std::fs::File, std::io::Error> {
268 use std::os::unix::fs::{MetadataExt, OpenOptionsExt, PermissionsExt};
269 use std::os::unix::io::AsRawFd;
270
271 let file = std::fs::OpenOptions::new()
272 .create(true)
273 .read(true)
274 .write(true)
275 .mode(0o600)
276 .open(path)?;
277
278 if let Ok(meta) = file.metadata() {
290 let mode = meta.permissions().mode() & 0o777;
291 let owned_by_us = meta.uid() == nix_uid();
292 if owned_by_us && mode != 0o600 {
293 let fd = file.as_raw_fd();
294 let rc = unsafe { libc_fchmod(fd, 0o600) };
297 if rc != 0 {
298 let err = std::io::Error::last_os_error();
299 eprintln!(
300 "[treeship] warning: could not tighten lock file perms on {} \
301 to 0o600 (current: 0o{:o}). Error: {}. Lock still functions; \
302 only the privacy of the sidecar is affected. Common cause: \
303 NFS mount or filesystem without full POSIX perm support.",
304 path.display(), mode, err
305 );
306 }
307 }
308 }
309
310 Ok(file)
311}
312
313#[cfg(all(not(target_family = "wasm"), unix))]
317fn libc_fchmod(fd: i32, mode: u32) -> i32 {
318 unsafe extern "C" {
321 fn fchmod(fd: i32, mode: u32) -> i32;
322 }
323 unsafe { fchmod(fd, mode) }
324}
325
326#[cfg(all(not(target_family = "wasm"), unix))]
330fn nix_uid() -> u32 {
331 unsafe extern "C" {
333 fn geteuid() -> u32;
334 }
335 unsafe { geteuid() }
336}
337
338#[cfg(all(not(target_family = "wasm"), not(unix)))]
339fn open_lock_file(path: &Path) -> Result<std::fs::File, std::io::Error> {
340 std::fs::OpenOptions::new()
341 .create(true)
342 .read(true)
343 .write(true)
344 .open(path)
345}
346
347fn counter_path(events_path: &Path) -> PathBuf {
349 events_path.with_extension("jsonl.count")
350}
351
352#[cfg(not(target_family = "wasm"))]
358fn read_counter_consistent(events_path: &Path) -> Option<u64> {
359 let counter = counter_path(events_path);
360 let bytes = std::fs::read(&counter).ok()?;
361 if bytes.len() != 16 {
362 return None;
363 }
364 let count = u64::from_le_bytes(bytes[0..8].try_into().ok()?);
365 let recorded_size = u64::from_le_bytes(bytes[8..16].try_into().ok()?);
366
367 let actual_size = match std::fs::metadata(events_path) {
369 Ok(m) => m.len(),
370 Err(e) if e.kind() == std::io::ErrorKind::NotFound => 0,
371 Err(_) => return None,
372 };
373 if actual_size != recorded_size {
374 return None;
375 }
376 Some(count)
377}
378
379#[cfg(not(target_family = "wasm"))]
383fn read_counter_or_recount(events_path: &Path) -> Result<u64, EventLogError> {
384 if let Some(count) = read_counter_consistent(events_path) {
385 return Ok(count);
386 }
387 let count = if events_path.exists() {
388 let f = std::fs::File::open(events_path)?;
389 let r = std::io::BufReader::new(f);
390 r.lines().filter(|l| l.is_ok()).count() as u64
391 } else {
392 0
393 };
394 let size = std::fs::metadata(events_path).map(|m| m.len()).unwrap_or(0);
395 let _ = write_counter(events_path, count, size);
396 Ok(count)
397}
398
399#[cfg(target_family = "wasm")]
402fn read_counter_or_recount(_events_path: &Path) -> Result<u64, EventLogError> {
403 Ok(0)
404}
405
406#[cfg(not(target_family = "wasm"))]
413fn write_counter(events_path: &Path, count: u64, byte_size: u64) -> Result<(), std::io::Error> {
414 use std::io::Write as _;
415 let counter = counter_path(events_path);
416 let dir = counter.parent().ok_or_else(|| {
417 std::io::Error::new(std::io::ErrorKind::InvalidInput, "counter path has no parent")
418 })?;
419 std::fs::create_dir_all(dir)?;
420
421 let mut buf = [0u8; 16];
422 buf[0..8].copy_from_slice(&count.to_le_bytes());
423 buf[8..16].copy_from_slice(&byte_size.to_le_bytes());
424
425 let tmp = counter.with_extension("count.tmp");
426 {
427 let mut f = open_counter_tmp(&tmp)?;
428 f.write_all(&buf)?;
429 f.sync_all()?;
430 }
431 std::fs::rename(&tmp, &counter)?;
432 Ok(())
433}
434
435#[cfg(all(not(target_family = "wasm"), unix))]
436fn open_counter_tmp(path: &Path) -> Result<std::fs::File, std::io::Error> {
437 use std::os::unix::fs::OpenOptionsExt;
438 std::fs::OpenOptions::new()
439 .create(true)
440 .write(true)
441 .truncate(true)
442 .mode(0o600)
443 .open(path)
444}
445
446#[cfg(all(not(target_family = "wasm"), not(unix)))]
447fn open_counter_tmp(path: &Path) -> Result<std::fs::File, std::io::Error> {
448 std::fs::OpenOptions::new()
449 .create(true)
450 .write(true)
451 .truncate(true)
452 .open(path)
453}
454
455#[cfg(test)]
456mod tests {
457 use super::*;
458 use crate::session::event::*;
459
460 fn make_event(session_id: &str, event_type: EventType) -> SessionEvent {
461 SessionEvent {
462 session_id: session_id.into(),
463 event_id: generate_event_id(),
464 timestamp: "2026-04-05T08:00:00Z".into(),
465 sequence_no: 0,
466 trace_id: generate_trace_id(),
467 span_id: generate_span_id(),
468 parent_span_id: None,
469 agent_id: "agent://test".into(),
470 agent_instance_id: "ai_test_1".into(),
471 agent_name: "test-agent".into(),
472 agent_role: None,
473 host_id: "host_test".into(),
474 tool_runtime_id: None,
475 event_type,
476 artifact_ref: None,
477 meta: None,
478 }
479 }
480
481 #[test]
482 fn append_and_read_back() {
483 let dir = std::env::temp_dir().join(format!("treeship-evtlog-test-{}", rand::random::<u32>()));
484 let log = EventLog::open(&dir).unwrap();
485
486 let mut e1 = make_event("ssn_001", EventType::SessionStarted);
487 let mut e2 = make_event("ssn_001", EventType::AgentStarted {
488 parent_agent_instance_id: None,
489 });
490
491 log.append(&mut e1).unwrap();
492 log.append(&mut e2).unwrap();
493
494 assert_eq!(log.event_count(), 2);
495 assert_eq!(e1.sequence_no, 0);
496 assert_eq!(e2.sequence_no, 1);
497
498 let events = log.read_all().unwrap();
499 assert_eq!(events.len(), 2);
500 assert_eq!(events[0].sequence_no, 0);
501 assert_eq!(events[1].sequence_no, 1);
502
503 let _ = std::fs::remove_dir_all(&dir);
504 }
505
506 #[test]
507 fn reopen_preserves_sequence() {
508 let dir = std::env::temp_dir().join(format!("treeship-evtlog-reopen-{}", rand::random::<u32>()));
509
510 {
511 let log = EventLog::open(&dir).unwrap();
512 let mut e = make_event("ssn_001", EventType::SessionStarted);
513 log.append(&mut e).unwrap();
514 }
515
516 let log = EventLog::open(&dir).unwrap();
518 assert_eq!(log.event_count(), 1);
519
520 let mut e2 = make_event("ssn_001", EventType::AgentStarted {
521 parent_agent_instance_id: None,
522 });
523 log.append(&mut e2).unwrap();
524 assert_eq!(e2.sequence_no, 1);
525
526 let _ = std::fs::remove_dir_all(&dir);
527 }
528
529 #[cfg(not(target_family = "wasm"))]
539 #[test]
540 fn concurrent_appends_have_unique_sequence_numbers() {
541 use std::sync::Arc;
542 use std::thread;
543
544 let dir = std::env::temp_dir()
545 .join(format!("treeship-evtlog-race-{}", rand::random::<u32>()));
546 std::fs::create_dir_all(&dir).unwrap();
547
548 const WRITERS: usize = 16;
549 let dir = Arc::new(dir);
550 let mut handles = Vec::with_capacity(WRITERS);
551
552 for _ in 0..WRITERS {
553 let dir = Arc::clone(&dir);
554 handles.push(thread::spawn(move || {
555 let log = EventLog::open(&dir).unwrap();
559 let mut e = make_event("ssn_race", EventType::SessionStarted);
560 log.append(&mut e).unwrap();
561 e.sequence_no
562 }));
563 }
564
565 let mut seqs: Vec<u64> = handles.into_iter().map(|h| h.join().unwrap()).collect();
566 seqs.sort();
567
568 let expected: Vec<u64> = (0..WRITERS as u64).collect();
570 assert_eq!(seqs, expected, "sequence_no collisions under contention");
571
572 let log = EventLog::open(&dir).unwrap();
574 let read = log.read_all().unwrap();
575 assert_eq!(read.len(), WRITERS);
576 let mut on_disk: Vec<u64> = read.iter().map(|e| e.sequence_no).collect();
577 on_disk.sort();
578 assert_eq!(on_disk, expected);
579
580 let _ = std::fs::remove_dir_all(&*dir);
581 }
582
583 #[cfg(all(not(target_family = "wasm"), unix))]
586 #[test]
587 fn lock_file_has_owner_only_permissions() {
588 use std::os::unix::fs::PermissionsExt;
589
590 let dir = std::env::temp_dir()
591 .join(format!("treeship-evtlog-perms-{}", rand::random::<u32>()));
592 let log = EventLog::open(&dir).unwrap();
593
594 let mut e = make_event("ssn_perms", EventType::SessionStarted);
595 log.append(&mut e).unwrap();
596
597 let lock_path = log.path().with_extension("jsonl.lock");
598 let meta = std::fs::metadata(&lock_path).expect("lock file must exist after first append");
599 let mode = meta.permissions().mode() & 0o777;
600 assert_eq!(
601 mode, 0o600,
602 "lock file mode is {:o}, expected 0o600 (owner-only)",
603 mode
604 );
605
606 let _ = std::fs::remove_dir_all(&dir);
607 }
608
609 #[cfg(all(not(target_family = "wasm"), unix))]
613 #[test]
614 fn existing_lock_file_is_re_tightened() {
615 use std::os::unix::fs::PermissionsExt;
616
617 let dir = std::env::temp_dir()
618 .join(format!("treeship-evtlog-retighten-{}", rand::random::<u32>()));
619 std::fs::create_dir_all(&dir).unwrap();
620
621 let lock_path = dir.join("events.jsonl.lock");
624 std::fs::write(&lock_path, b"").unwrap();
625 std::fs::set_permissions(&lock_path, std::fs::Permissions::from_mode(0o644)).unwrap();
626 let pre_mode = std::fs::metadata(&lock_path).unwrap().permissions().mode() & 0o777;
627 assert_eq!(pre_mode, 0o644, "test setup: pre-existing perms should be 0o644");
628
629 let log = EventLog::open(&dir).unwrap();
631 let mut e = make_event("ssn_retighten", EventType::SessionStarted);
632 log.append(&mut e).unwrap();
633
634 let post_mode = std::fs::metadata(&lock_path).unwrap().permissions().mode() & 0o777;
635 assert_eq!(
636 post_mode, 0o600,
637 "lock file should be re-tightened to 0o600 after open; got {:o}",
638 post_mode
639 );
640
641 let _ = std::fs::remove_dir_all(&dir);
642 }
643
644 #[cfg(not(target_family = "wasm"))]
648 #[test]
649 fn counter_sidecar_written_after_append() {
650 let dir = std::env::temp_dir()
651 .join(format!("treeship-evtlog-counter-{}", rand::random::<u32>()));
652 let log = EventLog::open(&dir).unwrap();
653
654 let mut e = make_event("ssn_counter", EventType::SessionStarted);
655 log.append(&mut e).unwrap();
656
657 let counter = log.path().with_extension("jsonl.count");
658 let bytes = std::fs::read(&counter).expect("counter sidecar must exist after append");
659 assert_eq!(bytes.len(), 16, "counter sidecar must be 16 bytes");
660
661 let count = u64::from_le_bytes(bytes[0..8].try_into().unwrap());
662 let recorded_size = u64::from_le_bytes(bytes[8..16].try_into().unwrap());
663 let actual_size = std::fs::metadata(log.path()).unwrap().len();
664 assert_eq!(count, 1, "counter must reflect the one appended event");
665 assert_eq!(
666 recorded_size, actual_size,
667 "counter byte_size ({}) must match events.jsonl size ({})",
668 recorded_size, actual_size
669 );
670
671 let _ = std::fs::remove_dir_all(&dir);
672 }
673
674 #[cfg(not(target_family = "wasm"))]
678 #[test]
679 fn counter_sidecar_recovers_when_missing() {
680 let dir = std::env::temp_dir()
681 .join(format!("treeship-evtlog-missing-counter-{}", rand::random::<u32>()));
682
683 {
685 let log = EventLog::open(&dir).unwrap();
686 let mut e1 = make_event("ssn_x", EventType::SessionStarted);
687 let mut e2 = make_event("ssn_x", EventType::AgentStarted {
688 parent_agent_instance_id: None,
689 });
690 log.append(&mut e1).unwrap();
691 log.append(&mut e2).unwrap();
692 }
693 let counter = dir.join("events.jsonl.count");
694 std::fs::remove_file(&counter).expect("counter must exist before deletion");
695
696 let log = EventLog::open(&dir).unwrap();
699 assert_eq!(log.event_count(), 2, "open() must recount when counter is missing");
700
701 let mut e3 = make_event("ssn_x", EventType::SessionClosed {
702 summary: None,
703 duration_ms: None,
704 });
705 log.append(&mut e3).unwrap();
706 assert_eq!(e3.sequence_no, 2);
707 assert!(counter.exists(), "counter must be rewritten after recount");
708
709 let _ = std::fs::remove_dir_all(&dir);
710 }
711
712 #[cfg(not(target_family = "wasm"))]
716 #[test]
717 fn counter_sidecar_recovers_when_corrupt() {
718 let dir = std::env::temp_dir()
719 .join(format!("treeship-evtlog-corrupt-counter-{}", rand::random::<u32>()));
720
721 {
722 let log = EventLog::open(&dir).unwrap();
723 let mut e = make_event("ssn_corrupt", EventType::SessionStarted);
724 log.append(&mut e).unwrap();
725 }
726 let counter = dir.join("events.jsonl.count");
728 std::fs::write(&counter, b"junk").unwrap();
729
730 let log = EventLog::open(&dir).unwrap();
731 assert_eq!(log.event_count(), 1, "short-read counter must be ignored, recount kicks in");
732
733 let _ = std::fs::remove_dir_all(&dir);
734 }
735
736 #[cfg(not(target_family = "wasm"))]
741 #[test]
742 fn counter_sidecar_recovers_when_size_disagrees() {
743 let dir = std::env::temp_dir()
744 .join(format!("treeship-evtlog-stale-counter-{}", rand::random::<u32>()));
745
746 {
747 let log = EventLog::open(&dir).unwrap();
748 let mut e = make_event("ssn_stale", EventType::SessionStarted);
749 log.append(&mut e).unwrap();
750 }
751
752 let events_path = dir.join("events.jsonl");
756 let mut extra = make_event("ssn_stale", EventType::AgentStarted {
757 parent_agent_instance_id: None,
758 });
759 extra.sequence_no = 999; let mut line = serde_json::to_vec(&extra).unwrap();
761 line.push(b'\n');
762 let mut f = std::fs::OpenOptions::new()
763 .append(true)
764 .open(&events_path)
765 .unwrap();
766 std::io::Write::write_all(&mut f, &line).unwrap();
767 std::io::Write::flush(&mut f).unwrap();
768
769 let log = EventLog::open(&dir).unwrap();
771 assert_eq!(
772 log.event_count(),
773 2,
774 "size mismatch must force recount, ignoring stale counter"
775 );
776
777 let _ = std::fs::remove_dir_all(&dir);
778 }
779
780 #[cfg(not(target_family = "wasm"))]
787 #[test]
788 fn counter_sidecar_preserves_concurrent_uniqueness() {
789 use std::sync::Arc;
790 use std::thread;
791
792 let dir = std::env::temp_dir()
793 .join(format!("treeship-evtlog-counter-race-{}", rand::random::<u32>()));
794 std::fs::create_dir_all(&dir).unwrap();
795
796 const WRITERS: usize = 16;
797 let dir = Arc::new(dir);
798 let mut handles = Vec::with_capacity(WRITERS);
799
800 for _ in 0..WRITERS {
801 let dir = Arc::clone(&dir);
802 handles.push(thread::spawn(move || {
803 let log = EventLog::open(&dir).unwrap();
804 let mut e = make_event("ssn_counter_race", EventType::SessionStarted);
805 log.append(&mut e).unwrap();
806 e.sequence_no
807 }));
808 }
809
810 let mut seqs: Vec<u64> = handles.into_iter().map(|h| h.join().unwrap()).collect();
811 seqs.sort();
812 let expected: Vec<u64> = (0..WRITERS as u64).collect();
813 assert_eq!(seqs, expected, "counter must not bypass the flock race protection");
814
815 let log = EventLog::open(&dir).unwrap();
817 assert_eq!(log.event_count(), WRITERS as u64);
818
819 let _ = std::fs::remove_dir_all(&*dir);
820 }
821
822 #[cfg(all(not(target_family = "wasm"), unix))]
826 #[test]
827 fn counter_sidecar_has_owner_only_permissions() {
828 use std::os::unix::fs::PermissionsExt;
829
830 let dir = std::env::temp_dir()
831 .join(format!("treeship-evtlog-counter-perms-{}", rand::random::<u32>()));
832 let log = EventLog::open(&dir).unwrap();
833
834 let mut e = make_event("ssn_counter_perms", EventType::SessionStarted);
835 log.append(&mut e).unwrap();
836
837 let counter = log.path().with_extension("jsonl.count");
838 let mode = std::fs::metadata(&counter).unwrap().permissions().mode() & 0o777;
839 assert_eq!(
840 mode, 0o600,
841 "counter sidecar mode is {:o}, expected 0o600 (owner-only)",
842 mode
843 );
844
845 let _ = std::fs::remove_dir_all(&dir);
846 }
847}