car-workflow 0.12.0

Declarative multi-stage workflow orchestration for Common Agent Runtime
Documentation
//! Static workflow verification — validate structure before execution.

use std::collections::{HashMap, HashSet, VecDeque};

use crate::types::*;

/// A single verification finding.
#[derive(Debug, Clone)]
pub struct WorkflowIssue {
    pub severity: String, // "error", "warning"
    pub stage_id: Option<String>,
    pub message: String,
}

/// Result of static workflow verification.
#[derive(Debug)]
pub struct WorkflowVerifyResult {
    pub valid: bool,
    pub issues: Vec<WorkflowIssue>,
    pub reachable_stages: Vec<String>,
    pub unreachable_stages: Vec<String>,
    pub has_cycles: bool,
}

/// Statically verify a workflow definition for structural correctness.
pub fn verify_workflow(workflow: &Workflow) -> WorkflowVerifyResult {
    let mut issues = Vec::new();
    let stage_ids: HashSet<&str> = workflow.stages.iter().map(|s| s.id.as_str()).collect();

    // 1. Start stage exists
    if !stage_ids.contains(workflow.start.as_str()) {
        issues.push(WorkflowIssue {
            severity: "error".into(),
            stage_id: None,
            message: format!("start stage '{}' does not exist", workflow.start),
        });
    }

    // 2. Edge references valid stage IDs
    for edge in &workflow.edges {
        if !stage_ids.contains(edge.from.as_str()) {
            issues.push(WorkflowIssue {
                severity: "error".into(),
                stage_id: None,
                message: format!("edge from '{}' references unknown stage", edge.from),
            });
        }
        if !stage_ids.contains(edge.to.as_str()) {
            issues.push(WorkflowIssue {
                severity: "error".into(),
                stage_id: None,
                message: format!("edge to '{}' references unknown stage", edge.to),
            });
        }
    }

    // 3. Compensation StageRef references valid stage IDs
    for stage in &workflow.stages {
        if let Some(CompensationHandler::StageRef { stage_id }) = &stage.compensation {
            if !stage_ids.contains(stage_id.as_str()) {
                issues.push(WorkflowIssue {
                    severity: "error".into(),
                    stage_id: Some(stage.id.clone()),
                    message: format!(
                        "compensation for stage '{}' references unknown stage '{}'",
                        stage.id, stage_id
                    ),
                });
            }
        }
    }

    // 4. Reachability via BFS from start
    let adj: HashMap<&str, Vec<&str>> = {
        let mut m: HashMap<&str, Vec<&str>> = HashMap::new();
        for edge in &workflow.edges {
            m.entry(edge.from.as_str())
                .or_default()
                .push(edge.to.as_str());
        }
        m
    };

    let mut visited: HashSet<&str> = HashSet::new();
    let mut queue: VecDeque<&str> = VecDeque::new();
    if stage_ids.contains(workflow.start.as_str()) {
        queue.push_back(workflow.start.as_str());
        visited.insert(workflow.start.as_str());
    }
    while let Some(node) = queue.pop_front() {
        if let Some(neighbors) = adj.get(node) {
            for &next in neighbors {
                if visited.insert(next) {
                    queue.push_back(next);
                }
            }
        }
    }

    let reachable_stages: Vec<String> = visited.iter().map(|s| s.to_string()).collect();
    let unreachable_stages: Vec<String> = stage_ids
        .iter()
        .filter(|s| !visited.contains(**s))
        .map(|s| s.to_string())
        .collect();

    for id in &unreachable_stages {
        issues.push(WorkflowIssue {
            severity: "warning".into(),
            stage_id: Some(id.clone()),
            message: format!("stage '{}' is unreachable from start", id),
        });
    }

    // 5. Cycle detection via DFS
    let has_cycles = detect_cycles(&adj, workflow.start.as_str());
    if has_cycles {
        issues.push(WorkflowIssue {
            severity: "warning".into(),
            stage_id: None,
            message: "workflow contains cycles (ensure max_iterations is set)".into(),
        });
    }

    // 6. Recurse into sub-workflows
    for stage in &workflow.stages {
        if let StageStep::SubWorkflow(ref sw) = stage.step {
            let sub_result = verify_workflow(&sw.workflow);
            for issue in sub_result.issues {
                issues.push(WorkflowIssue {
                    severity: issue.severity,
                    stage_id: Some(format!(
                        "{}.{}",
                        stage.id,
                        issue.stage_id.unwrap_or_default()
                    )),
                    message: format!("[sub-workflow {}] {}", stage.id, issue.message),
                });
            }
        }
    }

    // 7. Proposal verification via car-verify
    for stage in &workflow.stages {
        if let StageStep::Proposal(ref ps) = stage.step {
            let vr = car_verify::verify(&ps.proposal, None, None, 100);
            for issue in &vr.issues {
                if issue.severity == "error" {
                    issues.push(WorkflowIssue {
                        severity: "error".into(),
                        stage_id: Some(stage.id.clone()),
                        message: format!("[proposal] {}", issue.message),
                    });
                }
            }
        }
    }

    let valid = !issues.iter().any(|i| i.severity == "error");

    WorkflowVerifyResult {
        valid,
        issues,
        reachable_stages,
        unreachable_stages,
        has_cycles,
    }
}

/// DFS-based cycle detection.
fn detect_cycles(adj: &HashMap<&str, Vec<&str>>, start: &str) -> bool {
    let mut visited = HashSet::new();
    let mut stack = HashSet::new();

    fn dfs<'a>(
        node: &'a str,
        adj: &HashMap<&'a str, Vec<&'a str>>,
        visited: &mut HashSet<&'a str>,
        stack: &mut HashSet<&'a str>,
    ) -> bool {
        visited.insert(node);
        stack.insert(node);

        if let Some(neighbors) = adj.get(node) {
            for &next in neighbors {
                if stack.contains(next) {
                    return true; // back edge = cycle
                }
                if !visited.contains(next) && dfs(next, adj, visited, stack) {
                    return true;
                }
            }
        }

        stack.remove(node);
        false
    }

    dfs(start, adj, &mut visited, &mut stack)
}

#[cfg(test)]
mod tests {
    use super::*;
    use car_ir::ActionProposal;

    fn make_stage(id: &str) -> Stage {
        Stage {
            id: id.into(),
            name: id.into(),
            step: StageStep::Proposal(ProposalStep {
                proposal: ActionProposal {
                    id: format!("p-{}", id),
                    source: "test".into(),
                    actions: vec![],
                    timestamp: chrono::Utc::now(),
                    context: std::collections::HashMap::new(),
                },
            }),
            compensation: None,
            timeout_ms: None,
            metadata: std::collections::HashMap::new(),
        }
    }

    #[test]
    fn valid_linear_workflow() {
        let wf = Workflow {
            id: "test".into(),
            name: "Test".into(),
            start: "a".into(),
            stages: vec![make_stage("a"), make_stage("b"), make_stage("c")],
            edges: vec![
                Edge {
                    from: "a".into(),
                    to: "b".into(),
                    conditions: vec![],
                    label: String::new(),
                },
                Edge {
                    from: "b".into(),
                    to: "c".into(),
                    conditions: vec![],
                    label: String::new(),
                },
            ],
            max_iterations: 100,
            metadata: std::collections::HashMap::new(),
        };
        let result = verify_workflow(&wf);
        assert!(result.valid);
        assert!(!result.has_cycles);
        assert_eq!(result.reachable_stages.len(), 3);
        assert!(result.unreachable_stages.is_empty());
    }

    #[test]
    fn missing_start_stage() {
        let wf = Workflow {
            id: "test".into(),
            name: "Test".into(),
            start: "nonexistent".into(),
            stages: vec![make_stage("a")],
            edges: vec![],
            max_iterations: 100,
            metadata: std::collections::HashMap::new(),
        };
        let result = verify_workflow(&wf);
        assert!(!result.valid);
        assert!(result
            .issues
            .iter()
            .any(|i| i.message.contains("nonexistent")));
    }

    #[test]
    fn unreachable_stage() {
        let wf = Workflow {
            id: "test".into(),
            name: "Test".into(),
            start: "a".into(),
            stages: vec![make_stage("a"), make_stage("b"), make_stage("orphan")],
            edges: vec![Edge {
                from: "a".into(),
                to: "b".into(),
                conditions: vec![],
                label: String::new(),
            }],
            max_iterations: 100,
            metadata: std::collections::HashMap::new(),
        };
        let result = verify_workflow(&wf);
        assert!(result.valid); // unreachable is a warning, not error
        assert_eq!(result.unreachable_stages.len(), 1);
        assert!(result.unreachable_stages.contains(&"orphan".to_string()));
    }

    #[test]
    fn cycle_detected() {
        let wf = Workflow {
            id: "test".into(),
            name: "Test".into(),
            start: "a".into(),
            stages: vec![make_stage("a"), make_stage("b")],
            edges: vec![
                Edge {
                    from: "a".into(),
                    to: "b".into(),
                    conditions: vec![],
                    label: String::new(),
                },
                Edge {
                    from: "b".into(),
                    to: "a".into(),
                    conditions: vec![],
                    label: String::new(),
                },
            ],
            max_iterations: 100,
            metadata: std::collections::HashMap::new(),
        };
        let result = verify_workflow(&wf);
        assert!(result.valid); // cycles are warnings
        assert!(result.has_cycles);
    }

    #[test]
    fn invalid_edge_reference() {
        let wf = Workflow {
            id: "test".into(),
            name: "Test".into(),
            start: "a".into(),
            stages: vec![make_stage("a")],
            edges: vec![Edge {
                from: "a".into(),
                to: "ghost".into(),
                conditions: vec![],
                label: String::new(),
            }],
            max_iterations: 100,
            metadata: std::collections::HashMap::new(),
        };
        let result = verify_workflow(&wf);
        assert!(!result.valid);
        assert!(result.issues.iter().any(|i| i.message.contains("ghost")));
    }

    #[test]
    fn invalid_compensation_ref() {
        let mut stage = make_stage("a");
        stage.compensation = Some(CompensationHandler::StageRef {
            stage_id: "nonexistent".into(),
        });
        let wf = Workflow {
            id: "test".into(),
            name: "Test".into(),
            start: "a".into(),
            stages: vec![stage],
            edges: vec![],
            max_iterations: 100,
            metadata: std::collections::HashMap::new(),
        };
        let result = verify_workflow(&wf);
        assert!(!result.valid);
    }
}