Skip to main content

wrkflw_executor/
dependency.rs

1use std::collections::{HashMap, HashSet};
2use wrkflw_parser::workflow::WorkflowDefinition;
3
4pub fn resolve_dependencies(workflow: &WorkflowDefinition) -> Result<Vec<Vec<String>>, String> {
5    let jobs = &workflow.jobs;
6
7    // Build adjacency list with String keys
8    let mut dependencies: HashMap<String, HashSet<String>> = HashMap::new();
9    let mut dependents: HashMap<String, HashSet<String>> = HashMap::new();
10
11    // Initialize with empty dependencies
12    for job_name in jobs.keys() {
13        dependencies.insert(job_name.clone(), HashSet::new());
14        dependents.insert(job_name.clone(), HashSet::new());
15    }
16
17    // Populate dependencies
18    for (job_name, job) in jobs {
19        if let Some(needs) = &job.needs {
20            for needed_job in needs {
21                if !jobs.contains_key(needed_job) {
22                    return Err(format!(
23                        "Job '{}' depends on non-existent job '{}'",
24                        job_name, needed_job
25                    ));
26                }
27                // Get mutable reference to the dependency set for this job, with error handling
28                if let Some(deps) = dependencies.get_mut(job_name) {
29                    deps.insert(needed_job.clone());
30                } else {
31                    return Err(format!(
32                        "Internal error: Failed to update dependencies for job '{}'",
33                        job_name
34                    ));
35                }
36
37                // Get mutable reference to the dependents set for the needed job, with error handling
38                if let Some(deps) = dependents.get_mut(needed_job) {
39                    deps.insert(job_name.clone());
40                } else {
41                    return Err(format!(
42                        "Internal error: Failed to update dependents for job '{}'",
43                        needed_job
44                    ));
45                }
46            }
47        }
48    }
49
50    // Implement topological sort for execution ordering
51    let mut result = Vec::new();
52    let mut no_dependencies: HashSet<String> = dependencies
53        .iter()
54        .filter(|(_, deps)| deps.is_empty())
55        .map(|(job, _)| job.clone())
56        .collect();
57
58    // Process levels of the dependency graph
59    while !no_dependencies.is_empty() {
60        // Current level becomes a batch of jobs that can run in parallel
61        let current_level: Vec<String> = no_dependencies.iter().cloned().collect();
62        result.push(current_level);
63
64        // For the next level
65        let mut next_no_dependencies = HashSet::new();
66
67        for job in &no_dependencies {
68            // For each dependent job of the current job
69            // Get the set of dependents with error handling
70            let dependent_jobs = match dependents.get(job) {
71                Some(deps) => deps.clone(),
72                None => {
73                    return Err(format!(
74                        "Internal error: Failed to find dependents for job '{}'",
75                        job
76                    ));
77                }
78            };
79
80            for dependent in dependent_jobs {
81                // Remove the current job from its dependencies
82                if let Some(deps) = dependencies.get_mut(&dependent) {
83                    deps.remove(job);
84
85                    // Check if it's empty now to determine if it should be in the next level
86                    if deps.is_empty() {
87                        next_no_dependencies.insert(dependent);
88                    }
89                } else {
90                    return Err(format!(
91                        "Internal error: Failed to find dependencies for job '{}'",
92                        dependent
93                    ));
94                }
95            }
96        }
97
98        no_dependencies = next_no_dependencies;
99    }
100
101    // Check for circular dependencies
102    let processed_jobs: HashSet<String> = result
103        .iter()
104        .flat_map(|level| level.iter().cloned())
105        .collect();
106
107    if processed_jobs.len() < jobs.len() {
108        return Err("Circular dependency detected in workflow jobs".to_string());
109    }
110
111    Ok(result)
112}