Skip to main content

omne_cli/
event_log.rs

1//! Per-run append-only event log.
2//!
3//! Every `omne run` writes its lifecycle events to
4//! `.omne/var/runs/<run_id>/events.jsonl` — one [`Event`] per line as
5//! canonical JSON. There is deliberately no shared global log: each
6//! run is its own independent stream (plan R3, Mutability notes). This
7//! keeps parallel runs' logs disjoint and makes Unit 13 `omne status`
8//! a filesystem walk, not a parse-and-split.
9//!
10//! Atomicity layers, in order:
11//!
12//! 1. The file is opened `append + create`, so each `write` call is
13//!    positioned at the current end-of-file by the kernel, not by the
14//!    writer's cached cursor. POSIX guarantees that a single `write`
15//!    up to `PIPE_BUF` is atomic against other `append` writers; the
16//!    event-line sizes we emit (hundreds of bytes) clear that bar
17//!    comfortably. Windows' `FILE_APPEND_DATA` gives the same
18//!    guarantee.
19//! 2. A per-instance `Mutex<File>` serializes appends inside this
20//!    process so we don't rely on OS-level atomicity for multi-line
21//!    bursts or flush ordering.
22//! 3. An `fs2` advisory exclusive lock (same helper shape as
23//!    `crate::ulid`) serializes appends across processes — the MCP
24//!    server planned for post-v1 and any other tool that might open
25//!    the same file.
26//!
27//! Lock acquisition uses a 5-second default budget, same as the ULID
28//! allocator. Exceeding the budget returns [`Error::LockTimeout`].
29
30#![allow(dead_code)]
31
32use std::fs::{File, OpenOptions};
33use std::io::{BufRead, BufReader, Write};
34use std::path::{Path, PathBuf};
35use std::sync::Mutex;
36use std::thread;
37use std::time::{Duration, Instant};
38
39use fs2::FileExt;
40use thiserror::Error;
41
42use crate::events::Event;
43use crate::volume::{events_log_path, run_dir, runs_dir};
44
45/// Default budget for acquiring the per-file advisory lock. Matches
46/// the ULID allocator so callers can reason about one deadline model
47/// across both subsystems.
48pub const DEFAULT_LOCK_TIMEOUT: Duration = Duration::from_secs(5);
49
50const LOCK_POLL_INTERVAL: Duration = Duration::from_millis(25);
51
52#[derive(Debug, Error)]
53pub enum Error {
54    /// File-open, write, flush, mkdir, read-dir, or read-file error.
55    /// Carries the path for actionable messages.
56    #[error("I/O error on {path}: {source}")]
57    Io {
58        path: PathBuf,
59        #[source]
60        source: std::io::Error,
61    },
62
63    /// `serde_json::to_string` failed on an [`Event`]. Not expected in
64    /// practice (every `Event` variant is serde-derived and round-trips
65    /// against its own tests) but surfaced as a typed error rather than
66    /// an `unwrap` so production code cannot panic on an upstream event
67    /// schema regression.
68    #[error("failed to serialize event: {source}")]
69    Serialize {
70        #[source]
71        source: serde_json::Error,
72    },
73
74    /// A line in `events.jsonl` could not be parsed as a valid
75    /// [`Event`]. Includes the file path and 1-based line number so the
76    /// operator can jump to the offending record.
77    #[error("malformed event at {path}:{line_number}: {source}")]
78    Deserialize {
79        path: PathBuf,
80        line_number: usize,
81        #[source]
82        source: serde_json::Error,
83    },
84
85    /// Advisory lock on the log file was not released within the
86    /// acquire budget. Surfaced by callers as `CliError::EventLogLockTimeout`.
87    #[error("event log lock {path} was not released within the acquire budget")]
88    LockTimeout { path: PathBuf },
89}
90
91/// A long-lived writer for one run's `events.jsonl`.
92///
93/// The file handle is kept open for the lifetime of the `EventLog` so
94/// appends avoid the per-call `open` + `fsync` parent-dir overhead.
95/// Only appends are performed; readers go through the free
96/// [`read_run`] function and open the file fresh.
97pub struct EventLog {
98    path: PathBuf,
99    file: Mutex<File>,
100    lock_timeout: Duration,
101}
102
103impl EventLog {
104    /// Open (or create) the per-run log at
105    /// `.omne/var/runs/<run_id>/events.jsonl`. Parent directories are
106    /// created on demand.
107    pub fn for_run(volume_root: &Path, run_id: &str) -> Result<Self, Error> {
108        let path = events_log_path(volume_root, run_id);
109        if let Some(parent) = path.parent() {
110            std::fs::create_dir_all(parent).map_err(|source| Error::Io {
111                path: parent.to_path_buf(),
112                source,
113            })?;
114        }
115        let file = OpenOptions::new()
116            .create(true)
117            .append(true)
118            .read(true)
119            .open(&path)
120            .map_err(|source| Error::Io {
121                path: path.clone(),
122                source,
123            })?;
124        Ok(Self {
125            path,
126            file: Mutex::new(file),
127            lock_timeout: DEFAULT_LOCK_TIMEOUT,
128        })
129    }
130
131    /// Path to the underlying `events.jsonl`. Exposed for tests and
132    /// for callers that need to emit the path in a log line.
133    pub fn path(&self) -> &Path {
134        &self.path
135    }
136
137    /// Append one event as a JSON line.
138    ///
139    /// Acquires the per-instance mutex, then the cross-process `fs2`
140    /// advisory lock, writes exactly one line terminated by `\n`,
141    /// flushes, and releases the advisory lock. The mutex is released
142    /// when the guard drops.
143    pub fn append(&self, event: &Event) -> Result<(), Error> {
144        let line = serde_json::to_string(event).map_err(|source| Error::Serialize { source })?;
145
146        let mut guard = self
147            .file
148            .lock()
149            .expect("event-log mutex poisoned by a prior panic in append");
150
151        acquire_lock(&guard, &self.path, self.lock_timeout)?;
152
153        let write_result = (|| -> Result<(), Error> {
154            guard
155                .write_all(line.as_bytes())
156                .map_err(|source| Error::Io {
157                    path: self.path.clone(),
158                    source,
159                })?;
160            guard.write_all(b"\n").map_err(|source| Error::Io {
161                path: self.path.clone(),
162                source,
163            })?;
164            guard.flush().map_err(|source| Error::Io {
165                path: self.path.clone(),
166                source,
167            })
168        })();
169
170        // Release advisory lock regardless of outcome. Dropping the
171        // `File` would release it too, but we keep the handle for the
172        // next append — so unlock explicitly.
173        let _ = FileExt::unlock(&*guard);
174
175        write_result
176    }
177}
178
179/// Read every event in one run's `events.jsonl`, in order.
180///
181/// Opens the file fresh (not via a live [`EventLog`]), so it is safe
182/// to call from `omne status` while a writer is still running — the
183/// reader sees a snapshot of events already flushed to disk. A
184/// malformed line returns [`Error::Deserialize`] with the 1-based line
185/// number for the first bad record.
186pub fn read_run(volume_root: &Path, run_id: &str) -> Result<Vec<Event>, Error> {
187    let path = events_log_path(volume_root, run_id);
188    let file = File::open(&path).map_err(|source| Error::Io {
189        path: path.clone(),
190        source,
191    })?;
192    let mut events = Vec::new();
193    for (i, line) in BufReader::new(file).lines().enumerate() {
194        let line = line.map_err(|source| Error::Io {
195            path: path.clone(),
196            source,
197        })?;
198        if line.trim().is_empty() {
199            continue;
200        }
201        let event = serde_json::from_str(&line).map_err(|source| Error::Deserialize {
202            path: path.clone(),
203            line_number: i + 1,
204            source,
205        })?;
206        events.push(event);
207    }
208    Ok(events)
209}
210
211/// Enumerate run IDs by walking `.omne/var/runs/`.
212///
213/// A run ID is reported if `.omne/var/runs/<id>/` exists as a
214/// directory — `events.jsonl` is not required to be present yet (the
215/// writer creates it on first `append`, and `omne run` pre-creates
216/// the run dir during preflight). Missing `runs/` returns `Ok(vec![])`.
217pub fn enumerate_runs(volume_root: &Path) -> Result<Vec<String>, Error> {
218    let dir = runs_dir(volume_root);
219    if !dir.is_dir() {
220        return Ok(Vec::new());
221    }
222    let entries = std::fs::read_dir(&dir).map_err(|source| Error::Io {
223        path: dir.clone(),
224        source,
225    })?;
226    let mut ids = Vec::new();
227    for entry in entries {
228        let entry = entry.map_err(|source| Error::Io {
229            path: dir.clone(),
230            source,
231        })?;
232        let is_dir = entry.file_type().map(|t| t.is_dir()).unwrap_or(false);
233        if !is_dir {
234            continue;
235        }
236        if let Some(name) = entry.file_name().to_str() {
237            ids.push(name.to_string());
238        }
239    }
240    ids.sort();
241    Ok(ids)
242}
243
244/// Check whether a run's directory exists. Convenience for preflight
245/// paths (`omne run` collision detection, `omne status` lookup) that
246/// want a typed boolean rather than a `read_dir` probe.
247pub fn run_exists(volume_root: &Path, run_id: &str) -> bool {
248    run_dir(volume_root, run_id).is_dir()
249}
250
251fn acquire_lock(file: &File, path: &Path, timeout: Duration) -> Result<(), Error> {
252    // Same portable sentinel trick as `crate::ulid`: on Unix `flock`
253    // returns `WouldBlock`, while Windows returns `ERROR_LOCK_VIOLATION`
254    // which Rust classifies as `Uncategorized`. Match either.
255    let contended_os = fs2::lock_contended_error().raw_os_error();
256    let deadline = Instant::now() + timeout;
257    loop {
258        if Instant::now() >= deadline {
259            return Err(Error::LockTimeout {
260                path: path.to_path_buf(),
261            });
262        }
263        match FileExt::try_lock_exclusive(file) {
264            Ok(()) => return Ok(()),
265            Err(e)
266                if e.kind() == std::io::ErrorKind::WouldBlock
267                    || e.raw_os_error() == contended_os =>
268            {
269                let remaining = deadline.saturating_duration_since(Instant::now());
270                thread::sleep(LOCK_POLL_INTERVAL.min(remaining));
271            }
272            Err(source) => {
273                return Err(Error::Io {
274                    path: path.to_path_buf(),
275                    source,
276                });
277            }
278        }
279    }
280}
281
282#[cfg(test)]
283mod tests {
284    use super::*;
285    use crate::events::{NodeKind, NodeStarted};
286    use tempfile::TempDir;
287
288    fn node_started(run_id: &str, node_id: &str, seq: usize) -> Event {
289        Event::NodeStarted(NodeStarted {
290            id: format!("ev-{seq:04}"),
291            ts: "2026-04-15T00:00:00Z".to_string(),
292            run_id: run_id.to_string(),
293            node_id: node_id.to_string(),
294            kind: NodeKind::Bash,
295            name: None,
296            model: None,
297        })
298    }
299
300    #[test]
301    fn append_then_read_roundtrip_one_event() {
302        let tmp = TempDir::new().unwrap();
303        let log = EventLog::for_run(tmp.path(), "pipe-01").expect("open log");
304        log.append(&node_started("pipe-01", "n1", 1))
305            .expect("append");
306        drop(log);
307
308        let events = read_run(tmp.path(), "pipe-01").expect("read");
309        assert_eq!(events.len(), 1);
310        assert_eq!(events[0].run_id(), "pipe-01");
311        assert_eq!(events[0].event_type(), "node.started");
312    }
313
314    #[test]
315    fn hundred_sequential_appends_produce_hundred_parseable_lines() {
316        let tmp = TempDir::new().unwrap();
317        let log = EventLog::for_run(tmp.path(), "pipe-seq").unwrap();
318        for i in 0..100 {
319            log.append(&node_started("pipe-seq", &format!("n{i}"), i))
320                .unwrap();
321        }
322        drop(log);
323
324        let events = read_run(tmp.path(), "pipe-seq").unwrap();
325        assert_eq!(events.len(), 100);
326        // Preserves append order.
327        for (i, ev) in events.iter().enumerate() {
328            if let Event::NodeStarted(ns) = ev {
329                assert_eq!(ns.node_id, format!("n{i}"));
330            } else {
331                panic!("unexpected event variant at index {i}");
332            }
333        }
334    }
335
336    #[test]
337    fn for_run_creates_parent_directories() {
338        let tmp = TempDir::new().unwrap();
339        // Nothing under tmp.path() yet — not even `.omne/`.
340        let log = EventLog::for_run(tmp.path(), "pipe-fresh").expect("open");
341        assert!(log.path().parent().unwrap().is_dir());
342        log.append(&node_started("pipe-fresh", "n1", 1)).unwrap();
343    }
344
345    #[test]
346    fn read_run_on_malformed_line_reports_line_number() {
347        let tmp = TempDir::new().unwrap();
348        let log = EventLog::for_run(tmp.path(), "pipe-bad").unwrap();
349        log.append(&node_started("pipe-bad", "n1", 1)).unwrap();
350        // Corrupt line 2.
351        let mut raw = std::fs::OpenOptions::new()
352            .append(true)
353            .open(log.path())
354            .unwrap();
355        writeln!(raw, "not-json").unwrap();
356        drop(raw);
357        drop(log);
358
359        let err = read_run(tmp.path(), "pipe-bad").unwrap_err();
360        match err {
361            Error::Deserialize { line_number, .. } => assert_eq!(line_number, 2),
362            other => panic!("expected Deserialize at line 2, got {other:?}"),
363        }
364    }
365
366    #[test]
367    fn enumerate_runs_lists_sorted_directory_names() {
368        let tmp = TempDir::new().unwrap();
369        EventLog::for_run(tmp.path(), "pipe-b").unwrap();
370        EventLog::for_run(tmp.path(), "pipe-a").unwrap();
371        EventLog::for_run(tmp.path(), "pipe-c").unwrap();
372        // Drop `events.jsonl`-less run dir: for_run always creates
373        // events.jsonl, so create one manually to assert the
374        // "events.jsonl not required to be present" clause.
375        std::fs::create_dir_all(runs_dir(tmp.path()).join("pipe-pending")).unwrap();
376
377        let ids = enumerate_runs(tmp.path()).unwrap();
378        assert_eq!(ids, vec!["pipe-a", "pipe-b", "pipe-c", "pipe-pending"]);
379    }
380
381    #[test]
382    fn enumerate_runs_on_missing_var_runs_returns_empty() {
383        let tmp = TempDir::new().unwrap();
384        let ids = enumerate_runs(tmp.path()).expect("empty volume");
385        assert!(ids.is_empty());
386    }
387
388    #[test]
389    fn run_exists_discriminates_created_from_absent() {
390        let tmp = TempDir::new().unwrap();
391        assert!(!run_exists(tmp.path(), "pipe-no"));
392        EventLog::for_run(tmp.path(), "pipe-yes").unwrap();
393        assert!(run_exists(tmp.path(), "pipe-yes"));
394    }
395}