omne-cli 0.2.0

CLI for managing omne volumes: init, upgrade, and validate kernel and distro releases
Documentation
//! Per-run append-only event log.
//!
//! Every `omne run` writes its lifecycle events to
//! `.omne/var/runs/<run_id>/events.jsonl` — one [`Event`] per line as
//! canonical JSON. There is deliberately no shared global log: each
//! run is its own independent stream (plan R3, Mutability notes). This
//! keeps parallel runs' logs disjoint and makes Unit 13 `omne status`
//! a filesystem walk, not a parse-and-split.
//!
//! Atomicity layers, in order:
//!
//! 1. The file is opened `append + create`, so each `write` call is
//!    positioned at the current end-of-file by the kernel, not by the
//!    writer's cached cursor. POSIX guarantees that a single `write`
//!    up to `PIPE_BUF` is atomic against other `append` writers; the
//!    event-line sizes we emit (hundreds of bytes) clear that bar
//!    comfortably. Windows' `FILE_APPEND_DATA` gives the same
//!    guarantee.
//! 2. A per-instance `Mutex<File>` serializes appends inside this
//!    process so we don't rely on OS-level atomicity for multi-line
//!    bursts or flush ordering.
//! 3. An `fs2` advisory exclusive lock (same helper shape as
//!    `crate::ulid`) serializes appends across processes — the MCP
//!    server planned for post-v1 and any other tool that might open
//!    the same file.
//!
//! Lock acquisition uses a 5-second default budget, same as the ULID
//! allocator. Exceeding the budget returns [`Error::LockTimeout`].

#![allow(dead_code)]

use std::fs::{File, OpenOptions};
use std::io::{BufRead, BufReader, Write};
use std::path::{Path, PathBuf};
use std::sync::Mutex;
use std::thread;
use std::time::{Duration, Instant};

use fs2::FileExt;
use thiserror::Error;

use crate::events::Event;
use crate::volume::{events_log_path, run_dir, runs_dir};

/// Default budget for acquiring the per-file advisory lock. Matches
/// the ULID allocator so callers can reason about one deadline model
/// across both subsystems.
pub const DEFAULT_LOCK_TIMEOUT: Duration = Duration::from_secs(5);

const LOCK_POLL_INTERVAL: Duration = Duration::from_millis(25);

#[derive(Debug, Error)]
pub enum Error {
    /// File-open, write, flush, mkdir, read-dir, or read-file error.
    /// Carries the path for actionable messages.
    #[error("I/O error on {path}: {source}")]
    Io {
        path: PathBuf,
        #[source]
        source: std::io::Error,
    },

    /// `serde_json::to_string` failed on an [`Event`]. Not expected in
    /// practice (every `Event` variant is serde-derived and round-trips
    /// against its own tests) but surfaced as a typed error rather than
    /// an `unwrap` so production code cannot panic on an upstream event
    /// schema regression.
    #[error("failed to serialize event: {source}")]
    Serialize {
        #[source]
        source: serde_json::Error,
    },

    /// A line in `events.jsonl` could not be parsed as a valid
    /// [`Event`]. Includes the file path and 1-based line number so the
    /// operator can jump to the offending record.
    #[error("malformed event at {path}:{line_number}: {source}")]
    Deserialize {
        path: PathBuf,
        line_number: usize,
        #[source]
        source: serde_json::Error,
    },

    /// Advisory lock on the log file was not released within the
    /// acquire budget. Surfaced by callers as `CliError::EventLogLockTimeout`.
    #[error("event log lock {path} was not released within the acquire budget")]
    LockTimeout { path: PathBuf },
}

/// A long-lived writer for one run's `events.jsonl`.
///
/// The file handle is kept open for the lifetime of the `EventLog` so
/// appends avoid the per-call `open` + `fsync` parent-dir overhead.
/// Only appends are performed; readers go through the free
/// [`read_run`] function and open the file fresh.
pub struct EventLog {
    path: PathBuf,
    file: Mutex<File>,
    lock_timeout: Duration,
}

impl EventLog {
    /// Open (or create) the per-run log at
    /// `.omne/var/runs/<run_id>/events.jsonl`. Parent directories are
    /// created on demand.
    pub fn for_run(volume_root: &Path, run_id: &str) -> Result<Self, Error> {
        let path = events_log_path(volume_root, run_id);
        if let Some(parent) = path.parent() {
            std::fs::create_dir_all(parent).map_err(|source| Error::Io {
                path: parent.to_path_buf(),
                source,
            })?;
        }
        let file = OpenOptions::new()
            .create(true)
            .append(true)
            .read(true)
            .open(&path)
            .map_err(|source| Error::Io {
                path: path.clone(),
                source,
            })?;
        Ok(Self {
            path,
            file: Mutex::new(file),
            lock_timeout: DEFAULT_LOCK_TIMEOUT,
        })
    }

    /// Path to the underlying `events.jsonl`. Exposed for tests and
    /// for callers that need to emit the path in a log line.
    pub fn path(&self) -> &Path {
        &self.path
    }

    /// Append one event as a JSON line.
    ///
    /// Acquires the per-instance mutex, then the cross-process `fs2`
    /// advisory lock, writes exactly one line terminated by `\n`,
    /// flushes, and releases the advisory lock. The mutex is released
    /// when the guard drops.
    pub fn append(&self, event: &Event) -> Result<(), Error> {
        let line = serde_json::to_string(event).map_err(|source| Error::Serialize { source })?;

        let mut guard = self
            .file
            .lock()
            .expect("event-log mutex poisoned by a prior panic in append");

        acquire_lock(&guard, &self.path, self.lock_timeout)?;

        let write_result = (|| -> Result<(), Error> {
            guard
                .write_all(line.as_bytes())
                .map_err(|source| Error::Io {
                    path: self.path.clone(),
                    source,
                })?;
            guard.write_all(b"\n").map_err(|source| Error::Io {
                path: self.path.clone(),
                source,
            })?;
            guard.flush().map_err(|source| Error::Io {
                path: self.path.clone(),
                source,
            })
        })();

        // Release advisory lock regardless of outcome. Dropping the
        // `File` would release it too, but we keep the handle for the
        // next append — so unlock explicitly.
        let _ = FileExt::unlock(&*guard);

        write_result
    }
}

/// Read every event in one run's `events.jsonl`, in order.
///
/// Opens the file fresh (not via a live [`EventLog`]), so it is safe
/// to call from `omne status` while a writer is still running — the
/// reader sees a snapshot of events already flushed to disk. A
/// malformed line returns [`Error::Deserialize`] with the 1-based line
/// number for the first bad record.
pub fn read_run(volume_root: &Path, run_id: &str) -> Result<Vec<Event>, Error> {
    let path = events_log_path(volume_root, run_id);
    let file = File::open(&path).map_err(|source| Error::Io {
        path: path.clone(),
        source,
    })?;
    let mut events = Vec::new();
    for (i, line) in BufReader::new(file).lines().enumerate() {
        let line = line.map_err(|source| Error::Io {
            path: path.clone(),
            source,
        })?;
        if line.trim().is_empty() {
            continue;
        }
        let event = serde_json::from_str(&line).map_err(|source| Error::Deserialize {
            path: path.clone(),
            line_number: i + 1,
            source,
        })?;
        events.push(event);
    }
    Ok(events)
}

/// Enumerate run IDs by walking `.omne/var/runs/`.
///
/// A run ID is reported if `.omne/var/runs/<id>/` exists as a
/// directory — `events.jsonl` is not required to be present yet (the
/// writer creates it on first `append`, and `omne run` pre-creates
/// the run dir during preflight). Missing `runs/` returns `Ok(vec![])`.
pub fn enumerate_runs(volume_root: &Path) -> Result<Vec<String>, Error> {
    let dir = runs_dir(volume_root);
    if !dir.is_dir() {
        return Ok(Vec::new());
    }
    let entries = std::fs::read_dir(&dir).map_err(|source| Error::Io {
        path: dir.clone(),
        source,
    })?;
    let mut ids = Vec::new();
    for entry in entries {
        let entry = entry.map_err(|source| Error::Io {
            path: dir.clone(),
            source,
        })?;
        let is_dir = entry.file_type().map(|t| t.is_dir()).unwrap_or(false);
        if !is_dir {
            continue;
        }
        if let Some(name) = entry.file_name().to_str() {
            ids.push(name.to_string());
        }
    }
    ids.sort();
    Ok(ids)
}

/// Check whether a run's directory exists. Convenience for preflight
/// paths (`omne run` collision detection, `omne status` lookup) that
/// want a typed boolean rather than a `read_dir` probe.
pub fn run_exists(volume_root: &Path, run_id: &str) -> bool {
    run_dir(volume_root, run_id).is_dir()
}

fn acquire_lock(file: &File, path: &Path, timeout: Duration) -> Result<(), Error> {
    // Same portable sentinel trick as `crate::ulid`: on Unix `flock`
    // returns `WouldBlock`, while Windows returns `ERROR_LOCK_VIOLATION`
    // which Rust classifies as `Uncategorized`. Match either.
    let contended_os = fs2::lock_contended_error().raw_os_error();
    let deadline = Instant::now() + timeout;
    loop {
        if Instant::now() >= deadline {
            return Err(Error::LockTimeout {
                path: path.to_path_buf(),
            });
        }
        match FileExt::try_lock_exclusive(file) {
            Ok(()) => return Ok(()),
            Err(e)
                if e.kind() == std::io::ErrorKind::WouldBlock
                    || e.raw_os_error() == contended_os =>
            {
                let remaining = deadline.saturating_duration_since(Instant::now());
                thread::sleep(LOCK_POLL_INTERVAL.min(remaining));
            }
            Err(source) => {
                return Err(Error::Io {
                    path: path.to_path_buf(),
                    source,
                });
            }
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::events::{NodeKind, NodeStarted};
    use tempfile::TempDir;

    fn node_started(run_id: &str, node_id: &str, seq: usize) -> Event {
        Event::NodeStarted(NodeStarted {
            id: format!("ev-{seq:04}"),
            ts: "2026-04-15T00:00:00Z".to_string(),
            run_id: run_id.to_string(),
            node_id: node_id.to_string(),
            kind: NodeKind::Bash,
            name: None,
            model: None,
        })
    }

    #[test]
    fn append_then_read_roundtrip_one_event() {
        let tmp = TempDir::new().unwrap();
        let log = EventLog::for_run(tmp.path(), "pipe-01").expect("open log");
        log.append(&node_started("pipe-01", "n1", 1))
            .expect("append");
        drop(log);

        let events = read_run(tmp.path(), "pipe-01").expect("read");
        assert_eq!(events.len(), 1);
        assert_eq!(events[0].run_id(), "pipe-01");
        assert_eq!(events[0].event_type(), "node.started");
    }

    #[test]
    fn hundred_sequential_appends_produce_hundred_parseable_lines() {
        let tmp = TempDir::new().unwrap();
        let log = EventLog::for_run(tmp.path(), "pipe-seq").unwrap();
        for i in 0..100 {
            log.append(&node_started("pipe-seq", &format!("n{i}"), i))
                .unwrap();
        }
        drop(log);

        let events = read_run(tmp.path(), "pipe-seq").unwrap();
        assert_eq!(events.len(), 100);
        // Preserves append order.
        for (i, ev) in events.iter().enumerate() {
            if let Event::NodeStarted(ns) = ev {
                assert_eq!(ns.node_id, format!("n{i}"));
            } else {
                panic!("unexpected event variant at index {i}");
            }
        }
    }

    #[test]
    fn for_run_creates_parent_directories() {
        let tmp = TempDir::new().unwrap();
        // Nothing under tmp.path() yet — not even `.omne/`.
        let log = EventLog::for_run(tmp.path(), "pipe-fresh").expect("open");
        assert!(log.path().parent().unwrap().is_dir());
        log.append(&node_started("pipe-fresh", "n1", 1)).unwrap();
    }

    #[test]
    fn read_run_on_malformed_line_reports_line_number() {
        let tmp = TempDir::new().unwrap();
        let log = EventLog::for_run(tmp.path(), "pipe-bad").unwrap();
        log.append(&node_started("pipe-bad", "n1", 1)).unwrap();
        // Corrupt line 2.
        let mut raw = std::fs::OpenOptions::new()
            .append(true)
            .open(log.path())
            .unwrap();
        writeln!(raw, "not-json").unwrap();
        drop(raw);
        drop(log);

        let err = read_run(tmp.path(), "pipe-bad").unwrap_err();
        match err {
            Error::Deserialize { line_number, .. } => assert_eq!(line_number, 2),
            other => panic!("expected Deserialize at line 2, got {other:?}"),
        }
    }

    #[test]
    fn enumerate_runs_lists_sorted_directory_names() {
        let tmp = TempDir::new().unwrap();
        EventLog::for_run(tmp.path(), "pipe-b").unwrap();
        EventLog::for_run(tmp.path(), "pipe-a").unwrap();
        EventLog::for_run(tmp.path(), "pipe-c").unwrap();
        // Drop `events.jsonl`-less run dir: for_run always creates
        // events.jsonl, so create one manually to assert the
        // "events.jsonl not required to be present" clause.
        std::fs::create_dir_all(runs_dir(tmp.path()).join("pipe-pending")).unwrap();

        let ids = enumerate_runs(tmp.path()).unwrap();
        assert_eq!(ids, vec!["pipe-a", "pipe-b", "pipe-c", "pipe-pending"]);
    }

    #[test]
    fn enumerate_runs_on_missing_var_runs_returns_empty() {
        let tmp = TempDir::new().unwrap();
        let ids = enumerate_runs(tmp.path()).expect("empty volume");
        assert!(ids.is_empty());
    }

    #[test]
    fn run_exists_discriminates_created_from_absent() {
        let tmp = TempDir::new().unwrap();
        assert!(!run_exists(tmp.path(), "pipe-no"));
        EventLog::for_run(tmp.path(), "pipe-yes").unwrap();
        assert!(run_exists(tmp.path(), "pipe-yes"));
    }
}