Skip to main content

omne_cli/
dag.rs

1//! DAG scheduler.
2//!
3//! Pure-logic state machine over a parsed `pipe::Pipe`. Tracks node
4//! lifecycle states, decides which nodes are ready to dispatch given
5//! the current outcomes of their predecessors, and respects each
6//! node's `trigger_rule` (`all_success` default,
7//! `one_success` for synthesis nodes that should fire as soon as any
8//! single branch succeeds).
9//!
10//! No I/O. The runner (Unit 11+) plugs the scheduler into subprocess
11//! lifecycle: call `ready()` to find dispatchable nodes, spawn each,
12//! then call `mark(id, outcome)` when subprocess termination is
13//! observed. The returned set is the *newly* unblocked successors so
14//! callers don't re-scan the whole DAG every tick.
15//!
16//! Failure semantics:
17//! - `all_success` predecessors: any failed predecessor permanently
18//!   blocks the dependent.
19//! - `one_success` predecessors: the dependent becomes ready as soon
20//!   as one predecessor completes successfully; remaining
21//!   predecessors are no-ops once it fires. If every predecessor
22//!   ends up failed, the dependent is permanently blocked.
23
24#![allow(dead_code)]
25
26use std::collections::{BTreeMap, BTreeSet};
27
28use thiserror::Error;
29
30use crate::pipe::{Pipe, TriggerRule};
31
32/// Lifecycle of a node inside one run.
33#[derive(Debug, Clone, Copy, PartialEq, Eq)]
34pub enum NodeState {
35    /// Predecessors not yet satisfied (or not yet dispatched).
36    Pending,
37    /// Returned by `ready()` and not yet marked.
38    Running,
39    /// Subprocess returned successfully (or loop terminated on its
40    /// `until` sentinel).
41    Completed,
42    /// Subprocess returned non-zero / timed out / hit max_iterations
43    /// / tripped a gate.
44    Failed,
45    /// A predecessor failed in a way that prevents this node from
46    /// ever being dispatched. Distinct from `Failed` so the runner
47    /// can attribute the cascade in `pipe.aborted` reasoning.
48    Blocked,
49}
50
51/// What `mark` was told about a node's outcome.
52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
53pub enum NodeOutcome {
54    Completed,
55    Failed,
56}
57
58#[derive(Debug, Error)]
59pub enum SchedulerError {
60    #[error("scheduler does not know node `{id}`")]
61    UnknownNode { id: String },
62
63    #[error("node `{id}` is not Running (state: {state:?}); cannot mark outcome")]
64    NotRunning { id: String, state: NodeState },
65}
66
67/// State machine over a parsed pipe. Construct once per run; thread
68/// `ready()` / `mark()` calls through the dispatcher loop.
69pub struct Scheduler {
70    /// Stable iteration order — `ready()` returns ids sorted so
71    /// dispatch ordering is deterministic across runs.
72    order: Vec<String>,
73    states: BTreeMap<String, NodeState>,
74    /// `predecessors[id]` = ids that must complete before `id` can run.
75    predecessors: BTreeMap<String, Vec<String>>,
76    /// `trigger_rule[id]` = how `id`'s predecessors are combined.
77    trigger_rule: BTreeMap<String, TriggerRule>,
78}
79
80impl Scheduler {
81    /// Build a scheduler from a parsed pipe. Every node starts
82    /// `Pending`.
83    pub fn new(pipe: &Pipe) -> Self {
84        let order: Vec<String> = pipe.nodes.iter().map(|n| n.id.clone()).collect();
85        let states = order
86            .iter()
87            .map(|id| (id.clone(), NodeState::Pending))
88            .collect();
89        let predecessors = pipe
90            .nodes
91            .iter()
92            .map(|n| (n.id.clone(), n.depends_on.clone()))
93            .collect();
94        let trigger_rule = pipe
95            .nodes
96            .iter()
97            .map(|n| (n.id.clone(), n.trigger_rule))
98            .collect();
99        Self {
100            order,
101            states,
102            predecessors,
103            trigger_rule,
104        }
105    }
106
107    /// Current state of `id`, or `None` if unknown.
108    pub fn state(&self, id: &str) -> Option<NodeState> {
109        self.states.get(id).copied()
110    }
111
112    /// All node ids in the DAG (insertion order from the pipe).
113    pub fn all_ids(&self) -> &[String] {
114        &self.order
115    }
116
117    /// Ids currently dispatchable: `Pending` nodes whose
118    /// predecessors satisfy their `trigger_rule`. Idempotent —
119    /// promotes to `Running` and returns the freshly-promoted set.
120    /// Sorted for determinism.
121    pub fn ready(&mut self) -> Vec<String> {
122        let mut ready_ids = Vec::new();
123        for id in &self.order {
124            if self.states.get(id) != Some(&NodeState::Pending) {
125                continue;
126            }
127            if !self.predecessors_satisfied(id) {
128                continue;
129            }
130            ready_ids.push(id.clone());
131        }
132        for id in &ready_ids {
133            self.states.insert(id.clone(), NodeState::Running);
134        }
135        ready_ids
136    }
137
138    /// Record the outcome of a `Running` node and return the ids
139    /// that just became unblocked (whether ready-to-dispatch or
140    /// permanently blocked is a follow-up `ready()` / `state()`
141    /// inspection).
142    ///
143    /// Returns `Err` if `id` is unknown or not currently `Running`.
144    pub fn mark(&mut self, id: &str, outcome: NodeOutcome) -> Result<Vec<String>, SchedulerError> {
145        let current = self
146            .states
147            .get(id)
148            .copied()
149            .ok_or_else(|| SchedulerError::UnknownNode { id: id.to_string() })?;
150        if current != NodeState::Running {
151            return Err(SchedulerError::NotRunning {
152                id: id.to_string(),
153                state: current,
154            });
155        }
156        let new_state = match outcome {
157            NodeOutcome::Completed => NodeState::Completed,
158            NodeOutcome::Failed => NodeState::Failed,
159        };
160        self.states.insert(id.to_string(), new_state);
161
162        // Cascade: any Pending node that depends on `id` may now be
163        // ready (predecessors satisfied) or blocked (an `all_success`
164        // predecessor failed, or every `one_success` predecessor
165        // exhausted itself failing).
166        let mut newly_unblocked = BTreeSet::new();
167        for (dep_id, deps) in &self.predecessors {
168            if !deps.iter().any(|d| d == id) {
169                continue;
170            }
171            if self.states.get(dep_id) != Some(&NodeState::Pending) {
172                continue;
173            }
174            if self.permanently_blocked(dep_id) {
175                self.states.insert(dep_id.clone(), NodeState::Blocked);
176                newly_unblocked.insert(dep_id.clone());
177            } else if self.predecessors_satisfied(dep_id) {
178                newly_unblocked.insert(dep_id.clone());
179            }
180        }
181        Ok(newly_unblocked.into_iter().collect())
182    }
183
184    fn predecessors_satisfied(&self, id: &str) -> bool {
185        let Some(deps) = self.predecessors.get(id) else {
186            return false;
187        };
188        if deps.is_empty() {
189            return true;
190        }
191        let rule = self
192            .trigger_rule
193            .get(id)
194            .copied()
195            .unwrap_or(TriggerRule::AllSuccess);
196        match rule {
197            TriggerRule::AllSuccess => deps
198                .iter()
199                .all(|d| self.states.get(d) == Some(&NodeState::Completed)),
200            TriggerRule::OneSuccess => deps
201                .iter()
202                .any(|d| self.states.get(d) == Some(&NodeState::Completed)),
203        }
204    }
205
206    /// True iff no future outcome can satisfy `id`'s `trigger_rule`.
207    /// `all_success`: any predecessor `Failed` or `Blocked` poisons
208    /// the dependent. `one_success`: every predecessor must end in a
209    /// non-success state for the dependent to be permanently blocked.
210    fn permanently_blocked(&self, id: &str) -> bool {
211        let Some(deps) = self.predecessors.get(id) else {
212            return false;
213        };
214        if deps.is_empty() {
215            return false;
216        }
217        let rule = self
218            .trigger_rule
219            .get(id)
220            .copied()
221            .unwrap_or(TriggerRule::AllSuccess);
222        match rule {
223            TriggerRule::AllSuccess => deps.iter().any(|d| {
224                matches!(
225                    self.states.get(d),
226                    Some(NodeState::Failed) | Some(NodeState::Blocked)
227                )
228            }),
229            TriggerRule::OneSuccess => deps.iter().all(|d| {
230                matches!(
231                    self.states.get(d),
232                    Some(NodeState::Failed) | Some(NodeState::Blocked)
233                )
234            }),
235        }
236    }
237
238    /// True when no more progress is possible: every node is
239    /// `Completed`, `Failed`, or `Blocked`.
240    pub fn is_terminal(&self) -> bool {
241        self.states.values().all(|s| {
242            matches!(
243                s,
244                NodeState::Completed | NodeState::Failed | NodeState::Blocked
245            )
246        })
247    }
248}