1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
use std::collections::{HashMap, HashSet};
use wrkflw_parser::workflow::WorkflowDefinition;
pub fn resolve_dependencies(workflow: &WorkflowDefinition) -> Result<Vec<Vec<String>>, String> {
let jobs = &workflow.jobs;
// Build adjacency list with String keys
let mut dependencies: HashMap<String, HashSet<String>> = HashMap::new();
let mut dependents: HashMap<String, HashSet<String>> = HashMap::new();
// Initialize with empty dependencies
for job_name in jobs.keys() {
dependencies.insert(job_name.clone(), HashSet::new());
dependents.insert(job_name.clone(), HashSet::new());
}
// Populate dependencies
for (job_name, job) in jobs {
if let Some(needs) = &job.needs {
for needed_job in needs {
if !jobs.contains_key(needed_job) {
return Err(format!(
"Job '{}' depends on non-existent job '{}'",
job_name, needed_job
));
}
// Get mutable reference to the dependency set for this job, with error handling
if let Some(deps) = dependencies.get_mut(job_name) {
deps.insert(needed_job.clone());
} else {
return Err(format!(
"Internal error: Failed to update dependencies for job '{}'",
job_name
));
}
// Get mutable reference to the dependents set for the needed job, with error handling
if let Some(deps) = dependents.get_mut(needed_job) {
deps.insert(job_name.clone());
} else {
return Err(format!(
"Internal error: Failed to update dependents for job '{}'",
needed_job
));
}
}
}
}
// Implement topological sort for execution ordering
let mut result = Vec::new();
let mut no_dependencies: HashSet<String> = dependencies
.iter()
.filter(|(_, deps)| deps.is_empty())
.map(|(job, _)| job.clone())
.collect();
// Process levels of the dependency graph
while !no_dependencies.is_empty() {
// Current level becomes a batch of jobs that can run in parallel
let current_level: Vec<String> = no_dependencies.iter().cloned().collect();
result.push(current_level);
// For the next level
let mut next_no_dependencies = HashSet::new();
for job in &no_dependencies {
// For each dependent job of the current job
// Get the set of dependents with error handling
let dependent_jobs = match dependents.get(job) {
Some(deps) => deps.clone(),
None => {
return Err(format!(
"Internal error: Failed to find dependents for job '{}'",
job
));
}
};
for dependent in dependent_jobs {
// Remove the current job from its dependencies
if let Some(deps) = dependencies.get_mut(&dependent) {
deps.remove(job);
// Check if it's empty now to determine if it should be in the next level
if deps.is_empty() {
next_no_dependencies.insert(dependent);
}
} else {
return Err(format!(
"Internal error: Failed to find dependencies for job '{}'",
dependent
));
}
}
}
no_dependencies = next_no_dependencies;
}
// Check for circular dependencies
let processed_jobs: HashSet<String> = result
.iter()
.flat_map(|level| level.iter().cloned())
.collect();
if processed_jobs.len() < jobs.len() {
return Err("Circular dependency detected in workflow jobs".to_string());
}
Ok(result)
}