wrkflw-executor 0.7.3

Workflow execution engine for wrkflw
Documentation
use std::collections::{HashMap, HashSet};
use wrkflw_parser::workflow::WorkflowDefinition;

pub fn resolve_dependencies(workflow: &WorkflowDefinition) -> Result<Vec<Vec<String>>, String> {
    let jobs = &workflow.jobs;

    // Build adjacency list with String keys
    let mut dependencies: HashMap<String, HashSet<String>> = HashMap::new();
    let mut dependents: HashMap<String, HashSet<String>> = HashMap::new();

    // Initialize with empty dependencies
    for job_name in jobs.keys() {
        dependencies.insert(job_name.clone(), HashSet::new());
        dependents.insert(job_name.clone(), HashSet::new());
    }

    // Populate dependencies
    for (job_name, job) in jobs {
        if let Some(needs) = &job.needs {
            for needed_job in needs {
                if !jobs.contains_key(needed_job) {
                    return Err(format!(
                        "Job '{}' depends on non-existent job '{}'",
                        job_name, needed_job
                    ));
                }
                // Get mutable reference to the dependency set for this job, with error handling
                if let Some(deps) = dependencies.get_mut(job_name) {
                    deps.insert(needed_job.clone());
                } else {
                    return Err(format!(
                        "Internal error: Failed to update dependencies for job '{}'",
                        job_name
                    ));
                }

                // Get mutable reference to the dependents set for the needed job, with error handling
                if let Some(deps) = dependents.get_mut(needed_job) {
                    deps.insert(job_name.clone());
                } else {
                    return Err(format!(
                        "Internal error: Failed to update dependents for job '{}'",
                        needed_job
                    ));
                }
            }
        }
    }

    // Implement topological sort for execution ordering
    let mut result = Vec::new();
    let mut no_dependencies: HashSet<String> = dependencies
        .iter()
        .filter(|(_, deps)| deps.is_empty())
        .map(|(job, _)| job.clone())
        .collect();

    // Process levels of the dependency graph
    while !no_dependencies.is_empty() {
        // Current level becomes a batch of jobs that can run in parallel
        let current_level: Vec<String> = no_dependencies.iter().cloned().collect();
        result.push(current_level);

        // For the next level
        let mut next_no_dependencies = HashSet::new();

        for job in &no_dependencies {
            // For each dependent job of the current job
            // Get the set of dependents with error handling
            let dependent_jobs = match dependents.get(job) {
                Some(deps) => deps.clone(),
                None => {
                    return Err(format!(
                        "Internal error: Failed to find dependents for job '{}'",
                        job
                    ));
                }
            };

            for dependent in dependent_jobs {
                // Remove the current job from its dependencies
                if let Some(deps) = dependencies.get_mut(&dependent) {
                    deps.remove(job);

                    // Check if it's empty now to determine if it should be in the next level
                    if deps.is_empty() {
                        next_no_dependencies.insert(dependent);
                    }
                } else {
                    return Err(format!(
                        "Internal error: Failed to find dependencies for job '{}'",
                        dependent
                    ));
                }
            }
        }

        no_dependencies = next_no_dependencies;
    }

    // Check for circular dependencies
    let processed_jobs: HashSet<String> = result
        .iter()
        .flat_map(|level| level.iter().cloned())
        .collect();

    if processed_jobs.len() < jobs.len() {
        return Err("Circular dependency detected in workflow jobs".to_string());
    }

    Ok(result)
}