Skip to main content

openjd_model/job/
step_dependency_graph.rs

1// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2// Copyright by contributors to this project.
3// SPDX-License-Identifier: (Apache-2.0 OR MIT)
4
5//! Step dependency graph for instantiated jobs.
6
7use std::collections::HashMap;
8
9use crate::error::ModelError;
10use crate::job;
11
12type NodeIndex = usize;
13
14/// A step-to-step dependency edge.
15#[derive(Debug)]
16pub struct StepDependencyEdge {
17    /// Index of the step that is depended upon.
18    pub origin: NodeIndex,
19    /// Index of the step that depends on origin.
20    pub dependent: NodeIndex,
21}
22
23/// A node in the dependency graph.
24#[derive(Debug)]
25pub struct StepDependencyNode {
26    /// The step this node represents (index into job.steps).
27    pub step_index: usize,
28    /// The step name.
29    pub name: String,
30    /// Edges where this step depends on another (this step is the dependent).
31    pub in_edges: Vec<usize>,
32    /// Edges where another step depends on this one (this step is the origin).
33    pub out_edges: Vec<usize>,
34}
35
36/// Dependency graph over the steps of an instantiated job.
37#[derive(Debug)]
38pub struct StepDependencyGraph {
39    nodes: Vec<StepDependencyNode>,
40    edges: Vec<StepDependencyEdge>,
41    name_to_index: HashMap<String, NodeIndex>,
42}
43
44impl StepDependencyGraph {
45    /// Build the dependency graph from a Job.
46    pub fn new(job: &job::Job) -> Result<Self, ModelError> {
47        let name_to_index: HashMap<String, NodeIndex> = job
48            .steps
49            .iter()
50            .enumerate()
51            .map(|(i, s)| (s.name.clone(), i))
52            .collect();
53
54        let mut nodes: Vec<StepDependencyNode> = job
55            .steps
56            .iter()
57            .enumerate()
58            .map(|(i, s)| StepDependencyNode {
59                step_index: i,
60                name: s.name.clone(),
61                in_edges: Vec::new(),
62                out_edges: Vec::new(),
63            })
64            .collect();
65
66        let mut edges = Vec::new();
67
68        for (dep_idx, step) in job.steps.iter().enumerate() {
69            if let Some(deps) = &step.dependencies {
70                for dep in deps {
71                    let origin_idx = *name_to_index.get(&dep.depends_on).ok_or_else(|| {
72                        ModelError::DecodeValidation(format!(
73                            "Step '{}' depends on unknown step '{}'",
74                            step.name, dep.depends_on
75                        ))
76                    })?;
77                    let edge_idx = edges.len();
78                    edges.push(StepDependencyEdge {
79                        origin: origin_idx,
80                        dependent: dep_idx,
81                    });
82                    nodes[dep_idx].in_edges.push(edge_idx);
83                    nodes[origin_idx].out_edges.push(edge_idx);
84                }
85            }
86        }
87
88        Ok(Self {
89            nodes,
90            edges,
91            name_to_index,
92        })
93    }
94
95    /// Get a node by step name.
96    pub fn step_node(&self, name: &str) -> Option<&StepDependencyNode> {
97        self.name_to_index.get(name).map(|&i| &self.nodes[i])
98    }
99
100    /// Number of nodes in the graph.
101    pub fn node_count(&self) -> usize {
102        self.nodes.len()
103    }
104
105    /// Get an edge by index.
106    pub fn edge(&self, index: usize) -> Option<&StepDependencyEdge> {
107        self.edges.get(index)
108    }
109
110    /// Get a node by index.
111    pub fn node(&self, index: usize) -> Option<&StepDependencyNode> {
112        self.nodes.get(index)
113    }
114
115    /// Maximum in-degree (max number of dependencies any step has).
116    pub fn max_indegree(&self) -> usize {
117        self.nodes
118            .iter()
119            .map(|n| n.in_edges.len())
120            .max()
121            .unwrap_or(0)
122    }
123
124    /// Maximum out-degree (max number of steps that depend on any single step).
125    pub fn max_outdegree(&self) -> usize {
126        self.nodes
127            .iter()
128            .map(|n| n.out_edges.len())
129            .max()
130            .unwrap_or(0)
131    }
132
133    /// Stable topological sort matching the Python implementation.
134    ///
135    /// DFS-based: processes nodes in template order. For each unvisited node,
136    /// pushes it onto a stack and explores dependencies (pushed in reverse
137    /// template order so the earliest dependency is processed first).
138    /// Returns step indices in dependency-respecting, template-stable order.
139    pub fn topo_sorted(&self) -> Result<Vec<usize>, ModelError> {
140        let n = self.nodes.len();
141        // 0 = unvisited, 1 = started, 2 = completed
142        let mut state = vec![0u8; n];
143        let mut result = Vec::with_capacity(n);
144
145        for i in 0..n {
146            if state[i] != 0 {
147                continue;
148            }
149            let mut stack: Vec<NodeIndex> = vec![i];
150            while let Some(&top) = stack.last() {
151                match state[top] {
152                    2 => {
153                        stack.pop();
154                    }
155                    1 => {
156                        // Second visit — mark completed
157                        state[top] = 2;
158                        result.push(top);
159                        stack.pop();
160                    }
161                    _ => {
162                        // First visit — mark started, push unfinished deps
163                        state[top] = 1;
164                        // Collect dependency indices, sort by reverse template index
165                        let mut dep_indices: Vec<NodeIndex> = self.nodes[top]
166                            .in_edges
167                            .iter()
168                            .map(|&e| self.edges[e].origin)
169                            .collect();
170                        dep_indices.sort_unstable_by(|a, b| b.cmp(a));
171                        for dep in dep_indices {
172                            match state[dep] {
173                                2 => {} // already done
174                                1 => {
175                                    // Build cycle path from the stack
176                                    let cycle: Vec<&str> = stack
177                                        .iter()
178                                        .filter(|&&idx| state[idx] == 1)
179                                        .map(|&idx| self.nodes[idx].name.as_str())
180                                        .collect();
181                                    let dep_name = &self.nodes[dep].name;
182                                    return Err(ModelError::DecodeValidation(format!(
183                                        "A circular dependency was found in the step dependency graph:\n{} -> {}",
184                                        cycle.join(" -> "), dep_name
185                                    )));
186                                }
187                                _ => stack.push(dep),
188                            }
189                        }
190                    }
191                }
192            }
193        }
194
195        Ok(result)
196    }
197
198    /// Convenience: topological sort returning step names.
199    pub fn topo_sorted_names(&self) -> Result<Vec<String>, ModelError> {
200        self.topo_sorted().map(|indices| {
201            indices
202                .into_iter()
203                .map(|i| self.nodes[i].name.clone())
204                .collect()
205        })
206    }
207}