omne_cli/dag.rs
1//! DAG scheduler.
2//!
3//! Pure-logic state machine over a parsed `pipe::Pipe`. Tracks node
4//! lifecycle states, decides which nodes are ready to dispatch given
5//! the current outcomes of their predecessors, and respects each
6//! node's `trigger_rule` (`all_success` default,
7//! `one_success` for synthesis nodes that should fire as soon as any
8//! single branch succeeds).
9//!
10//! No I/O. The runner (Unit 11+) plugs the scheduler into subprocess
11//! lifecycle: call `ready()` to find dispatchable nodes, spawn each,
12//! then call `mark(id, outcome)` when subprocess termination is
13//! observed. The returned set is the *newly* unblocked successors so
14//! callers don't re-scan the whole DAG every tick.
15//!
16//! Failure semantics:
17//! - `all_success` predecessors: any failed predecessor permanently
18//! blocks the dependent.
19//! - `one_success` predecessors: the dependent becomes ready as soon
20//! as one predecessor completes successfully; remaining
21//! predecessors are no-ops once it fires. If every predecessor
22//! ends up failed, the dependent is permanently blocked.
23
24#![allow(dead_code)]
25
26use std::collections::{BTreeMap, BTreeSet};
27
28use thiserror::Error;
29
30use crate::pipe::{Pipe, TriggerRule};
31
32/// Lifecycle of a node inside one run.
33#[derive(Debug, Clone, Copy, PartialEq, Eq)]
34pub enum NodeState {
35 /// Predecessors not yet satisfied (or not yet dispatched).
36 Pending,
37 /// Returned by `ready()` and not yet marked.
38 Running,
39 /// Subprocess returned successfully (or loop terminated on its
40 /// `until` sentinel).
41 Completed,
42 /// Subprocess returned non-zero / timed out / hit max_iterations
43 /// / tripped a gate.
44 Failed,
45 /// A predecessor failed in a way that prevents this node from
46 /// ever being dispatched. Distinct from `Failed` so the runner
47 /// can attribute the cascade in `pipe.aborted` reasoning.
48 Blocked,
49}
50
51/// What `mark` was told about a node's outcome.
52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
53pub enum NodeOutcome {
54 Completed,
55 Failed,
56}
57
58#[derive(Debug, Error)]
59pub enum SchedulerError {
60 #[error("scheduler does not know node `{id}`")]
61 UnknownNode { id: String },
62
63 #[error("node `{id}` is not Running (state: {state:?}); cannot mark outcome")]
64 NotRunning { id: String, state: NodeState },
65}
66
67/// State machine over a parsed pipe. Construct once per run; thread
68/// `ready()` / `mark()` calls through the dispatcher loop.
69pub struct Scheduler {
70 /// Stable iteration order — `ready()` returns ids sorted so
71 /// dispatch ordering is deterministic across runs.
72 order: Vec<String>,
73 states: BTreeMap<String, NodeState>,
74 /// `predecessors[id]` = ids that must complete before `id` can run.
75 predecessors: BTreeMap<String, Vec<String>>,
76 /// `trigger_rule[id]` = how `id`'s predecessors are combined.
77 trigger_rule: BTreeMap<String, TriggerRule>,
78}
79
80impl Scheduler {
81 /// Build a scheduler from a parsed pipe. Every node starts
82 /// `Pending`.
83 pub fn new(pipe: &Pipe) -> Self {
84 let order: Vec<String> = pipe.nodes.iter().map(|n| n.id.clone()).collect();
85 let states = order
86 .iter()
87 .map(|id| (id.clone(), NodeState::Pending))
88 .collect();
89 let predecessors = pipe
90 .nodes
91 .iter()
92 .map(|n| (n.id.clone(), n.depends_on.clone()))
93 .collect();
94 let trigger_rule = pipe
95 .nodes
96 .iter()
97 .map(|n| (n.id.clone(), n.trigger_rule))
98 .collect();
99 Self {
100 order,
101 states,
102 predecessors,
103 trigger_rule,
104 }
105 }
106
107 /// Current state of `id`, or `None` if unknown.
108 pub fn state(&self, id: &str) -> Option<NodeState> {
109 self.states.get(id).copied()
110 }
111
112 /// All node ids in the DAG (insertion order from the pipe).
113 pub fn all_ids(&self) -> &[String] {
114 &self.order
115 }
116
117 /// Ids currently dispatchable: `Pending` nodes whose
118 /// predecessors satisfy their `trigger_rule`. Idempotent —
119 /// promotes to `Running` and returns the freshly-promoted set.
120 /// Sorted for determinism.
121 pub fn ready(&mut self) -> Vec<String> {
122 let mut ready_ids = Vec::new();
123 for id in &self.order {
124 if self.states.get(id) != Some(&NodeState::Pending) {
125 continue;
126 }
127 if !self.predecessors_satisfied(id) {
128 continue;
129 }
130 ready_ids.push(id.clone());
131 }
132 for id in &ready_ids {
133 self.states.insert(id.clone(), NodeState::Running);
134 }
135 ready_ids
136 }
137
138 /// Record the outcome of a `Running` node and return the ids
139 /// that just became unblocked (whether ready-to-dispatch or
140 /// permanently blocked is a follow-up `ready()` / `state()`
141 /// inspection).
142 ///
143 /// Returns `Err` if `id` is unknown or not currently `Running`.
144 pub fn mark(&mut self, id: &str, outcome: NodeOutcome) -> Result<Vec<String>, SchedulerError> {
145 let current = self
146 .states
147 .get(id)
148 .copied()
149 .ok_or_else(|| SchedulerError::UnknownNode { id: id.to_string() })?;
150 if current != NodeState::Running {
151 return Err(SchedulerError::NotRunning {
152 id: id.to_string(),
153 state: current,
154 });
155 }
156 let new_state = match outcome {
157 NodeOutcome::Completed => NodeState::Completed,
158 NodeOutcome::Failed => NodeState::Failed,
159 };
160 self.states.insert(id.to_string(), new_state);
161
162 // Cascade: any Pending node that depends on `id` may now be
163 // ready (predecessors satisfied) or blocked (an `all_success`
164 // predecessor failed, or every `one_success` predecessor
165 // exhausted itself failing).
166 let mut newly_unblocked = BTreeSet::new();
167 for (dep_id, deps) in &self.predecessors {
168 if !deps.iter().any(|d| d == id) {
169 continue;
170 }
171 if self.states.get(dep_id) != Some(&NodeState::Pending) {
172 continue;
173 }
174 if self.permanently_blocked(dep_id) {
175 self.states.insert(dep_id.clone(), NodeState::Blocked);
176 newly_unblocked.insert(dep_id.clone());
177 } else if self.predecessors_satisfied(dep_id) {
178 newly_unblocked.insert(dep_id.clone());
179 }
180 }
181 Ok(newly_unblocked.into_iter().collect())
182 }
183
184 fn predecessors_satisfied(&self, id: &str) -> bool {
185 let Some(deps) = self.predecessors.get(id) else {
186 return false;
187 };
188 if deps.is_empty() {
189 return true;
190 }
191 let rule = self
192 .trigger_rule
193 .get(id)
194 .copied()
195 .unwrap_or(TriggerRule::AllSuccess);
196 match rule {
197 TriggerRule::AllSuccess => deps
198 .iter()
199 .all(|d| self.states.get(d) == Some(&NodeState::Completed)),
200 TriggerRule::OneSuccess => deps
201 .iter()
202 .any(|d| self.states.get(d) == Some(&NodeState::Completed)),
203 }
204 }
205
206 /// True iff no future outcome can satisfy `id`'s `trigger_rule`.
207 /// `all_success`: any predecessor `Failed` or `Blocked` poisons
208 /// the dependent. `one_success`: every predecessor must end in a
209 /// non-success state for the dependent to be permanently blocked.
210 fn permanently_blocked(&self, id: &str) -> bool {
211 let Some(deps) = self.predecessors.get(id) else {
212 return false;
213 };
214 if deps.is_empty() {
215 return false;
216 }
217 let rule = self
218 .trigger_rule
219 .get(id)
220 .copied()
221 .unwrap_or(TriggerRule::AllSuccess);
222 match rule {
223 TriggerRule::AllSuccess => deps.iter().any(|d| {
224 matches!(
225 self.states.get(d),
226 Some(NodeState::Failed) | Some(NodeState::Blocked)
227 )
228 }),
229 TriggerRule::OneSuccess => deps.iter().all(|d| {
230 matches!(
231 self.states.get(d),
232 Some(NodeState::Failed) | Some(NodeState::Blocked)
233 )
234 }),
235 }
236 }
237
238 /// True when no more progress is possible: every node is
239 /// `Completed`, `Failed`, or `Blocked`.
240 pub fn is_terminal(&self) -> bool {
241 self.states.values().all(|s| {
242 matches!(
243 s,
244 NodeState::Completed | NodeState::Failed | NodeState::Blocked
245 )
246 })
247 }
248}