harmont_cli/orchestrator/
graph.rs1use std::collections::BTreeMap;
25
26use anyhow::Result;
27use hm_plugin_protocol::{CommandStep, Pipeline, Step};
28
29#[derive(Debug, Clone)]
30pub struct Node {
31 pub step: CommandStep,
32 pub env: BTreeMap<String, String>,
34 pub builds_in: Option<usize>,
36 pub depends_on: Vec<usize>,
38 pub builds_in_children: Vec<usize>,
40}
41
42#[derive(Debug, Clone)]
43pub struct Graph {
44 pub nodes: Vec<Node>,
45 pub default_image: Option<String>,
46}
47
48struct FlatStep {
52 step: CommandStep,
53 extra_deps: Vec<String>,
54}
55
56impl Graph {
57 pub fn build(pipeline: &Pipeline) -> Result<Self> {
66 let flat = flatten_steps(&pipeline.steps);
67 let key_to_idx: BTreeMap<String, usize> = flat
68 .iter()
69 .enumerate()
70 .map(|(i, f)| (f.step.key.clone(), i))
71 .collect();
72 let pipeline_env = pipeline.env.clone().unwrap_or_default();
73
74 let mut nodes: Vec<Node> = flat
75 .iter()
76 .map(|f| {
77 let mut env = pipeline_env.clone();
78 if let Some(e) = &f.step.env {
79 env.extend(e.clone());
80 }
81 Node {
82 step: f.step.clone(),
83 env,
84 builds_in: None,
85 depends_on: vec![],
86 builds_in_children: vec![],
87 }
88 })
89 .collect();
90
91 for (i, f) in flat.iter().enumerate() {
92 if let Some(parent_key) = &f.step.builds_in {
93 let p = *key_to_idx.get(parent_key).ok_or_else(|| {
94 anyhow::anyhow!(
95 "step '{}' builds_in references unknown step '{}'",
96 f.step.key,
97 parent_key
98 )
99 })?;
100 nodes[i].builds_in = Some(p);
101 nodes[p].builds_in_children.push(i);
102 if !nodes[i].depends_on.contains(&p) {
103 nodes[i].depends_on.push(p);
104 }
105 }
106 for dep_key in &f.extra_deps {
107 let p = *key_to_idx.get(dep_key).ok_or_else(|| {
108 anyhow::anyhow!(
109 "step '{}' has wait-barrier dep on unknown step '{}'",
110 f.step.key,
111 dep_key
112 )
113 })?;
114 if !nodes[i].depends_on.contains(&p) {
115 nodes[i].depends_on.push(p);
116 }
117 }
118 }
119
120 if let Some(default_img) = pipeline.default_image.as_deref() {
126 for node in &mut nodes {
127 if node.builds_in.is_none() && node.step.image.is_none() {
128 node.step.image = Some(default_img.to_string());
129 }
130 }
131 }
132
133 let g = Self {
134 nodes,
135 default_image: pipeline.default_image.clone(),
136 };
137 g.assert_acyclic()?;
138 Ok(g)
139 }
140
141 fn assert_acyclic(&self) -> Result<()> {
142 let mut color = vec![0u8; self.nodes.len()]; for start in 0..self.nodes.len() {
147 if color[start] == 0 {
148 let mut stack: Vec<(usize, Option<usize>, bool)> = vec![(start, None, false)];
149 while let Some((n, parent, exiting)) = stack.pop() {
150 if exiting {
151 color[n] = 2;
152 continue;
153 }
154 if color[n] == 1 {
155 let target = &self.nodes[n].step.key;
156 match parent {
157 Some(p) => anyhow::bail!(
158 "cycle: '{}' is reachable from itself via '{}'",
159 target,
160 self.nodes[p].step.key
161 ),
162 None => anyhow::bail!("cycle through step '{target}'"),
163 }
164 }
165 color[n] = 1;
166 stack.push((n, parent, true));
167 for &c in &self.nodes[n].depends_on {
168 if color[c] != 2 {
169 stack.push((c, Some(n), false));
170 }
171 }
172 }
173 }
174 }
175 Ok(())
176 }
177
178 #[must_use]
182 pub fn is_chain_step(&self, i: usize) -> bool {
183 self.nodes[i].builds_in.is_some_and(|p| {
186 self.nodes[p].builds_in_children.len() == 1 && self.nodes[i].depends_on.len() == 1
187 })
188 }
189
190 #[must_use]
198 pub fn chain_deps(&self, chains: &[Vec<usize>]) -> Vec<Vec<usize>> {
199 let mut chain_index = vec![usize::MAX; self.nodes.len()];
200 for (ci, ch) in chains.iter().enumerate() {
201 for &n in ch {
202 chain_index[n] = ci;
203 }
204 }
205 let mut out: Vec<Vec<usize>> = vec![Vec::new(); chains.len()];
206 for (ci, ch) in chains.iter().enumerate() {
207 let mut seen: std::collections::BTreeSet<usize> = std::collections::BTreeSet::new();
208 for &n in ch {
209 for &dep in &self.nodes[n].depends_on {
210 let dep_ci = chain_index[dep];
211 if dep_ci != ci {
212 seen.insert(dep_ci);
213 }
214 }
215 }
216 out[ci] = seen.into_iter().collect();
217 }
218 out
219 }
220
221 #[must_use]
227 pub fn chains(&self) -> Vec<Vec<usize>> {
228 let n = self.nodes.len();
229 let mut placed = vec![false; n];
230 let mut out: Vec<Vec<usize>> = Vec::new();
231 for root in 0..n {
232 if placed[root] || self.is_chain_step(root) {
233 continue;
234 }
235 let mut chain = vec![root];
236 placed[root] = true;
237 let mut cur = root;
239 while let Some(&next) = self.nodes[cur]
240 .builds_in_children
241 .iter()
242 .find(|&&c| self.is_chain_step(c))
243 {
244 chain.push(next);
245 placed[next] = true;
246 cur = next;
247 }
248 out.push(chain);
249 }
250 out
251 }
252}
253
254fn flatten_steps(steps: &[Step]) -> Vec<FlatStep> {
258 let mut out: Vec<FlatStep> = Vec::new();
259 let mut implicit_wait_targets: Vec<String> = Vec::new();
260 for s in steps {
261 match s {
262 Step::Command(c) => {
263 out.push(FlatStep {
264 step: (**c).clone(),
265 extra_deps: implicit_wait_targets.clone(),
266 });
267 }
268 Step::Wait(_) => {
269 implicit_wait_targets = out.iter().map(|f| f.step.key.clone()).collect();
270 }
271 }
272 }
273 out
274}
275
276#[cfg(test)]
277#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
278mod tests {
279 use super::*;
280
281 fn decode_wire(bytes: &[u8]) -> Pipeline {
285 serde_json::from_slice::<Pipeline>(bytes).unwrap()
286 }
287
288 #[test]
289 fn chain_detection() {
290 let json = br#"{
291 "version":"0",
292 "default_image":"ubuntu:24.04",
293 "steps":[
294 {"type":"command","key":"a","cmd":"echo a"},
295 {"type":"command","key":"b","cmd":"echo b","builds_in":"a"},
296 {"type":"command","key":"c","cmd":"echo c","builds_in":"b"}
297 ]
298 }"#;
299 let p = decode_wire(json);
300 let g = Graph::build(&p).unwrap();
301 assert!(!g.is_chain_step(0)); assert!(g.is_chain_step(1));
303 assert!(g.is_chain_step(2));
304 }
305
306 #[test]
307 fn fork_breaks_chain() {
308 let json = br#"{
309 "version":"0",
310 "default_image":"ubuntu:24.04",
311 "steps":[
312 {"type":"command","key":"a","cmd":"echo a"},
313 {"type":"command","key":"b","cmd":"echo b","builds_in":"a"},
314 {"type":"command","key":"c","cmd":"echo c","builds_in":"a"}
315 ]
316 }"#;
317 let p = decode_wire(json);
318 let g = Graph::build(&p).unwrap();
319 assert!(!g.is_chain_step(1)); assert!(!g.is_chain_step(2));
321 }
322
323 #[test]
324 fn wait_inserts_implicit_deps() {
325 let json = br#"{
326 "version":"0",
327 "steps":[
328 {"type":"command","key":"a","cmd":"echo a"},
329 {"type":"command","key":"b","cmd":"echo b"},
330 {"type":"wait"},
331 {"type":"command","key":"c","cmd":"echo c"}
332 ]
333 }"#;
334 let p = decode_wire(json);
335 let g = Graph::build(&p).unwrap();
336 let c_idx = g.nodes.iter().position(|n| n.step.key == "c").unwrap();
338 let a_idx = g.nodes.iter().position(|n| n.step.key == "a").unwrap();
339 let b_idx = g.nodes.iter().position(|n| n.step.key == "b").unwrap();
340 assert!(g.nodes[c_idx].depends_on.contains(&a_idx));
341 assert!(g.nodes[c_idx].depends_on.contains(&b_idx));
342 }
343
344 #[test]
345 fn rejects_unknown_builds_in() {
346 let json = br#"{
347 "version":"0",
348 "steps":[
349 {"type":"command","key":"b","cmd":"echo b","builds_in":"missing"}
350 ]
351 }"#;
352 let p = decode_wire(json);
353 let err = Graph::build(&p).unwrap_err();
354 assert!(err.to_string().contains("missing") || err.to_string().contains("unknown"));
355 }
356
357 #[test]
358 fn chains_partition_includes_every_node_once() {
359 let json = br#"{
364 "version":"0",
365 "default_image":"ubuntu:24.04",
366 "steps":[
367 {"type":"command","key":"a","cmd":"echo a"},
368 {"type":"command","key":"b","cmd":"echo b","builds_in":"a"},
369 {"type":"command","key":"c","cmd":"echo c","builds_in":"b"},
370 {"type":"command","key":"d","cmd":"echo d","builds_in":"a"},
371 {"type":"command","key":"e","cmd":"echo e"}
372 ]
373 }"#;
374 let p = decode_wire(json);
375 let g = Graph::build(&p).unwrap();
376 let idx = |k: &str| g.nodes.iter().position(|n| n.step.key == k).unwrap();
377
378 let chains = g.chains();
381 let mut all_nodes: Vec<usize> = chains.iter().flatten().copied().collect();
382 all_nodes.sort_unstable();
383 assert_eq!(
384 all_nodes,
385 vec![idx("a"), idx("b"), idx("c"), idx("d"), idx("e")]
386 );
387
388 let bc_chain = chains
390 .iter()
391 .find(|ch| ch.contains(&idx("b")))
392 .expect("b must be in some chain");
393 assert_eq!(bc_chain, &vec![idx("b"), idx("c")]);
394
395 for ch in &chains {
397 if ch != bc_chain {
398 assert_eq!(ch.len(), 1, "non-bc chain not singleton: {ch:?}");
399 }
400 }
401 }
402
403 #[test]
404 fn chain_deps_aggregates_cross_chain_edges() {
405 let json = br#"{
412 "version":"0",
413 "steps":[
414 {"type":"command","key":"a","cmd":"echo a"},
415 {"type":"command","key":"b","cmd":"echo b","builds_in":"a"},
416 {"type":"command","key":"c","cmd":"echo c","builds_in":"b"},
417 {"type":"command","key":"d","cmd":"echo d","builds_in":"a"},
418 {"type":"command","key":"e","cmd":"echo e"}
419 ]
420 }"#;
421 let p = decode_wire(json);
422 let g = Graph::build(&p).unwrap();
423 let chains = g.chains();
424 let deps = g.chain_deps(&chains);
425
426 let find_chain = |key: &str| -> usize {
427 let idx = g.nodes.iter().position(|n| n.step.key == key).unwrap();
428 chains.iter().position(|ch| ch.contains(&idx)).unwrap()
429 };
430 let a_ci = find_chain("a");
431 let bc_ci = find_chain("b");
432 let d_ci = find_chain("d");
433 let e_ci = find_chain("e");
434
435 assert!(
436 deps[a_ci].is_empty(),
437 "chain a has no deps: {:?}",
438 deps[a_ci]
439 );
440 assert_eq!(deps[bc_ci], vec![a_ci]);
441 assert_eq!(deps[d_ci], vec![a_ci]);
442 assert!(deps[e_ci].is_empty());
443 }
444
445 #[test]
446 fn chain_deps_subsumes_wait_barriers() {
447 let json = br#"{
448 "version":"0",
449 "steps":[
450 {"type":"command","key":"a","cmd":"echo a"},
451 {"type":"command","key":"b","cmd":"echo b"},
452 {"type":"wait"},
453 {"type":"command","key":"c","cmd":"echo c"}
454 ]
455 }"#;
456 let p = decode_wire(json);
457 let g = Graph::build(&p).unwrap();
458 let chains = g.chains();
459 let deps = g.chain_deps(&chains);
460 let find_chain = |key: &str| -> usize {
461 let idx = g.nodes.iter().position(|n| n.step.key == key).unwrap();
462 chains.iter().position(|ch| ch.contains(&idx)).unwrap()
463 };
464 let a_ci = find_chain("a");
465 let b_ci = find_chain("b");
466 let c_ci = find_chain("c");
467 let mut c_deps = deps[c_ci].clone();
468 c_deps.sort_unstable();
469 let mut want = vec![a_ci, b_ci];
470 want.sort_unstable();
471 assert_eq!(c_deps, want);
472 }
473
474 #[test]
475 fn chains_root_is_never_a_chain_step() {
476 let json = br#"{
477 "version":"0",
478 "steps":[
479 {"type":"command","key":"a","cmd":"echo a"},
480 {"type":"command","key":"b","cmd":"echo b","builds_in":"a"}
481 ]
482 }"#;
483 let p = decode_wire(json);
484 let g = Graph::build(&p).unwrap();
485 for chain in g.chains() {
486 let root = chain[0];
487 assert!(!g.is_chain_step(root), "chain root {root} is a chain step");
488 for &step in &chain[1..] {
489 assert!(g.is_chain_step(step), "non-root {step} is not a chain step");
490 }
491 }
492 }
493}