omne-cli 0.2.1

CLI for managing omne volumes: init, upgrade, and validate kernel and distro releases
Documentation
//! Derive per-run and per-node state from `events.jsonl`.
//!
//! Pure logic over [`Event`] sequences — no I/O. Callers read events
//! via [`crate::event_log::read_run`] and feed them here. Two entry
//! points:
//!
//! - [`derive`] — full per-node breakdown for `omne status <run_id>`.
//! - [`summarize`] — one-line summary for the global `omne status`
//!   listing (pipe state + last timestamp, no per-node detail).

#![allow(dead_code)]

use crate::events::{ErrorKind, Event, NodeKind};

/// Per-pipe lifecycle state.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum PipeState {
    Running,
    Completed,
    Aborted { reason: String },
}

/// Per-node lifecycle state derived from the event stream.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum NodeStatus {
    Pending,
    Running,
    Completed,
    Failed {
        kind: ErrorKind,
        message: Option<String>,
    },
}

/// Per-node info derived from events.
#[derive(Debug, Clone)]
pub struct NodeInfo {
    pub id: String,
    pub kind: Option<NodeKind>,
    pub status: NodeStatus,
    pub last_ts: Option<String>,
}

/// Full derived state for one run.
#[derive(Debug)]
pub struct RunState {
    pub run_id: String,
    pub pipe: String,
    pub state: PipeState,
    pub nodes: Vec<NodeInfo>,
    pub is_orphan: bool,
    pub last_ts: String,
}

/// One-line summary for the global listing.
#[derive(Debug)]
pub struct RunSummary {
    pub run_id: String,
    pub pipe: String,
    pub state: PipeState,
    pub is_orphan: bool,
    pub last_ts: String,
    pub node_count: usize,
    pub completed_count: usize,
    pub failed_count: usize,
}

/// Derive full per-node state from a run's event stream.
pub fn derive(run_id: &str, events: &[Event]) -> RunState {
    let mut pipe_name = String::new();
    let mut pipe_state = PipeState::Running;
    let mut last_ts = String::new();
    let mut nodes: Vec<NodeInfo> = Vec::new();
    let mut node_index: std::collections::HashMap<String, usize> = std::collections::HashMap::new();

    for event in events {
        match event {
            Event::PipeStarted(e) => {
                pipe_name = e.pipe.clone();
                last_ts = e.ts.clone();
            }
            Event::PipeCompleted(e) => {
                pipe_state = PipeState::Completed;
                last_ts = e.ts.clone();
            }
            Event::PipeAborted(e) => {
                pipe_state = PipeState::Aborted {
                    reason: e.reason.clone(),
                };
                last_ts = e.ts.clone();
            }
            Event::NodeStarted(e) => {
                last_ts = e.ts.clone();
                if let Some(&idx) = node_index.get(&e.node_id) {
                    nodes[idx].status = NodeStatus::Running;
                    nodes[idx].kind = Some(e.kind);
                    nodes[idx].last_ts = Some(e.ts.clone());
                } else {
                    let idx = nodes.len();
                    node_index.insert(e.node_id.clone(), idx);
                    nodes.push(NodeInfo {
                        id: e.node_id.clone(),
                        kind: Some(e.kind),
                        status: NodeStatus::Running,
                        last_ts: Some(e.ts.clone()),
                    });
                }
            }
            Event::NodeCompleted(e) => {
                last_ts = e.ts.clone();
                let idx = get_or_insert_node(&mut nodes, &mut node_index, &e.node_id);
                nodes[idx].status = NodeStatus::Completed;
                nodes[idx].last_ts = Some(e.ts.clone());
            }
            Event::NodeFailed(e) => {
                last_ts = e.ts.clone();
                let idx = get_or_insert_node(&mut nodes, &mut node_index, &e.node_id);
                nodes[idx].status = NodeStatus::Failed {
                    kind: e.error.kind,
                    message: e.message.clone(),
                };
                nodes[idx].last_ts = Some(e.ts.clone());
            }
            Event::GatePassed(e) => {
                last_ts = e.ts.clone();
            }
            Event::IterationStarted(e) => {
                last_ts = e.ts.clone();
            }
        }
    }

    let is_orphan = detect_orphan(&nodes, &pipe_state);

    RunState {
        run_id: run_id.to_string(),
        pipe: pipe_name,
        state: pipe_state,
        nodes,
        is_orphan,
        last_ts,
    }
}

/// Derive a one-line summary. Wraps [`derive`] and extracts counts.
pub fn summarize(run_id: &str, events: &[Event]) -> RunSummary {
    let state = derive(run_id, events);
    let node_count = state.nodes.len();
    let completed_count = state
        .nodes
        .iter()
        .filter(|n| n.status == NodeStatus::Completed)
        .count();
    let failed_count = state
        .nodes
        .iter()
        .filter(|n| matches!(n.status, NodeStatus::Failed { .. }))
        .count();
    RunSummary {
        run_id: state.run_id,
        pipe: state.pipe,
        state: state.state,
        is_orphan: state.is_orphan,
        last_ts: state.last_ts,
        node_count,
        completed_count,
        failed_count,
    }
}

/// Look up or create a node entry. Used by terminal events
/// (`NodeCompleted`/`NodeFailed`) so a missing `node.started` (corrupt
/// log, partial flush) still surfaces the node in output.
fn get_or_insert_node(
    nodes: &mut Vec<NodeInfo>,
    node_index: &mut std::collections::HashMap<String, usize>,
    node_id: &str,
) -> usize {
    if let Some(&idx) = node_index.get(node_id) {
        return idx;
    }
    let idx = nodes.len();
    node_index.insert(node_id.to_string(), idx);
    nodes.push(NodeInfo {
        id: node_id.to_string(),
        kind: None,
        status: NodeStatus::Pending,
        last_ts: None,
    });
    idx
}

/// A run is orphaned if the pipe is still `Running` but has at least
/// one node stuck in `Running` with no terminal pipe event. This
/// indicates the runner process died mid-execution.
fn detect_orphan(nodes: &[NodeInfo], pipe_state: &PipeState) -> bool {
    if *pipe_state != PipeState::Running {
        return false;
    }
    nodes.iter().any(|n| n.status == NodeStatus::Running)
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::events::*;

    fn pipe_started(run_id: &str, pipe: &str) -> Event {
        Event::PipeStarted(PipeStarted {
            id: "ev-0001".to_string(),
            ts: "2026-04-16T00:00:00Z".to_string(),
            run_id: run_id.to_string(),
            pipe: pipe.to_string(),
            inputs: vec![],
            distro_version: String::new(),
        })
    }

    fn node_started(run_id: &str, node_id: &str, kind: NodeKind) -> Event {
        Event::NodeStarted(NodeStarted {
            id: "ev-0002".to_string(),
            ts: "2026-04-16T00:00:01Z".to_string(),
            run_id: run_id.to_string(),
            node_id: node_id.to_string(),
            kind,
            name: None,
            model: None,
        })
    }

    fn node_completed(run_id: &str, node_id: &str) -> Event {
        Event::NodeCompleted(NodeCompleted {
            id: "ev-0003".to_string(),
            ts: "2026-04-16T00:00:02Z".to_string(),
            run_id: run_id.to_string(),
            node_id: node_id.to_string(),
            output_path: format!(".omne/var/runs/{run_id}/nodes/{node_id}.out"),
        })
    }

    fn node_failed(run_id: &str, node_id: &str, kind: ErrorKind) -> Event {
        Event::NodeFailed(NodeFailed {
            id: "ev-0004".to_string(),
            ts: "2026-04-16T00:00:03Z".to_string(),
            run_id: run_id.to_string(),
            node_id: node_id.to_string(),
            error: NodeError { kind },
            message: None,
        })
    }

    fn pipe_completed(run_id: &str) -> Event {
        Event::PipeCompleted(PipeCompleted {
            id: "ev-0005".to_string(),
            ts: "2026-04-16T00:00:10Z".to_string(),
            run_id: run_id.to_string(),
        })
    }

    fn pipe_aborted(run_id: &str, reason: &str) -> Event {
        Event::PipeAborted(PipeAborted {
            id: "ev-0006".to_string(),
            ts: "2026-04-16T00:00:10Z".to_string(),
            run_id: run_id.to_string(),
            reason: reason.to_string(),
        })
    }

    #[test]
    fn completed_run_derives_correctly() {
        let events = vec![
            pipe_started("r1", "build"),
            node_started("r1", "n1", NodeKind::Bash),
            node_completed("r1", "n1"),
            pipe_completed("r1"),
        ];
        let state = derive("r1", &events);
        assert_eq!(state.pipe, "build");
        assert_eq!(state.state, PipeState::Completed);
        assert!(!state.is_orphan);
        assert_eq!(state.nodes.len(), 1);
        assert_eq!(state.nodes[0].status, NodeStatus::Completed);
    }

    #[test]
    fn aborted_run_derives_correctly() {
        let events = vec![
            pipe_started("r2", "deploy"),
            node_started("r2", "n1", NodeKind::Command),
            node_failed("r2", "n1", ErrorKind::Blocked),
            pipe_aborted("r2", "failed nodes: n1"),
        ];
        let state = derive("r2", &events);
        assert_eq!(
            state.state,
            PipeState::Aborted {
                reason: "failed nodes: n1".to_string()
            }
        );
        assert!(!state.is_orphan);
        assert!(matches!(
            state.nodes[0].status,
            NodeStatus::Failed {
                kind: ErrorKind::Blocked,
                ..
            }
        ));
    }

    #[test]
    fn orphan_detected_when_node_stuck_running() {
        let events = vec![
            pipe_started("r3", "test"),
            node_started("r3", "n1", NodeKind::Bash),
        ];
        let state = derive("r3", &events);
        assert_eq!(state.state, PipeState::Running);
        assert!(state.is_orphan);
    }

    #[test]
    fn no_orphan_when_pipe_completed() {
        let events = vec![
            pipe_started("r4", "test"),
            node_started("r4", "n1", NodeKind::Bash),
            node_completed("r4", "n1"),
            pipe_completed("r4"),
        ];
        assert!(!derive("r4", &events).is_orphan);
    }

    #[test]
    fn empty_events_yields_running_no_orphan() {
        let state = derive("r5", &[]);
        assert_eq!(state.state, PipeState::Running);
        assert!(!state.is_orphan);
        assert!(state.nodes.is_empty());
    }

    #[test]
    fn summary_counts_nodes() {
        let events = vec![
            pipe_started("r6", "ci"),
            node_started("r6", "lint", NodeKind::Bash),
            node_completed("r6", "lint"),
            node_started("r6", "test", NodeKind::Bash),
            node_failed("r6", "test", ErrorKind::Crash),
            pipe_aborted("r6", "failed nodes: test"),
        ];
        let summary = summarize("r6", &events);
        assert_eq!(summary.node_count, 2);
        assert_eq!(summary.completed_count, 1);
        assert_eq!(summary.failed_count, 1);
    }
}