Skip to main content

treeship_core/session/
event_log.rs

1//! Append-only, file-backed event log for session events.
2//!
3//! Events are stored as newline-delimited JSON (JSONL) in
4//! `.treeship/sessions/<session_id>/events.jsonl`.
5//!
6//! Concurrency model: `append()` is safe to call from multiple processes
7//! concurrently. Each call attempts to acquire an exclusive advisory lock
8//! (via `fs2::FileExt::try_lock_exclusive` -- backed by `flock(2)` on Unix
9//! and `LockFileEx` on Windows) on a sidecar `events.jsonl.lock` file in a
10//! ~500ms bounded retry loop. Under the lock, the JSONL line count is the
11//! authoritative source for the next `sequence_no`. The per-process
12//! AtomicU64 is retained as a hot-path optimization for non-contended use,
13//! but its value is overwritten by the on-disk count after every locked
14//! append.
15//!
16//! Fail-open semantics: if a writer cannot acquire the lock within the
17//! retry window (typically because a peer crashed while holding it, or a
18//! filesystem doesn't honor flock at all), the append still proceeds
19//! without the lock and writes a stderr warning. In that degenerate case
20//! the resulting `sequence_no` is best-effort rather than guaranteed
21//! monotonic, but the event itself is preserved -- the alternative
22//! (blocking the agent forever on a wedged peer) is strictly worse.
23//!
24//! Lock file permissions are 0o600 (owner-only) on Unix, applied at file
25//! creation via `OpenOptionsExt::mode` and re-tightened on every open if
26//! a previous run left the file with looser perms.
27
28use std::io::{BufRead, Write};
29use std::path::{Path, PathBuf};
30use std::sync::atomic::{AtomicU64, Ordering};
31
32#[cfg(not(target_family = "wasm"))]
33use fs2::FileExt;
34
35use crate::session::event::SessionEvent;
36
37/// Error from event log operations.
38#[derive(Debug)]
39pub enum EventLogError {
40    Io(std::io::Error),
41    Json(serde_json::Error),
42}
43
44impl std::fmt::Display for EventLogError {
45    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46        match self {
47            Self::Io(e) => write!(f, "event log io: {e}"),
48            Self::Json(e) => write!(f, "event log json: {e}"),
49        }
50    }
51}
52
53impl std::error::Error for EventLogError {}
54impl From<std::io::Error> for EventLogError {
55    fn from(e: std::io::Error) -> Self { Self::Io(e) }
56}
57impl From<serde_json::Error> for EventLogError {
58    fn from(e: serde_json::Error) -> Self { Self::Json(e) }
59}
60
61/// An append-only event log backed by a JSONL file.
62pub struct EventLog {
63    path: PathBuf,
64    sequence: AtomicU64,
65}
66
67impl EventLog {
68    /// Open or create an event log for the given session directory.
69    ///
70    /// The session directory is typically `.treeship/sessions/<session_id>/`.
71    /// If the directory does not exist, it will be created.
72    pub fn open(session_dir: &Path) -> Result<Self, EventLogError> {
73        std::fs::create_dir_all(session_dir)?;
74        let path = session_dir.join("events.jsonl");
75
76        // Count existing events to initialize the sequence counter.
77        let sequence = if path.exists() {
78            let file = std::fs::File::open(&path)?;
79            let reader = std::io::BufReader::new(file);
80            let count = reader.lines().filter(|l| l.is_ok()).count() as u64;
81            AtomicU64::new(count)
82        } else {
83            AtomicU64::new(0)
84        };
85
86        Ok(Self { path, sequence })
87    }
88
89    /// Append a single event to the log.
90    ///
91    /// The event's `sequence_no` is set automatically. Under contention from
92    /// multiple writer processes, the sequence number is re-derived from the
93    /// on-disk line count under an exclusive flock so two parallel writers
94    /// never collide.
95    pub fn append(&self, event: &mut SessionEvent) -> Result<(), EventLogError> {
96        self.append_locked(event)
97    }
98
99    /// Cross-process safe append: acquires an exclusive advisory lock on a
100    /// sidecar `.lock` file, re-counts events.jsonl lines, assigns sequence_no,
101    /// writes the new event, then releases the lock on drop.
102    ///
103    /// Lock acquisition is bounded: tries to acquire for up to ~500ms via
104    /// `try_lock_exclusive` poll, then falls back to an unlocked append
105    /// with a stderr warning. A wedged or crashed writer must NOT hang
106    /// hook-driven invocations forever (PostToolUse hooks running per
107    /// tool call would freeze the agent). Better to lose strict
108    /// sequence_no monotonicity in the rare wedge case than to deadlock.
109    ///
110    /// Lock file is created mode 0o600 (owner-only) so the sidecar can
111    /// never be opened by other users on a shared machine.
112    ///
113    /// Skipped on WASM (no fs, no concurrency).
114    #[cfg(not(target_family = "wasm"))]
115    fn append_locked(&self, event: &mut SessionEvent) -> Result<(), EventLogError> {
116        use std::time::{Duration, Instant};
117
118        // Sidecar lock file: contention here doesn't block readers of events.jsonl.
119        let lock_path = self.path.with_extension("jsonl.lock");
120
121        // Open or create the lock file. On Unix we set 0o600 explicitly so
122        // the sidecar isn't group/world readable; the umask-derived default
123        // would otherwise be permissive on some setups.
124        let lock_file = open_lock_file(&lock_path)?;
125
126        // Bounded retry. With 16 parallel writers the worst case is a
127        // queue of N short-held locks; 500ms is plenty. If we fail to
128        // acquire in that window something is wedged -- fall through and
129        // append without ordering rather than freezing the caller.
130        let mut acquired = false;
131        let start = Instant::now();
132        let deadline = Duration::from_millis(500);
133        loop {
134            match lock_file.try_lock_exclusive() {
135                Ok(()) => {
136                    acquired = true;
137                    break;
138                }
139                Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
140                    if start.elapsed() >= deadline {
141                        eprintln!(
142                            "[treeship] event_log: lock contention on {} \
143                             exceeded {}ms; appending without sequence ordering guarantee",
144                            lock_path.display(),
145                            deadline.as_millis()
146                        );
147                        break;
148                    }
149                    std::thread::sleep(Duration::from_millis(10));
150                }
151                Err(e) => return Err(e.into()),
152            }
153        }
154
155        // Under the lock (or unlocked fallback): re-derive sequence_no
156        // from the actual on-disk line count. The per-process AtomicU64
157        // is a stale hint -- only the on-disk count is authoritative
158        // when multiple processes are appending.
159        let count = if self.path.exists() {
160            let f = std::fs::File::open(&self.path)?;
161            let r = std::io::BufReader::new(f);
162            r.lines().filter(|l| l.is_ok()).count() as u64
163        } else {
164            0
165        };
166        event.sequence_no = count;
167
168        let mut line = serde_json::to_vec(event)?;
169        line.push(b'\n');
170
171        let mut file = std::fs::OpenOptions::new()
172            .create(true)
173            .append(true)
174            .open(&self.path)?;
175        file.write_all(&line)?;
176        file.flush()?;
177
178        // Keep the in-process AtomicU64 in sync so non-contended callers
179        // see the right value via event_count() without re-reading.
180        self.sequence.store(count + 1, Ordering::SeqCst);
181
182        // Suppress the unused-variable warning on the unlock-fallback path.
183        let _ = acquired;
184        // lock_file drops here -> flock released (no-op if we never acquired).
185        Ok(())
186    }
187
188    /// WASM build: no filesystem locks available, no concurrent writers.
189    /// Falls back to the simple AtomicU64 path.
190    #[cfg(target_family = "wasm")]
191    fn append_locked(&self, event: &mut SessionEvent) -> Result<(), EventLogError> {
192        event.sequence_no = self.sequence.fetch_add(1, Ordering::SeqCst);
193
194        let mut line = serde_json::to_vec(event)?;
195        line.push(b'\n');
196
197        let mut file = std::fs::OpenOptions::new()
198            .create(true)
199            .append(true)
200            .open(&self.path)?;
201        file.write_all(&line)?;
202        file.flush()?;
203
204        Ok(())
205    }
206
207    /// Read all events from the log.
208    pub fn read_all(&self) -> Result<Vec<SessionEvent>, EventLogError> {
209        if !self.path.exists() {
210            return Ok(Vec::new());
211        }
212        let file = std::fs::File::open(&self.path)?;
213        let reader = std::io::BufReader::new(file);
214        let mut events = Vec::new();
215        for line in reader.lines() {
216            let line = line?;
217            if line.trim().is_empty() {
218                continue;
219            }
220            let event: SessionEvent = serde_json::from_str(&line)?;
221            events.push(event);
222        }
223        Ok(events)
224    }
225
226    /// Return the current event count.
227    pub fn event_count(&self) -> u64 {
228        self.sequence.load(Ordering::SeqCst)
229    }
230
231    /// Return the path to the JSONL file.
232    pub fn path(&self) -> &Path {
233        &self.path
234    }
235}
236
237/// Open the sidecar lock file with owner-only permissions (0o600 on Unix).
238///
239/// On Unix the mode is set atomically via `OpenOptionsExt::mode` for newly
240/// created files. For files that already exist (e.g. left over from a
241/// prior crash or an upgrade from a pre-0.9.3 CLI that didn't tighten
242/// perms), we additionally re-chmod to 0o600 after open IF the file is
243/// owned by the current user. This is best-effort: if the chmod fails
244/// (file owned by another user, read-only filesystem, etc.) we proceed
245/// silently rather than refuse to open the lock -- the lock semantics
246/// don't depend on the perms being tight, only the privacy of the
247/// sidecar's existence does.
248///
249/// On Windows the mode concept doesn't apply; ACLs default to inheriting
250/// the parent dir's permissions, which for `.treeship/sessions/<id>/`
251/// should already be scoped to the owning user.
252#[cfg(all(not(target_family = "wasm"), unix))]
253fn open_lock_file(path: &Path) -> Result<std::fs::File, std::io::Error> {
254    use std::os::unix::fs::{MetadataExt, OpenOptionsExt, PermissionsExt};
255
256    let file = std::fs::OpenOptions::new()
257        .create(true)
258        .read(true)
259        .write(true)
260        .mode(0o600)
261        .open(path)?;
262
263    // Re-tighten if a pre-existing file has loose perms. Only act when the
264    // file is owned by us (uid match) -- chmod'ing files we don't own is
265    // either pointless (will fail with EPERM) or a security smell.
266    if let Ok(meta) = file.metadata() {
267        let mode = meta.permissions().mode() & 0o777;
268        let owned_by_us = meta.uid() == nix_uid();
269        if owned_by_us && mode != 0o600 {
270            let _ = std::fs::set_permissions(path, std::fs::Permissions::from_mode(0o600));
271        }
272    }
273
274    Ok(file)
275}
276
277/// Lightweight wrapper around `geteuid` so we can compare to file ownership
278/// without pulling in the `nix` crate. Uses `libc` directly (already a
279/// transitive dep via several upstream crates).
280#[cfg(all(not(target_family = "wasm"), unix))]
281fn nix_uid() -> u32 {
282    // SAFETY: geteuid is async-signal-safe and never fails per POSIX.
283    unsafe extern "C" {
284        fn geteuid() -> u32;
285    }
286    unsafe { geteuid() }
287}
288
289#[cfg(all(not(target_family = "wasm"), not(unix)))]
290fn open_lock_file(path: &Path) -> Result<std::fs::File, std::io::Error> {
291    std::fs::OpenOptions::new()
292        .create(true)
293        .read(true)
294        .write(true)
295        .open(path)
296}
297
298#[cfg(test)]
299mod tests {
300    use super::*;
301    use crate::session::event::*;
302
303    fn make_event(session_id: &str, event_type: EventType) -> SessionEvent {
304        SessionEvent {
305            session_id: session_id.into(),
306            event_id: generate_event_id(),
307            timestamp: "2026-04-05T08:00:00Z".into(),
308            sequence_no: 0,
309            trace_id: generate_trace_id(),
310            span_id: generate_span_id(),
311            parent_span_id: None,
312            agent_id: "agent://test".into(),
313            agent_instance_id: "ai_test_1".into(),
314            agent_name: "test-agent".into(),
315            agent_role: None,
316            host_id: "host_test".into(),
317            tool_runtime_id: None,
318            event_type,
319            artifact_ref: None,
320            meta: None,
321        }
322    }
323
324    #[test]
325    fn append_and_read_back() {
326        let dir = std::env::temp_dir().join(format!("treeship-evtlog-test-{}", rand::random::<u32>()));
327        let log = EventLog::open(&dir).unwrap();
328
329        let mut e1 = make_event("ssn_001", EventType::SessionStarted);
330        let mut e2 = make_event("ssn_001", EventType::AgentStarted {
331            parent_agent_instance_id: None,
332        });
333
334        log.append(&mut e1).unwrap();
335        log.append(&mut e2).unwrap();
336
337        assert_eq!(log.event_count(), 2);
338        assert_eq!(e1.sequence_no, 0);
339        assert_eq!(e2.sequence_no, 1);
340
341        let events = log.read_all().unwrap();
342        assert_eq!(events.len(), 2);
343        assert_eq!(events[0].sequence_no, 0);
344        assert_eq!(events[1].sequence_no, 1);
345
346        let _ = std::fs::remove_dir_all(&dir);
347    }
348
349    #[test]
350    fn reopen_preserves_sequence() {
351        let dir = std::env::temp_dir().join(format!("treeship-evtlog-reopen-{}", rand::random::<u32>()));
352
353        {
354            let log = EventLog::open(&dir).unwrap();
355            let mut e = make_event("ssn_001", EventType::SessionStarted);
356            log.append(&mut e).unwrap();
357        }
358
359        // Reopen
360        let log = EventLog::open(&dir).unwrap();
361        assert_eq!(log.event_count(), 1);
362
363        let mut e2 = make_event("ssn_001", EventType::AgentStarted {
364            parent_agent_instance_id: None,
365        });
366        log.append(&mut e2).unwrap();
367        assert_eq!(e2.sequence_no, 1);
368
369        let _ = std::fs::remove_dir_all(&dir);
370    }
371
372    /// Regression test for #1 in the v0.9.3 Codex adversarial review.
373    ///
374    /// Multiple `EventLog` instances opened against the same directory must
375    /// not collide on `sequence_no`. This simulates what happens when each
376    /// `treeship session event` invocation (one per PostToolUse hook firing)
377    /// creates a fresh `EventLog` on a shared events.jsonl. Without the
378    /// flock-based re-derivation in `append_locked`, every instance sees
379    /// the same on-disk count at open time and assigns duplicate sequence
380    /// numbers.
381    #[cfg(not(target_family = "wasm"))]
382    #[test]
383    fn concurrent_appends_have_unique_sequence_numbers() {
384        use std::sync::Arc;
385        use std::thread;
386
387        let dir = std::env::temp_dir()
388            .join(format!("treeship-evtlog-race-{}", rand::random::<u32>()));
389        std::fs::create_dir_all(&dir).unwrap();
390
391        const WRITERS: usize = 16;
392        let dir = Arc::new(dir);
393        let mut handles = Vec::with_capacity(WRITERS);
394
395        for _ in 0..WRITERS {
396            let dir = Arc::clone(&dir);
397            handles.push(thread::spawn(move || {
398                // Each thread opens its OWN EventLog -- mimics a separate
399                // process invocation. Without flock, all threads would see
400                // the same line count at open() time.
401                let log = EventLog::open(&dir).unwrap();
402                let mut e = make_event("ssn_race", EventType::SessionStarted);
403                log.append(&mut e).unwrap();
404                e.sequence_no
405            }));
406        }
407
408        let mut seqs: Vec<u64> = handles.into_iter().map(|h| h.join().unwrap()).collect();
409        seqs.sort();
410
411        // All sequence numbers must be unique and contiguous 0..WRITERS.
412        let expected: Vec<u64> = (0..WRITERS as u64).collect();
413        assert_eq!(seqs, expected, "sequence_no collisions under contention");
414
415        // Same invariant from the on-disk file's perspective.
416        let log = EventLog::open(&dir).unwrap();
417        let read = log.read_all().unwrap();
418        assert_eq!(read.len(), WRITERS);
419        let mut on_disk: Vec<u64> = read.iter().map(|e| e.sequence_no).collect();
420        on_disk.sort();
421        assert_eq!(on_disk, expected);
422
423        let _ = std::fs::remove_dir_all(&*dir);
424    }
425
426    /// Sidecar lock file must be created mode 0o600 (owner-only) on Unix.
427    /// Regression test for #5 in the second Codex adversarial review.
428    #[cfg(all(not(target_family = "wasm"), unix))]
429    #[test]
430    fn lock_file_has_owner_only_permissions() {
431        use std::os::unix::fs::PermissionsExt;
432
433        let dir = std::env::temp_dir()
434            .join(format!("treeship-evtlog-perms-{}", rand::random::<u32>()));
435        let log = EventLog::open(&dir).unwrap();
436
437        let mut e = make_event("ssn_perms", EventType::SessionStarted);
438        log.append(&mut e).unwrap();
439
440        let lock_path = log.path().with_extension("jsonl.lock");
441        let meta = std::fs::metadata(&lock_path).expect("lock file must exist after first append");
442        let mode = meta.permissions().mode() & 0o777;
443        assert_eq!(
444            mode, 0o600,
445            "lock file mode is {:o}, expected 0o600 (owner-only)",
446            mode
447        );
448
449        let _ = std::fs::remove_dir_all(&dir);
450    }
451
452    /// A pre-existing lock file (e.g. from a v0.9.2 era crash) with looser
453    /// permissions must be tightened to 0o600 on next `EventLog::open`.
454    /// Regression test for the third Codex adversarial review.
455    #[cfg(all(not(target_family = "wasm"), unix))]
456    #[test]
457    fn existing_lock_file_is_re_tightened() {
458        use std::os::unix::fs::PermissionsExt;
459
460        let dir = std::env::temp_dir()
461            .join(format!("treeship-evtlog-retighten-{}", rand::random::<u32>()));
462        std::fs::create_dir_all(&dir).unwrap();
463
464        // Pre-create a lock file with deliberately loose perms, simulating
465        // an upgrade from a CLI version that didn't set 0o600.
466        let lock_path = dir.join("events.jsonl.lock");
467        std::fs::write(&lock_path, b"").unwrap();
468        std::fs::set_permissions(&lock_path, std::fs::Permissions::from_mode(0o644)).unwrap();
469        let pre_mode = std::fs::metadata(&lock_path).unwrap().permissions().mode() & 0o777;
470        assert_eq!(pre_mode, 0o644, "test setup: pre-existing perms should be 0o644");
471
472        // First append after upgrade -- should re-tighten.
473        let log = EventLog::open(&dir).unwrap();
474        let mut e = make_event("ssn_retighten", EventType::SessionStarted);
475        log.append(&mut e).unwrap();
476
477        let post_mode = std::fs::metadata(&lock_path).unwrap().permissions().mode() & 0o777;
478        assert_eq!(
479            post_mode, 0o600,
480            "lock file should be re-tightened to 0o600 after open; got {:o}",
481            post_mode
482        );
483
484        let _ = std::fs::remove_dir_all(&dir);
485    }
486}