1use super::types::Playbook;
8use anyhow::{bail, Result};
9use std::collections::{HashMap, HashSet, VecDeque};
10
11#[derive(Debug, Clone)]
13pub struct PlaybookDag {
14 pub topo_order: Vec<String>,
16
17 pub predecessors: HashMap<String, Vec<String>>,
19
20 pub successors: HashMap<String, Vec<String>>,
22}
23
24pub fn build_dag(playbook: &Playbook) -> Result<PlaybookDag> {
33 let stage_names: Vec<String> = playbook.stages.keys().cloned().collect();
34
35 let mut predecessors: HashMap<String, Vec<String>> = HashMap::new();
37 let mut successors: HashMap<String, Vec<String>> = HashMap::new();
38 for name in &stage_names {
39 predecessors.insert(name.clone(), Vec::new());
40 successors.insert(name.clone(), Vec::new());
41 }
42
43 let mut output_map: HashMap<&str, &str> = HashMap::new();
45 for (name, stage) in &playbook.stages {
46 for out in &stage.outs {
47 if let Some(existing) = output_map.insert(&out.path, name) {
48 bail!(
49 "output path '{}' is produced by both '{}' and '{}'",
50 out.path,
51 existing,
52 name
53 );
54 }
55 }
56 }
57
58 for (consumer_name, stage) in &playbook.stages {
60 for dep in &stage.deps {
61 if let Some(&producer_name) = output_map.get(dep.path.as_str()) {
62 if producer_name != consumer_name {
63 add_edge(&mut predecessors, &mut successors, producer_name, consumer_name);
64 }
65 }
66 }
68 }
69
70 for (name, stage) in &playbook.stages {
72 for after_name in &stage.after {
73 add_edge(&mut predecessors, &mut successors, after_name, name);
74 }
75 }
76
77 let topo_order = kahn_toposort(&stage_names, &predecessors, &successors)?;
79
80 Ok(PlaybookDag { topo_order, predecessors, successors })
81}
82
83fn add_edge(
84 predecessors: &mut HashMap<String, Vec<String>>,
85 successors: &mut HashMap<String, Vec<String>>,
86 from: &str,
87 to: &str,
88) {
89 let preds = predecessors.entry(to.to_string()).or_default();
90 if !preds.contains(&from.to_string()) {
91 preds.push(from.to_string());
92 }
93 let succs = successors.entry(from.to_string()).or_default();
94 if !succs.contains(&to.to_string()) {
95 succs.push(to.to_string());
96 }
97}
98
99fn collect_ready_successors(
101 node: &str,
102 names: &[String],
103 predecessors: &HashMap<String, Vec<String>>,
104 visited: &HashSet<String>,
105 in_degree: &mut HashMap<&str, usize>,
106) -> Vec<String> {
107 let mut ready = Vec::new();
108 for name in names {
109 if visited.contains(name) {
110 continue;
111 }
112 if let Some(preds) = predecessors.get(name.as_str()) {
113 if preds.contains(&node.to_string()) {
114 let deg = in_degree.get_mut(name.as_str()).expect("unexpected failure");
115 *deg -= 1;
116 if *deg == 0 {
117 ready.push(name.clone());
118 }
119 }
120 }
121 }
122 ready.sort();
123 ready
124}
125
126fn kahn_toposort(
128 names: &[String],
129 predecessors: &HashMap<String, Vec<String>>,
130 _successors: &HashMap<String, Vec<String>>,
131) -> Result<Vec<String>> {
132 let mut in_degree: HashMap<&str, usize> = HashMap::new();
133 for name in names {
134 in_degree.insert(name, predecessors.get(name.as_str()).map_or(0, |p| p.len()));
135 }
136
137 let mut queue: VecDeque<String> = {
138 let mut init: Vec<String> =
139 names.iter().filter(|n| in_degree.get(n.as_str()) == Some(&0)).cloned().collect();
140 init.sort();
141 init.into()
142 };
143
144 let mut result = Vec::new();
145 let mut visited: HashSet<String> = HashSet::new();
146
147 while let Some(node) = queue.pop_front() {
148 visited.insert(node.clone());
149 result.push(node.clone());
150 let next = collect_ready_successors(&node, names, predecessors, &visited, &mut in_degree);
151 queue.extend(next);
152 }
153
154 if result.len() != names.len() {
155 let cycle_stages: Vec<&str> =
156 names.iter().filter(|n| !visited.contains(n.as_str())).map(|n| n.as_str()).collect();
157 bail!("cycle detected in pipeline stages: {}", cycle_stages.join(" → "));
158 }
159
160 Ok(result)
161}
162
163#[cfg(test)]
164#[allow(non_snake_case)]
165mod tests {
166 use super::*;
167 use crate::playbook::parser::parse_playbook;
168
169 #[test]
170 fn test_PB002_linear_chain() {
171 let yaml = r#"
172version: "1.0"
173name: chain
174params: {}
175targets: {}
176stages:
177 a:
178 cmd: "echo a > /tmp/a.txt"
179 deps: []
180 outs:
181 - path: /tmp/a.txt
182 b:
183 cmd: "cat /tmp/a.txt > /tmp/b.txt"
184 deps:
185 - path: /tmp/a.txt
186 outs:
187 - path: /tmp/b.txt
188 c:
189 cmd: "cat /tmp/b.txt > /tmp/c.txt"
190 deps:
191 - path: /tmp/b.txt
192 outs:
193 - path: /tmp/c.txt
194policy:
195 failure: stop_on_first
196 validation: checksum
197 lock_file: true
198"#;
199 let pb = parse_playbook(yaml).expect("unexpected failure");
200 let dag = build_dag(&pb).expect("unexpected failure");
201 assert_eq!(dag.topo_order, vec!["a", "b", "c"]);
202 }
203
204 #[test]
205 fn test_PB002_parallel_stages() {
206 let yaml = r#"
207version: "1.0"
208name: parallel
209params: {}
210targets: {}
211stages:
212 a:
213 cmd: "echo a"
214 deps: []
215 outs:
216 - path: /tmp/a.txt
217 b:
218 cmd: "echo b"
219 deps: []
220 outs:
221 - path: /tmp/b.txt
222 c:
223 cmd: "echo c"
224 deps: []
225 outs:
226 - path: /tmp/c.txt
227policy:
228 failure: stop_on_first
229 validation: checksum
230 lock_file: true
231"#;
232 let pb = parse_playbook(yaml).expect("unexpected failure");
233 let dag = build_dag(&pb).expect("unexpected failure");
234 assert_eq!(dag.topo_order, vec!["a", "b", "c"]);
236 }
237
238 #[test]
239 fn test_PB002_diamond_dag() {
240 let yaml = r#"
241version: "1.0"
242name: diamond
243params: {}
244targets: {}
245stages:
246 source:
247 cmd: "echo src"
248 deps: []
249 outs:
250 - path: /tmp/src.txt
251 left:
252 cmd: "echo left"
253 deps:
254 - path: /tmp/src.txt
255 outs:
256 - path: /tmp/left.txt
257 right:
258 cmd: "echo right"
259 deps:
260 - path: /tmp/src.txt
261 outs:
262 - path: /tmp/right.txt
263 sink:
264 cmd: "echo sink"
265 deps:
266 - path: /tmp/left.txt
267 - path: /tmp/right.txt
268 outs:
269 - path: /tmp/sink.txt
270policy:
271 failure: stop_on_first
272 validation: checksum
273 lock_file: true
274"#;
275 let pb = parse_playbook(yaml).expect("unexpected failure");
276 let dag = build_dag(&pb).expect("unexpected failure");
277 assert_eq!(dag.topo_order[0], "source");
279 assert_eq!(dag.topo_order[3], "sink");
280 let middle: HashSet<&str> = dag.topo_order[1..3].iter().map(|s| s.as_str()).collect();
282 assert!(middle.contains("left"));
283 assert!(middle.contains("right"));
284 }
285
286 #[test]
287 fn test_PB002_cycle_detection() {
288 let yaml = r#"
289version: "1.0"
290name: cycle
291params: {}
292targets: {}
293stages:
294 a:
295 cmd: "echo a"
296 deps:
297 - path: /tmp/b.txt
298 outs:
299 - path: /tmp/a.txt
300 b:
301 cmd: "echo b"
302 deps:
303 - path: /tmp/a.txt
304 outs:
305 - path: /tmp/b.txt
306policy:
307 failure: stop_on_first
308 validation: checksum
309 lock_file: true
310"#;
311 let pb = parse_playbook(yaml).expect("unexpected failure");
312 let err = build_dag(&pb).unwrap_err();
313 assert!(err.to_string().contains("cycle"));
314 }
315
316 #[test]
317 fn test_PB002_after_edges() {
318 let yaml = r#"
319version: "1.0"
320name: after
321params: {}
322targets: {}
323stages:
324 setup:
325 cmd: "echo setup"
326 deps: []
327 outs:
328 - path: /tmp/setup.txt
329 work:
330 cmd: "echo work"
331 deps: []
332 outs:
333 - path: /tmp/work.txt
334 after:
335 - setup
336policy:
337 failure: stop_on_first
338 validation: checksum
339 lock_file: true
340"#;
341 let pb = parse_playbook(yaml).expect("unexpected failure");
342 let dag = build_dag(&pb).expect("unexpected failure");
343 assert_eq!(dag.topo_order, vec!["setup", "work"]);
344 assert_eq!(dag.predecessors["work"], vec!["setup"]);
345 assert_eq!(dag.successors["setup"], vec!["work"]);
346 }
347
348 #[test]
349 fn test_PB002_duplicate_output_path() {
350 let yaml = r#"
351version: "1.0"
352name: dup
353params: {}
354targets: {}
355stages:
356 a:
357 cmd: "echo a"
358 deps: []
359 outs:
360 - path: /tmp/shared.txt
361 b:
362 cmd: "echo b"
363 deps: []
364 outs:
365 - path: /tmp/shared.txt
366policy:
367 failure: stop_on_first
368 validation: checksum
369 lock_file: true
370"#;
371 let pb = parse_playbook(yaml).expect("unexpected failure");
372 let err = build_dag(&pb).unwrap_err();
373 assert!(err.to_string().contains("produced by both"));
374 }
375
376 #[test]
377 fn test_PB002_external_deps_no_edge() {
378 let yaml = r#"
379version: "1.0"
380name: external
381params: {}
382targets: {}
383stages:
384 a:
385 cmd: "echo a"
386 deps:
387 - path: /data/external.csv
388 outs:
389 - path: /tmp/a.txt
390 b:
391 cmd: "echo b"
392 deps:
393 - path: /data/another.csv
394 outs:
395 - path: /tmp/b.txt
396policy:
397 failure: stop_on_first
398 validation: checksum
399 lock_file: true
400"#;
401 let pb = parse_playbook(yaml).expect("unexpected failure");
402 let dag = build_dag(&pb).expect("unexpected failure");
403 assert_eq!(dag.topo_order.len(), 2);
405 assert!(dag.predecessors["a"].is_empty());
406 assert!(dag.predecessors["b"].is_empty());
407 }
408
409 #[test]
410 fn test_PB002_mixed_implicit_and_explicit() {
411 let yaml = r#"
412version: "1.0"
413name: mixed
414params: {}
415targets: {}
416stages:
417 a:
418 cmd: "echo a"
419 deps: []
420 outs:
421 - path: /tmp/a.txt
422 b:
423 cmd: "echo b"
424 deps:
425 - path: /tmp/a.txt
426 outs:
427 - path: /tmp/b.txt
428 c:
429 cmd: "echo c"
430 deps: []
431 outs:
432 - path: /tmp/c.txt
433 after:
434 - b
435policy:
436 failure: stop_on_first
437 validation: checksum
438 lock_file: true
439"#;
440 let pb = parse_playbook(yaml).expect("unexpected failure");
441 let dag = build_dag(&pb).expect("unexpected failure");
442 assert_eq!(dag.topo_order, vec!["a", "b", "c"]);
443 }
444}