Skip to main content

harmont_cli/orchestrator/
graph.rs

1//! Chain-bounded DAG built from a wire-typed `Pipeline`.
2//!
3//! Each `Node` carries an owned wire `CommandStep`, so the scheduler
4//! reads `runner`/`runner_args` directly at dispatch time — no
5//! legacy schema round-trip.
6//!
7//! Two edge sets at the Node level:
8//!
9//! * `builds_in` — at most one parent per step. Lineage edge: child runs
10//!   inside the parent's container (locally) or boots from the parent's
11//!   snapshot (remotely). Implies a sync edge.
12//!
13//! * `depends_on` — synchronisation edges (no state inheritance).
14//!   Computed from the `builds_in` parent (always added) plus any
15//!   implicit barriers introduced by `wait` steps in the wire format.
16//!   v0 has no schema-level `depends_on` — chain DSL is the only
17//!   topology mechanism.
18//!
19//! After flattening wait barriers, the graph is a DAG over command
20//! steps. A "lineage chain" is a maximal path of `builds_in` edges
21//! with single-child branching at every internal node — these are
22//! schedulable as a single long-lived container.
23
24use std::collections::BTreeMap;
25
26use anyhow::Result;
27use hm_plugin_protocol::{CommandStep, Pipeline, Step};
28
29#[derive(Debug, Clone)]
30pub struct Node {
31    pub step: CommandStep,
32    /// Resolved final environment (pipeline.env merged with step.env).
33    pub env: BTreeMap<String, String>,
34    /// `builds_in` parent's index, if any.
35    pub builds_in: Option<usize>,
36    /// Synchronisation edges (computed from `builds_in` and wait barriers).
37    pub depends_on: Vec<usize>,
38    /// `builds_in` children's indices.
39    pub builds_in_children: Vec<usize>,
40}
41
42#[derive(Debug, Clone)]
43pub struct Graph {
44    pub nodes: Vec<Node>,
45    pub default_image: Option<String>,
46}
47
48/// A flattened command step plus the keys of any prior steps a `wait`
49/// barrier inserted between them and us. The `extra_deps` are step
50/// keys, not graph indices — they're resolved during graph build.
51struct FlatStep {
52    step: CommandStep,
53    extra_deps: Vec<String>,
54}
55
56impl Graph {
57    /// Build a synchronisation graph from a planner-emitted `Pipeline`.
58    ///
59    /// # Errors
60    ///
61    /// Returns an error if a step's `builds_in` or `depends_on`
62    /// references an unknown step key, or if the resulting graph is
63    /// cyclic. Each cycle error names both ends of the back edge so the
64    /// user can grep their pipeline for the offending pair.
65    pub fn build(pipeline: &Pipeline) -> Result<Self> {
66        let flat = flatten_steps(&pipeline.steps);
67        let key_to_idx: BTreeMap<String, usize> = flat
68            .iter()
69            .enumerate()
70            .map(|(i, f)| (f.step.key.clone(), i))
71            .collect();
72        let pipeline_env = pipeline.env.clone().unwrap_or_default();
73
74        let mut nodes: Vec<Node> = flat
75            .iter()
76            .map(|f| {
77                let mut env = pipeline_env.clone();
78                if let Some(e) = &f.step.env {
79                    env.extend(e.clone());
80                }
81                Node {
82                    step: f.step.clone(),
83                    env,
84                    builds_in: None,
85                    depends_on: vec![],
86                    builds_in_children: vec![],
87                }
88            })
89            .collect();
90
91        for (i, f) in flat.iter().enumerate() {
92            if let Some(parent_key) = &f.step.builds_in {
93                let p = *key_to_idx.get(parent_key).ok_or_else(|| {
94                    anyhow::anyhow!(
95                        "step '{}' builds_in references unknown step '{}'",
96                        f.step.key,
97                        parent_key
98                    )
99                })?;
100                nodes[i].builds_in = Some(p);
101                nodes[p].builds_in_children.push(i);
102                if !nodes[i].depends_on.contains(&p) {
103                    nodes[i].depends_on.push(p);
104                }
105            }
106            for dep_key in &f.extra_deps {
107                let p = *key_to_idx.get(dep_key).ok_or_else(|| {
108                    anyhow::anyhow!(
109                        "step '{}' has wait-barrier dep on unknown step '{}'",
110                        f.step.key,
111                        dep_key
112                    )
113                })?;
114                if !nodes[i].depends_on.contains(&p) {
115                    nodes[i].depends_on.push(p);
116                }
117            }
118        }
119
120        // Root steps (no `builds_in`) with no explicit `image` inherit
121        // the pipeline's `default_image`. Without this the docker
122        // plugin's `image_name::resolve_image` falls back to
123        // `alpine:latest`, which breaks every apt-based example. Patch
124        // at the host so the plugin stays pipeline-agnostic.
125        if let Some(default_img) = pipeline.default_image.as_deref() {
126            for node in &mut nodes {
127                if node.builds_in.is_none() && node.step.image.is_none() {
128                    node.step.image = Some(default_img.to_string());
129                }
130            }
131        }
132
133        let g = Self {
134            nodes,
135            default_image: pipeline.default_image.clone(),
136        };
137        g.assert_acyclic()?;
138        Ok(g)
139    }
140
141    fn assert_acyclic(&self) -> Result<()> {
142        // Iterative DFS with grey/black colouring. When we enter a grey
143        // node, the parent we came from is the back-edge source; the
144        // grey node itself is the back-edge target.
145        let mut color = vec![0u8; self.nodes.len()]; // 0 white, 1 grey, 2 black
146        for start in 0..self.nodes.len() {
147            if color[start] == 0 {
148                let mut stack: Vec<(usize, Option<usize>, bool)> = vec![(start, None, false)];
149                while let Some((n, parent, exiting)) = stack.pop() {
150                    if exiting {
151                        color[n] = 2;
152                        continue;
153                    }
154                    if color[n] == 1 {
155                        let target = &self.nodes[n].step.key;
156                        match parent {
157                            Some(p) => anyhow::bail!(
158                                "cycle: '{}' is reachable from itself via '{}'",
159                                target,
160                                self.nodes[p].step.key
161                            ),
162                            None => anyhow::bail!("cycle through step '{target}'"),
163                        }
164                    }
165                    color[n] = 1;
166                    stack.push((n, parent, true));
167                    for &c in &self.nodes[n].depends_on {
168                        if color[c] != 2 {
169                            stack.push((c, Some(n), false));
170                        }
171                    }
172                }
173            }
174        }
175        Ok(())
176    }
177
178    /// True iff `i` is the unique `builds_in` child of its parent and has
179    /// no other `depends_on` edges. A chain step can run via `docker exec`
180    /// in the parent's running container.
181    #[must_use]
182    pub fn is_chain_step(&self, i: usize) -> bool {
183        // depends_on length == 1 because the only edge is the implied
184        // builds_in dep.
185        self.nodes[i].builds_in.is_some_and(|p| {
186            self.nodes[p].builds_in_children.len() == 1 && self.nodes[i].depends_on.len() == 1
187        })
188    }
189
190    /// For each chain in `chains`, return the set of OTHER chain indices
191    /// it depends on. A chain depends on another chain iff any node in
192    /// it has a `depends_on` edge into a node belonging to that other
193    /// chain. The result is deterministic (ascending chain index).
194    ///
195    /// Caller must pass the same partition `chains()` returned —
196    /// `chain_index[node]` is derived from `chains`.
197    #[must_use]
198    pub fn chain_deps(&self, chains: &[Vec<usize>]) -> Vec<Vec<usize>> {
199        let mut chain_index = vec![usize::MAX; self.nodes.len()];
200        for (ci, ch) in chains.iter().enumerate() {
201            for &n in ch {
202                chain_index[n] = ci;
203            }
204        }
205        let mut out: Vec<Vec<usize>> = vec![Vec::new(); chains.len()];
206        for (ci, ch) in chains.iter().enumerate() {
207            let mut seen: std::collections::BTreeSet<usize> = std::collections::BTreeSet::new();
208            for &n in ch {
209                for &dep in &self.nodes[n].depends_on {
210                    let dep_ci = chain_index[dep];
211                    if dep_ci != ci {
212                        seen.insert(dep_ci);
213                    }
214                }
215            }
216            out[ci] = seen.into_iter().collect();
217        }
218        out
219    }
220
221    /// Partition `nodes` into chain units. Each returned vec is a chain
222    /// `[root, step_1, step_2, …]` where `root` is **not** a chain step
223    /// and every subsequent index is a chain step descending from the
224    /// previous one via `builds_in`. Every node appears in exactly one
225    /// chain. The order of chains is deterministic (root index, ascending).
226    #[must_use]
227    pub fn chains(&self) -> Vec<Vec<usize>> {
228        let n = self.nodes.len();
229        let mut placed = vec![false; n];
230        let mut out: Vec<Vec<usize>> = Vec::new();
231        for root in 0..n {
232            if placed[root] || self.is_chain_step(root) {
233                continue;
234            }
235            let mut chain = vec![root];
236            placed[root] = true;
237            // Follow the unique chain-step child, if any.
238            let mut cur = root;
239            while let Some(&next) = self.nodes[cur]
240                .builds_in_children
241                .iter()
242                .find(|&&c| self.is_chain_step(c))
243            {
244                chain.push(next);
245                placed[next] = true;
246                cur = next;
247            }
248            out.push(chain);
249        }
250        out
251    }
252}
253
254/// Flatten v0 steps. Drops Wait nodes; for each command step that
255/// follows a wait, records the keys of the steps that completed
256/// before that wait as `extra_deps`.
257fn flatten_steps(steps: &[Step]) -> Vec<FlatStep> {
258    let mut out: Vec<FlatStep> = Vec::new();
259    let mut implicit_wait_targets: Vec<String> = Vec::new();
260    for s in steps {
261        match s {
262            Step::Command(c) => {
263                out.push(FlatStep {
264                    step: (**c).clone(),
265                    extra_deps: implicit_wait_targets.clone(),
266                });
267            }
268            Step::Wait(_) => {
269                implicit_wait_targets = out.iter().map(|f| f.step.key.clone()).collect();
270            }
271        }
272    }
273    out
274}
275
276#[cfg(test)]
277#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
278mod tests {
279    use super::*;
280
281    /// Decode a wire `Pipeline` directly from JSON bytes. Mirrors the
282    /// legacy `local::schema::decode_plan` helper but uses the wire
283    /// types so the new module has no dependency on the legacy schema.
284    fn decode_wire(bytes: &[u8]) -> Pipeline {
285        serde_json::from_slice::<Pipeline>(bytes).unwrap()
286    }
287
288    #[test]
289    fn chain_detection() {
290        let json = br#"{
291          "version":"0",
292          "default_image":"ubuntu:24.04",
293          "steps":[
294            {"type":"command","key":"a","cmd":"echo a"},
295            {"type":"command","key":"b","cmd":"echo b","builds_in":"a"},
296            {"type":"command","key":"c","cmd":"echo c","builds_in":"b"}
297          ]
298        }"#;
299        let p = decode_wire(json);
300        let g = Graph::build(&p).unwrap();
301        assert!(!g.is_chain_step(0)); // root, no parent
302        assert!(g.is_chain_step(1));
303        assert!(g.is_chain_step(2));
304    }
305
306    #[test]
307    fn fork_breaks_chain() {
308        let json = br#"{
309          "version":"0",
310          "default_image":"ubuntu:24.04",
311          "steps":[
312            {"type":"command","key":"a","cmd":"echo a"},
313            {"type":"command","key":"b","cmd":"echo b","builds_in":"a"},
314            {"type":"command","key":"c","cmd":"echo c","builds_in":"a"}
315          ]
316        }"#;
317        let p = decode_wire(json);
318        let g = Graph::build(&p).unwrap();
319        assert!(!g.is_chain_step(1)); // sibling exists, must commit a then run separately
320        assert!(!g.is_chain_step(2));
321    }
322
323    #[test]
324    fn wait_inserts_implicit_deps() {
325        let json = br#"{
326          "version":"0",
327          "steps":[
328            {"type":"command","key":"a","cmd":"echo a"},
329            {"type":"command","key":"b","cmd":"echo b"},
330            {"type":"wait"},
331            {"type":"command","key":"c","cmd":"echo c"}
332          ]
333        }"#;
334        let p = decode_wire(json);
335        let g = Graph::build(&p).unwrap();
336        // c (index 2 since the wait is dropped) should depend on a (0) and b (1).
337        let c_idx = g.nodes.iter().position(|n| n.step.key == "c").unwrap();
338        let a_idx = g.nodes.iter().position(|n| n.step.key == "a").unwrap();
339        let b_idx = g.nodes.iter().position(|n| n.step.key == "b").unwrap();
340        assert!(g.nodes[c_idx].depends_on.contains(&a_idx));
341        assert!(g.nodes[c_idx].depends_on.contains(&b_idx));
342    }
343
344    #[test]
345    fn rejects_unknown_builds_in() {
346        let json = br#"{
347          "version":"0",
348          "steps":[
349            {"type":"command","key":"b","cmd":"echo b","builds_in":"missing"}
350          ]
351        }"#;
352        let p = decode_wire(json);
353        let err = Graph::build(&p).unwrap_err();
354        assert!(err.to_string().contains("missing") || err.to_string().contains("unknown"));
355    }
356
357    #[test]
358    fn chains_partition_includes_every_node_once() {
359        // Pipeline:
360        //   a (root) -> b (chain) -> c (chain)
361        //            -> d (fork)
362        //   e (root, independent)
363        let json = br#"{
364          "version":"0",
365          "default_image":"ubuntu:24.04",
366          "steps":[
367            {"type":"command","key":"a","cmd":"echo a"},
368            {"type":"command","key":"b","cmd":"echo b","builds_in":"a"},
369            {"type":"command","key":"c","cmd":"echo c","builds_in":"b"},
370            {"type":"command","key":"d","cmd":"echo d","builds_in":"a"},
371            {"type":"command","key":"e","cmd":"echo e"}
372          ]
373        }"#;
374        let p = decode_wire(json);
375        let g = Graph::build(&p).unwrap();
376        let idx = |k: &str| g.nodes.iter().position(|n| n.step.key == k).unwrap();
377
378        // 'a' has two builds_in children, so neither b nor d is a chain
379        // step relative to a. But b -> c is a single-child chain.
380        let chains = g.chains();
381        let mut all_nodes: Vec<usize> = chains.iter().flatten().copied().collect();
382        all_nodes.sort_unstable();
383        assert_eq!(
384            all_nodes,
385            vec![idx("a"), idx("b"), idx("c"), idx("d"), idx("e")]
386        );
387
388        // The chain containing 'b' must also contain 'c' in that order.
389        let bc_chain = chains
390            .iter()
391            .find(|ch| ch.contains(&idx("b")))
392            .expect("b must be in some chain");
393        assert_eq!(bc_chain, &vec![idx("b"), idx("c")]);
394
395        // Every other chain is length-1.
396        for ch in &chains {
397            if ch != bc_chain {
398                assert_eq!(ch.len(), 1, "non-bc chain not singleton: {ch:?}");
399            }
400        }
401    }
402
403    #[test]
404    fn chain_deps_aggregates_cross_chain_edges() {
405        // Pipeline:
406        //   a -> b (chain) -> c (chain)
407        //   a -> d (fork)
408        //   e (independent)
409        // Chains: [a], [b,c], [d], [e]
410        // chain_deps: [a] none, [b,c] {[a]}, [d] {[a]}, [e] none
411        let json = br#"{
412          "version":"0",
413          "steps":[
414            {"type":"command","key":"a","cmd":"echo a"},
415            {"type":"command","key":"b","cmd":"echo b","builds_in":"a"},
416            {"type":"command","key":"c","cmd":"echo c","builds_in":"b"},
417            {"type":"command","key":"d","cmd":"echo d","builds_in":"a"},
418            {"type":"command","key":"e","cmd":"echo e"}
419          ]
420        }"#;
421        let p = decode_wire(json);
422        let g = Graph::build(&p).unwrap();
423        let chains = g.chains();
424        let deps = g.chain_deps(&chains);
425
426        let find_chain = |key: &str| -> usize {
427            let idx = g.nodes.iter().position(|n| n.step.key == key).unwrap();
428            chains.iter().position(|ch| ch.contains(&idx)).unwrap()
429        };
430        let a_ci = find_chain("a");
431        let bc_ci = find_chain("b");
432        let d_ci = find_chain("d");
433        let e_ci = find_chain("e");
434
435        assert!(
436            deps[a_ci].is_empty(),
437            "chain a has no deps: {:?}",
438            deps[a_ci]
439        );
440        assert_eq!(deps[bc_ci], vec![a_ci]);
441        assert_eq!(deps[d_ci], vec![a_ci]);
442        assert!(deps[e_ci].is_empty());
443    }
444
445    #[test]
446    fn chain_deps_subsumes_wait_barriers() {
447        let json = br#"{
448          "version":"0",
449          "steps":[
450            {"type":"command","key":"a","cmd":"echo a"},
451            {"type":"command","key":"b","cmd":"echo b"},
452            {"type":"wait"},
453            {"type":"command","key":"c","cmd":"echo c"}
454          ]
455        }"#;
456        let p = decode_wire(json);
457        let g = Graph::build(&p).unwrap();
458        let chains = g.chains();
459        let deps = g.chain_deps(&chains);
460        let find_chain = |key: &str| -> usize {
461            let idx = g.nodes.iter().position(|n| n.step.key == key).unwrap();
462            chains.iter().position(|ch| ch.contains(&idx)).unwrap()
463        };
464        let a_ci = find_chain("a");
465        let b_ci = find_chain("b");
466        let c_ci = find_chain("c");
467        let mut c_deps = deps[c_ci].clone();
468        c_deps.sort_unstable();
469        let mut want = vec![a_ci, b_ci];
470        want.sort_unstable();
471        assert_eq!(c_deps, want);
472    }
473
474    #[test]
475    fn chains_root_is_never_a_chain_step() {
476        let json = br#"{
477          "version":"0",
478          "steps":[
479            {"type":"command","key":"a","cmd":"echo a"},
480            {"type":"command","key":"b","cmd":"echo b","builds_in":"a"}
481          ]
482        }"#;
483        let p = decode_wire(json);
484        let g = Graph::build(&p).unwrap();
485        for chain in g.chains() {
486            let root = chain[0];
487            assert!(!g.is_chain_step(root), "chain root {root} is a chain step");
488            for &step in &chain[1..] {
489                assert!(g.is_chain_step(step), "non-root {step} is not a chain step");
490            }
491        }
492    }
493}