Skip to main content

treeship_core/session/
event_log.rs

1//! Append-only, file-backed event log for session events.
2//!
3//! Events are stored as newline-delimited JSON (JSONL) in
4//! `.treeship/sessions/<session_id>/events.jsonl`.
5//!
6//! Concurrency model: `append()` is safe to call from multiple processes
7//! concurrently. Each call attempts to acquire an exclusive advisory lock
8//! (via `fs2::FileExt::try_lock_exclusive` -- backed by `flock(2)` on Unix
9//! and `LockFileEx` on Windows) on a sidecar `events.jsonl.lock` file in a
10//! ~500ms bounded retry loop. Under the lock, a counter sidecar
11//! `events.jsonl.count` is the authoritative source for the next
12//! `sequence_no`. The per-process AtomicU64 is retained as a hot-path
13//! optimization for non-contended use, but its value is overwritten by the
14//! on-disk counter after every locked append.
15//!
16//! Counter sidecar format (16 bytes):
17//!   - bytes 0..8:  count (u64 LE) -- number of events written to events.jsonl
18//!   - bytes 8..16: byte_size (u64 LE) -- size of events.jsonl when count was recorded
19//!
20//! The byte_size field is the crash detector. If a peer wrote events.jsonl
21//! but crashed before fsyncing the counter (or vice versa), the size on disk
22//! and the size in the counter disagree. On any mismatch we fall back to an
23//! O(N) line count and rewrite the counter -- one paid scan, then back to
24//! O(1) on every subsequent append.
25//!
26//! This bounds steady-state append cost at constant: read 16 bytes, write
27//! one JSONL line, write 16 bytes. The previous implementation re-streamed
28//! the entire events.jsonl on every append, which made hooks O(N) in
29//! session length and dominated PostToolUse latency on long sessions.
30//!
31//! Fail-open semantics: if a writer cannot acquire the lock within the
32//! retry window (typically because a peer crashed while holding it, or a
33//! filesystem doesn't honor flock at all), the append still proceeds
34//! without the lock and writes a stderr warning. In that degenerate case
35//! the resulting `sequence_no` is best-effort rather than guaranteed
36//! monotonic, but the event itself is preserved -- the alternative
37//! (blocking the agent forever on a wedged peer) is strictly worse.
38//!
39//! Lock file permissions are 0o600 (owner-only) on Unix, applied at file
40//! creation via `OpenOptionsExt::mode` and re-tightened on every open if
41//! a previous run left the file with looser perms.
42
43use std::io::{BufRead, Write};
44use std::path::{Path, PathBuf};
45use std::sync::atomic::{AtomicU64, Ordering};
46
47#[cfg(not(target_family = "wasm"))]
48use fs2::FileExt;
49
50use crate::session::event::SessionEvent;
51
52/// Error from event log operations.
53#[derive(Debug)]
54pub enum EventLogError {
55    Io(std::io::Error),
56    Json(serde_json::Error),
57}
58
59impl std::fmt::Display for EventLogError {
60    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61        match self {
62            Self::Io(e) => write!(f, "event log io: {e}"),
63            Self::Json(e) => write!(f, "event log json: {e}"),
64        }
65    }
66}
67
68impl std::error::Error for EventLogError {}
69impl From<std::io::Error> for EventLogError {
70    fn from(e: std::io::Error) -> Self { Self::Io(e) }
71}
72impl From<serde_json::Error> for EventLogError {
73    fn from(e: serde_json::Error) -> Self { Self::Json(e) }
74}
75
76/// An append-only event log backed by a JSONL file.
77pub struct EventLog {
78    path: PathBuf,
79    sequence: AtomicU64,
80}
81
82impl EventLog {
83    /// Open or create an event log for the given session directory.
84    ///
85    /// The session directory is typically `.treeship/sessions/<session_id>/`.
86    /// If the directory does not exist, it will be created.
87    ///
88    /// Initialization reads the counter sidecar in O(1) when present and
89    /// consistent with events.jsonl's byte size; falls back to an O(N) line
90    /// count (and rewrites the sidecar) when the sidecar is missing,
91    /// short-read, or stale from a crashed previous appender.
92    pub fn open(session_dir: &Path) -> Result<Self, EventLogError> {
93        std::fs::create_dir_all(session_dir)?;
94        let path = session_dir.join("events.jsonl");
95        let count = read_counter_or_recount(&path)?;
96        Ok(Self { path, sequence: AtomicU64::new(count) })
97    }
98
99    /// Append a single event to the log.
100    ///
101    /// The event's `sequence_no` is set automatically. Under contention from
102    /// multiple writer processes, the sequence number is re-derived from the
103    /// on-disk line count under an exclusive flock so two parallel writers
104    /// never collide.
105    pub fn append(&self, event: &mut SessionEvent) -> Result<(), EventLogError> {
106        self.append_locked(event)
107    }
108
109    /// Cross-process safe append: acquires an exclusive advisory lock on a
110    /// sidecar `.lock` file, re-counts events.jsonl lines, assigns sequence_no,
111    /// writes the new event, then releases the lock on drop.
112    ///
113    /// Lock acquisition is bounded: tries to acquire for up to ~500ms via
114    /// `try_lock_exclusive` poll, then falls back to an unlocked append
115    /// with a stderr warning. A wedged or crashed writer must NOT hang
116    /// hook-driven invocations forever (PostToolUse hooks running per
117    /// tool call would freeze the agent). Better to lose strict
118    /// sequence_no monotonicity in the rare wedge case than to deadlock.
119    ///
120    /// Lock file is created mode 0o600 (owner-only) so the sidecar can
121    /// never be opened by other users on a shared machine.
122    ///
123    /// Skipped on WASM (no fs, no concurrency).
124    #[cfg(not(target_family = "wasm"))]
125    fn append_locked(&self, event: &mut SessionEvent) -> Result<(), EventLogError> {
126        use std::time::{Duration, Instant};
127
128        // Sidecar lock file: contention here doesn't block readers of events.jsonl.
129        let lock_path = self.path.with_extension("jsonl.lock");
130
131        // Open or create the lock file. On Unix we set 0o600 explicitly so
132        // the sidecar isn't group/world readable; the umask-derived default
133        // would otherwise be permissive on some setups.
134        let lock_file = open_lock_file(&lock_path)?;
135
136        // Bounded retry. With 16 parallel writers the worst case is a
137        // queue of N short-held locks; 500ms is plenty. If we fail to
138        // acquire in that window something is wedged -- fall through and
139        // append without ordering rather than freezing the caller.
140        let mut acquired = false;
141        let start = Instant::now();
142        let deadline = Duration::from_millis(500);
143        loop {
144            match lock_file.try_lock_exclusive() {
145                Ok(()) => {
146                    acquired = true;
147                    break;
148                }
149                Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
150                    if start.elapsed() >= deadline {
151                        eprintln!(
152                            "[treeship] event_log: lock contention on {} \
153                             exceeded {}ms; appending without sequence ordering guarantee",
154                            lock_path.display(),
155                            deadline.as_millis()
156                        );
157                        break;
158                    }
159                    std::thread::sleep(Duration::from_millis(10));
160                }
161                Err(e) => return Err(e.into()),
162            }
163        }
164
165        // Under the lock (or unlocked fallback): read sequence_no from the
166        // counter sidecar in O(1) when consistent with events.jsonl size.
167        // Stale or missing counters force a one-time O(N) rescan that also
168        // rewrites the counter, so subsequent appends return to O(1). Only
169        // the on-disk state (counter + size check) is authoritative when
170        // multiple processes are appending; the per-process AtomicU64 is a
171        // stale hint.
172        let count = read_counter_or_recount(&self.path)?;
173        event.sequence_no = count;
174
175        let mut line = serde_json::to_vec(event)?;
176        line.push(b'\n');
177
178        let mut file = std::fs::OpenOptions::new()
179            .create(true)
180            .append(true)
181            .open(&self.path)?;
182        file.write_all(&line)?;
183        file.flush()?;
184
185        // Update the counter sidecar with the new count and the new
186        // events.jsonl size, so the next append can short-circuit the line
187        // scan. Failure to update the counter is non-fatal: the next reader
188        // will detect the size mismatch and recount.
189        let new_size = file.metadata().map(|m| m.len()).unwrap_or(0);
190        let _ = write_counter(&self.path, count + 1, new_size);
191
192        // Keep the in-process AtomicU64 in sync so non-contended callers
193        // see the right value via event_count() without re-reading.
194        self.sequence.store(count + 1, Ordering::SeqCst);
195
196        // Suppress the unused-variable warning on the unlock-fallback path.
197        let _ = acquired;
198        // lock_file drops here -> flock released (no-op if we never acquired).
199        Ok(())
200    }
201
202    /// WASM build: no filesystem locks available, no concurrent writers.
203    /// Falls back to the simple AtomicU64 path.
204    #[cfg(target_family = "wasm")]
205    fn append_locked(&self, event: &mut SessionEvent) -> Result<(), EventLogError> {
206        event.sequence_no = self.sequence.fetch_add(1, Ordering::SeqCst);
207
208        let mut line = serde_json::to_vec(event)?;
209        line.push(b'\n');
210
211        let mut file = std::fs::OpenOptions::new()
212            .create(true)
213            .append(true)
214            .open(&self.path)?;
215        file.write_all(&line)?;
216        file.flush()?;
217
218        Ok(())
219    }
220
221    /// Read all events from the log.
222    ///
223    /// Per-line tolerant: a single malformed line (unknown event type,
224    /// missing required field, truncated JSON from a crashed writer) is
225    /// logged to stderr and skipped, not propagated as an error. The
226    /// caller -- session close, in particular -- composes a receipt
227    /// from whatever events parse, instead of dropping every event when
228    /// any one is bad.
229    ///
230    /// Why this matters: events.jsonl is append-only and written by
231    /// hooks, daemons, SDKs, and bridges from multiple processes. A
232    /// single bad event from one buggy emitter would otherwise nuke
233    /// the entire receipt's side_effects / agent_graph / timeline.
234    /// Real-world repro: a hook that emitted events with an unknown
235    /// `type` field caused side_effects.files_written to come back
236    /// empty even though the rest of the events in the log were valid
237    /// agent.wrote_file events the aggregator would have happily
238    /// processed.
239    pub fn read_all(&self) -> Result<Vec<SessionEvent>, EventLogError> {
240        // Drop the skipped count for callers that don't carry it through.
241        // Receipt composition uses read_all_with_stats to record the
242        // count in-band on the sealed receipt -- see Codex finding #8.
243        self.read_all_with_stats().map(|(events, _skipped)| events)
244    }
245
246    /// Same as `read_all` but returns the count of malformed lines that
247    /// were skipped during parsing alongside the valid events.
248    ///
249    /// Codex adversarial review finding #8: skipping malformed events
250    /// on stderr only is silent data loss from the verifier's
251    /// perspective. The receipt gets sealed under a merkle root that
252    /// represents only the events that successfully parsed -- a
253    /// downstream consumer cannot tell whether the receipt is complete
254    /// or whether N events were silently dropped.
255    ///
256    /// session::close calls this and stores the count on
257    /// `receipt.proofs.event_log_skipped`. `treeship package verify`
258    /// surfaces it as a WARN when nonzero so the receipt's
259    /// completeness signal is visible without breaking byte-identical
260    /// re-verification of pre-existing receipts.
261    pub fn read_all_with_stats(&self) -> Result<(Vec<SessionEvent>, usize), EventLogError> {
262        if !self.path.exists() {
263            return Ok((Vec::new(), 0));
264        }
265        let file = std::fs::File::open(&self.path)?;
266        let reader = std::io::BufReader::new(file);
267        let mut events = Vec::new();
268        let mut skipped = 0usize;
269        for (idx, line) in reader.lines().enumerate() {
270            let line = line?;
271            if line.trim().is_empty() {
272                continue;
273            }
274            match serde_json::from_str::<SessionEvent>(&line) {
275                Ok(event) => events.push(event),
276                Err(e) => {
277                    skipped += 1;
278                    eprintln!(
279                        "[treeship] event_log: skipping malformed line {} in {}: {}",
280                        idx + 1,
281                        self.path.display(),
282                        e,
283                    );
284                }
285            }
286        }
287        if skipped > 0 {
288            eprintln!(
289                "[treeship] event_log: {} malformed line(s) skipped while reading {} (kept {} valid event(s))",
290                skipped,
291                self.path.display(),
292                events.len(),
293            );
294        }
295        Ok((events, skipped))
296    }
297
298    /// Return the current event count.
299    pub fn event_count(&self) -> u64 {
300        self.sequence.load(Ordering::SeqCst)
301    }
302
303    /// Return the path to the JSONL file.
304    pub fn path(&self) -> &Path {
305        &self.path
306    }
307}
308
309/// Open the sidecar lock file with owner-only permissions (0o600 on Unix).
310///
311/// On Unix the mode is set atomically via `OpenOptionsExt::mode` for newly
312/// created files. For files that already exist (e.g. left over from a
313/// prior crash or an upgrade from a pre-0.9.3 CLI that didn't tighten
314/// perms), we additionally re-chmod to 0o600 after open IF the file is
315/// owned by the current user. This is best-effort: if the chmod fails
316/// (file owned by another user, read-only filesystem, etc.) we proceed
317/// silently rather than refuse to open the lock -- the lock semantics
318/// don't depend on the perms being tight, only the privacy of the
319/// sidecar's existence does.
320///
321/// On Windows the mode concept doesn't apply; ACLs default to inheriting
322/// the parent dir's permissions, which for `.treeship/sessions/<id>/`
323/// should already be scoped to the owning user.
324#[cfg(all(not(target_family = "wasm"), unix))]
325fn open_lock_file(path: &Path) -> Result<std::fs::File, std::io::Error> {
326    use std::os::unix::fs::{MetadataExt, OpenOptionsExt, PermissionsExt};
327    use std::os::unix::io::AsRawFd;
328
329    let file = std::fs::OpenOptions::new()
330        .create(true)
331        .read(true)
332        .write(true)
333        .mode(0o600)
334        .open(path)?;
335
336    // Re-tighten if a pre-existing file has loose perms. Use `fchmod` on the
337    // open file descriptor rather than `set_permissions(path, ...)` to
338    // eliminate the TOCTOU window -- between metadata() and a path-based
339    // chmod, an attacker could swap the file. `fchmod` operates on the
340    // already-opened inode, so the target is pinned.
341    //
342    // Only act when the file is owned by us (uid match via geteuid). If
343    // fchmod fails (NFS mount with restricted metadata writes, or some
344    // filesystems without full POSIX perm support), emit a one-line
345    // stderr warning so an operator has visibility. The lock still works;
346    // only the privacy of the sidecar's existence is affected.
347    if let Ok(meta) = file.metadata() {
348        let mode = meta.permissions().mode() & 0o777;
349        let owned_by_us = meta.uid() == nix_uid();
350        if owned_by_us && mode != 0o600 {
351            let fd = file.as_raw_fd();
352            // SAFETY: fd is valid (we just opened it), 0o600 is a
353            // well-formed mode. fchmod is async-signal-safe per POSIX.
354            let rc = unsafe { libc_fchmod(fd, 0o600) };
355            if rc != 0 {
356                let err = std::io::Error::last_os_error();
357                eprintln!(
358                    "[treeship] warning: could not tighten lock file perms on {} \
359                     to 0o600 (current: 0o{:o}). Error: {}. Lock still functions; \
360                     only the privacy of the sidecar is affected. Common cause: \
361                     NFS mount or filesystem without full POSIX perm support.",
362                    path.display(), mode, err
363                );
364            }
365        }
366    }
367
368    Ok(file)
369}
370
371/// Thin FFI wrapper around libc::fchmod. Declared here so event_log.rs
372/// doesn't need a direct libc crate dep -- the symbol is available in
373/// every Unix libc binary.
374#[cfg(all(not(target_family = "wasm"), unix))]
375fn libc_fchmod(fd: i32, mode: u32) -> i32 {
376    // SAFETY: posix-standard FFI signature; `fd` validity and `mode`
377    // bounds are enforced by the caller.
378    unsafe extern "C" {
379        fn fchmod(fd: i32, mode: u32) -> i32;
380    }
381    unsafe { fchmod(fd, mode) }
382}
383
384/// Lightweight wrapper around `geteuid` so we can compare to file ownership
385/// without pulling in the `nix` crate. Uses `libc` directly (already a
386/// transitive dep via several upstream crates).
387#[cfg(all(not(target_family = "wasm"), unix))]
388fn nix_uid() -> u32 {
389    // SAFETY: geteuid is async-signal-safe and never fails per POSIX.
390    unsafe extern "C" {
391        fn geteuid() -> u32;
392    }
393    unsafe { geteuid() }
394}
395
396#[cfg(all(not(target_family = "wasm"), not(unix)))]
397fn open_lock_file(path: &Path) -> Result<std::fs::File, std::io::Error> {
398    std::fs::OpenOptions::new()
399        .create(true)
400        .read(true)
401        .write(true)
402        .open(path)
403}
404
405/// Path of the counter sidecar for a given events.jsonl path.
406fn counter_path(events_path: &Path) -> PathBuf {
407    events_path.with_extension("jsonl.count")
408}
409
410/// Read the counter sidecar if it exists and is consistent with events.jsonl.
411///
412/// Returns `Some(count)` when the sidecar's recorded byte_size matches the
413/// current events.jsonl size, and `None` otherwise (missing sidecar, short
414/// read, parse failure, or size mismatch from a crashed previous appender).
415#[cfg(not(target_family = "wasm"))]
416fn read_counter_consistent(events_path: &Path) -> Option<u64> {
417    let counter = counter_path(events_path);
418    let bytes = std::fs::read(&counter).ok()?;
419    if bytes.len() != 16 {
420        return None;
421    }
422    let count = u64::from_le_bytes(bytes[0..8].try_into().ok()?);
423    let recorded_size = u64::from_le_bytes(bytes[8..16].try_into().ok()?);
424
425    // events.jsonl may not exist yet -- counter records (0, 0) for that case.
426    let actual_size = match std::fs::metadata(events_path) {
427        Ok(m) => m.len(),
428        Err(e) if e.kind() == std::io::ErrorKind::NotFound => 0,
429        Err(_) => return None,
430    };
431    if actual_size != recorded_size {
432        return None;
433    }
434    Some(count)
435}
436
437/// Read the counter via the sidecar (O(1)) or fall back to an O(N) line
438/// scan, rewriting the sidecar on the way out. This is the recovery path
439/// after a crash that left the counter and events.jsonl out of sync.
440#[cfg(not(target_family = "wasm"))]
441fn read_counter_or_recount(events_path: &Path) -> Result<u64, EventLogError> {
442    if let Some(count) = read_counter_consistent(events_path) {
443        return Ok(count);
444    }
445    let count = if events_path.exists() {
446        let f = std::fs::File::open(events_path)?;
447        let r = std::io::BufReader::new(f);
448        r.lines().filter(|l| l.is_ok()).count() as u64
449    } else {
450        0
451    };
452    let size = std::fs::metadata(events_path).map(|m| m.len()).unwrap_or(0);
453    let _ = write_counter(events_path, count, size);
454    Ok(count)
455}
456
457/// WASM has no fs and no concurrent writers; the in-memory AtomicU64 in the
458/// EventLog is sufficient. Initialize to zero on open.
459#[cfg(target_family = "wasm")]
460fn read_counter_or_recount(_events_path: &Path) -> Result<u64, EventLogError> {
461    Ok(0)
462}
463
464/// Atomically replace the counter sidecar with the new (count, byte_size).
465///
466/// Writes to a temp file in the same directory and renames into place so a
467/// reader either sees the old 16 bytes or the new 16 bytes, never a partial
468/// write. The 0o600 perm matches the lock file -- the counter doesn't leak
469/// secrets but its existence is a session signal worth scoping to the owner.
470#[cfg(not(target_family = "wasm"))]
471fn write_counter(events_path: &Path, count: u64, byte_size: u64) -> Result<(), std::io::Error> {
472    use std::io::Write as _;
473    let counter = counter_path(events_path);
474    let dir = counter.parent().ok_or_else(|| {
475        std::io::Error::new(std::io::ErrorKind::InvalidInput, "counter path has no parent")
476    })?;
477    std::fs::create_dir_all(dir)?;
478
479    let mut buf = [0u8; 16];
480    buf[0..8].copy_from_slice(&count.to_le_bytes());
481    buf[8..16].copy_from_slice(&byte_size.to_le_bytes());
482
483    let tmp = counter.with_extension("count.tmp");
484    {
485        let mut f = open_counter_tmp(&tmp)?;
486        f.write_all(&buf)?;
487        f.sync_all()?;
488    }
489    std::fs::rename(&tmp, &counter)?;
490    Ok(())
491}
492
493#[cfg(all(not(target_family = "wasm"), unix))]
494fn open_counter_tmp(path: &Path) -> Result<std::fs::File, std::io::Error> {
495    use std::os::unix::fs::OpenOptionsExt;
496    std::fs::OpenOptions::new()
497        .create(true)
498        .write(true)
499        .truncate(true)
500        .mode(0o600)
501        .open(path)
502}
503
504#[cfg(all(not(target_family = "wasm"), not(unix)))]
505fn open_counter_tmp(path: &Path) -> Result<std::fs::File, std::io::Error> {
506    std::fs::OpenOptions::new()
507        .create(true)
508        .write(true)
509        .truncate(true)
510        .open(path)
511}
512
513#[cfg(test)]
514mod tests {
515    use super::*;
516    use crate::session::event::*;
517
518    fn make_event(session_id: &str, event_type: EventType) -> SessionEvent {
519        SessionEvent {
520            session_id: session_id.into(),
521            event_id: generate_event_id(),
522            timestamp: "2026-04-05T08:00:00Z".into(),
523            sequence_no: 0,
524            trace_id: generate_trace_id(),
525            span_id: generate_span_id(),
526            parent_span_id: None,
527            agent_id: "agent://test".into(),
528            agent_instance_id: "ai_test_1".into(),
529            agent_name: "test-agent".into(),
530            agent_role: None,
531            host_id: "host_test".into(),
532            tool_runtime_id: None,
533            event_type,
534            artifact_ref: None,
535            meta: None,
536        }
537    }
538
539    #[test]
540    fn append_and_read_back() {
541        let dir = std::env::temp_dir().join(format!("treeship-evtlog-test-{}", rand::random::<u32>()));
542        let log = EventLog::open(&dir).unwrap();
543
544        let mut e1 = make_event("ssn_001", EventType::SessionStarted);
545        let mut e2 = make_event("ssn_001", EventType::AgentStarted {
546            parent_agent_instance_id: None,
547        });
548
549        log.append(&mut e1).unwrap();
550        log.append(&mut e2).unwrap();
551
552        assert_eq!(log.event_count(), 2);
553        assert_eq!(e1.sequence_no, 0);
554        assert_eq!(e2.sequence_no, 1);
555
556        let events = log.read_all().unwrap();
557        assert_eq!(events.len(), 2);
558        assert_eq!(events[0].sequence_no, 0);
559        assert_eq!(events[1].sequence_no, 1);
560
561        let _ = std::fs::remove_dir_all(&dir);
562    }
563
564    #[test]
565    fn read_all_skips_malformed_lines() {
566        // Regression: a single malformed line in events.jsonl used to
567        // make read_all() return Err, and the caller's
568        // .unwrap_or_default() would drop EVERY event in the log. Real
569        // bug: hooks emitting events with an unknown `type` field made
570        // side_effects.files_written come back empty even though every
571        // other event in the log was a perfectly valid agent.wrote_file
572        // event. Now we skip-and-log the bad line and keep the rest.
573        let dir = std::env::temp_dir().join(format!("treeship-evtlog-malformed-{}", rand::random::<u32>()));
574        let log = EventLog::open(&dir).unwrap();
575
576        let mut good1 = make_event(
577            "ssn_001",
578            EventType::AgentWroteFile {
579                file_path: "src/before.rs".into(),
580                digest: None,
581                operation: None,
582                additions: None,
583                deletions: None,
584            },
585        );
586        let mut good2 = make_event(
587            "ssn_001",
588            EventType::AgentWroteFile {
589                file_path: "src/after.rs".into(),
590                digest: None,
591                operation: None,
592                additions: None,
593                deletions: None,
594            },
595        );
596        log.append(&mut good1).unwrap();
597        log.append(&mut good2).unwrap();
598
599        // Manually inject a malformed line between the two good ones by
600        // truncating the file and rewriting. The malformed line has an
601        // unknown event type ("custom.weird") which the closed EventType
602        // enum can't deserialize.
603        let path = log.path().to_path_buf();
604        let original = std::fs::read_to_string(&path).unwrap();
605        let mut lines: Vec<&str> = original.lines().collect();
606        lines.insert(1, r#"{"session_id":"ssn_001","event_id":"evt_bad","timestamp":"2026-04-26T00:00:00Z","sequence_no":1,"trace_id":"x","span_id":"y","agent_id":"a","agent_instance_id":"i","agent_name":"n","host_id":"h","type":"custom.weird","payload":42}"#);
607        std::fs::write(&path, lines.join("\n") + "\n").unwrap();
608
609        let events = log.read_all().unwrap();
610        assert_eq!(events.len(), 2, "expected the two valid events to come through; got {}", events.len());
611        // Confirm the valid events are the file-write events and not
612        // some default fallback.
613        let written_paths: Vec<&str> = events
614            .iter()
615            .filter_map(|e| match &e.event_type {
616                EventType::AgentWroteFile { file_path, .. } => Some(file_path.as_str()),
617                _ => None,
618            })
619            .collect();
620        assert_eq!(written_paths, vec!["src/before.rs", "src/after.rs"]);
621
622        // Codex finding #8: the count must be exposed in-band so a
623        // sealed receipt can carry the incompleteness signal. Verify
624        // read_all_with_stats reports it.
625        let (events2, skipped) = log.read_all_with_stats().unwrap();
626        assert_eq!(events2.len(), 2);
627        assert_eq!(skipped, 1, "exactly one malformed line was injected; expected skipped == 1");
628
629        let _ = std::fs::remove_dir_all(&dir);
630    }
631
632    #[test]
633    fn read_all_with_stats_reports_zero_when_clean() {
634        // No malformed lines -> skipped == 0 -> the receipt's
635        // event_log_skipped field stays default (0) and gets omitted
636        // from canonical JSON. This preserves byte-identical receipts
637        // for the common case where the event log is clean.
638        let dir = std::env::temp_dir().join(format!("treeship-evtlog-clean-{}", rand::random::<u32>()));
639        let log = EventLog::open(&dir).unwrap();
640
641        let mut e = make_event(
642            "ssn_001",
643            EventType::AgentWroteFile {
644                file_path: "x.rs".into(),
645                digest: None, operation: None, additions: None, deletions: None,
646            },
647        );
648        log.append(&mut e).unwrap();
649
650        let (events, skipped) = log.read_all_with_stats().unwrap();
651        assert_eq!(events.len(), 1);
652        assert_eq!(skipped, 0);
653
654        let _ = std::fs::remove_dir_all(&dir);
655    }
656
657    #[test]
658    fn reopen_preserves_sequence() {
659        let dir = std::env::temp_dir().join(format!("treeship-evtlog-reopen-{}", rand::random::<u32>()));
660
661        {
662            let log = EventLog::open(&dir).unwrap();
663            let mut e = make_event("ssn_001", EventType::SessionStarted);
664            log.append(&mut e).unwrap();
665        }
666
667        // Reopen
668        let log = EventLog::open(&dir).unwrap();
669        assert_eq!(log.event_count(), 1);
670
671        let mut e2 = make_event("ssn_001", EventType::AgentStarted {
672            parent_agent_instance_id: None,
673        });
674        log.append(&mut e2).unwrap();
675        assert_eq!(e2.sequence_no, 1);
676
677        let _ = std::fs::remove_dir_all(&dir);
678    }
679
680    /// Regression test for #1 in the v0.9.3 Codex adversarial review.
681    ///
682    /// Multiple `EventLog` instances opened against the same directory must
683    /// not collide on `sequence_no`. This simulates what happens when each
684    /// `treeship session event` invocation (one per PostToolUse hook firing)
685    /// creates a fresh `EventLog` on a shared events.jsonl. Without the
686    /// flock-based re-derivation in `append_locked`, every instance sees
687    /// the same on-disk count at open time and assigns duplicate sequence
688    /// numbers.
689    #[cfg(not(target_family = "wasm"))]
690    #[test]
691    fn concurrent_appends_have_unique_sequence_numbers() {
692        use std::sync::Arc;
693        use std::thread;
694
695        let dir = std::env::temp_dir()
696            .join(format!("treeship-evtlog-race-{}", rand::random::<u32>()));
697        std::fs::create_dir_all(&dir).unwrap();
698
699        const WRITERS: usize = 16;
700        let dir = Arc::new(dir);
701        let mut handles = Vec::with_capacity(WRITERS);
702
703        for _ in 0..WRITERS {
704            let dir = Arc::clone(&dir);
705            handles.push(thread::spawn(move || {
706                // Each thread opens its OWN EventLog -- mimics a separate
707                // process invocation. Without flock, all threads would see
708                // the same line count at open() time.
709                let log = EventLog::open(&dir).unwrap();
710                let mut e = make_event("ssn_race", EventType::SessionStarted);
711                log.append(&mut e).unwrap();
712                e.sequence_no
713            }));
714        }
715
716        let mut seqs: Vec<u64> = handles.into_iter().map(|h| h.join().unwrap()).collect();
717        seqs.sort();
718
719        // All sequence numbers must be unique and contiguous 0..WRITERS.
720        let expected: Vec<u64> = (0..WRITERS as u64).collect();
721        assert_eq!(seqs, expected, "sequence_no collisions under contention");
722
723        // Same invariant from the on-disk file's perspective.
724        let log = EventLog::open(&dir).unwrap();
725        let read = log.read_all().unwrap();
726        assert_eq!(read.len(), WRITERS);
727        let mut on_disk: Vec<u64> = read.iter().map(|e| e.sequence_no).collect();
728        on_disk.sort();
729        assert_eq!(on_disk, expected);
730
731        let _ = std::fs::remove_dir_all(&*dir);
732    }
733
734    /// Sidecar lock file must be created mode 0o600 (owner-only) on Unix.
735    /// Regression test for #5 in the second Codex adversarial review.
736    #[cfg(all(not(target_family = "wasm"), unix))]
737    #[test]
738    fn lock_file_has_owner_only_permissions() {
739        use std::os::unix::fs::PermissionsExt;
740
741        let dir = std::env::temp_dir()
742            .join(format!("treeship-evtlog-perms-{}", rand::random::<u32>()));
743        let log = EventLog::open(&dir).unwrap();
744
745        let mut e = make_event("ssn_perms", EventType::SessionStarted);
746        log.append(&mut e).unwrap();
747
748        let lock_path = log.path().with_extension("jsonl.lock");
749        let meta = std::fs::metadata(&lock_path).expect("lock file must exist after first append");
750        let mode = meta.permissions().mode() & 0o777;
751        assert_eq!(
752            mode, 0o600,
753            "lock file mode is {:o}, expected 0o600 (owner-only)",
754            mode
755        );
756
757        let _ = std::fs::remove_dir_all(&dir);
758    }
759
760    /// A pre-existing lock file (e.g. from a v0.9.2 era crash) with looser
761    /// permissions must be tightened to 0o600 on next `EventLog::open`.
762    /// Regression test for the third Codex adversarial review.
763    #[cfg(all(not(target_family = "wasm"), unix))]
764    #[test]
765    fn existing_lock_file_is_re_tightened() {
766        use std::os::unix::fs::PermissionsExt;
767
768        let dir = std::env::temp_dir()
769            .join(format!("treeship-evtlog-retighten-{}", rand::random::<u32>()));
770        std::fs::create_dir_all(&dir).unwrap();
771
772        // Pre-create a lock file with deliberately loose perms, simulating
773        // an upgrade from a CLI version that didn't set 0o600.
774        let lock_path = dir.join("events.jsonl.lock");
775        std::fs::write(&lock_path, b"").unwrap();
776        std::fs::set_permissions(&lock_path, std::fs::Permissions::from_mode(0o644)).unwrap();
777        let pre_mode = std::fs::metadata(&lock_path).unwrap().permissions().mode() & 0o777;
778        assert_eq!(pre_mode, 0o644, "test setup: pre-existing perms should be 0o644");
779
780        // First append after upgrade -- should re-tighten.
781        let log = EventLog::open(&dir).unwrap();
782        let mut e = make_event("ssn_retighten", EventType::SessionStarted);
783        log.append(&mut e).unwrap();
784
785        let post_mode = std::fs::metadata(&lock_path).unwrap().permissions().mode() & 0o777;
786        assert_eq!(
787            post_mode, 0o600,
788            "lock file should be re-tightened to 0o600 after open; got {:o}",
789            post_mode
790        );
791
792        let _ = std::fs::remove_dir_all(&dir);
793    }
794
795    /// Counter sidecar must exist after the first append and contain
796    /// (count=1, byte_size=size of events.jsonl). This is the happy path
797    /// that lets every subsequent append skip the O(N) rescan.
798    #[cfg(not(target_family = "wasm"))]
799    #[test]
800    fn counter_sidecar_written_after_append() {
801        let dir = std::env::temp_dir()
802            .join(format!("treeship-evtlog-counter-{}", rand::random::<u32>()));
803        let log = EventLog::open(&dir).unwrap();
804
805        let mut e = make_event("ssn_counter", EventType::SessionStarted);
806        log.append(&mut e).unwrap();
807
808        let counter = log.path().with_extension("jsonl.count");
809        let bytes = std::fs::read(&counter).expect("counter sidecar must exist after append");
810        assert_eq!(bytes.len(), 16, "counter sidecar must be 16 bytes");
811
812        let count = u64::from_le_bytes(bytes[0..8].try_into().unwrap());
813        let recorded_size = u64::from_le_bytes(bytes[8..16].try_into().unwrap());
814        let actual_size = std::fs::metadata(log.path()).unwrap().len();
815        assert_eq!(count, 1, "counter must reflect the one appended event");
816        assert_eq!(
817            recorded_size, actual_size,
818            "counter byte_size ({}) must match events.jsonl size ({})",
819            recorded_size, actual_size
820        );
821
822        let _ = std::fs::remove_dir_all(&dir);
823    }
824
825    /// A missing counter sidecar (fresh install, deleted by user, etc.)
826    /// must not break sequence_no assignment. The next append falls back
827    /// to an O(N) recount and rewrites the counter.
828    #[cfg(not(target_family = "wasm"))]
829    #[test]
830    fn counter_sidecar_recovers_when_missing() {
831        let dir = std::env::temp_dir()
832            .join(format!("treeship-evtlog-missing-counter-{}", rand::random::<u32>()));
833
834        // Append two events, then nuke the counter sidecar.
835        {
836            let log = EventLog::open(&dir).unwrap();
837            let mut e1 = make_event("ssn_x", EventType::SessionStarted);
838            let mut e2 = make_event("ssn_x", EventType::AgentStarted {
839                parent_agent_instance_id: None,
840            });
841            log.append(&mut e1).unwrap();
842            log.append(&mut e2).unwrap();
843        }
844        let counter = dir.join("events.jsonl.count");
845        std::fs::remove_file(&counter).expect("counter must exist before deletion");
846
847        // Reopen + append. The third event must get sequence_no=2 even
848        // though the counter sidecar is gone.
849        let log = EventLog::open(&dir).unwrap();
850        assert_eq!(log.event_count(), 2, "open() must recount when counter is missing");
851
852        let mut e3 = make_event("ssn_x", EventType::SessionClosed {
853            summary: None,
854            duration_ms: None,
855        });
856        log.append(&mut e3).unwrap();
857        assert_eq!(e3.sequence_no, 2);
858        assert!(counter.exists(), "counter must be rewritten after recount");
859
860        let _ = std::fs::remove_dir_all(&dir);
861    }
862
863    /// A short-read or garbage counter sidecar (corrupted, partial write,
864    /// truncated by external tool) must not be trusted. The size mismatch
865    /// path covers the "wrong content" case for a 16-byte file too.
866    #[cfg(not(target_family = "wasm"))]
867    #[test]
868    fn counter_sidecar_recovers_when_corrupt() {
869        let dir = std::env::temp_dir()
870            .join(format!("treeship-evtlog-corrupt-counter-{}", rand::random::<u32>()));
871
872        {
873            let log = EventLog::open(&dir).unwrap();
874            let mut e = make_event("ssn_corrupt", EventType::SessionStarted);
875            log.append(&mut e).unwrap();
876        }
877        // Truncate the counter to a non-16 length.
878        let counter = dir.join("events.jsonl.count");
879        std::fs::write(&counter, b"junk").unwrap();
880
881        let log = EventLog::open(&dir).unwrap();
882        assert_eq!(log.event_count(), 1, "short-read counter must be ignored, recount kicks in");
883
884        let _ = std::fs::remove_dir_all(&dir);
885    }
886
887    /// A counter that recorded the wrong byte_size (someone or something
888    /// appended to events.jsonl behind our back) must not be trusted.
889    /// This is the crash-recovery path: peer wrote events.jsonl but
890    /// crashed before fsyncing the counter, so the recorded size is stale.
891    #[cfg(not(target_family = "wasm"))]
892    #[test]
893    fn counter_sidecar_recovers_when_size_disagrees() {
894        let dir = std::env::temp_dir()
895            .join(format!("treeship-evtlog-stale-counter-{}", rand::random::<u32>()));
896
897        {
898            let log = EventLog::open(&dir).unwrap();
899            let mut e = make_event("ssn_stale", EventType::SessionStarted);
900            log.append(&mut e).unwrap();
901        }
902
903        // Simulate a crash mid-append: append one extra raw line to
904        // events.jsonl WITHOUT updating the counter. Now the counter
905        // says (1, S) but events.jsonl is (S + |line|) bytes.
906        let events_path = dir.join("events.jsonl");
907        let mut extra = make_event("ssn_stale", EventType::AgentStarted {
908            parent_agent_instance_id: None,
909        });
910        extra.sequence_no = 999; // intentionally wrong; will be overwritten on read
911        let mut line = serde_json::to_vec(&extra).unwrap();
912        line.push(b'\n');
913        let mut f = std::fs::OpenOptions::new()
914            .append(true)
915            .open(&events_path)
916            .unwrap();
917        std::io::Write::write_all(&mut f, &line).unwrap();
918        std::io::Write::flush(&mut f).unwrap();
919
920        // Re-open. The size mismatch must trigger a recount; we should see 2.
921        let log = EventLog::open(&dir).unwrap();
922        assert_eq!(
923            log.event_count(),
924            2,
925            "size mismatch must force recount, ignoring stale counter"
926        );
927
928        let _ = std::fs::remove_dir_all(&dir);
929    }
930
931    /// The counter sidecar fix must not break the cross-process race
932    /// safety established by the flock layer. This is the same shape as
933    /// `concurrent_appends_have_unique_sequence_numbers` but exists to
934    /// guard against a regression where the counter is read OUTSIDE the
935    /// lock, which would let two writers both see count=N and assign N
936    /// to two different events.
937    #[cfg(not(target_family = "wasm"))]
938    #[test]
939    fn counter_sidecar_preserves_concurrent_uniqueness() {
940        use std::sync::Arc;
941        use std::thread;
942
943        let dir = std::env::temp_dir()
944            .join(format!("treeship-evtlog-counter-race-{}", rand::random::<u32>()));
945        std::fs::create_dir_all(&dir).unwrap();
946
947        const WRITERS: usize = 16;
948        let dir = Arc::new(dir);
949        let mut handles = Vec::with_capacity(WRITERS);
950
951        for _ in 0..WRITERS {
952            let dir = Arc::clone(&dir);
953            handles.push(thread::spawn(move || {
954                let log = EventLog::open(&dir).unwrap();
955                let mut e = make_event("ssn_counter_race", EventType::SessionStarted);
956                log.append(&mut e).unwrap();
957                e.sequence_no
958            }));
959        }
960
961        let mut seqs: Vec<u64> = handles.into_iter().map(|h| h.join().unwrap()).collect();
962        seqs.sort();
963        let expected: Vec<u64> = (0..WRITERS as u64).collect();
964        assert_eq!(seqs, expected, "counter must not bypass the flock race protection");
965
966        // Counter should reflect the final state.
967        let log = EventLog::open(&dir).unwrap();
968        assert_eq!(log.event_count(), WRITERS as u64);
969
970        let _ = std::fs::remove_dir_all(&*dir);
971    }
972
973    /// Counter sidecar must be created mode 0o600 (owner-only) on Unix --
974    /// same scoping as the lock file; the existence of a counter is a
975    /// session signal that doesn't need to leak to other users.
976    #[cfg(all(not(target_family = "wasm"), unix))]
977    #[test]
978    fn counter_sidecar_has_owner_only_permissions() {
979        use std::os::unix::fs::PermissionsExt;
980
981        let dir = std::env::temp_dir()
982            .join(format!("treeship-evtlog-counter-perms-{}", rand::random::<u32>()));
983        let log = EventLog::open(&dir).unwrap();
984
985        let mut e = make_event("ssn_counter_perms", EventType::SessionStarted);
986        log.append(&mut e).unwrap();
987
988        let counter = log.path().with_extension("jsonl.count");
989        let mode = std::fs::metadata(&counter).unwrap().permissions().mode() & 0o777;
990        assert_eq!(
991            mode, 0o600,
992            "counter sidecar mode is {:o}, expected 0o600 (owner-only)",
993            mode
994        );
995
996        let _ = std::fs::remove_dir_all(&dir);
997    }
998}