Skip to main content

treeship_core/session/
event_log.rs

1//! Append-only, file-backed event log for session events.
2//!
3//! Events are stored as newline-delimited JSON (JSONL) in
4//! `.treeship/sessions/<session_id>/events.jsonl`.
5//!
6//! Concurrency model: `append()` is safe to call from multiple processes
7//! concurrently. Each call attempts to acquire an exclusive advisory lock
8//! (via `fs2::FileExt::try_lock_exclusive` -- backed by `flock(2)` on Unix
9//! and `LockFileEx` on Windows) on a sidecar `events.jsonl.lock` file in a
10//! ~500ms bounded retry loop. Under the lock, the JSONL line count is the
11//! authoritative source for the next `sequence_no`. The per-process
12//! AtomicU64 is retained as a hot-path optimization for non-contended use,
13//! but its value is overwritten by the on-disk count after every locked
14//! append.
15//!
16//! Fail-open semantics: if a writer cannot acquire the lock within the
17//! retry window (typically because a peer crashed while holding it, or a
18//! filesystem doesn't honor flock at all), the append still proceeds
19//! without the lock and writes a stderr warning. In that degenerate case
20//! the resulting `sequence_no` is best-effort rather than guaranteed
21//! monotonic, but the event itself is preserved -- the alternative
22//! (blocking the agent forever on a wedged peer) is strictly worse.
23//!
24//! Lock file permissions are 0o600 (owner-only) on Unix, applied at file
25//! creation via `OpenOptionsExt::mode` and re-tightened on every open if
26//! a previous run left the file with looser perms.
27
28use std::io::{BufRead, Write};
29use std::path::{Path, PathBuf};
30use std::sync::atomic::{AtomicU64, Ordering};
31
32#[cfg(not(target_family = "wasm"))]
33use fs2::FileExt;
34
35use crate::session::event::SessionEvent;
36
37/// Error from event log operations.
38#[derive(Debug)]
39pub enum EventLogError {
40    Io(std::io::Error),
41    Json(serde_json::Error),
42}
43
44impl std::fmt::Display for EventLogError {
45    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46        match self {
47            Self::Io(e) => write!(f, "event log io: {e}"),
48            Self::Json(e) => write!(f, "event log json: {e}"),
49        }
50    }
51}
52
53impl std::error::Error for EventLogError {}
54impl From<std::io::Error> for EventLogError {
55    fn from(e: std::io::Error) -> Self { Self::Io(e) }
56}
57impl From<serde_json::Error> for EventLogError {
58    fn from(e: serde_json::Error) -> Self { Self::Json(e) }
59}
60
61/// An append-only event log backed by a JSONL file.
62pub struct EventLog {
63    path: PathBuf,
64    sequence: AtomicU64,
65}
66
67impl EventLog {
68    /// Open or create an event log for the given session directory.
69    ///
70    /// The session directory is typically `.treeship/sessions/<session_id>/`.
71    /// If the directory does not exist, it will be created.
72    pub fn open(session_dir: &Path) -> Result<Self, EventLogError> {
73        std::fs::create_dir_all(session_dir)?;
74        let path = session_dir.join("events.jsonl");
75
76        // Count existing events to initialize the sequence counter.
77        let sequence = if path.exists() {
78            let file = std::fs::File::open(&path)?;
79            let reader = std::io::BufReader::new(file);
80            let count = reader.lines().filter(|l| l.is_ok()).count() as u64;
81            AtomicU64::new(count)
82        } else {
83            AtomicU64::new(0)
84        };
85
86        Ok(Self { path, sequence })
87    }
88
89    /// Append a single event to the log.
90    ///
91    /// The event's `sequence_no` is set automatically. Under contention from
92    /// multiple writer processes, the sequence number is re-derived from the
93    /// on-disk line count under an exclusive flock so two parallel writers
94    /// never collide.
95    pub fn append(&self, event: &mut SessionEvent) -> Result<(), EventLogError> {
96        self.append_locked(event)
97    }
98
99    /// Cross-process safe append: acquires an exclusive advisory lock on a
100    /// sidecar `.lock` file, re-counts events.jsonl lines, assigns sequence_no,
101    /// writes the new event, then releases the lock on drop.
102    ///
103    /// Lock acquisition is bounded: tries to acquire for up to ~500ms via
104    /// `try_lock_exclusive` poll, then falls back to an unlocked append
105    /// with a stderr warning. A wedged or crashed writer must NOT hang
106    /// hook-driven invocations forever (PostToolUse hooks running per
107    /// tool call would freeze the agent). Better to lose strict
108    /// sequence_no monotonicity in the rare wedge case than to deadlock.
109    ///
110    /// Lock file is created mode 0o600 (owner-only) so the sidecar can
111    /// never be opened by other users on a shared machine.
112    ///
113    /// Skipped on WASM (no fs, no concurrency).
114    #[cfg(not(target_family = "wasm"))]
115    fn append_locked(&self, event: &mut SessionEvent) -> Result<(), EventLogError> {
116        use std::time::{Duration, Instant};
117
118        // Sidecar lock file: contention here doesn't block readers of events.jsonl.
119        let lock_path = self.path.with_extension("jsonl.lock");
120
121        // Open or create the lock file. On Unix we set 0o600 explicitly so
122        // the sidecar isn't group/world readable; the umask-derived default
123        // would otherwise be permissive on some setups.
124        let lock_file = open_lock_file(&lock_path)?;
125
126        // Bounded retry. With 16 parallel writers the worst case is a
127        // queue of N short-held locks; 500ms is plenty. If we fail to
128        // acquire in that window something is wedged -- fall through and
129        // append without ordering rather than freezing the caller.
130        let mut acquired = false;
131        let start = Instant::now();
132        let deadline = Duration::from_millis(500);
133        loop {
134            match lock_file.try_lock_exclusive() {
135                Ok(()) => {
136                    acquired = true;
137                    break;
138                }
139                Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
140                    if start.elapsed() >= deadline {
141                        eprintln!(
142                            "[treeship] event_log: lock contention on {} \
143                             exceeded {}ms; appending without sequence ordering guarantee",
144                            lock_path.display(),
145                            deadline.as_millis()
146                        );
147                        break;
148                    }
149                    std::thread::sleep(Duration::from_millis(10));
150                }
151                Err(e) => return Err(e.into()),
152            }
153        }
154
155        // Under the lock (or unlocked fallback): re-derive sequence_no
156        // from the actual on-disk line count. The per-process AtomicU64
157        // is a stale hint -- only the on-disk count is authoritative
158        // when multiple processes are appending.
159        let count = if self.path.exists() {
160            let f = std::fs::File::open(&self.path)?;
161            let r = std::io::BufReader::new(f);
162            r.lines().filter(|l| l.is_ok()).count() as u64
163        } else {
164            0
165        };
166        event.sequence_no = count;
167
168        let mut line = serde_json::to_vec(event)?;
169        line.push(b'\n');
170
171        let mut file = std::fs::OpenOptions::new()
172            .create(true)
173            .append(true)
174            .open(&self.path)?;
175        file.write_all(&line)?;
176        file.flush()?;
177
178        // Keep the in-process AtomicU64 in sync so non-contended callers
179        // see the right value via event_count() without re-reading.
180        self.sequence.store(count + 1, Ordering::SeqCst);
181
182        // Suppress the unused-variable warning on the unlock-fallback path.
183        let _ = acquired;
184        // lock_file drops here -> flock released (no-op if we never acquired).
185        Ok(())
186    }
187
188    /// WASM build: no filesystem locks available, no concurrent writers.
189    /// Falls back to the simple AtomicU64 path.
190    #[cfg(target_family = "wasm")]
191    fn append_locked(&self, event: &mut SessionEvent) -> Result<(), EventLogError> {
192        event.sequence_no = self.sequence.fetch_add(1, Ordering::SeqCst);
193
194        let mut line = serde_json::to_vec(event)?;
195        line.push(b'\n');
196
197        let mut file = std::fs::OpenOptions::new()
198            .create(true)
199            .append(true)
200            .open(&self.path)?;
201        file.write_all(&line)?;
202        file.flush()?;
203
204        Ok(())
205    }
206
207    /// Read all events from the log.
208    pub fn read_all(&self) -> Result<Vec<SessionEvent>, EventLogError> {
209        if !self.path.exists() {
210            return Ok(Vec::new());
211        }
212        let file = std::fs::File::open(&self.path)?;
213        let reader = std::io::BufReader::new(file);
214        let mut events = Vec::new();
215        for line in reader.lines() {
216            let line = line?;
217            if line.trim().is_empty() {
218                continue;
219            }
220            let event: SessionEvent = serde_json::from_str(&line)?;
221            events.push(event);
222        }
223        Ok(events)
224    }
225
226    /// Return the current event count.
227    pub fn event_count(&self) -> u64 {
228        self.sequence.load(Ordering::SeqCst)
229    }
230
231    /// Return the path to the JSONL file.
232    pub fn path(&self) -> &Path {
233        &self.path
234    }
235}
236
237/// Open the sidecar lock file with owner-only permissions (0o600 on Unix).
238///
239/// On Unix the mode is set atomically via `OpenOptionsExt::mode` for newly
240/// created files. For files that already exist (e.g. left over from a
241/// prior crash or an upgrade from a pre-0.9.3 CLI that didn't tighten
242/// perms), we additionally re-chmod to 0o600 after open IF the file is
243/// owned by the current user. This is best-effort: if the chmod fails
244/// (file owned by another user, read-only filesystem, etc.) we proceed
245/// silently rather than refuse to open the lock -- the lock semantics
246/// don't depend on the perms being tight, only the privacy of the
247/// sidecar's existence does.
248///
249/// On Windows the mode concept doesn't apply; ACLs default to inheriting
250/// the parent dir's permissions, which for `.treeship/sessions/<id>/`
251/// should already be scoped to the owning user.
252#[cfg(all(not(target_family = "wasm"), unix))]
253fn open_lock_file(path: &Path) -> Result<std::fs::File, std::io::Error> {
254    use std::os::unix::fs::{MetadataExt, OpenOptionsExt, PermissionsExt};
255    use std::os::unix::io::AsRawFd;
256
257    let file = std::fs::OpenOptions::new()
258        .create(true)
259        .read(true)
260        .write(true)
261        .mode(0o600)
262        .open(path)?;
263
264    // Re-tighten if a pre-existing file has loose perms. Use `fchmod` on the
265    // open file descriptor rather than `set_permissions(path, ...)` to
266    // eliminate the TOCTOU window -- between metadata() and a path-based
267    // chmod, an attacker could swap the file. `fchmod` operates on the
268    // already-opened inode, so the target is pinned.
269    //
270    // Only act when the file is owned by us (uid match via geteuid). If
271    // fchmod fails (NFS mount with restricted metadata writes, or some
272    // filesystems without full POSIX perm support), emit a one-line
273    // stderr warning so an operator has visibility. The lock still works;
274    // only the privacy of the sidecar's existence is affected.
275    if let Ok(meta) = file.metadata() {
276        let mode = meta.permissions().mode() & 0o777;
277        let owned_by_us = meta.uid() == nix_uid();
278        if owned_by_us && mode != 0o600 {
279            let fd = file.as_raw_fd();
280            // SAFETY: fd is valid (we just opened it), 0o600 is a
281            // well-formed mode. fchmod is async-signal-safe per POSIX.
282            let rc = unsafe { libc_fchmod(fd, 0o600) };
283            if rc != 0 {
284                let err = std::io::Error::last_os_error();
285                eprintln!(
286                    "[treeship] warning: could not tighten lock file perms on {} \
287                     to 0o600 (current: 0o{:o}). Error: {}. Lock still functions; \
288                     only the privacy of the sidecar is affected. Common cause: \
289                     NFS mount or filesystem without full POSIX perm support.",
290                    path.display(), mode, err
291                );
292            }
293        }
294    }
295
296    Ok(file)
297}
298
299/// Thin FFI wrapper around libc::fchmod. Declared here so event_log.rs
300/// doesn't need a direct libc crate dep -- the symbol is available in
301/// every Unix libc binary.
302#[cfg(all(not(target_family = "wasm"), unix))]
303fn libc_fchmod(fd: i32, mode: u32) -> i32 {
304    // SAFETY: posix-standard FFI signature; `fd` validity and `mode`
305    // bounds are enforced by the caller.
306    unsafe extern "C" {
307        fn fchmod(fd: i32, mode: u32) -> i32;
308    }
309    unsafe { fchmod(fd, mode) }
310}
311
312/// Lightweight wrapper around `geteuid` so we can compare to file ownership
313/// without pulling in the `nix` crate. Uses `libc` directly (already a
314/// transitive dep via several upstream crates).
315#[cfg(all(not(target_family = "wasm"), unix))]
316fn nix_uid() -> u32 {
317    // SAFETY: geteuid is async-signal-safe and never fails per POSIX.
318    unsafe extern "C" {
319        fn geteuid() -> u32;
320    }
321    unsafe { geteuid() }
322}
323
324#[cfg(all(not(target_family = "wasm"), not(unix)))]
325fn open_lock_file(path: &Path) -> Result<std::fs::File, std::io::Error> {
326    std::fs::OpenOptions::new()
327        .create(true)
328        .read(true)
329        .write(true)
330        .open(path)
331}
332
333#[cfg(test)]
334mod tests {
335    use super::*;
336    use crate::session::event::*;
337
338    fn make_event(session_id: &str, event_type: EventType) -> SessionEvent {
339        SessionEvent {
340            session_id: session_id.into(),
341            event_id: generate_event_id(),
342            timestamp: "2026-04-05T08:00:00Z".into(),
343            sequence_no: 0,
344            trace_id: generate_trace_id(),
345            span_id: generate_span_id(),
346            parent_span_id: None,
347            agent_id: "agent://test".into(),
348            agent_instance_id: "ai_test_1".into(),
349            agent_name: "test-agent".into(),
350            agent_role: None,
351            host_id: "host_test".into(),
352            tool_runtime_id: None,
353            event_type,
354            artifact_ref: None,
355            meta: None,
356        }
357    }
358
359    #[test]
360    fn append_and_read_back() {
361        let dir = std::env::temp_dir().join(format!("treeship-evtlog-test-{}", rand::random::<u32>()));
362        let log = EventLog::open(&dir).unwrap();
363
364        let mut e1 = make_event("ssn_001", EventType::SessionStarted);
365        let mut e2 = make_event("ssn_001", EventType::AgentStarted {
366            parent_agent_instance_id: None,
367        });
368
369        log.append(&mut e1).unwrap();
370        log.append(&mut e2).unwrap();
371
372        assert_eq!(log.event_count(), 2);
373        assert_eq!(e1.sequence_no, 0);
374        assert_eq!(e2.sequence_no, 1);
375
376        let events = log.read_all().unwrap();
377        assert_eq!(events.len(), 2);
378        assert_eq!(events[0].sequence_no, 0);
379        assert_eq!(events[1].sequence_no, 1);
380
381        let _ = std::fs::remove_dir_all(&dir);
382    }
383
384    #[test]
385    fn reopen_preserves_sequence() {
386        let dir = std::env::temp_dir().join(format!("treeship-evtlog-reopen-{}", rand::random::<u32>()));
387
388        {
389            let log = EventLog::open(&dir).unwrap();
390            let mut e = make_event("ssn_001", EventType::SessionStarted);
391            log.append(&mut e).unwrap();
392        }
393
394        // Reopen
395        let log = EventLog::open(&dir).unwrap();
396        assert_eq!(log.event_count(), 1);
397
398        let mut e2 = make_event("ssn_001", EventType::AgentStarted {
399            parent_agent_instance_id: None,
400        });
401        log.append(&mut e2).unwrap();
402        assert_eq!(e2.sequence_no, 1);
403
404        let _ = std::fs::remove_dir_all(&dir);
405    }
406
407    /// Regression test for #1 in the v0.9.3 Codex adversarial review.
408    ///
409    /// Multiple `EventLog` instances opened against the same directory must
410    /// not collide on `sequence_no`. This simulates what happens when each
411    /// `treeship session event` invocation (one per PostToolUse hook firing)
412    /// creates a fresh `EventLog` on a shared events.jsonl. Without the
413    /// flock-based re-derivation in `append_locked`, every instance sees
414    /// the same on-disk count at open time and assigns duplicate sequence
415    /// numbers.
416    #[cfg(not(target_family = "wasm"))]
417    #[test]
418    fn concurrent_appends_have_unique_sequence_numbers() {
419        use std::sync::Arc;
420        use std::thread;
421
422        let dir = std::env::temp_dir()
423            .join(format!("treeship-evtlog-race-{}", rand::random::<u32>()));
424        std::fs::create_dir_all(&dir).unwrap();
425
426        const WRITERS: usize = 16;
427        let dir = Arc::new(dir);
428        let mut handles = Vec::with_capacity(WRITERS);
429
430        for _ in 0..WRITERS {
431            let dir = Arc::clone(&dir);
432            handles.push(thread::spawn(move || {
433                // Each thread opens its OWN EventLog -- mimics a separate
434                // process invocation. Without flock, all threads would see
435                // the same line count at open() time.
436                let log = EventLog::open(&dir).unwrap();
437                let mut e = make_event("ssn_race", EventType::SessionStarted);
438                log.append(&mut e).unwrap();
439                e.sequence_no
440            }));
441        }
442
443        let mut seqs: Vec<u64> = handles.into_iter().map(|h| h.join().unwrap()).collect();
444        seqs.sort();
445
446        // All sequence numbers must be unique and contiguous 0..WRITERS.
447        let expected: Vec<u64> = (0..WRITERS as u64).collect();
448        assert_eq!(seqs, expected, "sequence_no collisions under contention");
449
450        // Same invariant from the on-disk file's perspective.
451        let log = EventLog::open(&dir).unwrap();
452        let read = log.read_all().unwrap();
453        assert_eq!(read.len(), WRITERS);
454        let mut on_disk: Vec<u64> = read.iter().map(|e| e.sequence_no).collect();
455        on_disk.sort();
456        assert_eq!(on_disk, expected);
457
458        let _ = std::fs::remove_dir_all(&*dir);
459    }
460
461    /// Sidecar lock file must be created mode 0o600 (owner-only) on Unix.
462    /// Regression test for #5 in the second Codex adversarial review.
463    #[cfg(all(not(target_family = "wasm"), unix))]
464    #[test]
465    fn lock_file_has_owner_only_permissions() {
466        use std::os::unix::fs::PermissionsExt;
467
468        let dir = std::env::temp_dir()
469            .join(format!("treeship-evtlog-perms-{}", rand::random::<u32>()));
470        let log = EventLog::open(&dir).unwrap();
471
472        let mut e = make_event("ssn_perms", EventType::SessionStarted);
473        log.append(&mut e).unwrap();
474
475        let lock_path = log.path().with_extension("jsonl.lock");
476        let meta = std::fs::metadata(&lock_path).expect("lock file must exist after first append");
477        let mode = meta.permissions().mode() & 0o777;
478        assert_eq!(
479            mode, 0o600,
480            "lock file mode is {:o}, expected 0o600 (owner-only)",
481            mode
482        );
483
484        let _ = std::fs::remove_dir_all(&dir);
485    }
486
487    /// A pre-existing lock file (e.g. from a v0.9.2 era crash) with looser
488    /// permissions must be tightened to 0o600 on next `EventLog::open`.
489    /// Regression test for the third Codex adversarial review.
490    #[cfg(all(not(target_family = "wasm"), unix))]
491    #[test]
492    fn existing_lock_file_is_re_tightened() {
493        use std::os::unix::fs::PermissionsExt;
494
495        let dir = std::env::temp_dir()
496            .join(format!("treeship-evtlog-retighten-{}", rand::random::<u32>()));
497        std::fs::create_dir_all(&dir).unwrap();
498
499        // Pre-create a lock file with deliberately loose perms, simulating
500        // an upgrade from a CLI version that didn't set 0o600.
501        let lock_path = dir.join("events.jsonl.lock");
502        std::fs::write(&lock_path, b"").unwrap();
503        std::fs::set_permissions(&lock_path, std::fs::Permissions::from_mode(0o644)).unwrap();
504        let pre_mode = std::fs::metadata(&lock_path).unwrap().permissions().mode() & 0o777;
505        assert_eq!(pre_mode, 0o644, "test setup: pre-existing perms should be 0o644");
506
507        // First append after upgrade -- should re-tighten.
508        let log = EventLog::open(&dir).unwrap();
509        let mut e = make_event("ssn_retighten", EventType::SessionStarted);
510        log.append(&mut e).unwrap();
511
512        let post_mode = std::fs::metadata(&lock_path).unwrap().permissions().mode() & 0o777;
513        assert_eq!(
514            post_mode, 0o600,
515            "lock file should be re-tightened to 0o600 after open; got {:o}",
516            post_mode
517        );
518
519        let _ = std::fs::remove_dir_all(&dir);
520    }
521}