Skip to main content

batuta/playbook/
dag.rs

1//! DAG construction from playbook deps/outs + after edges (PB-002)
2//!
3//! Builds a directed acyclic graph from implicit data dependencies (output→dep
4//! matching) and explicit `after` edges. Reuses the fallback graph primitives
5//! from `stack/graph.rs`.
6
7use super::types::Playbook;
8use anyhow::{bail, Result};
9use std::collections::{HashMap, HashSet, VecDeque};
10
11/// Playbook DAG with topological execution order
12#[derive(Debug, Clone)]
13pub struct PlaybookDag {
14    /// Stages in topological execution order
15    pub topo_order: Vec<String>,
16
17    /// Map from stage → stages that must complete before it
18    pub predecessors: HashMap<String, Vec<String>>,
19
20    /// Map from stage → stages that depend on it
21    pub successors: HashMap<String, Vec<String>>,
22}
23
24/// Build the execution DAG from a playbook
25///
26/// Algorithm:
27/// 1. Build output_map: path → producing stage
28/// 2. For each stage's deps, find the producing stage via output_map → add edge
29/// 3. Add explicit `after` edges
30/// 4. Check for cycles
31/// 5. Topological sort
32pub fn build_dag(playbook: &Playbook) -> Result<PlaybookDag> {
33    let stage_names: Vec<String> = playbook.stages.keys().cloned().collect();
34
35    // Initialize adjacency
36    let mut predecessors: HashMap<String, Vec<String>> = HashMap::new();
37    let mut successors: HashMap<String, Vec<String>> = HashMap::new();
38    for name in &stage_names {
39        predecessors.insert(name.clone(), Vec::new());
40        successors.insert(name.clone(), Vec::new());
41    }
42
43    // Step 1: Build output_map (path → producing stage name)
44    let mut output_map: HashMap<&str, &str> = HashMap::new();
45    for (name, stage) in &playbook.stages {
46        for out in &stage.outs {
47            if let Some(existing) = output_map.insert(&out.path, name) {
48                bail!(
49                    "output path '{}' is produced by both '{}' and '{}'",
50                    out.path,
51                    existing,
52                    name
53                );
54            }
55        }
56    }
57
58    // Step 2: Implicit data dependency edges
59    for (consumer_name, stage) in &playbook.stages {
60        for dep in &stage.deps {
61            if let Some(&producer_name) = output_map.get(dep.path.as_str()) {
62                if producer_name != consumer_name {
63                    add_edge(&mut predecessors, &mut successors, producer_name, consumer_name);
64                }
65            }
66            // deps referencing external files (not produced by any stage) are fine
67        }
68    }
69
70    // Step 3: Explicit `after` edges
71    for (name, stage) in &playbook.stages {
72        for after_name in &stage.after {
73            add_edge(&mut predecessors, &mut successors, after_name, name);
74        }
75    }
76
77    // Step 4: Cycle detection + topological sort (Kahn's algorithm)
78    let topo_order = kahn_toposort(&stage_names, &predecessors, &successors)?;
79
80    Ok(PlaybookDag { topo_order, predecessors, successors })
81}
82
83fn add_edge(
84    predecessors: &mut HashMap<String, Vec<String>>,
85    successors: &mut HashMap<String, Vec<String>>,
86    from: &str,
87    to: &str,
88) {
89    let preds = predecessors.entry(to.to_string()).or_default();
90    if !preds.contains(&from.to_string()) {
91        preds.push(from.to_string());
92    }
93    let succs = successors.entry(from.to_string()).or_default();
94    if !succs.contains(&to.to_string()) {
95        succs.push(to.to_string());
96    }
97}
98
99/// Find nodes that become ready (in-degree drops to 0) after processing `node`.
100fn collect_ready_successors(
101    node: &str,
102    names: &[String],
103    predecessors: &HashMap<String, Vec<String>>,
104    visited: &HashSet<String>,
105    in_degree: &mut HashMap<&str, usize>,
106) -> Vec<String> {
107    let mut ready = Vec::new();
108    for name in names {
109        if visited.contains(name) {
110            continue;
111        }
112        if let Some(preds) = predecessors.get(name.as_str()) {
113            if preds.contains(&node.to_string()) {
114                let deg = in_degree.get_mut(name.as_str()).expect("unexpected failure");
115                *deg -= 1;
116                if *deg == 0 {
117                    ready.push(name.clone());
118                }
119            }
120        }
121    }
122    ready.sort();
123    ready
124}
125
126/// Kahn's topological sort with cycle detection
127fn kahn_toposort(
128    names: &[String],
129    predecessors: &HashMap<String, Vec<String>>,
130    _successors: &HashMap<String, Vec<String>>,
131) -> Result<Vec<String>> {
132    let mut in_degree: HashMap<&str, usize> = HashMap::new();
133    for name in names {
134        in_degree.insert(name, predecessors.get(name.as_str()).map_or(0, |p| p.len()));
135    }
136
137    let mut queue: VecDeque<String> = {
138        let mut init: Vec<String> =
139            names.iter().filter(|n| in_degree.get(n.as_str()) == Some(&0)).cloned().collect();
140        init.sort();
141        init.into()
142    };
143
144    let mut result = Vec::new();
145    let mut visited: HashSet<String> = HashSet::new();
146
147    while let Some(node) = queue.pop_front() {
148        visited.insert(node.clone());
149        result.push(node.clone());
150        let next = collect_ready_successors(&node, names, predecessors, &visited, &mut in_degree);
151        queue.extend(next);
152    }
153
154    if result.len() != names.len() {
155        let cycle_stages: Vec<&str> =
156            names.iter().filter(|n| !visited.contains(n.as_str())).map(|n| n.as_str()).collect();
157        bail!("cycle detected in pipeline stages: {}", cycle_stages.join(" → "));
158    }
159
160    Ok(result)
161}
162
163#[cfg(test)]
164#[allow(non_snake_case)]
165mod tests {
166    use super::*;
167    use crate::playbook::parser::parse_playbook;
168
169    #[test]
170    fn test_PB002_linear_chain() {
171        let yaml = r#"
172version: "1.0"
173name: chain
174params: {}
175targets: {}
176stages:
177  a:
178    cmd: "echo a > /tmp/a.txt"
179    deps: []
180    outs:
181      - path: /tmp/a.txt
182  b:
183    cmd: "cat /tmp/a.txt > /tmp/b.txt"
184    deps:
185      - path: /tmp/a.txt
186    outs:
187      - path: /tmp/b.txt
188  c:
189    cmd: "cat /tmp/b.txt > /tmp/c.txt"
190    deps:
191      - path: /tmp/b.txt
192    outs:
193      - path: /tmp/c.txt
194policy:
195  failure: stop_on_first
196  validation: checksum
197  lock_file: true
198"#;
199        let pb = parse_playbook(yaml).expect("unexpected failure");
200        let dag = build_dag(&pb).expect("unexpected failure");
201        assert_eq!(dag.topo_order, vec!["a", "b", "c"]);
202    }
203
204    #[test]
205    fn test_PB002_parallel_stages() {
206        let yaml = r#"
207version: "1.0"
208name: parallel
209params: {}
210targets: {}
211stages:
212  a:
213    cmd: "echo a"
214    deps: []
215    outs:
216      - path: /tmp/a.txt
217  b:
218    cmd: "echo b"
219    deps: []
220    outs:
221      - path: /tmp/b.txt
222  c:
223    cmd: "echo c"
224    deps: []
225    outs:
226      - path: /tmp/c.txt
227policy:
228  failure: stop_on_first
229  validation: checksum
230  lock_file: true
231"#;
232        let pb = parse_playbook(yaml).expect("unexpected failure");
233        let dag = build_dag(&pb).expect("unexpected failure");
234        // All independent, alphabetical sort
235        assert_eq!(dag.topo_order, vec!["a", "b", "c"]);
236    }
237
238    #[test]
239    fn test_PB002_diamond_dag() {
240        let yaml = r#"
241version: "1.0"
242name: diamond
243params: {}
244targets: {}
245stages:
246  source:
247    cmd: "echo src"
248    deps: []
249    outs:
250      - path: /tmp/src.txt
251  left:
252    cmd: "echo left"
253    deps:
254      - path: /tmp/src.txt
255    outs:
256      - path: /tmp/left.txt
257  right:
258    cmd: "echo right"
259    deps:
260      - path: /tmp/src.txt
261    outs:
262      - path: /tmp/right.txt
263  sink:
264    cmd: "echo sink"
265    deps:
266      - path: /tmp/left.txt
267      - path: /tmp/right.txt
268    outs:
269      - path: /tmp/sink.txt
270policy:
271  failure: stop_on_first
272  validation: checksum
273  lock_file: true
274"#;
275        let pb = parse_playbook(yaml).expect("unexpected failure");
276        let dag = build_dag(&pb).expect("unexpected failure");
277        // source must be first, sink must be last
278        assert_eq!(dag.topo_order[0], "source");
279        assert_eq!(dag.topo_order[3], "sink");
280        // left and right can be in either order
281        let middle: HashSet<&str> = dag.topo_order[1..3].iter().map(|s| s.as_str()).collect();
282        assert!(middle.contains("left"));
283        assert!(middle.contains("right"));
284    }
285
286    #[test]
287    fn test_PB002_cycle_detection() {
288        let yaml = r#"
289version: "1.0"
290name: cycle
291params: {}
292targets: {}
293stages:
294  a:
295    cmd: "echo a"
296    deps:
297      - path: /tmp/b.txt
298    outs:
299      - path: /tmp/a.txt
300  b:
301    cmd: "echo b"
302    deps:
303      - path: /tmp/a.txt
304    outs:
305      - path: /tmp/b.txt
306policy:
307  failure: stop_on_first
308  validation: checksum
309  lock_file: true
310"#;
311        let pb = parse_playbook(yaml).expect("unexpected failure");
312        let err = build_dag(&pb).unwrap_err();
313        assert!(err.to_string().contains("cycle"));
314    }
315
316    #[test]
317    fn test_PB002_after_edges() {
318        let yaml = r#"
319version: "1.0"
320name: after
321params: {}
322targets: {}
323stages:
324  setup:
325    cmd: "echo setup"
326    deps: []
327    outs:
328      - path: /tmp/setup.txt
329  work:
330    cmd: "echo work"
331    deps: []
332    outs:
333      - path: /tmp/work.txt
334    after:
335      - setup
336policy:
337  failure: stop_on_first
338  validation: checksum
339  lock_file: true
340"#;
341        let pb = parse_playbook(yaml).expect("unexpected failure");
342        let dag = build_dag(&pb).expect("unexpected failure");
343        assert_eq!(dag.topo_order, vec!["setup", "work"]);
344        assert_eq!(dag.predecessors["work"], vec!["setup"]);
345        assert_eq!(dag.successors["setup"], vec!["work"]);
346    }
347
348    #[test]
349    fn test_PB002_duplicate_output_path() {
350        let yaml = r#"
351version: "1.0"
352name: dup
353params: {}
354targets: {}
355stages:
356  a:
357    cmd: "echo a"
358    deps: []
359    outs:
360      - path: /tmp/shared.txt
361  b:
362    cmd: "echo b"
363    deps: []
364    outs:
365      - path: /tmp/shared.txt
366policy:
367  failure: stop_on_first
368  validation: checksum
369  lock_file: true
370"#;
371        let pb = parse_playbook(yaml).expect("unexpected failure");
372        let err = build_dag(&pb).unwrap_err();
373        assert!(err.to_string().contains("produced by both"));
374    }
375
376    #[test]
377    fn test_PB002_external_deps_no_edge() {
378        let yaml = r#"
379version: "1.0"
380name: external
381params: {}
382targets: {}
383stages:
384  a:
385    cmd: "echo a"
386    deps:
387      - path: /data/external.csv
388    outs:
389      - path: /tmp/a.txt
390  b:
391    cmd: "echo b"
392    deps:
393      - path: /data/another.csv
394    outs:
395      - path: /tmp/b.txt
396policy:
397  failure: stop_on_first
398  validation: checksum
399  lock_file: true
400"#;
401        let pb = parse_playbook(yaml).expect("unexpected failure");
402        let dag = build_dag(&pb).expect("unexpected failure");
403        // Both are independent (deps are external)
404        assert_eq!(dag.topo_order.len(), 2);
405        assert!(dag.predecessors["a"].is_empty());
406        assert!(dag.predecessors["b"].is_empty());
407    }
408
409    #[test]
410    fn test_PB002_mixed_implicit_and_explicit() {
411        let yaml = r#"
412version: "1.0"
413name: mixed
414params: {}
415targets: {}
416stages:
417  a:
418    cmd: "echo a"
419    deps: []
420    outs:
421      - path: /tmp/a.txt
422  b:
423    cmd: "echo b"
424    deps:
425      - path: /tmp/a.txt
426    outs:
427      - path: /tmp/b.txt
428  c:
429    cmd: "echo c"
430    deps: []
431    outs:
432      - path: /tmp/c.txt
433    after:
434      - b
435policy:
436  failure: stop_on_first
437  validation: checksum
438  lock_file: true
439"#;
440        let pb = parse_playbook(yaml).expect("unexpected failure");
441        let dag = build_dag(&pb).expect("unexpected failure");
442        assert_eq!(dag.topo_order, vec!["a", "b", "c"]);
443    }
444}