openjd_model/job/step_dependency_graph.rs
1// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2// Copyright by contributors to this project.
3// SPDX-License-Identifier: (Apache-2.0 OR MIT)
4
5//! Step dependency graph for instantiated jobs.
6
7use std::collections::HashMap;
8
9use crate::error::ModelError;
10use crate::job;
11
12type NodeIndex = usize;
13
14/// A step-to-step dependency edge.
15#[derive(Debug)]
16pub struct StepDependencyEdge {
17 /// Index of the step that is depended upon.
18 pub origin: NodeIndex,
19 /// Index of the step that depends on origin.
20 pub dependent: NodeIndex,
21}
22
23/// A node in the dependency graph.
24#[derive(Debug)]
25pub struct StepDependencyNode {
26 /// The step this node represents (index into job.steps).
27 pub step_index: usize,
28 /// The step name.
29 pub name: String,
30 /// Edges where this step depends on another (this step is the dependent).
31 pub in_edges: Vec<usize>,
32 /// Edges where another step depends on this one (this step is the origin).
33 pub out_edges: Vec<usize>,
34}
35
36/// Dependency graph over the steps of an instantiated job.
37#[derive(Debug)]
38pub struct StepDependencyGraph {
39 nodes: Vec<StepDependencyNode>,
40 edges: Vec<StepDependencyEdge>,
41 name_to_index: HashMap<String, NodeIndex>,
42}
43
44impl StepDependencyGraph {
45 /// Build the dependency graph from a Job.
46 pub fn new(job: &job::Job) -> Result<Self, ModelError> {
47 let name_to_index: HashMap<String, NodeIndex> = job
48 .steps
49 .iter()
50 .enumerate()
51 .map(|(i, s)| (s.name.clone(), i))
52 .collect();
53
54 let mut nodes: Vec<StepDependencyNode> = job
55 .steps
56 .iter()
57 .enumerate()
58 .map(|(i, s)| StepDependencyNode {
59 step_index: i,
60 name: s.name.clone(),
61 in_edges: Vec::new(),
62 out_edges: Vec::new(),
63 })
64 .collect();
65
66 let mut edges = Vec::new();
67
68 for (dep_idx, step) in job.steps.iter().enumerate() {
69 if let Some(deps) = &step.dependencies {
70 for dep in deps {
71 let origin_idx = *name_to_index.get(&dep.depends_on).ok_or_else(|| {
72 ModelError::DecodeValidation(format!(
73 "Step '{}' depends on unknown step '{}'",
74 step.name, dep.depends_on
75 ))
76 })?;
77 let edge_idx = edges.len();
78 edges.push(StepDependencyEdge {
79 origin: origin_idx,
80 dependent: dep_idx,
81 });
82 nodes[dep_idx].in_edges.push(edge_idx);
83 nodes[origin_idx].out_edges.push(edge_idx);
84 }
85 }
86 }
87
88 Ok(Self {
89 nodes,
90 edges,
91 name_to_index,
92 })
93 }
94
95 /// Get a node by step name.
96 pub fn step_node(&self, name: &str) -> Option<&StepDependencyNode> {
97 self.name_to_index.get(name).map(|&i| &self.nodes[i])
98 }
99
100 /// Number of nodes in the graph.
101 pub fn node_count(&self) -> usize {
102 self.nodes.len()
103 }
104
105 /// Get an edge by index.
106 pub fn edge(&self, index: usize) -> Option<&StepDependencyEdge> {
107 self.edges.get(index)
108 }
109
110 /// Get a node by index.
111 pub fn node(&self, index: usize) -> Option<&StepDependencyNode> {
112 self.nodes.get(index)
113 }
114
115 /// Maximum in-degree (max number of dependencies any step has).
116 pub fn max_indegree(&self) -> usize {
117 self.nodes
118 .iter()
119 .map(|n| n.in_edges.len())
120 .max()
121 .unwrap_or(0)
122 }
123
124 /// Maximum out-degree (max number of steps that depend on any single step).
125 pub fn max_outdegree(&self) -> usize {
126 self.nodes
127 .iter()
128 .map(|n| n.out_edges.len())
129 .max()
130 .unwrap_or(0)
131 }
132
133 /// Stable topological sort matching the Python implementation.
134 ///
135 /// DFS-based: processes nodes in template order. For each unvisited node,
136 /// pushes it onto a stack and explores dependencies (pushed in reverse
137 /// template order so the earliest dependency is processed first).
138 /// Returns step indices in dependency-respecting, template-stable order.
139 pub fn topo_sorted(&self) -> Result<Vec<usize>, ModelError> {
140 let n = self.nodes.len();
141 // 0 = unvisited, 1 = started, 2 = completed
142 let mut state = vec![0u8; n];
143 let mut result = Vec::with_capacity(n);
144
145 for i in 0..n {
146 if state[i] != 0 {
147 continue;
148 }
149 let mut stack: Vec<NodeIndex> = vec![i];
150 while let Some(&top) = stack.last() {
151 match state[top] {
152 2 => {
153 stack.pop();
154 }
155 1 => {
156 // Second visit — mark completed
157 state[top] = 2;
158 result.push(top);
159 stack.pop();
160 }
161 _ => {
162 // First visit — mark started, push unfinished deps
163 state[top] = 1;
164 // Collect dependency indices, sort by reverse template index
165 let mut dep_indices: Vec<NodeIndex> = self.nodes[top]
166 .in_edges
167 .iter()
168 .map(|&e| self.edges[e].origin)
169 .collect();
170 dep_indices.sort_unstable_by(|a, b| b.cmp(a));
171 for dep in dep_indices {
172 match state[dep] {
173 2 => {} // already done
174 1 => {
175 // Build cycle path from the stack
176 let cycle: Vec<&str> = stack
177 .iter()
178 .filter(|&&idx| state[idx] == 1)
179 .map(|&idx| self.nodes[idx].name.as_str())
180 .collect();
181 let dep_name = &self.nodes[dep].name;
182 return Err(ModelError::DecodeValidation(format!(
183 "A circular dependency was found in the step dependency graph:\n{} -> {}",
184 cycle.join(" -> "), dep_name
185 )));
186 }
187 _ => stack.push(dep),
188 }
189 }
190 }
191 }
192 }
193 }
194
195 Ok(result)
196 }
197
198 /// Convenience: topological sort returning step names.
199 pub fn topo_sorted_names(&self) -> Result<Vec<String>, ModelError> {
200 self.topo_sorted().map(|indices| {
201 indices
202 .into_iter()
203 .map(|i| self.nodes[i].name.clone())
204 .collect()
205 })
206 }
207}