treeship_core/session/event_log.rs
1//! Append-only, file-backed event log for session events.
2//!
3//! Events are stored as newline-delimited JSON (JSONL) in
4//! `.treeship/sessions/<session_id>/events.jsonl`.
5//!
6//! Concurrency model: `append()` is safe to call from multiple processes
7//! concurrently. Each call attempts to acquire an exclusive advisory lock
8//! (via `fs2::FileExt::try_lock_exclusive` -- backed by `flock(2)` on Unix
9//! and `LockFileEx` on Windows) on a sidecar `events.jsonl.lock` file in a
10//! ~500ms bounded retry loop. Under the lock, a counter sidecar
11//! `events.jsonl.count` is the authoritative source for the next
12//! `sequence_no`. The per-process AtomicU64 is retained as a hot-path
13//! optimization for non-contended use, but its value is overwritten by the
14//! on-disk counter after every locked append.
15//!
16//! Counter sidecar format (16 bytes):
17//! - bytes 0..8: count (u64 LE) -- number of events written to events.jsonl
18//! - bytes 8..16: byte_size (u64 LE) -- size of events.jsonl when count was recorded
19//!
20//! The byte_size field is the crash detector. If a peer wrote events.jsonl
21//! but crashed before fsyncing the counter (or vice versa), the size on disk
22//! and the size in the counter disagree. On any mismatch we fall back to an
23//! O(N) line count and rewrite the counter -- one paid scan, then back to
24//! O(1) on every subsequent append.
25//!
26//! This bounds steady-state append cost at constant: read 16 bytes, write
27//! one JSONL line, write 16 bytes. The previous implementation re-streamed
28//! the entire events.jsonl on every append, which made hooks O(N) in
29//! session length and dominated PostToolUse latency on long sessions.
30//!
31//! Fail-closed semantics: the writer ALWAYS acquires the exclusive flock
32//! before reading the counter and writing the event. We use fs2's blocking
33//! `lock_exclusive()` (flock(2) without LOCK_NB on Unix, LockFileEx without
34//! LOCKFILE_FAIL_IMMEDIATELY on Windows) so contended writers queue rather
35//! than race. An earlier "best-effort, fall through on contention" path
36//! existed here -- it could produce duplicate `sequence_no` under hook
37//! contention (P0, audit lane F), so it was removed. Hook callers already
38//! sit under Claude Code's 60s hook timeout, which bounds wall-clock
39//! exposure to a wedged peer. Trading a bounded wait for guaranteed
40//! injective sequence numbers is the right call when receipts are the
41//! trust artifact.
42//!
43//! Lock file permissions are 0o600 (owner-only) on Unix, applied at file
44//! creation via `OpenOptionsExt::mode` and re-tightened on every open if
45//! a previous run left the file with looser perms.
46
47use std::io::{BufRead, Write};
48use std::path::{Path, PathBuf};
49use std::sync::atomic::{AtomicU64, Ordering};
50
51#[cfg(not(target_family = "wasm"))]
52use fs2::FileExt;
53
54use crate::session::event::SessionEvent;
55
56/// Error from event log operations.
57#[derive(Debug)]
58pub enum EventLogError {
59 Io(std::io::Error),
60 Json(serde_json::Error),
61}
62
63impl std::fmt::Display for EventLogError {
64 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65 match self {
66 Self::Io(e) => write!(f, "event log io: {e}"),
67 Self::Json(e) => write!(f, "event log json: {e}"),
68 }
69 }
70}
71
72impl std::error::Error for EventLogError {}
73impl From<std::io::Error> for EventLogError {
74 fn from(e: std::io::Error) -> Self { Self::Io(e) }
75}
76impl From<serde_json::Error> for EventLogError {
77 fn from(e: serde_json::Error) -> Self { Self::Json(e) }
78}
79
80/// An append-only event log backed by a JSONL file.
81pub struct EventLog {
82 path: PathBuf,
83 sequence: AtomicU64,
84}
85
86impl EventLog {
87 /// Open or create an event log for the given session directory.
88 ///
89 /// The session directory is typically `.treeship/sessions/<session_id>/`.
90 /// If the directory does not exist, it will be created.
91 ///
92 /// Initialization reads the counter sidecar in O(1) when present and
93 /// consistent with events.jsonl's byte size; falls back to an O(N) line
94 /// count (and rewrites the sidecar) when the sidecar is missing,
95 /// short-read, or stale from a crashed previous appender.
96 pub fn open(session_dir: &Path) -> Result<Self, EventLogError> {
97 std::fs::create_dir_all(session_dir)?;
98 let path = session_dir.join("events.jsonl");
99 let count = read_counter_or_recount(&path)?;
100 Ok(Self { path, sequence: AtomicU64::new(count) })
101 }
102
103 /// Append a single event to the log.
104 ///
105 /// The event's `sequence_no` is set automatically. Under contention from
106 /// multiple writer processes, the sequence number is re-derived from the
107 /// on-disk line count under an exclusive flock so two parallel writers
108 /// never collide.
109 pub fn append(&self, event: &mut SessionEvent) -> Result<(), EventLogError> {
110 self.append_locked(event)
111 }
112
113 /// Cross-process safe append: acquires an exclusive advisory lock on a
114 /// sidecar `.lock` file, re-counts events.jsonl lines, assigns sequence_no,
115 /// writes the new event, then releases the lock on drop.
116 ///
117 /// Locking is BLOCKING (`fs2::FileExt::lock_exclusive`, i.e. flock(2)
118 /// without LOCK_NB). A previous implementation polled `try_lock_exclusive`
119 /// for ~500ms and then fell through to an UNLOCKED write -- which under
120 /// hook contention (multiple PostToolUse invocations racing) could
121 /// assign duplicate `sequence_no` values to different events. That
122 /// broke the injective-sequence invariant receipts depend on, even
123 /// though local merkle verification still passed. Audit lane F (P0).
124 ///
125 /// The trade-off: a wedged peer holding the lock will now stall this
126 /// caller until the peer releases (e.g. by crashing -- flock is
127 /// released by the kernel when the holder's FD closes). Hook
128 /// invocations are bounded by Claude Code's hook timeout (60s by
129 /// default), so the worst-case wedge surfaces as a hook failure
130 /// rather than a duplicate sequence_no. That mirrors how
131 /// `journal/mod.rs` treats the journal append lock as a hard
132 /// correctness barrier rather than a soft hint.
133 ///
134 /// The locked region covers BOTH the counter read AND the file write,
135 /// so two concurrent writers cannot read the same count and append
136 /// twice with that count.
137 ///
138 /// Lock file is created mode 0o600 (owner-only) so the sidecar can
139 /// never be opened by other users on a shared machine.
140 ///
141 /// Skipped on WASM (no fs, no concurrency).
142 #[cfg(not(target_family = "wasm"))]
143 fn append_locked(&self, event: &mut SessionEvent) -> Result<(), EventLogError> {
144 // Sidecar lock file: contention here doesn't block readers of events.jsonl.
145 let lock_path = self.path.with_extension("jsonl.lock");
146
147 // Open or create the lock file. On Unix we set 0o600 explicitly so
148 // the sidecar isn't group/world readable; the umask-derived default
149 // would otherwise be permissive on some setups.
150 let lock_file = open_lock_file(&lock_path)?;
151
152 // Blocking flock. Returns when the lock is held exclusively, or
153 // propagates a real I/O error (filesystem that doesn't support
154 // flock, FD revoked, etc.). EINTR retry isn't needed -- fs2 wraps
155 // the syscall and retries internally on POSIX.
156 FileExt::lock_exclusive(&lock_file)?;
157
158 // From here until `lock_file` is dropped (end of function), we
159 // hold the exclusive flock. The counter read + event write + counter
160 // update MUST stay inside this block; any early return must still
161 // drop `lock_file`, which Rust guarantees by RAII.
162 let result = (|| -> Result<(), EventLogError> {
163 // Read sequence_no from the counter sidecar in O(1) when
164 // consistent with events.jsonl size. Stale or missing counters
165 // force a one-time O(N) rescan that also rewrites the counter,
166 // so subsequent appends return to O(1). Only the on-disk state
167 // (counter + size check) is authoritative when multiple
168 // processes are appending; the per-process AtomicU64 is a
169 // stale hint.
170 let count = read_counter_or_recount(&self.path)?;
171 event.sequence_no = count;
172
173 let mut line = serde_json::to_vec(event)?;
174 line.push(b'\n');
175
176 let mut file = std::fs::OpenOptions::new()
177 .create(true)
178 .append(true)
179 .open(&self.path)?;
180 file.write_all(&line)?;
181 file.flush()?;
182
183 // Update the counter sidecar with the new count and the new
184 // events.jsonl size, so the next append can short-circuit the
185 // line scan. Failure to update the counter is non-fatal: the
186 // next reader will detect the size mismatch and recount.
187 let new_size = file.metadata().map(|m| m.len()).unwrap_or(0);
188 let _ = write_counter(&self.path, count + 1, new_size);
189
190 // Keep the in-process AtomicU64 in sync so non-contended callers
191 // see the right value via event_count() without re-reading.
192 self.sequence.store(count + 1, Ordering::SeqCst);
193 Ok(())
194 })();
195
196 // Explicit unlock matches the journal precedent (journal/mod.rs
197 // also calls `unlock` before dropping). Drop alone would release
198 // the flock via close(2), but being explicit makes the lock
199 // window obvious to readers of this function.
200 let _ = FileExt::unlock(&lock_file);
201 result
202 }
203
204 /// WASM build: no filesystem locks available, no concurrent writers.
205 /// Falls back to the simple AtomicU64 path.
206 #[cfg(target_family = "wasm")]
207 fn append_locked(&self, event: &mut SessionEvent) -> Result<(), EventLogError> {
208 event.sequence_no = self.sequence.fetch_add(1, Ordering::SeqCst);
209
210 let mut line = serde_json::to_vec(event)?;
211 line.push(b'\n');
212
213 let mut file = std::fs::OpenOptions::new()
214 .create(true)
215 .append(true)
216 .open(&self.path)?;
217 file.write_all(&line)?;
218 file.flush()?;
219
220 Ok(())
221 }
222
223 /// Read all events from the log.
224 ///
225 /// Per-line tolerant: a single malformed line (unknown event type,
226 /// missing required field, truncated JSON from a crashed writer) is
227 /// logged to stderr and skipped, not propagated as an error. The
228 /// caller -- session close, in particular -- composes a receipt
229 /// from whatever events parse, instead of dropping every event when
230 /// any one is bad.
231 ///
232 /// Why this matters: events.jsonl is append-only and written by
233 /// hooks, daemons, SDKs, and bridges from multiple processes. A
234 /// single bad event from one buggy emitter would otherwise nuke
235 /// the entire receipt's side_effects / agent_graph / timeline.
236 /// Real-world repro: a hook that emitted events with an unknown
237 /// `type` field caused side_effects.files_written to come back
238 /// empty even though the rest of the events in the log were valid
239 /// agent.wrote_file events the aggregator would have happily
240 /// processed.
241 pub fn read_all(&self) -> Result<Vec<SessionEvent>, EventLogError> {
242 // Drop the skipped count for callers that don't carry it through.
243 // Receipt composition uses read_all_with_stats to record the
244 // count in-band on the sealed receipt -- see Codex finding #8.
245 self.read_all_with_stats().map(|(events, _skipped)| events)
246 }
247
248 /// Same as `read_all` but returns the count of malformed lines that
249 /// were skipped during parsing alongside the valid events.
250 ///
251 /// Codex adversarial review finding #8: skipping malformed events
252 /// on stderr only is silent data loss from the verifier's
253 /// perspective. The receipt gets sealed under a merkle root that
254 /// represents only the events that successfully parsed -- a
255 /// downstream consumer cannot tell whether the receipt is complete
256 /// or whether N events were silently dropped.
257 ///
258 /// session::close calls this and stores the count on
259 /// `receipt.proofs.event_log_skipped`. `treeship package verify`
260 /// surfaces it as a WARN when nonzero so the receipt's
261 /// completeness signal is visible without breaking byte-identical
262 /// re-verification of pre-existing receipts.
263 pub fn read_all_with_stats(&self) -> Result<(Vec<SessionEvent>, usize), EventLogError> {
264 if !self.path.exists() {
265 return Ok((Vec::new(), 0));
266 }
267 let file = std::fs::File::open(&self.path)?;
268 let reader = std::io::BufReader::new(file);
269 let mut events = Vec::new();
270 let mut skipped = 0usize;
271 for (idx, line) in reader.lines().enumerate() {
272 let line = line?;
273 if line.trim().is_empty() {
274 continue;
275 }
276 match serde_json::from_str::<SessionEvent>(&line) {
277 Ok(event) => events.push(event),
278 Err(e) => {
279 skipped += 1;
280 eprintln!(
281 "[treeship] event_log: skipping malformed line {} in {}: {}",
282 idx + 1,
283 self.path.display(),
284 e,
285 );
286 }
287 }
288 }
289 if skipped > 0 {
290 eprintln!(
291 "[treeship] event_log: {} malformed line(s) skipped while reading {} (kept {} valid event(s))",
292 skipped,
293 self.path.display(),
294 events.len(),
295 );
296 }
297 Ok((events, skipped))
298 }
299
300 /// Return the current event count.
301 pub fn event_count(&self) -> u64 {
302 self.sequence.load(Ordering::SeqCst)
303 }
304
305 /// Return the path to the JSONL file.
306 pub fn path(&self) -> &Path {
307 &self.path
308 }
309}
310
311/// Open the sidecar lock file with owner-only permissions (0o600 on Unix).
312///
313/// On Unix the mode is set atomically via `OpenOptionsExt::mode` for newly
314/// created files. For files that already exist (e.g. left over from a
315/// prior crash or an upgrade from a pre-0.9.3 CLI that didn't tighten
316/// perms), we additionally re-chmod to 0o600 after open IF the file is
317/// owned by the current user. This is best-effort: if the chmod fails
318/// (file owned by another user, read-only filesystem, etc.) we proceed
319/// silently rather than refuse to open the lock -- the lock semantics
320/// don't depend on the perms being tight, only the privacy of the
321/// sidecar's existence does.
322///
323/// On Windows the mode concept doesn't apply; ACLs default to inheriting
324/// the parent dir's permissions, which for `.treeship/sessions/<id>/`
325/// should already be scoped to the owning user.
326#[cfg(all(not(target_family = "wasm"), unix))]
327fn open_lock_file(path: &Path) -> Result<std::fs::File, std::io::Error> {
328 use std::os::unix::fs::{MetadataExt, OpenOptionsExt, PermissionsExt};
329 use std::os::unix::io::AsRawFd;
330
331 let file = std::fs::OpenOptions::new()
332 .create(true)
333 .read(true)
334 .write(true)
335 .mode(0o600)
336 .open(path)?;
337
338 // Re-tighten if a pre-existing file has loose perms. Use `fchmod` on the
339 // open file descriptor rather than `set_permissions(path, ...)` to
340 // eliminate the TOCTOU window -- between metadata() and a path-based
341 // chmod, an attacker could swap the file. `fchmod` operates on the
342 // already-opened inode, so the target is pinned.
343 //
344 // Only act when the file is owned by us (uid match via geteuid). If
345 // fchmod fails (NFS mount with restricted metadata writes, or some
346 // filesystems without full POSIX perm support), emit a one-line
347 // stderr warning so an operator has visibility. The lock still works;
348 // only the privacy of the sidecar's existence is affected.
349 if let Ok(meta) = file.metadata() {
350 let mode = meta.permissions().mode() & 0o777;
351 let owned_by_us = meta.uid() == nix_uid();
352 if owned_by_us && mode != 0o600 {
353 let fd = file.as_raw_fd();
354 // SAFETY: fd is valid (we just opened it), 0o600 is a
355 // well-formed mode. fchmod is async-signal-safe per POSIX.
356 let rc = unsafe { libc_fchmod(fd, 0o600) };
357 if rc != 0 {
358 let err = std::io::Error::last_os_error();
359 eprintln!(
360 "[treeship] warning: could not tighten lock file perms on {} \
361 to 0o600 (current: 0o{:o}). Error: {}. Lock still functions; \
362 only the privacy of the sidecar is affected. Common cause: \
363 NFS mount or filesystem without full POSIX perm support.",
364 path.display(), mode, err
365 );
366 }
367 }
368 }
369
370 Ok(file)
371}
372
373/// Thin FFI wrapper around libc::fchmod. Declared here so event_log.rs
374/// doesn't need a direct libc crate dep -- the symbol is available in
375/// every Unix libc binary.
376#[cfg(all(not(target_family = "wasm"), unix))]
377fn libc_fchmod(fd: i32, mode: u32) -> i32 {
378 // SAFETY: posix-standard FFI signature; `fd` validity and `mode`
379 // bounds are enforced by the caller.
380 unsafe extern "C" {
381 fn fchmod(fd: i32, mode: u32) -> i32;
382 }
383 unsafe { fchmod(fd, mode) }
384}
385
386/// Lightweight wrapper around `geteuid` so we can compare to file ownership
387/// without pulling in the `nix` crate. Uses `libc` directly (already a
388/// transitive dep via several upstream crates).
389#[cfg(all(not(target_family = "wasm"), unix))]
390fn nix_uid() -> u32 {
391 // SAFETY: geteuid is async-signal-safe and never fails per POSIX.
392 unsafe extern "C" {
393 fn geteuid() -> u32;
394 }
395 unsafe { geteuid() }
396}
397
398#[cfg(all(not(target_family = "wasm"), not(unix)))]
399fn open_lock_file(path: &Path) -> Result<std::fs::File, std::io::Error> {
400 std::fs::OpenOptions::new()
401 .create(true)
402 .read(true)
403 .write(true)
404 .open(path)
405}
406
407/// Path of the counter sidecar for a given events.jsonl path.
408fn counter_path(events_path: &Path) -> PathBuf {
409 events_path.with_extension("jsonl.count")
410}
411
412/// Read the counter sidecar if it exists and is consistent with events.jsonl.
413///
414/// Returns `Some(count)` when the sidecar's recorded byte_size matches the
415/// current events.jsonl size, and `None` otherwise (missing sidecar, short
416/// read, parse failure, or size mismatch from a crashed previous appender).
417#[cfg(not(target_family = "wasm"))]
418fn read_counter_consistent(events_path: &Path) -> Option<u64> {
419 let counter = counter_path(events_path);
420 let bytes = std::fs::read(&counter).ok()?;
421 if bytes.len() != 16 {
422 return None;
423 }
424 let count = u64::from_le_bytes(bytes[0..8].try_into().ok()?);
425 let recorded_size = u64::from_le_bytes(bytes[8..16].try_into().ok()?);
426
427 // events.jsonl may not exist yet -- counter records (0, 0) for that case.
428 let actual_size = match std::fs::metadata(events_path) {
429 Ok(m) => m.len(),
430 Err(e) if e.kind() == std::io::ErrorKind::NotFound => 0,
431 Err(_) => return None,
432 };
433 if actual_size != recorded_size {
434 return None;
435 }
436 Some(count)
437}
438
439/// Read the counter via the sidecar (O(1)) or fall back to an O(N) line
440/// scan, rewriting the sidecar on the way out. This is the recovery path
441/// after a crash that left the counter and events.jsonl out of sync.
442#[cfg(not(target_family = "wasm"))]
443fn read_counter_or_recount(events_path: &Path) -> Result<u64, EventLogError> {
444 if let Some(count) = read_counter_consistent(events_path) {
445 return Ok(count);
446 }
447 let count = if events_path.exists() {
448 let f = std::fs::File::open(events_path)?;
449 let r = std::io::BufReader::new(f);
450 r.lines().filter(|l| l.is_ok()).count() as u64
451 } else {
452 0
453 };
454 let size = std::fs::metadata(events_path).map(|m| m.len()).unwrap_or(0);
455 let _ = write_counter(events_path, count, size);
456 Ok(count)
457}
458
459/// WASM has no fs and no concurrent writers; the in-memory AtomicU64 in the
460/// EventLog is sufficient. Initialize to zero on open.
461#[cfg(target_family = "wasm")]
462fn read_counter_or_recount(_events_path: &Path) -> Result<u64, EventLogError> {
463 Ok(0)
464}
465
466/// Atomically replace the counter sidecar with the new (count, byte_size).
467///
468/// Writes to a temp file in the same directory and renames into place so a
469/// reader either sees the old 16 bytes or the new 16 bytes, never a partial
470/// write. The 0o600 perm matches the lock file -- the counter doesn't leak
471/// secrets but its existence is a session signal worth scoping to the owner.
472#[cfg(not(target_family = "wasm"))]
473fn write_counter(events_path: &Path, count: u64, byte_size: u64) -> Result<(), std::io::Error> {
474 use std::io::Write as _;
475 let counter = counter_path(events_path);
476 let dir = counter.parent().ok_or_else(|| {
477 std::io::Error::new(std::io::ErrorKind::InvalidInput, "counter path has no parent")
478 })?;
479 std::fs::create_dir_all(dir)?;
480
481 let mut buf = [0u8; 16];
482 buf[0..8].copy_from_slice(&count.to_le_bytes());
483 buf[8..16].copy_from_slice(&byte_size.to_le_bytes());
484
485 let tmp = counter.with_extension("count.tmp");
486 {
487 let mut f = open_counter_tmp(&tmp)?;
488 f.write_all(&buf)?;
489 f.sync_all()?;
490 }
491 std::fs::rename(&tmp, &counter)?;
492 Ok(())
493}
494
495#[cfg(all(not(target_family = "wasm"), unix))]
496fn open_counter_tmp(path: &Path) -> Result<std::fs::File, std::io::Error> {
497 use std::os::unix::fs::OpenOptionsExt;
498 std::fs::OpenOptions::new()
499 .create(true)
500 .write(true)
501 .truncate(true)
502 .mode(0o600)
503 .open(path)
504}
505
506#[cfg(all(not(target_family = "wasm"), not(unix)))]
507fn open_counter_tmp(path: &Path) -> Result<std::fs::File, std::io::Error> {
508 std::fs::OpenOptions::new()
509 .create(true)
510 .write(true)
511 .truncate(true)
512 .open(path)
513}
514
515#[cfg(test)]
516mod tests {
517 use super::*;
518 use crate::session::event::*;
519
520 fn make_event(session_id: &str, event_type: EventType) -> SessionEvent {
521 SessionEvent {
522 session_id: session_id.into(),
523 event_id: generate_event_id(),
524 timestamp: "2026-04-05T08:00:00Z".into(),
525 sequence_no: 0,
526 trace_id: generate_trace_id(),
527 span_id: generate_span_id(),
528 parent_span_id: None,
529 agent_id: "agent://test".into(),
530 agent_instance_id: "ai_test_1".into(),
531 agent_name: "test-agent".into(),
532 agent_role: None,
533 host_id: "host_test".into(),
534 tool_runtime_id: None,
535 event_type,
536 artifact_ref: None,
537 meta: None,
538 }
539 }
540
541 #[test]
542 fn append_and_read_back() {
543 let dir = std::env::temp_dir().join(format!("treeship-evtlog-test-{}", rand::random::<u32>()));
544 let log = EventLog::open(&dir).unwrap();
545
546 let mut e1 = make_event("ssn_001", EventType::SessionStarted);
547 let mut e2 = make_event("ssn_001", EventType::AgentStarted {
548 parent_agent_instance_id: None,
549 });
550
551 log.append(&mut e1).unwrap();
552 log.append(&mut e2).unwrap();
553
554 assert_eq!(log.event_count(), 2);
555 assert_eq!(e1.sequence_no, 0);
556 assert_eq!(e2.sequence_no, 1);
557
558 let events = log.read_all().unwrap();
559 assert_eq!(events.len(), 2);
560 assert_eq!(events[0].sequence_no, 0);
561 assert_eq!(events[1].sequence_no, 1);
562
563 let _ = std::fs::remove_dir_all(&dir);
564 }
565
566 #[test]
567 fn read_all_skips_malformed_lines() {
568 // Regression: a single malformed line in events.jsonl used to
569 // make read_all() return Err, and the caller's
570 // .unwrap_or_default() would drop EVERY event in the log. Real
571 // bug: hooks emitting events with an unknown `type` field made
572 // side_effects.files_written come back empty even though every
573 // other event in the log was a perfectly valid agent.wrote_file
574 // event. Now we skip-and-log the bad line and keep the rest.
575 let dir = std::env::temp_dir().join(format!("treeship-evtlog-malformed-{}", rand::random::<u32>()));
576 let log = EventLog::open(&dir).unwrap();
577
578 let mut good1 = make_event(
579 "ssn_001",
580 EventType::AgentWroteFile {
581 file_path: "src/before.rs".into(),
582 digest: None,
583 operation: None,
584 additions: None,
585 deletions: None,
586 },
587 );
588 let mut good2 = make_event(
589 "ssn_001",
590 EventType::AgentWroteFile {
591 file_path: "src/after.rs".into(),
592 digest: None,
593 operation: None,
594 additions: None,
595 deletions: None,
596 },
597 );
598 log.append(&mut good1).unwrap();
599 log.append(&mut good2).unwrap();
600
601 // Manually inject a malformed line between the two good ones by
602 // truncating the file and rewriting. The malformed line has an
603 // unknown event type ("custom.weird") which the closed EventType
604 // enum can't deserialize.
605 let path = log.path().to_path_buf();
606 let original = std::fs::read_to_string(&path).unwrap();
607 let mut lines: Vec<&str> = original.lines().collect();
608 lines.insert(1, r#"{"session_id":"ssn_001","event_id":"evt_bad","timestamp":"2026-04-26T00:00:00Z","sequence_no":1,"trace_id":"x","span_id":"y","agent_id":"a","agent_instance_id":"i","agent_name":"n","host_id":"h","type":"custom.weird","payload":42}"#);
609 std::fs::write(&path, lines.join("\n") + "\n").unwrap();
610
611 let events = log.read_all().unwrap();
612 assert_eq!(events.len(), 2, "expected the two valid events to come through; got {}", events.len());
613 // Confirm the valid events are the file-write events and not
614 // some default fallback.
615 let written_paths: Vec<&str> = events
616 .iter()
617 .filter_map(|e| match &e.event_type {
618 EventType::AgentWroteFile { file_path, .. } => Some(file_path.as_str()),
619 _ => None,
620 })
621 .collect();
622 assert_eq!(written_paths, vec!["src/before.rs", "src/after.rs"]);
623
624 // Codex finding #8: the count must be exposed in-band so a
625 // sealed receipt can carry the incompleteness signal. Verify
626 // read_all_with_stats reports it.
627 let (events2, skipped) = log.read_all_with_stats().unwrap();
628 assert_eq!(events2.len(), 2);
629 assert_eq!(skipped, 1, "exactly one malformed line was injected; expected skipped == 1");
630
631 let _ = std::fs::remove_dir_all(&dir);
632 }
633
634 #[test]
635 fn read_all_with_stats_reports_zero_when_clean() {
636 // No malformed lines -> skipped == 0 -> the receipt's
637 // event_log_skipped field stays default (0) and gets omitted
638 // from canonical JSON. This preserves byte-identical receipts
639 // for the common case where the event log is clean.
640 let dir = std::env::temp_dir().join(format!("treeship-evtlog-clean-{}", rand::random::<u32>()));
641 let log = EventLog::open(&dir).unwrap();
642
643 let mut e = make_event(
644 "ssn_001",
645 EventType::AgentWroteFile {
646 file_path: "x.rs".into(),
647 digest: None, operation: None, additions: None, deletions: None,
648 },
649 );
650 log.append(&mut e).unwrap();
651
652 let (events, skipped) = log.read_all_with_stats().unwrap();
653 assert_eq!(events.len(), 1);
654 assert_eq!(skipped, 0);
655
656 let _ = std::fs::remove_dir_all(&dir);
657 }
658
659 #[test]
660 fn reopen_preserves_sequence() {
661 let dir = std::env::temp_dir().join(format!("treeship-evtlog-reopen-{}", rand::random::<u32>()));
662
663 {
664 let log = EventLog::open(&dir).unwrap();
665 let mut e = make_event("ssn_001", EventType::SessionStarted);
666 log.append(&mut e).unwrap();
667 }
668
669 // Reopen
670 let log = EventLog::open(&dir).unwrap();
671 assert_eq!(log.event_count(), 1);
672
673 let mut e2 = make_event("ssn_001", EventType::AgentStarted {
674 parent_agent_instance_id: None,
675 });
676 log.append(&mut e2).unwrap();
677 assert_eq!(e2.sequence_no, 1);
678
679 let _ = std::fs::remove_dir_all(&dir);
680 }
681
682 /// Regression test for #1 in the v0.9.3 Codex adversarial review.
683 ///
684 /// Multiple `EventLog` instances opened against the same directory must
685 /// not collide on `sequence_no`. This simulates what happens when each
686 /// `treeship session event` invocation (one per PostToolUse hook firing)
687 /// creates a fresh `EventLog` on a shared events.jsonl. Without the
688 /// flock-based re-derivation in `append_locked`, every instance sees
689 /// the same on-disk count at open time and assigns duplicate sequence
690 /// numbers.
691 #[cfg(not(target_family = "wasm"))]
692 #[test]
693 fn concurrent_appends_have_unique_sequence_numbers() {
694 use std::sync::Arc;
695 use std::thread;
696
697 let dir = std::env::temp_dir()
698 .join(format!("treeship-evtlog-race-{}", rand::random::<u32>()));
699 std::fs::create_dir_all(&dir).unwrap();
700
701 const WRITERS: usize = 16;
702 let dir = Arc::new(dir);
703 let mut handles = Vec::with_capacity(WRITERS);
704
705 for _ in 0..WRITERS {
706 let dir = Arc::clone(&dir);
707 handles.push(thread::spawn(move || {
708 // Each thread opens its OWN EventLog -- mimics a separate
709 // process invocation. Without flock, all threads would see
710 // the same line count at open() time.
711 let log = EventLog::open(&dir).unwrap();
712 let mut e = make_event("ssn_race", EventType::SessionStarted);
713 log.append(&mut e).unwrap();
714 e.sequence_no
715 }));
716 }
717
718 let mut seqs: Vec<u64> = handles.into_iter().map(|h| h.join().unwrap()).collect();
719 seqs.sort();
720
721 // All sequence numbers must be unique and contiguous 0..WRITERS.
722 let expected: Vec<u64> = (0..WRITERS as u64).collect();
723 assert_eq!(seqs, expected, "sequence_no collisions under contention");
724
725 // Same invariant from the on-disk file's perspective.
726 let log = EventLog::open(&dir).unwrap();
727 let read = log.read_all().unwrap();
728 assert_eq!(read.len(), WRITERS);
729 let mut on_disk: Vec<u64> = read.iter().map(|e| e.sequence_no).collect();
730 on_disk.sort();
731 assert_eq!(on_disk, expected);
732
733 let _ = std::fs::remove_dir_all(&*dir);
734 }
735
736 /// Sidecar lock file must be created mode 0o600 (owner-only) on Unix.
737 /// Regression test for #5 in the second Codex adversarial review.
738 #[cfg(all(not(target_family = "wasm"), unix))]
739 #[test]
740 fn lock_file_has_owner_only_permissions() {
741 use std::os::unix::fs::PermissionsExt;
742
743 let dir = std::env::temp_dir()
744 .join(format!("treeship-evtlog-perms-{}", rand::random::<u32>()));
745 let log = EventLog::open(&dir).unwrap();
746
747 let mut e = make_event("ssn_perms", EventType::SessionStarted);
748 log.append(&mut e).unwrap();
749
750 let lock_path = log.path().with_extension("jsonl.lock");
751 let meta = std::fs::metadata(&lock_path).expect("lock file must exist after first append");
752 let mode = meta.permissions().mode() & 0o777;
753 assert_eq!(
754 mode, 0o600,
755 "lock file mode is {:o}, expected 0o600 (owner-only)",
756 mode
757 );
758
759 let _ = std::fs::remove_dir_all(&dir);
760 }
761
762 /// A pre-existing lock file (e.g. from a v0.9.2 era crash) with looser
763 /// permissions must be tightened to 0o600 on next `EventLog::open`.
764 /// Regression test for the third Codex adversarial review.
765 #[cfg(all(not(target_family = "wasm"), unix))]
766 #[test]
767 fn existing_lock_file_is_re_tightened() {
768 use std::os::unix::fs::PermissionsExt;
769
770 let dir = std::env::temp_dir()
771 .join(format!("treeship-evtlog-retighten-{}", rand::random::<u32>()));
772 std::fs::create_dir_all(&dir).unwrap();
773
774 // Pre-create a lock file with deliberately loose perms, simulating
775 // an upgrade from a CLI version that didn't set 0o600.
776 let lock_path = dir.join("events.jsonl.lock");
777 std::fs::write(&lock_path, b"").unwrap();
778 std::fs::set_permissions(&lock_path, std::fs::Permissions::from_mode(0o644)).unwrap();
779 let pre_mode = std::fs::metadata(&lock_path).unwrap().permissions().mode() & 0o777;
780 assert_eq!(pre_mode, 0o644, "test setup: pre-existing perms should be 0o644");
781
782 // First append after upgrade -- should re-tighten.
783 let log = EventLog::open(&dir).unwrap();
784 let mut e = make_event("ssn_retighten", EventType::SessionStarted);
785 log.append(&mut e).unwrap();
786
787 let post_mode = std::fs::metadata(&lock_path).unwrap().permissions().mode() & 0o777;
788 assert_eq!(
789 post_mode, 0o600,
790 "lock file should be re-tightened to 0o600 after open; got {:o}",
791 post_mode
792 );
793
794 let _ = std::fs::remove_dir_all(&dir);
795 }
796
797 /// Counter sidecar must exist after the first append and contain
798 /// (count=1, byte_size=size of events.jsonl). This is the happy path
799 /// that lets every subsequent append skip the O(N) rescan.
800 #[cfg(not(target_family = "wasm"))]
801 #[test]
802 fn counter_sidecar_written_after_append() {
803 let dir = std::env::temp_dir()
804 .join(format!("treeship-evtlog-counter-{}", rand::random::<u32>()));
805 let log = EventLog::open(&dir).unwrap();
806
807 let mut e = make_event("ssn_counter", EventType::SessionStarted);
808 log.append(&mut e).unwrap();
809
810 let counter = log.path().with_extension("jsonl.count");
811 let bytes = std::fs::read(&counter).expect("counter sidecar must exist after append");
812 assert_eq!(bytes.len(), 16, "counter sidecar must be 16 bytes");
813
814 let count = u64::from_le_bytes(bytes[0..8].try_into().unwrap());
815 let recorded_size = u64::from_le_bytes(bytes[8..16].try_into().unwrap());
816 let actual_size = std::fs::metadata(log.path()).unwrap().len();
817 assert_eq!(count, 1, "counter must reflect the one appended event");
818 assert_eq!(
819 recorded_size, actual_size,
820 "counter byte_size ({}) must match events.jsonl size ({})",
821 recorded_size, actual_size
822 );
823
824 let _ = std::fs::remove_dir_all(&dir);
825 }
826
827 /// A missing counter sidecar (fresh install, deleted by user, etc.)
828 /// must not break sequence_no assignment. The next append falls back
829 /// to an O(N) recount and rewrites the counter.
830 #[cfg(not(target_family = "wasm"))]
831 #[test]
832 fn counter_sidecar_recovers_when_missing() {
833 let dir = std::env::temp_dir()
834 .join(format!("treeship-evtlog-missing-counter-{}", rand::random::<u32>()));
835
836 // Append two events, then nuke the counter sidecar.
837 {
838 let log = EventLog::open(&dir).unwrap();
839 let mut e1 = make_event("ssn_x", EventType::SessionStarted);
840 let mut e2 = make_event("ssn_x", EventType::AgentStarted {
841 parent_agent_instance_id: None,
842 });
843 log.append(&mut e1).unwrap();
844 log.append(&mut e2).unwrap();
845 }
846 let counter = dir.join("events.jsonl.count");
847 std::fs::remove_file(&counter).expect("counter must exist before deletion");
848
849 // Reopen + append. The third event must get sequence_no=2 even
850 // though the counter sidecar is gone.
851 let log = EventLog::open(&dir).unwrap();
852 assert_eq!(log.event_count(), 2, "open() must recount when counter is missing");
853
854 let mut e3 = make_event("ssn_x", EventType::SessionClosed {
855 summary: None,
856 duration_ms: None,
857 });
858 log.append(&mut e3).unwrap();
859 assert_eq!(e3.sequence_no, 2);
860 assert!(counter.exists(), "counter must be rewritten after recount");
861
862 let _ = std::fs::remove_dir_all(&dir);
863 }
864
865 /// A short-read or garbage counter sidecar (corrupted, partial write,
866 /// truncated by external tool) must not be trusted. The size mismatch
867 /// path covers the "wrong content" case for a 16-byte file too.
868 #[cfg(not(target_family = "wasm"))]
869 #[test]
870 fn counter_sidecar_recovers_when_corrupt() {
871 let dir = std::env::temp_dir()
872 .join(format!("treeship-evtlog-corrupt-counter-{}", rand::random::<u32>()));
873
874 {
875 let log = EventLog::open(&dir).unwrap();
876 let mut e = make_event("ssn_corrupt", EventType::SessionStarted);
877 log.append(&mut e).unwrap();
878 }
879 // Truncate the counter to a non-16 length.
880 let counter = dir.join("events.jsonl.count");
881 std::fs::write(&counter, b"junk").unwrap();
882
883 let log = EventLog::open(&dir).unwrap();
884 assert_eq!(log.event_count(), 1, "short-read counter must be ignored, recount kicks in");
885
886 let _ = std::fs::remove_dir_all(&dir);
887 }
888
889 /// A counter that recorded the wrong byte_size (someone or something
890 /// appended to events.jsonl behind our back) must not be trusted.
891 /// This is the crash-recovery path: peer wrote events.jsonl but
892 /// crashed before fsyncing the counter, so the recorded size is stale.
893 #[cfg(not(target_family = "wasm"))]
894 #[test]
895 fn counter_sidecar_recovers_when_size_disagrees() {
896 let dir = std::env::temp_dir()
897 .join(format!("treeship-evtlog-stale-counter-{}", rand::random::<u32>()));
898
899 {
900 let log = EventLog::open(&dir).unwrap();
901 let mut e = make_event("ssn_stale", EventType::SessionStarted);
902 log.append(&mut e).unwrap();
903 }
904
905 // Simulate a crash mid-append: append one extra raw line to
906 // events.jsonl WITHOUT updating the counter. Now the counter
907 // says (1, S) but events.jsonl is (S + |line|) bytes.
908 let events_path = dir.join("events.jsonl");
909 let mut extra = make_event("ssn_stale", EventType::AgentStarted {
910 parent_agent_instance_id: None,
911 });
912 extra.sequence_no = 999; // intentionally wrong; will be overwritten on read
913 let mut line = serde_json::to_vec(&extra).unwrap();
914 line.push(b'\n');
915 let mut f = std::fs::OpenOptions::new()
916 .append(true)
917 .open(&events_path)
918 .unwrap();
919 std::io::Write::write_all(&mut f, &line).unwrap();
920 std::io::Write::flush(&mut f).unwrap();
921
922 // Re-open. The size mismatch must trigger a recount; we should see 2.
923 let log = EventLog::open(&dir).unwrap();
924 assert_eq!(
925 log.event_count(),
926 2,
927 "size mismatch must force recount, ignoring stale counter"
928 );
929
930 let _ = std::fs::remove_dir_all(&dir);
931 }
932
933 /// The counter sidecar fix must not break the cross-process race
934 /// safety established by the flock layer. This is the same shape as
935 /// `concurrent_appends_have_unique_sequence_numbers` but exists to
936 /// guard against a regression where the counter is read OUTSIDE the
937 /// lock, which would let two writers both see count=N and assign N
938 /// to two different events.
939 #[cfg(not(target_family = "wasm"))]
940 #[test]
941 fn counter_sidecar_preserves_concurrent_uniqueness() {
942 use std::sync::Arc;
943 use std::thread;
944
945 let dir = std::env::temp_dir()
946 .join(format!("treeship-evtlog-counter-race-{}", rand::random::<u32>()));
947 std::fs::create_dir_all(&dir).unwrap();
948
949 const WRITERS: usize = 16;
950 let dir = Arc::new(dir);
951 let mut handles = Vec::with_capacity(WRITERS);
952
953 for _ in 0..WRITERS {
954 let dir = Arc::clone(&dir);
955 handles.push(thread::spawn(move || {
956 let log = EventLog::open(&dir).unwrap();
957 let mut e = make_event("ssn_counter_race", EventType::SessionStarted);
958 log.append(&mut e).unwrap();
959 e.sequence_no
960 }));
961 }
962
963 let mut seqs: Vec<u64> = handles.into_iter().map(|h| h.join().unwrap()).collect();
964 seqs.sort();
965 let expected: Vec<u64> = (0..WRITERS as u64).collect();
966 assert_eq!(seqs, expected, "counter must not bypass the flock race protection");
967
968 // Counter should reflect the final state.
969 let log = EventLog::open(&dir).unwrap();
970 assert_eq!(log.event_count(), WRITERS as u64);
971
972 let _ = std::fs::remove_dir_all(&*dir);
973 }
974
975 /// Counter sidecar must be created mode 0o600 (owner-only) on Unix --
976 /// same scoping as the lock file; the existence of a counter is a
977 /// session signal that doesn't need to leak to other users.
978 #[cfg(all(not(target_family = "wasm"), unix))]
979 #[test]
980 fn counter_sidecar_has_owner_only_permissions() {
981 use std::os::unix::fs::PermissionsExt;
982
983 let dir = std::env::temp_dir()
984 .join(format!("treeship-evtlog-counter-perms-{}", rand::random::<u32>()));
985 let log = EventLog::open(&dir).unwrap();
986
987 let mut e = make_event("ssn_counter_perms", EventType::SessionStarted);
988 log.append(&mut e).unwrap();
989
990 let counter = log.path().with_extension("jsonl.count");
991 let mode = std::fs::metadata(&counter).unwrap().permissions().mode() & 0o777;
992 assert_eq!(
993 mode, 0o600,
994 "counter sidecar mode is {:o}, expected 0o600 (owner-only)",
995 mode
996 );
997
998 let _ = std::fs::remove_dir_all(&dir);
999 }
1000
1001 /// P0 regression (audit lane F): under heavy hook contention, multiple
1002 /// writers used to fall through and append without the flock when the
1003 /// 500ms poll exhausted, producing duplicate `sequence_no` values. The
1004 /// blocking `lock_exclusive` fix means every writer must hold the
1005 /// flock across both the counter read and the event write.
1006 ///
1007 /// This test spawns 8 threads, each calling `append` 25 times on its
1008 /// own `EventLog` (mimicking 8 separate hook processes each appending
1009 /// a burst of events). After the join, the on-disk log must contain
1010 /// exactly 8*25=200 events with `sequence_no` exactly the contiguous
1011 /// range 0..200, no duplicates and no gaps.
1012 #[cfg(not(target_family = "wasm"))]
1013 #[test]
1014 fn p0_no_duplicate_sequence_under_burst_contention() {
1015 use std::sync::Arc;
1016 use std::thread;
1017
1018 const THREADS: usize = 8;
1019 const PER_THREAD: usize = 25;
1020 const EXPECTED: usize = THREADS * PER_THREAD;
1021
1022 let dir = std::env::temp_dir().join(format!(
1023 "treeship-evtlog-p0-burst-{}",
1024 rand::random::<u32>()
1025 ));
1026 std::fs::create_dir_all(&dir).unwrap();
1027 let dir = Arc::new(dir);
1028
1029 let mut handles = Vec::with_capacity(THREADS);
1030 for t in 0..THREADS {
1031 let dir = Arc::clone(&dir);
1032 handles.push(thread::spawn(move || -> Vec<u64> {
1033 // Each thread opens its own EventLog -- this is the
1034 // per-process model the audit flagged: every PostToolUse
1035 // invocation is a fresh handle on the shared log.
1036 let log = EventLog::open(&dir).unwrap();
1037 let mut seen = Vec::with_capacity(PER_THREAD);
1038 for i in 0..PER_THREAD {
1039 let mut e =
1040 make_event(&format!("ssn_burst_{}_{}", t, i), EventType::SessionStarted);
1041 log.append(&mut e).unwrap();
1042 seen.push(e.sequence_no);
1043 }
1044 seen
1045 }));
1046 }
1047
1048 // Collect what each thread saw locally.
1049 let mut all_returned: Vec<u64> = handles
1050 .into_iter()
1051 .flat_map(|h| h.join().unwrap())
1052 .collect();
1053 all_returned.sort();
1054
1055 let expected: Vec<u64> = (0..EXPECTED as u64).collect();
1056 assert_eq!(
1057 all_returned, expected,
1058 "returned sequence_no values must be a contiguous range 0..{} \
1059 with no duplicates and no gaps",
1060 EXPECTED
1061 );
1062
1063 // Truth source: the on-disk log itself. Verify (a) count and
1064 // (b) sequence_no is exactly 0..EXPECTED on the persisted events.
1065 let log = EventLog::open(&dir).unwrap();
1066 let events = log.read_all().unwrap();
1067 assert_eq!(
1068 events.len(),
1069 EXPECTED,
1070 "on-disk event count must be exactly {} (got {})",
1071 EXPECTED,
1072 events.len()
1073 );
1074 let mut on_disk: Vec<u64> = events.iter().map(|e| e.sequence_no).collect();
1075 on_disk.sort();
1076 assert_eq!(
1077 on_disk, expected,
1078 "on-disk sequence_no must be a contiguous range with no duplicates and no gaps"
1079 );
1080
1081 // Counter sidecar must agree with both.
1082 assert_eq!(log.event_count(), EXPECTED as u64);
1083
1084 let _ = std::fs::remove_dir_all(&*dir);
1085 }
1086
1087 /// Companion stress test for the lock-file lifecycle. Each append
1088 /// opens the lock file, locks it, writes, unlocks, and drops the FD.
1089 /// Repeatedly creating + dropping `EventLog`s in a tight loop must
1090 /// not panic, must not leak FDs we can detect (no `EMFILE` after
1091 /// hundreds of iterations on a default ulimit), and must produce a
1092 /// log with contiguous sequence numbers.
1093 #[cfg(not(target_family = "wasm"))]
1094 #[test]
1095 fn lock_file_handles_drop_cleanly_under_churn() {
1096 let dir = std::env::temp_dir().join(format!(
1097 "treeship-evtlog-fd-churn-{}",
1098 rand::random::<u32>()
1099 ));
1100 std::fs::create_dir_all(&dir).unwrap();
1101
1102 // 500 sequential open + append + drop cycles. Far below the
1103 // default macOS/Linux soft limit (256/1024) for a sustained
1104 // leak, but plenty to catch one-per-iteration FD leaks.
1105 const ITERS: usize = 500;
1106 for i in 0..ITERS {
1107 let log = EventLog::open(&dir).unwrap();
1108 let mut e = make_event(&format!("ssn_churn_{}", i), EventType::SessionStarted);
1109 log.append(&mut e).unwrap();
1110 // log drops here -> lock_file FD already closed inside append.
1111 }
1112
1113 let log = EventLog::open(&dir).unwrap();
1114 let events = log.read_all().unwrap();
1115 assert_eq!(events.len(), ITERS);
1116 let mut seqs: Vec<u64> = events.iter().map(|e| e.sequence_no).collect();
1117 seqs.sort();
1118 let expected: Vec<u64> = (0..ITERS as u64).collect();
1119 assert_eq!(
1120 seqs, expected,
1121 "no FD leak should still produce contiguous seqs"
1122 );
1123
1124 let _ = std::fs::remove_dir_all(&dir);
1125 }
1126}