use std::collections::{HashMap, HashSet, VecDeque};
use crate::types::*;
#[derive(Debug, Clone)]
pub struct WorkflowIssue {
pub severity: String, pub stage_id: Option<String>,
pub message: String,
}
#[derive(Debug)]
pub struct WorkflowVerifyResult {
pub valid: bool,
pub issues: Vec<WorkflowIssue>,
pub reachable_stages: Vec<String>,
pub unreachable_stages: Vec<String>,
pub has_cycles: bool,
}
pub fn verify_workflow(workflow: &Workflow) -> WorkflowVerifyResult {
let mut issues = Vec::new();
let stage_ids: HashSet<&str> = workflow.stages.iter().map(|s| s.id.as_str()).collect();
if !stage_ids.contains(workflow.start.as_str()) {
issues.push(WorkflowIssue {
severity: "error".into(),
stage_id: None,
message: format!("start stage '{}' does not exist", workflow.start),
});
}
for edge in &workflow.edges {
if !stage_ids.contains(edge.from.as_str()) {
issues.push(WorkflowIssue {
severity: "error".into(),
stage_id: None,
message: format!("edge from '{}' references unknown stage", edge.from),
});
}
if !stage_ids.contains(edge.to.as_str()) {
issues.push(WorkflowIssue {
severity: "error".into(),
stage_id: None,
message: format!("edge to '{}' references unknown stage", edge.to),
});
}
}
for stage in &workflow.stages {
if let Some(CompensationHandler::StageRef { stage_id }) = &stage.compensation {
if !stage_ids.contains(stage_id.as_str()) {
issues.push(WorkflowIssue {
severity: "error".into(),
stage_id: Some(stage.id.clone()),
message: format!(
"compensation for stage '{}' references unknown stage '{}'",
stage.id, stage_id
),
});
}
}
}
let adj: HashMap<&str, Vec<&str>> = {
let mut m: HashMap<&str, Vec<&str>> = HashMap::new();
for edge in &workflow.edges {
m.entry(edge.from.as_str())
.or_default()
.push(edge.to.as_str());
}
m
};
let mut visited: HashSet<&str> = HashSet::new();
let mut queue: VecDeque<&str> = VecDeque::new();
if stage_ids.contains(workflow.start.as_str()) {
queue.push_back(workflow.start.as_str());
visited.insert(workflow.start.as_str());
}
while let Some(node) = queue.pop_front() {
if let Some(neighbors) = adj.get(node) {
for &next in neighbors {
if visited.insert(next) {
queue.push_back(next);
}
}
}
}
let reachable_stages: Vec<String> = visited.iter().map(|s| s.to_string()).collect();
let unreachable_stages: Vec<String> = stage_ids
.iter()
.filter(|s| !visited.contains(**s))
.map(|s| s.to_string())
.collect();
for id in &unreachable_stages {
issues.push(WorkflowIssue {
severity: "warning".into(),
stage_id: Some(id.clone()),
message: format!("stage '{}' is unreachable from start", id),
});
}
let has_cycles = detect_cycles(&adj, workflow.start.as_str());
if has_cycles {
issues.push(WorkflowIssue {
severity: "warning".into(),
stage_id: None,
message: "workflow contains cycles (ensure max_iterations is set)".into(),
});
}
for stage in &workflow.stages {
if let StageStep::SubWorkflow(ref sw) = stage.step {
let sub_result = verify_workflow(&sw.workflow);
for issue in sub_result.issues {
issues.push(WorkflowIssue {
severity: issue.severity,
stage_id: Some(format!(
"{}.{}",
stage.id,
issue.stage_id.unwrap_or_default()
)),
message: format!("[sub-workflow {}] {}", stage.id, issue.message),
});
}
}
}
for stage in &workflow.stages {
if let StageStep::Proposal(ref ps) = stage.step {
let vr = car_verify::verify(&ps.proposal, None, None, 100);
for issue in &vr.issues {
if issue.severity == "error" {
issues.push(WorkflowIssue {
severity: "error".into(),
stage_id: Some(stage.id.clone()),
message: format!("[proposal] {}", issue.message),
});
}
}
}
}
let valid = !issues.iter().any(|i| i.severity == "error");
WorkflowVerifyResult {
valid,
issues,
reachable_stages,
unreachable_stages,
has_cycles,
}
}
fn detect_cycles(adj: &HashMap<&str, Vec<&str>>, start: &str) -> bool {
let mut visited = HashSet::new();
let mut stack = HashSet::new();
fn dfs<'a>(
node: &'a str,
adj: &HashMap<&'a str, Vec<&'a str>>,
visited: &mut HashSet<&'a str>,
stack: &mut HashSet<&'a str>,
) -> bool {
visited.insert(node);
stack.insert(node);
if let Some(neighbors) = adj.get(node) {
for &next in neighbors {
if stack.contains(next) {
return true; }
if !visited.contains(next) && dfs(next, adj, visited, stack) {
return true;
}
}
}
stack.remove(node);
false
}
dfs(start, adj, &mut visited, &mut stack)
}
#[cfg(test)]
mod tests {
use super::*;
use car_ir::ActionProposal;
fn make_stage(id: &str) -> Stage {
Stage {
id: id.into(),
name: id.into(),
step: StageStep::Proposal(ProposalStep {
proposal: ActionProposal {
id: format!("p-{}", id),
source: "test".into(),
actions: vec![],
timestamp: chrono::Utc::now(),
context: std::collections::HashMap::new(),
},
}),
compensation: None,
timeout_ms: None,
metadata: std::collections::HashMap::new(),
}
}
#[test]
fn valid_linear_workflow() {
let wf = Workflow {
id: "test".into(),
name: "Test".into(),
start: "a".into(),
stages: vec![make_stage("a"), make_stage("b"), make_stage("c")],
edges: vec![
Edge {
from: "a".into(),
to: "b".into(),
conditions: vec![],
label: String::new(),
},
Edge {
from: "b".into(),
to: "c".into(),
conditions: vec![],
label: String::new(),
},
],
max_iterations: 100,
metadata: std::collections::HashMap::new(),
};
let result = verify_workflow(&wf);
assert!(result.valid);
assert!(!result.has_cycles);
assert_eq!(result.reachable_stages.len(), 3);
assert!(result.unreachable_stages.is_empty());
}
#[test]
fn missing_start_stage() {
let wf = Workflow {
id: "test".into(),
name: "Test".into(),
start: "nonexistent".into(),
stages: vec![make_stage("a")],
edges: vec![],
max_iterations: 100,
metadata: std::collections::HashMap::new(),
};
let result = verify_workflow(&wf);
assert!(!result.valid);
assert!(result
.issues
.iter()
.any(|i| i.message.contains("nonexistent")));
}
#[test]
fn unreachable_stage() {
let wf = Workflow {
id: "test".into(),
name: "Test".into(),
start: "a".into(),
stages: vec![make_stage("a"), make_stage("b"), make_stage("orphan")],
edges: vec![Edge {
from: "a".into(),
to: "b".into(),
conditions: vec![],
label: String::new(),
}],
max_iterations: 100,
metadata: std::collections::HashMap::new(),
};
let result = verify_workflow(&wf);
assert!(result.valid); assert_eq!(result.unreachable_stages.len(), 1);
assert!(result.unreachable_stages.contains(&"orphan".to_string()));
}
#[test]
fn cycle_detected() {
let wf = Workflow {
id: "test".into(),
name: "Test".into(),
start: "a".into(),
stages: vec![make_stage("a"), make_stage("b")],
edges: vec![
Edge {
from: "a".into(),
to: "b".into(),
conditions: vec![],
label: String::new(),
},
Edge {
from: "b".into(),
to: "a".into(),
conditions: vec![],
label: String::new(),
},
],
max_iterations: 100,
metadata: std::collections::HashMap::new(),
};
let result = verify_workflow(&wf);
assert!(result.valid); assert!(result.has_cycles);
}
#[test]
fn invalid_edge_reference() {
let wf = Workflow {
id: "test".into(),
name: "Test".into(),
start: "a".into(),
stages: vec![make_stage("a")],
edges: vec![Edge {
from: "a".into(),
to: "ghost".into(),
conditions: vec![],
label: String::new(),
}],
max_iterations: 100,
metadata: std::collections::HashMap::new(),
};
let result = verify_workflow(&wf);
assert!(!result.valid);
assert!(result.issues.iter().any(|i| i.message.contains("ghost")));
}
#[test]
fn invalid_compensation_ref() {
let mut stage = make_stage("a");
stage.compensation = Some(CompensationHandler::StageRef {
stage_id: "nonexistent".into(),
});
let wf = Workflow {
id: "test".into(),
name: "Test".into(),
start: "a".into(),
stages: vec![stage],
edges: vec![],
max_iterations: 100,
metadata: std::collections::HashMap::new(),
};
let result = verify_workflow(&wf);
assert!(!result.valid);
}
}