Skip to main content

treeship_core/session/
event_log.rs

1//! Append-only, file-backed event log for session events.
2//!
3//! Events are stored as newline-delimited JSON (JSONL) in
4//! `.treeship/sessions/<session_id>/events.jsonl`.
5//!
6//! Concurrency model: `append()` is safe to call from multiple processes
7//! concurrently. Each call attempts to acquire an exclusive advisory lock
8//! (via `fs2::FileExt::try_lock_exclusive` -- backed by `flock(2)` on Unix
9//! and `LockFileEx` on Windows) on a sidecar `events.jsonl.lock` file in a
10//! ~500ms bounded retry loop. Under the lock, a counter sidecar
11//! `events.jsonl.count` is the authoritative source for the next
12//! `sequence_no`. The per-process AtomicU64 is retained as a hot-path
13//! optimization for non-contended use, but its value is overwritten by the
14//! on-disk counter after every locked append.
15//!
16//! Counter sidecar format (16 bytes):
17//!   - bytes 0..8:  count (u64 LE) -- number of events written to events.jsonl
18//!   - bytes 8..16: byte_size (u64 LE) -- size of events.jsonl when count was recorded
19//!
20//! The byte_size field is the crash detector. If a peer wrote events.jsonl
21//! but crashed before fsyncing the counter (or vice versa), the size on disk
22//! and the size in the counter disagree. On any mismatch we fall back to an
23//! O(N) line count and rewrite the counter -- one paid scan, then back to
24//! O(1) on every subsequent append.
25//!
26//! This bounds steady-state append cost at constant: read 16 bytes, write
27//! one JSONL line, write 16 bytes. The previous implementation re-streamed
28//! the entire events.jsonl on every append, which made hooks O(N) in
29//! session length and dominated PostToolUse latency on long sessions.
30//!
31//! Fail-closed semantics: the writer ALWAYS acquires the exclusive flock
32//! before reading the counter and writing the event. We use fs2's blocking
33//! `lock_exclusive()` (flock(2) without LOCK_NB on Unix, LockFileEx without
34//! LOCKFILE_FAIL_IMMEDIATELY on Windows) so contended writers queue rather
35//! than race. An earlier "best-effort, fall through on contention" path
36//! existed here -- it could produce duplicate `sequence_no` under hook
37//! contention (P0, audit lane F), so it was removed. Hook callers already
38//! sit under Claude Code's 60s hook timeout, which bounds wall-clock
39//! exposure to a wedged peer. Trading a bounded wait for guaranteed
40//! injective sequence numbers is the right call when receipts are the
41//! trust artifact.
42//!
43//! Lock file permissions are 0o600 (owner-only) on Unix, applied at file
44//! creation via `OpenOptionsExt::mode` and re-tightened on every open if
45//! a previous run left the file with looser perms.
46
47use std::io::{BufRead, Write};
48use std::path::{Path, PathBuf};
49use std::sync::atomic::{AtomicU64, Ordering};
50
51#[cfg(not(target_family = "wasm"))]
52use fs2::FileExt;
53
54use crate::session::event::SessionEvent;
55
56/// Error from event log operations.
57#[derive(Debug)]
58pub enum EventLogError {
59    Io(std::io::Error),
60    Json(serde_json::Error),
61}
62
63impl std::fmt::Display for EventLogError {
64    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65        match self {
66            Self::Io(e) => write!(f, "event log io: {e}"),
67            Self::Json(e) => write!(f, "event log json: {e}"),
68        }
69    }
70}
71
72impl std::error::Error for EventLogError {}
73impl From<std::io::Error> for EventLogError {
74    fn from(e: std::io::Error) -> Self { Self::Io(e) }
75}
76impl From<serde_json::Error> for EventLogError {
77    fn from(e: serde_json::Error) -> Self { Self::Json(e) }
78}
79
80/// An append-only event log backed by a JSONL file.
81pub struct EventLog {
82    path: PathBuf,
83    sequence: AtomicU64,
84}
85
86impl EventLog {
87    /// Open or create an event log for the given session directory.
88    ///
89    /// The session directory is typically `.treeship/sessions/<session_id>/`.
90    /// If the directory does not exist, it will be created.
91    ///
92    /// Initialization reads the counter sidecar in O(1) when present and
93    /// consistent with events.jsonl's byte size; falls back to an O(N) line
94    /// count (and rewrites the sidecar) when the sidecar is missing,
95    /// short-read, or stale from a crashed previous appender.
96    pub fn open(session_dir: &Path) -> Result<Self, EventLogError> {
97        std::fs::create_dir_all(session_dir)?;
98        let path = session_dir.join("events.jsonl");
99        let count = read_counter_or_recount(&path)?;
100        Ok(Self { path, sequence: AtomicU64::new(count) })
101    }
102
103    /// Append a single event to the log.
104    ///
105    /// The event's `sequence_no` is set automatically. Under contention from
106    /// multiple writer processes, the sequence number is re-derived from the
107    /// on-disk line count under an exclusive flock so two parallel writers
108    /// never collide.
109    pub fn append(&self, event: &mut SessionEvent) -> Result<(), EventLogError> {
110        self.append_locked(event)
111    }
112
113    /// Cross-process safe append: acquires an exclusive advisory lock on a
114    /// sidecar `.lock` file, re-counts events.jsonl lines, assigns sequence_no,
115    /// writes the new event, then releases the lock on drop.
116    ///
117    /// Locking is BLOCKING (`fs2::FileExt::lock_exclusive`, i.e. flock(2)
118    /// without LOCK_NB). A previous implementation polled `try_lock_exclusive`
119    /// for ~500ms and then fell through to an UNLOCKED write -- which under
120    /// hook contention (multiple PostToolUse invocations racing) could
121    /// assign duplicate `sequence_no` values to different events. That
122    /// broke the injective-sequence invariant receipts depend on, even
123    /// though local merkle verification still passed. Audit lane F (P0).
124    ///
125    /// The trade-off: a wedged peer holding the lock will now stall this
126    /// caller until the peer releases (e.g. by crashing -- flock is
127    /// released by the kernel when the holder's FD closes). Hook
128    /// invocations are bounded by Claude Code's hook timeout (60s by
129    /// default), so the worst-case wedge surfaces as a hook failure
130    /// rather than a duplicate sequence_no. That mirrors how
131    /// `journal/mod.rs` treats the journal append lock as a hard
132    /// correctness barrier rather than a soft hint.
133    ///
134    /// The locked region covers BOTH the counter read AND the file write,
135    /// so two concurrent writers cannot read the same count and append
136    /// twice with that count.
137    ///
138    /// Lock file is created mode 0o600 (owner-only) so the sidecar can
139    /// never be opened by other users on a shared machine.
140    ///
141    /// Skipped on WASM (no fs, no concurrency).
142    #[cfg(not(target_family = "wasm"))]
143    fn append_locked(&self, event: &mut SessionEvent) -> Result<(), EventLogError> {
144        // Sidecar lock file: contention here doesn't block readers of events.jsonl.
145        let lock_path = self.path.with_extension("jsonl.lock");
146
147        // Open or create the lock file. On Unix we set 0o600 explicitly so
148        // the sidecar isn't group/world readable; the umask-derived default
149        // would otherwise be permissive on some setups.
150        let lock_file = open_lock_file(&lock_path)?;
151
152        // Blocking flock. Returns when the lock is held exclusively, or
153        // propagates a real I/O error (filesystem that doesn't support
154        // flock, FD revoked, etc.). EINTR retry isn't needed -- fs2 wraps
155        // the syscall and retries internally on POSIX.
156        FileExt::lock_exclusive(&lock_file)?;
157
158        // From here until `lock_file` is dropped (end of function), we
159        // hold the exclusive flock. The counter read + event write + counter
160        // update MUST stay inside this block; any early return must still
161        // drop `lock_file`, which Rust guarantees by RAII.
162        let result = (|| -> Result<(), EventLogError> {
163            // Read sequence_no from the counter sidecar in O(1) when
164            // consistent with events.jsonl size. Stale or missing counters
165            // force a one-time O(N) rescan that also rewrites the counter,
166            // so subsequent appends return to O(1). Only the on-disk state
167            // (counter + size check) is authoritative when multiple
168            // processes are appending; the per-process AtomicU64 is a
169            // stale hint.
170            let count = read_counter_or_recount(&self.path)?;
171            event.sequence_no = count;
172
173            let mut line = serde_json::to_vec(event)?;
174            line.push(b'\n');
175
176            let mut file = std::fs::OpenOptions::new()
177                .create(true)
178                .append(true)
179                .open(&self.path)?;
180            file.write_all(&line)?;
181            file.flush()?;
182
183            // Update the counter sidecar with the new count and the new
184            // events.jsonl size, so the next append can short-circuit the
185            // line scan. Failure to update the counter is non-fatal: the
186            // next reader will detect the size mismatch and recount.
187            let new_size = file.metadata().map(|m| m.len()).unwrap_or(0);
188            let _ = write_counter(&self.path, count + 1, new_size);
189
190            // Keep the in-process AtomicU64 in sync so non-contended callers
191            // see the right value via event_count() without re-reading.
192            self.sequence.store(count + 1, Ordering::SeqCst);
193            Ok(())
194        })();
195
196        // Explicit unlock matches the journal precedent (journal/mod.rs
197        // also calls `unlock` before dropping). Drop alone would release
198        // the flock via close(2), but being explicit makes the lock
199        // window obvious to readers of this function.
200        let _ = FileExt::unlock(&lock_file);
201        result
202    }
203
204    /// WASM build: no filesystem locks available, no concurrent writers.
205    /// Falls back to the simple AtomicU64 path.
206    #[cfg(target_family = "wasm")]
207    fn append_locked(&self, event: &mut SessionEvent) -> Result<(), EventLogError> {
208        event.sequence_no = self.sequence.fetch_add(1, Ordering::SeqCst);
209
210        let mut line = serde_json::to_vec(event)?;
211        line.push(b'\n');
212
213        let mut file = std::fs::OpenOptions::new()
214            .create(true)
215            .append(true)
216            .open(&self.path)?;
217        file.write_all(&line)?;
218        file.flush()?;
219
220        Ok(())
221    }
222
223    /// Read all events from the log.
224    ///
225    /// Per-line tolerant: a single malformed line (unknown event type,
226    /// missing required field, truncated JSON from a crashed writer) is
227    /// logged to stderr and skipped, not propagated as an error. The
228    /// caller -- session close, in particular -- composes a receipt
229    /// from whatever events parse, instead of dropping every event when
230    /// any one is bad.
231    ///
232    /// Why this matters: events.jsonl is append-only and written by
233    /// hooks, daemons, SDKs, and bridges from multiple processes. A
234    /// single bad event from one buggy emitter would otherwise nuke
235    /// the entire receipt's side_effects / agent_graph / timeline.
236    /// Real-world repro: a hook that emitted events with an unknown
237    /// `type` field caused side_effects.files_written to come back
238    /// empty even though the rest of the events in the log were valid
239    /// agent.wrote_file events the aggregator would have happily
240    /// processed.
241    pub fn read_all(&self) -> Result<Vec<SessionEvent>, EventLogError> {
242        // Drop the skipped count for callers that don't carry it through.
243        // Receipt composition uses read_all_with_stats to record the
244        // count in-band on the sealed receipt -- see Codex finding #8.
245        self.read_all_with_stats().map(|(events, _skipped)| events)
246    }
247
248    /// Same as `read_all` but returns the count of malformed lines that
249    /// were skipped during parsing alongside the valid events.
250    ///
251    /// Codex adversarial review finding #8: skipping malformed events
252    /// on stderr only is silent data loss from the verifier's
253    /// perspective. The receipt gets sealed under a merkle root that
254    /// represents only the events that successfully parsed -- a
255    /// downstream consumer cannot tell whether the receipt is complete
256    /// or whether N events were silently dropped.
257    ///
258    /// session::close calls this and stores the count on
259    /// `receipt.proofs.event_log_skipped`. `treeship package verify`
260    /// surfaces it as a WARN when nonzero so the receipt's
261    /// completeness signal is visible without breaking byte-identical
262    /// re-verification of pre-existing receipts.
263    pub fn read_all_with_stats(&self) -> Result<(Vec<SessionEvent>, usize), EventLogError> {
264        if !self.path.exists() {
265            return Ok((Vec::new(), 0));
266        }
267        let file = std::fs::File::open(&self.path)?;
268        let reader = std::io::BufReader::new(file);
269        let mut events = Vec::new();
270        let mut skipped = 0usize;
271        for (idx, line) in reader.lines().enumerate() {
272            let line = line?;
273            if line.trim().is_empty() {
274                continue;
275            }
276            match serde_json::from_str::<SessionEvent>(&line) {
277                Ok(event) => events.push(event),
278                Err(e) => {
279                    skipped += 1;
280                    eprintln!(
281                        "[treeship] event_log: skipping malformed line {} in {}: {}",
282                        idx + 1,
283                        self.path.display(),
284                        e,
285                    );
286                }
287            }
288        }
289        if skipped > 0 {
290            eprintln!(
291                "[treeship] event_log: {} malformed line(s) skipped while reading {} (kept {} valid event(s))",
292                skipped,
293                self.path.display(),
294                events.len(),
295            );
296        }
297        Ok((events, skipped))
298    }
299
300    /// Return the current event count.
301    pub fn event_count(&self) -> u64 {
302        self.sequence.load(Ordering::SeqCst)
303    }
304
305    /// Return the path to the JSONL file.
306    pub fn path(&self) -> &Path {
307        &self.path
308    }
309}
310
311/// Open the sidecar lock file with owner-only permissions (0o600 on Unix).
312///
313/// On Unix the mode is set atomically via `OpenOptionsExt::mode` for newly
314/// created files. For files that already exist (e.g. left over from a
315/// prior crash or an upgrade from a pre-0.9.3 CLI that didn't tighten
316/// perms), we additionally re-chmod to 0o600 after open IF the file is
317/// owned by the current user. This is best-effort: if the chmod fails
318/// (file owned by another user, read-only filesystem, etc.) we proceed
319/// silently rather than refuse to open the lock -- the lock semantics
320/// don't depend on the perms being tight, only the privacy of the
321/// sidecar's existence does.
322///
323/// On Windows the mode concept doesn't apply; ACLs default to inheriting
324/// the parent dir's permissions, which for `.treeship/sessions/<id>/`
325/// should already be scoped to the owning user.
326#[cfg(all(not(target_family = "wasm"), unix))]
327fn open_lock_file(path: &Path) -> Result<std::fs::File, std::io::Error> {
328    use std::os::unix::fs::{MetadataExt, OpenOptionsExt, PermissionsExt};
329    use std::os::unix::io::AsRawFd;
330
331    let file = std::fs::OpenOptions::new()
332        .create(true)
333        .read(true)
334        .write(true)
335        .mode(0o600)
336        .open(path)?;
337
338    // Re-tighten if a pre-existing file has loose perms. Use `fchmod` on the
339    // open file descriptor rather than `set_permissions(path, ...)` to
340    // eliminate the TOCTOU window -- between metadata() and a path-based
341    // chmod, an attacker could swap the file. `fchmod` operates on the
342    // already-opened inode, so the target is pinned.
343    //
344    // Only act when the file is owned by us (uid match via geteuid). If
345    // fchmod fails (NFS mount with restricted metadata writes, or some
346    // filesystems without full POSIX perm support), emit a one-line
347    // stderr warning so an operator has visibility. The lock still works;
348    // only the privacy of the sidecar's existence is affected.
349    if let Ok(meta) = file.metadata() {
350        let mode = meta.permissions().mode() & 0o777;
351        let owned_by_us = meta.uid() == nix_uid();
352        if owned_by_us && mode != 0o600 {
353            let fd = file.as_raw_fd();
354            // SAFETY: fd is valid (we just opened it), 0o600 is a
355            // well-formed mode. fchmod is async-signal-safe per POSIX.
356            let rc = unsafe { libc_fchmod(fd, 0o600) };
357            if rc != 0 {
358                let err = std::io::Error::last_os_error();
359                eprintln!(
360                    "[treeship] warning: could not tighten lock file perms on {} \
361                     to 0o600 (current: 0o{:o}). Error: {}. Lock still functions; \
362                     only the privacy of the sidecar is affected. Common cause: \
363                     NFS mount or filesystem without full POSIX perm support.",
364                    path.display(), mode, err
365                );
366            }
367        }
368    }
369
370    Ok(file)
371}
372
373/// Thin FFI wrapper around libc::fchmod. Declared here so event_log.rs
374/// doesn't need a direct libc crate dep -- the symbol is available in
375/// every Unix libc binary.
376#[cfg(all(not(target_family = "wasm"), unix))]
377fn libc_fchmod(fd: i32, mode: u32) -> i32 {
378    // SAFETY: posix-standard FFI signature; `fd` validity and `mode`
379    // bounds are enforced by the caller.
380    unsafe extern "C" {
381        fn fchmod(fd: i32, mode: u32) -> i32;
382    }
383    unsafe { fchmod(fd, mode) }
384}
385
386/// Lightweight wrapper around `geteuid` so we can compare to file ownership
387/// without pulling in the `nix` crate. Uses `libc` directly (already a
388/// transitive dep via several upstream crates).
389#[cfg(all(not(target_family = "wasm"), unix))]
390fn nix_uid() -> u32 {
391    // SAFETY: geteuid is async-signal-safe and never fails per POSIX.
392    unsafe extern "C" {
393        fn geteuid() -> u32;
394    }
395    unsafe { geteuid() }
396}
397
398#[cfg(all(not(target_family = "wasm"), not(unix)))]
399fn open_lock_file(path: &Path) -> Result<std::fs::File, std::io::Error> {
400    std::fs::OpenOptions::new()
401        .create(true)
402        .read(true)
403        .write(true)
404        .open(path)
405}
406
407/// Path of the counter sidecar for a given events.jsonl path.
408fn counter_path(events_path: &Path) -> PathBuf {
409    events_path.with_extension("jsonl.count")
410}
411
412/// Read the counter sidecar if it exists and is consistent with events.jsonl.
413///
414/// Returns `Some(count)` when the sidecar's recorded byte_size matches the
415/// current events.jsonl size, and `None` otherwise (missing sidecar, short
416/// read, parse failure, or size mismatch from a crashed previous appender).
417#[cfg(not(target_family = "wasm"))]
418fn read_counter_consistent(events_path: &Path) -> Option<u64> {
419    let counter = counter_path(events_path);
420    let bytes = std::fs::read(&counter).ok()?;
421    if bytes.len() != 16 {
422        return None;
423    }
424    let count = u64::from_le_bytes(bytes[0..8].try_into().ok()?);
425    let recorded_size = u64::from_le_bytes(bytes[8..16].try_into().ok()?);
426
427    // events.jsonl may not exist yet -- counter records (0, 0) for that case.
428    let actual_size = match std::fs::metadata(events_path) {
429        Ok(m) => m.len(),
430        Err(e) if e.kind() == std::io::ErrorKind::NotFound => 0,
431        Err(_) => return None,
432    };
433    if actual_size != recorded_size {
434        return None;
435    }
436    Some(count)
437}
438
439/// Read the counter via the sidecar (O(1)) or fall back to an O(N) line
440/// scan, rewriting the sidecar on the way out. This is the recovery path
441/// after a crash that left the counter and events.jsonl out of sync.
442#[cfg(not(target_family = "wasm"))]
443fn read_counter_or_recount(events_path: &Path) -> Result<u64, EventLogError> {
444    if let Some(count) = read_counter_consistent(events_path) {
445        return Ok(count);
446    }
447    let count = if events_path.exists() {
448        let f = std::fs::File::open(events_path)?;
449        let r = std::io::BufReader::new(f);
450        r.lines().filter(|l| l.is_ok()).count() as u64
451    } else {
452        0
453    };
454    let size = std::fs::metadata(events_path).map(|m| m.len()).unwrap_or(0);
455    let _ = write_counter(events_path, count, size);
456    Ok(count)
457}
458
459/// WASM has no fs and no concurrent writers; the in-memory AtomicU64 in the
460/// EventLog is sufficient. Initialize to zero on open.
461#[cfg(target_family = "wasm")]
462fn read_counter_or_recount(_events_path: &Path) -> Result<u64, EventLogError> {
463    Ok(0)
464}
465
466/// Atomically replace the counter sidecar with the new (count, byte_size).
467///
468/// Writes to a temp file in the same directory and renames into place so a
469/// reader either sees the old 16 bytes or the new 16 bytes, never a partial
470/// write. The 0o600 perm matches the lock file -- the counter doesn't leak
471/// secrets but its existence is a session signal worth scoping to the owner.
472#[cfg(not(target_family = "wasm"))]
473fn write_counter(events_path: &Path, count: u64, byte_size: u64) -> Result<(), std::io::Error> {
474    use std::io::Write as _;
475    let counter = counter_path(events_path);
476    let dir = counter.parent().ok_or_else(|| {
477        std::io::Error::new(std::io::ErrorKind::InvalidInput, "counter path has no parent")
478    })?;
479    std::fs::create_dir_all(dir)?;
480
481    let mut buf = [0u8; 16];
482    buf[0..8].copy_from_slice(&count.to_le_bytes());
483    buf[8..16].copy_from_slice(&byte_size.to_le_bytes());
484
485    let tmp = counter.with_extension("count.tmp");
486    {
487        let mut f = open_counter_tmp(&tmp)?;
488        f.write_all(&buf)?;
489        f.sync_all()?;
490    }
491    std::fs::rename(&tmp, &counter)?;
492    Ok(())
493}
494
495#[cfg(all(not(target_family = "wasm"), unix))]
496fn open_counter_tmp(path: &Path) -> Result<std::fs::File, std::io::Error> {
497    use std::os::unix::fs::OpenOptionsExt;
498    std::fs::OpenOptions::new()
499        .create(true)
500        .write(true)
501        .truncate(true)
502        .mode(0o600)
503        .open(path)
504}
505
506#[cfg(all(not(target_family = "wasm"), not(unix)))]
507fn open_counter_tmp(path: &Path) -> Result<std::fs::File, std::io::Error> {
508    std::fs::OpenOptions::new()
509        .create(true)
510        .write(true)
511        .truncate(true)
512        .open(path)
513}
514
515#[cfg(test)]
516mod tests {
517    use super::*;
518    use crate::session::event::*;
519
520    fn make_event(session_id: &str, event_type: EventType) -> SessionEvent {
521        SessionEvent {
522            session_id: session_id.into(),
523            event_id: generate_event_id(),
524            timestamp: "2026-04-05T08:00:00Z".into(),
525            sequence_no: 0,
526            trace_id: generate_trace_id(),
527            span_id: generate_span_id(),
528            parent_span_id: None,
529            agent_id: "agent://test".into(),
530            agent_instance_id: "ai_test_1".into(),
531            agent_name: "test-agent".into(),
532            agent_role: None,
533            host_id: "host_test".into(),
534            tool_runtime_id: None,
535            event_type,
536            artifact_ref: None,
537            meta: None,
538        }
539    }
540
541    #[test]
542    fn append_and_read_back() {
543        let dir = std::env::temp_dir().join(format!("treeship-evtlog-test-{}", rand::random::<u32>()));
544        let log = EventLog::open(&dir).unwrap();
545
546        let mut e1 = make_event("ssn_001", EventType::SessionStarted);
547        let mut e2 = make_event("ssn_001", EventType::AgentStarted {
548            parent_agent_instance_id: None,
549        });
550
551        log.append(&mut e1).unwrap();
552        log.append(&mut e2).unwrap();
553
554        assert_eq!(log.event_count(), 2);
555        assert_eq!(e1.sequence_no, 0);
556        assert_eq!(e2.sequence_no, 1);
557
558        let events = log.read_all().unwrap();
559        assert_eq!(events.len(), 2);
560        assert_eq!(events[0].sequence_no, 0);
561        assert_eq!(events[1].sequence_no, 1);
562
563        let _ = std::fs::remove_dir_all(&dir);
564    }
565
566    #[test]
567    fn read_all_skips_malformed_lines() {
568        // Regression: a single malformed line in events.jsonl used to
569        // make read_all() return Err, and the caller's
570        // .unwrap_or_default() would drop EVERY event in the log. Real
571        // bug: hooks emitting events with an unknown `type` field made
572        // side_effects.files_written come back empty even though every
573        // other event in the log was a perfectly valid agent.wrote_file
574        // event. Now we skip-and-log the bad line and keep the rest.
575        let dir = std::env::temp_dir().join(format!("treeship-evtlog-malformed-{}", rand::random::<u32>()));
576        let log = EventLog::open(&dir).unwrap();
577
578        let mut good1 = make_event(
579            "ssn_001",
580            EventType::AgentWroteFile {
581                file_path: "src/before.rs".into(),
582                digest: None,
583                operation: None,
584                additions: None,
585                deletions: None,
586            },
587        );
588        let mut good2 = make_event(
589            "ssn_001",
590            EventType::AgentWroteFile {
591                file_path: "src/after.rs".into(),
592                digest: None,
593                operation: None,
594                additions: None,
595                deletions: None,
596            },
597        );
598        log.append(&mut good1).unwrap();
599        log.append(&mut good2).unwrap();
600
601        // Manually inject a malformed line between the two good ones by
602        // truncating the file and rewriting. The malformed line has an
603        // unknown event type ("custom.weird") which the closed EventType
604        // enum can't deserialize.
605        let path = log.path().to_path_buf();
606        let original = std::fs::read_to_string(&path).unwrap();
607        let mut lines: Vec<&str> = original.lines().collect();
608        lines.insert(1, r#"{"session_id":"ssn_001","event_id":"evt_bad","timestamp":"2026-04-26T00:00:00Z","sequence_no":1,"trace_id":"x","span_id":"y","agent_id":"a","agent_instance_id":"i","agent_name":"n","host_id":"h","type":"custom.weird","payload":42}"#);
609        std::fs::write(&path, lines.join("\n") + "\n").unwrap();
610
611        let events = log.read_all().unwrap();
612        assert_eq!(events.len(), 2, "expected the two valid events to come through; got {}", events.len());
613        // Confirm the valid events are the file-write events and not
614        // some default fallback.
615        let written_paths: Vec<&str> = events
616            .iter()
617            .filter_map(|e| match &e.event_type {
618                EventType::AgentWroteFile { file_path, .. } => Some(file_path.as_str()),
619                _ => None,
620            })
621            .collect();
622        assert_eq!(written_paths, vec!["src/before.rs", "src/after.rs"]);
623
624        // Codex finding #8: the count must be exposed in-band so a
625        // sealed receipt can carry the incompleteness signal. Verify
626        // read_all_with_stats reports it.
627        let (events2, skipped) = log.read_all_with_stats().unwrap();
628        assert_eq!(events2.len(), 2);
629        assert_eq!(skipped, 1, "exactly one malformed line was injected; expected skipped == 1");
630
631        let _ = std::fs::remove_dir_all(&dir);
632    }
633
634    #[test]
635    fn read_all_with_stats_reports_zero_when_clean() {
636        // No malformed lines -> skipped == 0 -> the receipt's
637        // event_log_skipped field stays default (0) and gets omitted
638        // from canonical JSON. This preserves byte-identical receipts
639        // for the common case where the event log is clean.
640        let dir = std::env::temp_dir().join(format!("treeship-evtlog-clean-{}", rand::random::<u32>()));
641        let log = EventLog::open(&dir).unwrap();
642
643        let mut e = make_event(
644            "ssn_001",
645            EventType::AgentWroteFile {
646                file_path: "x.rs".into(),
647                digest: None, operation: None, additions: None, deletions: None,
648            },
649        );
650        log.append(&mut e).unwrap();
651
652        let (events, skipped) = log.read_all_with_stats().unwrap();
653        assert_eq!(events.len(), 1);
654        assert_eq!(skipped, 0);
655
656        let _ = std::fs::remove_dir_all(&dir);
657    }
658
659    #[test]
660    fn reopen_preserves_sequence() {
661        let dir = std::env::temp_dir().join(format!("treeship-evtlog-reopen-{}", rand::random::<u32>()));
662
663        {
664            let log = EventLog::open(&dir).unwrap();
665            let mut e = make_event("ssn_001", EventType::SessionStarted);
666            log.append(&mut e).unwrap();
667        }
668
669        // Reopen
670        let log = EventLog::open(&dir).unwrap();
671        assert_eq!(log.event_count(), 1);
672
673        let mut e2 = make_event("ssn_001", EventType::AgentStarted {
674            parent_agent_instance_id: None,
675        });
676        log.append(&mut e2).unwrap();
677        assert_eq!(e2.sequence_no, 1);
678
679        let _ = std::fs::remove_dir_all(&dir);
680    }
681
682    /// Regression test for #1 in the v0.9.3 Codex adversarial review.
683    ///
684    /// Multiple `EventLog` instances opened against the same directory must
685    /// not collide on `sequence_no`. This simulates what happens when each
686    /// `treeship session event` invocation (one per PostToolUse hook firing)
687    /// creates a fresh `EventLog` on a shared events.jsonl. Without the
688    /// flock-based re-derivation in `append_locked`, every instance sees
689    /// the same on-disk count at open time and assigns duplicate sequence
690    /// numbers.
691    #[cfg(not(target_family = "wasm"))]
692    #[test]
693    fn concurrent_appends_have_unique_sequence_numbers() {
694        use std::sync::Arc;
695        use std::thread;
696
697        let dir = std::env::temp_dir()
698            .join(format!("treeship-evtlog-race-{}", rand::random::<u32>()));
699        std::fs::create_dir_all(&dir).unwrap();
700
701        const WRITERS: usize = 16;
702        let dir = Arc::new(dir);
703        let mut handles = Vec::with_capacity(WRITERS);
704
705        for _ in 0..WRITERS {
706            let dir = Arc::clone(&dir);
707            handles.push(thread::spawn(move || {
708                // Each thread opens its OWN EventLog -- mimics a separate
709                // process invocation. Without flock, all threads would see
710                // the same line count at open() time.
711                let log = EventLog::open(&dir).unwrap();
712                let mut e = make_event("ssn_race", EventType::SessionStarted);
713                log.append(&mut e).unwrap();
714                e.sequence_no
715            }));
716        }
717
718        let mut seqs: Vec<u64> = handles.into_iter().map(|h| h.join().unwrap()).collect();
719        seqs.sort();
720
721        // All sequence numbers must be unique and contiguous 0..WRITERS.
722        let expected: Vec<u64> = (0..WRITERS as u64).collect();
723        assert_eq!(seqs, expected, "sequence_no collisions under contention");
724
725        // Same invariant from the on-disk file's perspective.
726        let log = EventLog::open(&dir).unwrap();
727        let read = log.read_all().unwrap();
728        assert_eq!(read.len(), WRITERS);
729        let mut on_disk: Vec<u64> = read.iter().map(|e| e.sequence_no).collect();
730        on_disk.sort();
731        assert_eq!(on_disk, expected);
732
733        let _ = std::fs::remove_dir_all(&*dir);
734    }
735
736    /// Sidecar lock file must be created mode 0o600 (owner-only) on Unix.
737    /// Regression test for #5 in the second Codex adversarial review.
738    #[cfg(all(not(target_family = "wasm"), unix))]
739    #[test]
740    fn lock_file_has_owner_only_permissions() {
741        use std::os::unix::fs::PermissionsExt;
742
743        let dir = std::env::temp_dir()
744            .join(format!("treeship-evtlog-perms-{}", rand::random::<u32>()));
745        let log = EventLog::open(&dir).unwrap();
746
747        let mut e = make_event("ssn_perms", EventType::SessionStarted);
748        log.append(&mut e).unwrap();
749
750        let lock_path = log.path().with_extension("jsonl.lock");
751        let meta = std::fs::metadata(&lock_path).expect("lock file must exist after first append");
752        let mode = meta.permissions().mode() & 0o777;
753        assert_eq!(
754            mode, 0o600,
755            "lock file mode is {:o}, expected 0o600 (owner-only)",
756            mode
757        );
758
759        let _ = std::fs::remove_dir_all(&dir);
760    }
761
762    /// A pre-existing lock file (e.g. from a v0.9.2 era crash) with looser
763    /// permissions must be tightened to 0o600 on next `EventLog::open`.
764    /// Regression test for the third Codex adversarial review.
765    #[cfg(all(not(target_family = "wasm"), unix))]
766    #[test]
767    fn existing_lock_file_is_re_tightened() {
768        use std::os::unix::fs::PermissionsExt;
769
770        let dir = std::env::temp_dir()
771            .join(format!("treeship-evtlog-retighten-{}", rand::random::<u32>()));
772        std::fs::create_dir_all(&dir).unwrap();
773
774        // Pre-create a lock file with deliberately loose perms, simulating
775        // an upgrade from a CLI version that didn't set 0o600.
776        let lock_path = dir.join("events.jsonl.lock");
777        std::fs::write(&lock_path, b"").unwrap();
778        std::fs::set_permissions(&lock_path, std::fs::Permissions::from_mode(0o644)).unwrap();
779        let pre_mode = std::fs::metadata(&lock_path).unwrap().permissions().mode() & 0o777;
780        assert_eq!(pre_mode, 0o644, "test setup: pre-existing perms should be 0o644");
781
782        // First append after upgrade -- should re-tighten.
783        let log = EventLog::open(&dir).unwrap();
784        let mut e = make_event("ssn_retighten", EventType::SessionStarted);
785        log.append(&mut e).unwrap();
786
787        let post_mode = std::fs::metadata(&lock_path).unwrap().permissions().mode() & 0o777;
788        assert_eq!(
789            post_mode, 0o600,
790            "lock file should be re-tightened to 0o600 after open; got {:o}",
791            post_mode
792        );
793
794        let _ = std::fs::remove_dir_all(&dir);
795    }
796
797    /// Counter sidecar must exist after the first append and contain
798    /// (count=1, byte_size=size of events.jsonl). This is the happy path
799    /// that lets every subsequent append skip the O(N) rescan.
800    #[cfg(not(target_family = "wasm"))]
801    #[test]
802    fn counter_sidecar_written_after_append() {
803        let dir = std::env::temp_dir()
804            .join(format!("treeship-evtlog-counter-{}", rand::random::<u32>()));
805        let log = EventLog::open(&dir).unwrap();
806
807        let mut e = make_event("ssn_counter", EventType::SessionStarted);
808        log.append(&mut e).unwrap();
809
810        let counter = log.path().with_extension("jsonl.count");
811        let bytes = std::fs::read(&counter).expect("counter sidecar must exist after append");
812        assert_eq!(bytes.len(), 16, "counter sidecar must be 16 bytes");
813
814        let count = u64::from_le_bytes(bytes[0..8].try_into().unwrap());
815        let recorded_size = u64::from_le_bytes(bytes[8..16].try_into().unwrap());
816        let actual_size = std::fs::metadata(log.path()).unwrap().len();
817        assert_eq!(count, 1, "counter must reflect the one appended event");
818        assert_eq!(
819            recorded_size, actual_size,
820            "counter byte_size ({}) must match events.jsonl size ({})",
821            recorded_size, actual_size
822        );
823
824        let _ = std::fs::remove_dir_all(&dir);
825    }
826
827    /// A missing counter sidecar (fresh install, deleted by user, etc.)
828    /// must not break sequence_no assignment. The next append falls back
829    /// to an O(N) recount and rewrites the counter.
830    #[cfg(not(target_family = "wasm"))]
831    #[test]
832    fn counter_sidecar_recovers_when_missing() {
833        let dir = std::env::temp_dir()
834            .join(format!("treeship-evtlog-missing-counter-{}", rand::random::<u32>()));
835
836        // Append two events, then nuke the counter sidecar.
837        {
838            let log = EventLog::open(&dir).unwrap();
839            let mut e1 = make_event("ssn_x", EventType::SessionStarted);
840            let mut e2 = make_event("ssn_x", EventType::AgentStarted {
841                parent_agent_instance_id: None,
842            });
843            log.append(&mut e1).unwrap();
844            log.append(&mut e2).unwrap();
845        }
846        let counter = dir.join("events.jsonl.count");
847        std::fs::remove_file(&counter).expect("counter must exist before deletion");
848
849        // Reopen + append. The third event must get sequence_no=2 even
850        // though the counter sidecar is gone.
851        let log = EventLog::open(&dir).unwrap();
852        assert_eq!(log.event_count(), 2, "open() must recount when counter is missing");
853
854        let mut e3 = make_event("ssn_x", EventType::SessionClosed {
855            summary: None,
856            duration_ms: None,
857        });
858        log.append(&mut e3).unwrap();
859        assert_eq!(e3.sequence_no, 2);
860        assert!(counter.exists(), "counter must be rewritten after recount");
861
862        let _ = std::fs::remove_dir_all(&dir);
863    }
864
865    /// A short-read or garbage counter sidecar (corrupted, partial write,
866    /// truncated by external tool) must not be trusted. The size mismatch
867    /// path covers the "wrong content" case for a 16-byte file too.
868    #[cfg(not(target_family = "wasm"))]
869    #[test]
870    fn counter_sidecar_recovers_when_corrupt() {
871        let dir = std::env::temp_dir()
872            .join(format!("treeship-evtlog-corrupt-counter-{}", rand::random::<u32>()));
873
874        {
875            let log = EventLog::open(&dir).unwrap();
876            let mut e = make_event("ssn_corrupt", EventType::SessionStarted);
877            log.append(&mut e).unwrap();
878        }
879        // Truncate the counter to a non-16 length.
880        let counter = dir.join("events.jsonl.count");
881        std::fs::write(&counter, b"junk").unwrap();
882
883        let log = EventLog::open(&dir).unwrap();
884        assert_eq!(log.event_count(), 1, "short-read counter must be ignored, recount kicks in");
885
886        let _ = std::fs::remove_dir_all(&dir);
887    }
888
889    /// A counter that recorded the wrong byte_size (someone or something
890    /// appended to events.jsonl behind our back) must not be trusted.
891    /// This is the crash-recovery path: peer wrote events.jsonl but
892    /// crashed before fsyncing the counter, so the recorded size is stale.
893    #[cfg(not(target_family = "wasm"))]
894    #[test]
895    fn counter_sidecar_recovers_when_size_disagrees() {
896        let dir = std::env::temp_dir()
897            .join(format!("treeship-evtlog-stale-counter-{}", rand::random::<u32>()));
898
899        {
900            let log = EventLog::open(&dir).unwrap();
901            let mut e = make_event("ssn_stale", EventType::SessionStarted);
902            log.append(&mut e).unwrap();
903        }
904
905        // Simulate a crash mid-append: append one extra raw line to
906        // events.jsonl WITHOUT updating the counter. Now the counter
907        // says (1, S) but events.jsonl is (S + |line|) bytes.
908        let events_path = dir.join("events.jsonl");
909        let mut extra = make_event("ssn_stale", EventType::AgentStarted {
910            parent_agent_instance_id: None,
911        });
912        extra.sequence_no = 999; // intentionally wrong; will be overwritten on read
913        let mut line = serde_json::to_vec(&extra).unwrap();
914        line.push(b'\n');
915        let mut f = std::fs::OpenOptions::new()
916            .append(true)
917            .open(&events_path)
918            .unwrap();
919        std::io::Write::write_all(&mut f, &line).unwrap();
920        std::io::Write::flush(&mut f).unwrap();
921
922        // Re-open. The size mismatch must trigger a recount; we should see 2.
923        let log = EventLog::open(&dir).unwrap();
924        assert_eq!(
925            log.event_count(),
926            2,
927            "size mismatch must force recount, ignoring stale counter"
928        );
929
930        let _ = std::fs::remove_dir_all(&dir);
931    }
932
933    /// The counter sidecar fix must not break the cross-process race
934    /// safety established by the flock layer. This is the same shape as
935    /// `concurrent_appends_have_unique_sequence_numbers` but exists to
936    /// guard against a regression where the counter is read OUTSIDE the
937    /// lock, which would let two writers both see count=N and assign N
938    /// to two different events.
939    #[cfg(not(target_family = "wasm"))]
940    #[test]
941    fn counter_sidecar_preserves_concurrent_uniqueness() {
942        use std::sync::Arc;
943        use std::thread;
944
945        let dir = std::env::temp_dir()
946            .join(format!("treeship-evtlog-counter-race-{}", rand::random::<u32>()));
947        std::fs::create_dir_all(&dir).unwrap();
948
949        const WRITERS: usize = 16;
950        let dir = Arc::new(dir);
951        let mut handles = Vec::with_capacity(WRITERS);
952
953        for _ in 0..WRITERS {
954            let dir = Arc::clone(&dir);
955            handles.push(thread::spawn(move || {
956                let log = EventLog::open(&dir).unwrap();
957                let mut e = make_event("ssn_counter_race", EventType::SessionStarted);
958                log.append(&mut e).unwrap();
959                e.sequence_no
960            }));
961        }
962
963        let mut seqs: Vec<u64> = handles.into_iter().map(|h| h.join().unwrap()).collect();
964        seqs.sort();
965        let expected: Vec<u64> = (0..WRITERS as u64).collect();
966        assert_eq!(seqs, expected, "counter must not bypass the flock race protection");
967
968        // Counter should reflect the final state.
969        let log = EventLog::open(&dir).unwrap();
970        assert_eq!(log.event_count(), WRITERS as u64);
971
972        let _ = std::fs::remove_dir_all(&*dir);
973    }
974
975    /// Counter sidecar must be created mode 0o600 (owner-only) on Unix --
976    /// same scoping as the lock file; the existence of a counter is a
977    /// session signal that doesn't need to leak to other users.
978    #[cfg(all(not(target_family = "wasm"), unix))]
979    #[test]
980    fn counter_sidecar_has_owner_only_permissions() {
981        use std::os::unix::fs::PermissionsExt;
982
983        let dir = std::env::temp_dir()
984            .join(format!("treeship-evtlog-counter-perms-{}", rand::random::<u32>()));
985        let log = EventLog::open(&dir).unwrap();
986
987        let mut e = make_event("ssn_counter_perms", EventType::SessionStarted);
988        log.append(&mut e).unwrap();
989
990        let counter = log.path().with_extension("jsonl.count");
991        let mode = std::fs::metadata(&counter).unwrap().permissions().mode() & 0o777;
992        assert_eq!(
993            mode, 0o600,
994            "counter sidecar mode is {:o}, expected 0o600 (owner-only)",
995            mode
996        );
997
998        let _ = std::fs::remove_dir_all(&dir);
999    }
1000
1001    /// P0 regression (audit lane F): under heavy hook contention, multiple
1002    /// writers used to fall through and append without the flock when the
1003    /// 500ms poll exhausted, producing duplicate `sequence_no` values. The
1004    /// blocking `lock_exclusive` fix means every writer must hold the
1005    /// flock across both the counter read and the event write.
1006    ///
1007    /// This test spawns 8 threads, each calling `append` 25 times on its
1008    /// own `EventLog` (mimicking 8 separate hook processes each appending
1009    /// a burst of events). After the join, the on-disk log must contain
1010    /// exactly 8*25=200 events with `sequence_no` exactly the contiguous
1011    /// range 0..200, no duplicates and no gaps.
1012    #[cfg(not(target_family = "wasm"))]
1013    #[test]
1014    fn p0_no_duplicate_sequence_under_burst_contention() {
1015        use std::sync::Arc;
1016        use std::thread;
1017
1018        const THREADS: usize = 8;
1019        const PER_THREAD: usize = 25;
1020        const EXPECTED: usize = THREADS * PER_THREAD;
1021
1022        let dir = std::env::temp_dir().join(format!(
1023            "treeship-evtlog-p0-burst-{}",
1024            rand::random::<u32>()
1025        ));
1026        std::fs::create_dir_all(&dir).unwrap();
1027        let dir = Arc::new(dir);
1028
1029        let mut handles = Vec::with_capacity(THREADS);
1030        for t in 0..THREADS {
1031            let dir = Arc::clone(&dir);
1032            handles.push(thread::spawn(move || -> Vec<u64> {
1033                // Each thread opens its own EventLog -- this is the
1034                // per-process model the audit flagged: every PostToolUse
1035                // invocation is a fresh handle on the shared log.
1036                let log = EventLog::open(&dir).unwrap();
1037                let mut seen = Vec::with_capacity(PER_THREAD);
1038                for i in 0..PER_THREAD {
1039                    let mut e =
1040                        make_event(&format!("ssn_burst_{}_{}", t, i), EventType::SessionStarted);
1041                    log.append(&mut e).unwrap();
1042                    seen.push(e.sequence_no);
1043                }
1044                seen
1045            }));
1046        }
1047
1048        // Collect what each thread saw locally.
1049        let mut all_returned: Vec<u64> = handles
1050            .into_iter()
1051            .flat_map(|h| h.join().unwrap())
1052            .collect();
1053        all_returned.sort();
1054
1055        let expected: Vec<u64> = (0..EXPECTED as u64).collect();
1056        assert_eq!(
1057            all_returned, expected,
1058            "returned sequence_no values must be a contiguous range 0..{} \
1059             with no duplicates and no gaps",
1060            EXPECTED
1061        );
1062
1063        // Truth source: the on-disk log itself. Verify (a) count and
1064        // (b) sequence_no is exactly 0..EXPECTED on the persisted events.
1065        let log = EventLog::open(&dir).unwrap();
1066        let events = log.read_all().unwrap();
1067        assert_eq!(
1068            events.len(),
1069            EXPECTED,
1070            "on-disk event count must be exactly {} (got {})",
1071            EXPECTED,
1072            events.len()
1073        );
1074        let mut on_disk: Vec<u64> = events.iter().map(|e| e.sequence_no).collect();
1075        on_disk.sort();
1076        assert_eq!(
1077            on_disk, expected,
1078            "on-disk sequence_no must be a contiguous range with no duplicates and no gaps"
1079        );
1080
1081        // Counter sidecar must agree with both.
1082        assert_eq!(log.event_count(), EXPECTED as u64);
1083
1084        let _ = std::fs::remove_dir_all(&*dir);
1085    }
1086
1087    /// Companion stress test for the lock-file lifecycle. Each append
1088    /// opens the lock file, locks it, writes, unlocks, and drops the FD.
1089    /// Repeatedly creating + dropping `EventLog`s in a tight loop must
1090    /// not panic, must not leak FDs we can detect (no `EMFILE` after
1091    /// hundreds of iterations on a default ulimit), and must produce a
1092    /// log with contiguous sequence numbers.
1093    #[cfg(not(target_family = "wasm"))]
1094    #[test]
1095    fn lock_file_handles_drop_cleanly_under_churn() {
1096        let dir = std::env::temp_dir().join(format!(
1097            "treeship-evtlog-fd-churn-{}",
1098            rand::random::<u32>()
1099        ));
1100        std::fs::create_dir_all(&dir).unwrap();
1101
1102        // 500 sequential open + append + drop cycles. Far below the
1103        // default macOS/Linux soft limit (256/1024) for a sustained
1104        // leak, but plenty to catch one-per-iteration FD leaks.
1105        const ITERS: usize = 500;
1106        for i in 0..ITERS {
1107            let log = EventLog::open(&dir).unwrap();
1108            let mut e = make_event(&format!("ssn_churn_{}", i), EventType::SessionStarted);
1109            log.append(&mut e).unwrap();
1110            // log drops here -> lock_file FD already closed inside append.
1111        }
1112
1113        let log = EventLog::open(&dir).unwrap();
1114        let events = log.read_all().unwrap();
1115        assert_eq!(events.len(), ITERS);
1116        let mut seqs: Vec<u64> = events.iter().map(|e| e.sequence_no).collect();
1117        seqs.sort();
1118        let expected: Vec<u64> = (0..ITERS as u64).collect();
1119        assert_eq!(
1120            seqs, expected,
1121            "no FD leak should still produce contiguous seqs"
1122        );
1123
1124        let _ = std::fs::remove_dir_all(&dir);
1125    }
1126}