runledger-core 0.1.0

Core contracts and types for the Runledger durable job and workflow system
Documentation
use std::collections::{BTreeMap, BTreeSet, VecDeque};

use super::super::identifiers::{JobType, StepKey, WorkflowType};
use super::super::status::JobStage;
use super::types::{WorkflowRunEnqueue, WorkflowStepExecutionKind};

#[derive(Debug, Clone, Copy)]
pub struct WorkflowDagDependencyValidationInput<'a> {
    pub prerequisite_step_key: StepKey<'a>,
}

#[derive(Debug, Clone)]
pub struct WorkflowDagStepValidationInput<'a> {
    pub step_key: StepKey<'a>,
    pub execution_kind: WorkflowStepExecutionKind,
    pub job_type: Option<JobType<'a>>,
    pub priority: Option<i32>,
    pub max_attempts: Option<i32>,
    pub timeout_seconds: Option<i32>,
    pub stage: Option<JobStage>,
    pub dependencies: Vec<WorkflowDagDependencyValidationInput<'a>>,
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum WorkflowDagValidationError {
    EmptySteps,
    BlankWorkflowType,
    BlankStepKey {
        step_index: usize,
    },
    BlankStepJobType {
        step_key: String,
    },
    ExternalStepJobTypeNotAllowed {
        step_key: String,
    },
    ExternalStepQueueSettingsNotAllowed {
        step_key: String,
    },
    BlankDependencyStepKey {
        step_key: String,
    },
    DuplicateStepKey {
        step_key: String,
    },
    MissingDependency {
        step_key: String,
        prerequisite_step_key: String,
    },
    SelfDependency {
        step_key: String,
    },
    DuplicateDependency {
        step_key: String,
        prerequisite_step_key: String,
    },
    CycleDetected,
}

pub fn validate_workflow_dag(
    workflow_type: WorkflowType<'_>,
    steps: &[WorkflowDagStepValidationInput<'_>],
) -> Result<(), WorkflowDagValidationError> {
    if workflow_type.as_str().trim().is_empty() {
        return Err(WorkflowDagValidationError::BlankWorkflowType);
    }
    if steps.is_empty() {
        return Err(WorkflowDagValidationError::EmptySteps);
    }

    let mut step_key_to_index: BTreeMap<&str, usize> = BTreeMap::new();
    for (step_index, step) in steps.iter().enumerate() {
        if step.step_key.as_str().trim().is_empty() {
            return Err(WorkflowDagValidationError::BlankStepKey { step_index });
        }
        match step.execution_kind {
            WorkflowStepExecutionKind::Job => {
                let Some(job_type) = step.job_type else {
                    return Err(WorkflowDagValidationError::BlankStepJobType {
                        step_key: step.step_key.as_str().to_owned(),
                    });
                };
                if job_type.as_str().trim().is_empty() {
                    return Err(WorkflowDagValidationError::BlankStepJobType {
                        step_key: step.step_key.as_str().to_owned(),
                    });
                }
            }
            WorkflowStepExecutionKind::External => {
                if step.job_type.is_some() {
                    return Err(WorkflowDagValidationError::ExternalStepJobTypeNotAllowed {
                        step_key: step.step_key.as_str().to_owned(),
                    });
                }
                if step.priority.is_some()
                    || step.max_attempts.is_some()
                    || step.timeout_seconds.is_some()
                    || step.stage.is_some()
                {
                    return Err(
                        WorkflowDagValidationError::ExternalStepQueueSettingsNotAllowed {
                            step_key: step.step_key.as_str().to_owned(),
                        },
                    );
                }
            }
        }
        if step_key_to_index
            .insert(step.step_key.as_str(), step_index)
            .is_some()
        {
            return Err(WorkflowDagValidationError::DuplicateStepKey {
                step_key: step.step_key.as_str().to_owned(),
            });
        }
    }

    let mut indegree: Vec<usize> = vec![0; steps.len()];
    let mut adjacency: Vec<Vec<usize>> = vec![Vec::new(); steps.len()];

    for (dependent_index, step) in steps.iter().enumerate() {
        let mut seen_dependencies: BTreeSet<&str> = BTreeSet::new();

        for dependency in &step.dependencies {
            if dependency.prerequisite_step_key.as_str().trim().is_empty() {
                return Err(WorkflowDagValidationError::BlankDependencyStepKey {
                    step_key: step.step_key.as_str().to_owned(),
                });
            }
            if dependency.prerequisite_step_key == step.step_key {
                return Err(WorkflowDagValidationError::SelfDependency {
                    step_key: step.step_key.as_str().to_owned(),
                });
            }

            if !seen_dependencies.insert(dependency.prerequisite_step_key.as_str()) {
                return Err(WorkflowDagValidationError::DuplicateDependency {
                    step_key: step.step_key.as_str().to_owned(),
                    prerequisite_step_key: dependency.prerequisite_step_key.as_str().to_owned(),
                });
            }

            let Some(&prerequisite_index) =
                step_key_to_index.get(dependency.prerequisite_step_key.as_str())
            else {
                return Err(WorkflowDagValidationError::MissingDependency {
                    step_key: step.step_key.as_str().to_owned(),
                    prerequisite_step_key: dependency.prerequisite_step_key.as_str().to_owned(),
                });
            };

            indegree[dependent_index] += 1;
            adjacency[prerequisite_index].push(dependent_index);
        }
    }

    let mut ready: VecDeque<usize> = indegree
        .iter()
        .enumerate()
        .filter_map(|(index, &count)| (count == 0).then_some(index))
        .collect();
    let mut visited = 0usize;

    while let Some(index) = ready.pop_front() {
        visited += 1;

        for &next in &adjacency[index] {
            indegree[next] -= 1;
            if indegree[next] == 0 {
                ready.push_back(next);
            }
        }
    }

    if visited != steps.len() {
        return Err(WorkflowDagValidationError::CycleDetected);
    }

    Ok(())
}

pub fn validate_workflow_run_enqueue(
    payload: &WorkflowRunEnqueue<'_>,
) -> Result<(), WorkflowDagValidationError> {
    let steps = payload
        .steps()
        .iter()
        .map(|step| WorkflowDagStepValidationInput {
            step_key: step.step_key(),
            execution_kind: step.execution_kind(),
            job_type: step.job_type(),
            priority: step.priority(),
            max_attempts: step.max_attempts(),
            timeout_seconds: step.timeout_seconds(),
            stage: step.stage(),
            dependencies: step
                .dependencies()
                .iter()
                .map(|dependency| WorkflowDagDependencyValidationInput {
                    prerequisite_step_key: dependency.prerequisite_step_key,
                })
                .collect(),
        })
        .collect::<Vec<_>>();

    validate_workflow_dag(payload.workflow_type(), &steps)
}

pub fn validate_workflow_step_append(
    existing_step_keys: &BTreeSet<super::super::identifiers::StepKeyName>,
    new_steps: &[super::types::WorkflowStepEnqueue<'_>],
) -> Result<(), WorkflowDagValidationError> {
    if new_steps.is_empty() {
        return Err(WorkflowDagValidationError::EmptySteps);
    }

    let mut new_step_key_to_index: BTreeMap<&str, usize> = BTreeMap::new();
    for (build_step_index, step) in new_steps.iter().enumerate() {
        super::build_validation::validate_step_enqueue(step, Some(build_step_index)).map_err(
            |error| match error {
                super::errors::WorkflowBuildError::BlankWorkflowType => {
                    WorkflowDagValidationError::BlankWorkflowType
                }
                super::errors::WorkflowBuildError::EmptySteps => {
                    WorkflowDagValidationError::EmptySteps
                }
                super::errors::WorkflowBuildError::BlankStepKey { step_index } => {
                    WorkflowDagValidationError::BlankStepKey {
                        step_index: step_index.unwrap_or(build_step_index),
                    }
                }
                super::errors::WorkflowBuildError::BlankStepJobType { step_key } => {
                    WorkflowDagValidationError::BlankStepJobType { step_key }
                }
                super::errors::WorkflowBuildError::ExternalStepJobTypeNotAllowed { step_key } => {
                    WorkflowDagValidationError::ExternalStepJobTypeNotAllowed { step_key }
                }
                super::errors::WorkflowBuildError::ExternalStepQueueSettingsNotAllowed {
                    step_key,
                } => WorkflowDagValidationError::ExternalStepQueueSettingsNotAllowed { step_key },
                super::errors::WorkflowBuildError::BlankDependencyStepKey { step_key } => {
                    WorkflowDagValidationError::BlankDependencyStepKey { step_key }
                }
                super::errors::WorkflowBuildError::DuplicateStepKey { step_key } => {
                    WorkflowDagValidationError::DuplicateStepKey { step_key }
                }
                super::errors::WorkflowBuildError::MissingDependency {
                    step_key,
                    prerequisite_step_key,
                } => WorkflowDagValidationError::MissingDependency {
                    step_key,
                    prerequisite_step_key,
                },
                super::errors::WorkflowBuildError::DuplicateDependency {
                    step_key,
                    prerequisite_step_key,
                } => WorkflowDagValidationError::DuplicateDependency {
                    step_key,
                    prerequisite_step_key,
                },
                super::errors::WorkflowBuildError::SelfDependency { step_key } => {
                    WorkflowDagValidationError::SelfDependency { step_key }
                }
                super::errors::WorkflowBuildError::CycleDetected => {
                    WorkflowDagValidationError::CycleDetected
                }
            },
        )?;

        let step_key = step.step_key().as_str();
        if existing_step_keys.contains(step_key) {
            return Err(WorkflowDagValidationError::DuplicateStepKey {
                step_key: step_key.to_owned(),
            });
        }

        if new_step_key_to_index
            .insert(step_key, build_step_index)
            .is_some()
        {
            return Err(WorkflowDagValidationError::DuplicateStepKey {
                step_key: step_key.to_owned(),
            });
        }
    }

    let mut indegree = vec![0usize; new_steps.len()];
    let mut adjacency: Vec<Vec<usize>> = vec![Vec::new(); new_steps.len()];

    for (dependent_index, step) in new_steps.iter().enumerate() {
        for dependency in step.dependencies() {
            let prerequisite_step_key = dependency.prerequisite_step_key.as_str();
            if let Some(&prerequisite_index) = new_step_key_to_index.get(prerequisite_step_key) {
                indegree[dependent_index] += 1;
                adjacency[prerequisite_index].push(dependent_index);
                continue;
            }

            if existing_step_keys.contains(prerequisite_step_key) {
                continue;
            }

            return Err(WorkflowDagValidationError::MissingDependency {
                step_key: step.step_key().as_str().to_owned(),
                prerequisite_step_key: prerequisite_step_key.to_owned(),
            });
        }
    }

    let mut ready: VecDeque<usize> = indegree
        .iter()
        .enumerate()
        .filter_map(|(index, &count)| (count == 0).then_some(index))
        .collect();
    let mut visited = 0usize;

    while let Some(index) = ready.pop_front() {
        visited += 1;
        for &next in &adjacency[index] {
            indegree[next] -= 1;
            if indegree[next] == 0 {
                ready.push_back(next);
            }
        }
    }

    if visited != new_steps.len() {
        return Err(WorkflowDagValidationError::CycleDetected);
    }

    Ok(())
}