#![allow(dead_code)]
use crate::events::{ErrorKind, Event, NodeKind};
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum PipeState {
Running,
Completed,
Aborted { reason: String },
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum NodeStatus {
Pending,
Running,
Completed,
Failed {
kind: ErrorKind,
message: Option<String>,
},
}
#[derive(Debug, Clone)]
pub struct NodeInfo {
pub id: String,
pub kind: Option<NodeKind>,
pub status: NodeStatus,
pub last_ts: Option<String>,
}
#[derive(Debug)]
pub struct RunState {
pub run_id: String,
pub pipe: String,
pub state: PipeState,
pub nodes: Vec<NodeInfo>,
pub is_orphan: bool,
pub last_ts: String,
}
#[derive(Debug)]
pub struct RunSummary {
pub run_id: String,
pub pipe: String,
pub state: PipeState,
pub is_orphan: bool,
pub last_ts: String,
pub node_count: usize,
pub completed_count: usize,
pub failed_count: usize,
}
pub fn derive(run_id: &str, events: &[Event]) -> RunState {
let mut pipe_name = String::new();
let mut pipe_state = PipeState::Running;
let mut last_ts = String::new();
let mut nodes: Vec<NodeInfo> = Vec::new();
let mut node_index: std::collections::HashMap<String, usize> = std::collections::HashMap::new();
for event in events {
match event {
Event::PipeStarted(e) => {
pipe_name = e.pipe.clone();
last_ts = e.ts.clone();
}
Event::PipeCompleted(e) => {
pipe_state = PipeState::Completed;
last_ts = e.ts.clone();
}
Event::PipeAborted(e) => {
pipe_state = PipeState::Aborted {
reason: e.reason.clone(),
};
last_ts = e.ts.clone();
}
Event::NodeStarted(e) => {
last_ts = e.ts.clone();
if let Some(&idx) = node_index.get(&e.node_id) {
nodes[idx].status = NodeStatus::Running;
nodes[idx].kind = Some(e.kind);
nodes[idx].last_ts = Some(e.ts.clone());
} else {
let idx = nodes.len();
node_index.insert(e.node_id.clone(), idx);
nodes.push(NodeInfo {
id: e.node_id.clone(),
kind: Some(e.kind),
status: NodeStatus::Running,
last_ts: Some(e.ts.clone()),
});
}
}
Event::NodeCompleted(e) => {
last_ts = e.ts.clone();
let idx = get_or_insert_node(&mut nodes, &mut node_index, &e.node_id);
nodes[idx].status = NodeStatus::Completed;
nodes[idx].last_ts = Some(e.ts.clone());
}
Event::NodeFailed(e) => {
last_ts = e.ts.clone();
let idx = get_or_insert_node(&mut nodes, &mut node_index, &e.node_id);
nodes[idx].status = NodeStatus::Failed {
kind: e.error.kind,
message: e.message.clone(),
};
nodes[idx].last_ts = Some(e.ts.clone());
}
Event::GatePassed(e) => {
last_ts = e.ts.clone();
}
Event::IterationStarted(e) => {
last_ts = e.ts.clone();
}
}
}
let is_orphan = detect_orphan(&nodes, &pipe_state);
RunState {
run_id: run_id.to_string(),
pipe: pipe_name,
state: pipe_state,
nodes,
is_orphan,
last_ts,
}
}
pub fn summarize(run_id: &str, events: &[Event]) -> RunSummary {
let state = derive(run_id, events);
let node_count = state.nodes.len();
let completed_count = state
.nodes
.iter()
.filter(|n| n.status == NodeStatus::Completed)
.count();
let failed_count = state
.nodes
.iter()
.filter(|n| matches!(n.status, NodeStatus::Failed { .. }))
.count();
RunSummary {
run_id: state.run_id,
pipe: state.pipe,
state: state.state,
is_orphan: state.is_orphan,
last_ts: state.last_ts,
node_count,
completed_count,
failed_count,
}
}
fn get_or_insert_node(
nodes: &mut Vec<NodeInfo>,
node_index: &mut std::collections::HashMap<String, usize>,
node_id: &str,
) -> usize {
if let Some(&idx) = node_index.get(node_id) {
return idx;
}
let idx = nodes.len();
node_index.insert(node_id.to_string(), idx);
nodes.push(NodeInfo {
id: node_id.to_string(),
kind: None,
status: NodeStatus::Pending,
last_ts: None,
});
idx
}
fn detect_orphan(nodes: &[NodeInfo], pipe_state: &PipeState) -> bool {
if *pipe_state != PipeState::Running {
return false;
}
nodes.iter().any(|n| n.status == NodeStatus::Running)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::events::*;
fn pipe_started(run_id: &str, pipe: &str) -> Event {
Event::PipeStarted(PipeStarted {
id: "ev-0001".to_string(),
ts: "2026-04-16T00:00:00Z".to_string(),
run_id: run_id.to_string(),
pipe: pipe.to_string(),
inputs: vec![],
distro_version: String::new(),
})
}
fn node_started(run_id: &str, node_id: &str, kind: NodeKind) -> Event {
Event::NodeStarted(NodeStarted {
id: "ev-0002".to_string(),
ts: "2026-04-16T00:00:01Z".to_string(),
run_id: run_id.to_string(),
node_id: node_id.to_string(),
kind,
name: None,
model: None,
})
}
fn node_completed(run_id: &str, node_id: &str) -> Event {
Event::NodeCompleted(NodeCompleted {
id: "ev-0003".to_string(),
ts: "2026-04-16T00:00:02Z".to_string(),
run_id: run_id.to_string(),
node_id: node_id.to_string(),
output_path: format!(".omne/var/runs/{run_id}/nodes/{node_id}.out"),
})
}
fn node_failed(run_id: &str, node_id: &str, kind: ErrorKind) -> Event {
Event::NodeFailed(NodeFailed {
id: "ev-0004".to_string(),
ts: "2026-04-16T00:00:03Z".to_string(),
run_id: run_id.to_string(),
node_id: node_id.to_string(),
error: NodeError { kind },
message: None,
})
}
fn pipe_completed(run_id: &str) -> Event {
Event::PipeCompleted(PipeCompleted {
id: "ev-0005".to_string(),
ts: "2026-04-16T00:00:10Z".to_string(),
run_id: run_id.to_string(),
})
}
fn pipe_aborted(run_id: &str, reason: &str) -> Event {
Event::PipeAborted(PipeAborted {
id: "ev-0006".to_string(),
ts: "2026-04-16T00:00:10Z".to_string(),
run_id: run_id.to_string(),
reason: reason.to_string(),
})
}
#[test]
fn completed_run_derives_correctly() {
let events = vec![
pipe_started("r1", "build"),
node_started("r1", "n1", NodeKind::Bash),
node_completed("r1", "n1"),
pipe_completed("r1"),
];
let state = derive("r1", &events);
assert_eq!(state.pipe, "build");
assert_eq!(state.state, PipeState::Completed);
assert!(!state.is_orphan);
assert_eq!(state.nodes.len(), 1);
assert_eq!(state.nodes[0].status, NodeStatus::Completed);
}
#[test]
fn aborted_run_derives_correctly() {
let events = vec![
pipe_started("r2", "deploy"),
node_started("r2", "n1", NodeKind::Command),
node_failed("r2", "n1", ErrorKind::Blocked),
pipe_aborted("r2", "failed nodes: n1"),
];
let state = derive("r2", &events);
assert_eq!(
state.state,
PipeState::Aborted {
reason: "failed nodes: n1".to_string()
}
);
assert!(!state.is_orphan);
assert!(matches!(
state.nodes[0].status,
NodeStatus::Failed {
kind: ErrorKind::Blocked,
..
}
));
}
#[test]
fn orphan_detected_when_node_stuck_running() {
let events = vec![
pipe_started("r3", "test"),
node_started("r3", "n1", NodeKind::Bash),
];
let state = derive("r3", &events);
assert_eq!(state.state, PipeState::Running);
assert!(state.is_orphan);
}
#[test]
fn no_orphan_when_pipe_completed() {
let events = vec![
pipe_started("r4", "test"),
node_started("r4", "n1", NodeKind::Bash),
node_completed("r4", "n1"),
pipe_completed("r4"),
];
assert!(!derive("r4", &events).is_orphan);
}
#[test]
fn empty_events_yields_running_no_orphan() {
let state = derive("r5", &[]);
assert_eq!(state.state, PipeState::Running);
assert!(!state.is_orphan);
assert!(state.nodes.is_empty());
}
#[test]
fn summary_counts_nodes() {
let events = vec![
pipe_started("r6", "ci"),
node_started("r6", "lint", NodeKind::Bash),
node_completed("r6", "lint"),
node_started("r6", "test", NodeKind::Bash),
node_failed("r6", "test", ErrorKind::Crash),
pipe_aborted("r6", "failed nodes: test"),
];
let summary = summarize("r6", &events);
assert_eq!(summary.node_count, 2);
assert_eq!(summary.completed_count, 1);
assert_eq!(summary.failed_count, 1);
}
}