Skip to main content

omne_cli/
run_state.rs

1//! Derive per-run and per-node state from `events.jsonl`.
2//!
3//! Pure logic over [`Event`] sequences — no I/O. Callers read events
4//! via [`crate::event_log::read_run`] and feed them here. Two entry
5//! points:
6//!
7//! - [`derive`] — full per-node breakdown for `omne status <run_id>`.
8//! - [`summarize`] — one-line summary for the global `omne status`
9//!   listing (pipe state + last timestamp, no per-node detail).
10
11#![allow(dead_code)]
12
13use crate::events::{ErrorKind, Event, NodeKind};
14
15/// Per-pipe lifecycle state.
16#[derive(Debug, Clone, PartialEq, Eq)]
17pub enum PipeState {
18    Running,
19    Completed,
20    Aborted { reason: String },
21}
22
23/// Per-node lifecycle state derived from the event stream.
24#[derive(Debug, Clone, PartialEq, Eq)]
25pub enum NodeStatus {
26    Pending,
27    Running,
28    Completed,
29    Failed {
30        kind: ErrorKind,
31        message: Option<String>,
32    },
33}
34
35/// Per-node info derived from events.
36#[derive(Debug, Clone)]
37pub struct NodeInfo {
38    pub id: String,
39    pub kind: Option<NodeKind>,
40    pub status: NodeStatus,
41    pub last_ts: Option<String>,
42}
43
44/// Full derived state for one run.
45#[derive(Debug)]
46pub struct RunState {
47    pub run_id: String,
48    pub pipe: String,
49    pub state: PipeState,
50    pub nodes: Vec<NodeInfo>,
51    pub is_orphan: bool,
52    pub last_ts: String,
53}
54
55/// One-line summary for the global listing.
56#[derive(Debug)]
57pub struct RunSummary {
58    pub run_id: String,
59    pub pipe: String,
60    pub state: PipeState,
61    pub is_orphan: bool,
62    pub last_ts: String,
63    pub node_count: usize,
64    pub completed_count: usize,
65    pub failed_count: usize,
66}
67
68/// Derive full per-node state from a run's event stream.
69pub fn derive(run_id: &str, events: &[Event]) -> RunState {
70    let mut pipe_name = String::new();
71    let mut pipe_state = PipeState::Running;
72    let mut last_ts = String::new();
73    let mut nodes: Vec<NodeInfo> = Vec::new();
74    let mut node_index: std::collections::HashMap<String, usize> = std::collections::HashMap::new();
75
76    for event in events {
77        match event {
78            Event::PipeStarted(e) => {
79                pipe_name = e.pipe.clone();
80                last_ts = e.ts.clone();
81            }
82            Event::PipeCompleted(e) => {
83                pipe_state = PipeState::Completed;
84                last_ts = e.ts.clone();
85            }
86            Event::PipeAborted(e) => {
87                pipe_state = PipeState::Aborted {
88                    reason: e.reason.clone(),
89                };
90                last_ts = e.ts.clone();
91            }
92            Event::NodeStarted(e) => {
93                last_ts = e.ts.clone();
94                if let Some(&idx) = node_index.get(&e.node_id) {
95                    nodes[idx].status = NodeStatus::Running;
96                    nodes[idx].kind = Some(e.kind);
97                    nodes[idx].last_ts = Some(e.ts.clone());
98                } else {
99                    let idx = nodes.len();
100                    node_index.insert(e.node_id.clone(), idx);
101                    nodes.push(NodeInfo {
102                        id: e.node_id.clone(),
103                        kind: Some(e.kind),
104                        status: NodeStatus::Running,
105                        last_ts: Some(e.ts.clone()),
106                    });
107                }
108            }
109            Event::NodeCompleted(e) => {
110                last_ts = e.ts.clone();
111                let idx = get_or_insert_node(&mut nodes, &mut node_index, &e.node_id);
112                nodes[idx].status = NodeStatus::Completed;
113                nodes[idx].last_ts = Some(e.ts.clone());
114            }
115            Event::NodeFailed(e) => {
116                last_ts = e.ts.clone();
117                let idx = get_or_insert_node(&mut nodes, &mut node_index, &e.node_id);
118                nodes[idx].status = NodeStatus::Failed {
119                    kind: e.error.kind,
120                    message: e.message.clone(),
121                };
122                nodes[idx].last_ts = Some(e.ts.clone());
123            }
124            Event::GatePassed(e) => {
125                last_ts = e.ts.clone();
126            }
127            Event::IterationStarted(e) => {
128                last_ts = e.ts.clone();
129            }
130        }
131    }
132
133    let is_orphan = detect_orphan(&nodes, &pipe_state);
134
135    RunState {
136        run_id: run_id.to_string(),
137        pipe: pipe_name,
138        state: pipe_state,
139        nodes,
140        is_orphan,
141        last_ts,
142    }
143}
144
145/// Derive a one-line summary. Wraps [`derive`] and extracts counts.
146pub fn summarize(run_id: &str, events: &[Event]) -> RunSummary {
147    let state = derive(run_id, events);
148    let node_count = state.nodes.len();
149    let completed_count = state
150        .nodes
151        .iter()
152        .filter(|n| n.status == NodeStatus::Completed)
153        .count();
154    let failed_count = state
155        .nodes
156        .iter()
157        .filter(|n| matches!(n.status, NodeStatus::Failed { .. }))
158        .count();
159    RunSummary {
160        run_id: state.run_id,
161        pipe: state.pipe,
162        state: state.state,
163        is_orphan: state.is_orphan,
164        last_ts: state.last_ts,
165        node_count,
166        completed_count,
167        failed_count,
168    }
169}
170
171/// Look up or create a node entry. Used by terminal events
172/// (`NodeCompleted`/`NodeFailed`) so a missing `node.started` (corrupt
173/// log, partial flush) still surfaces the node in output.
174fn get_or_insert_node(
175    nodes: &mut Vec<NodeInfo>,
176    node_index: &mut std::collections::HashMap<String, usize>,
177    node_id: &str,
178) -> usize {
179    if let Some(&idx) = node_index.get(node_id) {
180        return idx;
181    }
182    let idx = nodes.len();
183    node_index.insert(node_id.to_string(), idx);
184    nodes.push(NodeInfo {
185        id: node_id.to_string(),
186        kind: None,
187        status: NodeStatus::Pending,
188        last_ts: None,
189    });
190    idx
191}
192
193/// A run is orphaned if the pipe is still `Running` but has at least
194/// one node stuck in `Running` with no terminal pipe event. This
195/// indicates the runner process died mid-execution.
196fn detect_orphan(nodes: &[NodeInfo], pipe_state: &PipeState) -> bool {
197    if *pipe_state != PipeState::Running {
198        return false;
199    }
200    nodes.iter().any(|n| n.status == NodeStatus::Running)
201}
202
203#[cfg(test)]
204mod tests {
205    use super::*;
206    use crate::events::*;
207
208    fn pipe_started(run_id: &str, pipe: &str) -> Event {
209        Event::PipeStarted(PipeStarted {
210            id: "ev-0001".to_string(),
211            ts: "2026-04-16T00:00:00Z".to_string(),
212            run_id: run_id.to_string(),
213            pipe: pipe.to_string(),
214            inputs: vec![],
215            distro_version: String::new(),
216        })
217    }
218
219    fn node_started(run_id: &str, node_id: &str, kind: NodeKind) -> Event {
220        Event::NodeStarted(NodeStarted {
221            id: "ev-0002".to_string(),
222            ts: "2026-04-16T00:00:01Z".to_string(),
223            run_id: run_id.to_string(),
224            node_id: node_id.to_string(),
225            kind,
226            name: None,
227            model: None,
228        })
229    }
230
231    fn node_completed(run_id: &str, node_id: &str) -> Event {
232        Event::NodeCompleted(NodeCompleted {
233            id: "ev-0003".to_string(),
234            ts: "2026-04-16T00:00:02Z".to_string(),
235            run_id: run_id.to_string(),
236            node_id: node_id.to_string(),
237            output_path: format!(".omne/var/runs/{run_id}/nodes/{node_id}.out"),
238        })
239    }
240
241    fn node_failed(run_id: &str, node_id: &str, kind: ErrorKind) -> Event {
242        Event::NodeFailed(NodeFailed {
243            id: "ev-0004".to_string(),
244            ts: "2026-04-16T00:00:03Z".to_string(),
245            run_id: run_id.to_string(),
246            node_id: node_id.to_string(),
247            error: NodeError { kind },
248            message: None,
249        })
250    }
251
252    fn pipe_completed(run_id: &str) -> Event {
253        Event::PipeCompleted(PipeCompleted {
254            id: "ev-0005".to_string(),
255            ts: "2026-04-16T00:00:10Z".to_string(),
256            run_id: run_id.to_string(),
257        })
258    }
259
260    fn pipe_aborted(run_id: &str, reason: &str) -> Event {
261        Event::PipeAborted(PipeAborted {
262            id: "ev-0006".to_string(),
263            ts: "2026-04-16T00:00:10Z".to_string(),
264            run_id: run_id.to_string(),
265            reason: reason.to_string(),
266        })
267    }
268
269    #[test]
270    fn completed_run_derives_correctly() {
271        let events = vec![
272            pipe_started("r1", "build"),
273            node_started("r1", "n1", NodeKind::Bash),
274            node_completed("r1", "n1"),
275            pipe_completed("r1"),
276        ];
277        let state = derive("r1", &events);
278        assert_eq!(state.pipe, "build");
279        assert_eq!(state.state, PipeState::Completed);
280        assert!(!state.is_orphan);
281        assert_eq!(state.nodes.len(), 1);
282        assert_eq!(state.nodes[0].status, NodeStatus::Completed);
283    }
284
285    #[test]
286    fn aborted_run_derives_correctly() {
287        let events = vec![
288            pipe_started("r2", "deploy"),
289            node_started("r2", "n1", NodeKind::Command),
290            node_failed("r2", "n1", ErrorKind::Blocked),
291            pipe_aborted("r2", "failed nodes: n1"),
292        ];
293        let state = derive("r2", &events);
294        assert_eq!(
295            state.state,
296            PipeState::Aborted {
297                reason: "failed nodes: n1".to_string()
298            }
299        );
300        assert!(!state.is_orphan);
301        assert!(matches!(
302            state.nodes[0].status,
303            NodeStatus::Failed {
304                kind: ErrorKind::Blocked,
305                ..
306            }
307        ));
308    }
309
310    #[test]
311    fn orphan_detected_when_node_stuck_running() {
312        let events = vec![
313            pipe_started("r3", "test"),
314            node_started("r3", "n1", NodeKind::Bash),
315        ];
316        let state = derive("r3", &events);
317        assert_eq!(state.state, PipeState::Running);
318        assert!(state.is_orphan);
319    }
320
321    #[test]
322    fn no_orphan_when_pipe_completed() {
323        let events = vec![
324            pipe_started("r4", "test"),
325            node_started("r4", "n1", NodeKind::Bash),
326            node_completed("r4", "n1"),
327            pipe_completed("r4"),
328        ];
329        assert!(!derive("r4", &events).is_orphan);
330    }
331
332    #[test]
333    fn empty_events_yields_running_no_orphan() {
334        let state = derive("r5", &[]);
335        assert_eq!(state.state, PipeState::Running);
336        assert!(!state.is_orphan);
337        assert!(state.nodes.is_empty());
338    }
339
340    #[test]
341    fn summary_counts_nodes() {
342        let events = vec![
343            pipe_started("r6", "ci"),
344            node_started("r6", "lint", NodeKind::Bash),
345            node_completed("r6", "lint"),
346            node_started("r6", "test", NodeKind::Bash),
347            node_failed("r6", "test", ErrorKind::Crash),
348            pipe_aborted("r6", "failed nodes: test"),
349        ];
350        let summary = summarize("r6", &events);
351        assert_eq!(summary.node_count, 2);
352        assert_eq!(summary.completed_count, 1);
353        assert_eq!(summary.failed_count, 1);
354    }
355}