use std::collections::{HashMap, HashSet, VecDeque};
use crate::types::*;
#[derive(Debug, Clone)]
pub struct WorkflowIssue {
pub severity: String, pub stage_id: Option<String>,
pub message: String,
}
#[derive(Debug)]
pub struct WorkflowVerifyResult {
pub valid: bool,
pub issues: Vec<WorkflowIssue>,
pub reachable_stages: Vec<String>,
pub unreachable_stages: Vec<String>,
pub has_cycles: bool,
}
pub fn verify_workflow(workflow: &Workflow) -> WorkflowVerifyResult {
let mut issues = Vec::new();
let stage_ids: HashSet<&str> = workflow.stages.iter().map(|s| s.id.as_str()).collect();
if !stage_ids.contains(workflow.start.as_str()) {
issues.push(WorkflowIssue {
severity: "error".into(),
stage_id: None,
message: format!("start stage '{}' does not exist", workflow.start),
});
}
for edge in &workflow.edges {
if !stage_ids.contains(edge.from.as_str()) {
issues.push(WorkflowIssue {
severity: "error".into(),
stage_id: None,
message: format!("edge from '{}' references unknown stage", edge.from),
});
}
if !stage_ids.contains(edge.to.as_str()) {
issues.push(WorkflowIssue {
severity: "error".into(),
stage_id: None,
message: format!("edge to '{}' references unknown stage", edge.to),
});
}
}
for stage in &workflow.stages {
if let Some(CompensationHandler::StageRef { stage_id }) = &stage.compensation {
match workflow.stage(stage_id) {
None => issues.push(WorkflowIssue {
severity: "error".into(),
stage_id: Some(stage.id.clone()),
message: format!(
"compensation for stage '{}' references unknown stage '{}'",
stage.id, stage_id
),
}),
Some(s) if matches!(s.step, StageStep::Approval(_)) => issues.push(WorkflowIssue {
severity: "error".into(),
stage_id: Some(stage.id.clone()),
message: format!(
"compensation for stage '{}' references approval gate '{}', which cannot be run as a compensation",
stage.id, stage_id
),
}),
Some(_) => {}
}
}
}
for stage in &workflow.stages {
if let StageStep::Approval(ap) = &stage.step {
if ap.output_key.trim().is_empty() {
issues.push(WorkflowIssue {
severity: "error".into(),
stage_id: Some(stage.id.clone()),
message: format!("approval stage '{}' has an empty output_key", stage.id),
});
}
if ap.output_key == "goal" {
issues.push(WorkflowIssue {
severity: "error".into(),
stage_id: Some(stage.id.clone()),
message: format!(
"approval stage '{}' uses reserved output_key 'goal' (the drift anchor)",
stage.id
),
});
}
}
}
let adj: HashMap<&str, Vec<&str>> = {
let mut m: HashMap<&str, Vec<&str>> = HashMap::new();
for edge in &workflow.edges {
m.entry(edge.from.as_str())
.or_default()
.push(edge.to.as_str());
}
m
};
let mut visited: HashSet<&str> = HashSet::new();
let mut queue: VecDeque<&str> = VecDeque::new();
if stage_ids.contains(workflow.start.as_str()) {
queue.push_back(workflow.start.as_str());
visited.insert(workflow.start.as_str());
}
while let Some(node) = queue.pop_front() {
if let Some(neighbors) = adj.get(node) {
for &next in neighbors {
if visited.insert(next) {
queue.push_back(next);
}
}
}
}
let reachable_stages: Vec<String> = visited.iter().map(|s| s.to_string()).collect();
let unreachable_stages: Vec<String> = stage_ids
.iter()
.filter(|s| !visited.contains(**s))
.map(|s| s.to_string())
.collect();
for id in &unreachable_stages {
issues.push(WorkflowIssue {
severity: "warning".into(),
stage_id: Some(id.clone()),
message: format!("stage '{}' is unreachable from start", id),
});
}
let has_cycles = detect_cycles(&adj, workflow.start.as_str());
if has_cycles {
issues.push(WorkflowIssue {
severity: "warning".into(),
stage_id: None,
message: "workflow contains cycles (ensure max_iterations is set)".into(),
});
}
for stage in &workflow.stages {
if let StageStep::SubWorkflow(ref sw) = stage.step {
let sub_result = verify_workflow(&sw.workflow);
for issue in sub_result.issues {
issues.push(WorkflowIssue {
severity: issue.severity,
stage_id: Some(format!(
"{}.{}",
stage.id,
issue.stage_id.unwrap_or_default()
)),
message: format!("[sub-workflow {}] {}", stage.id, issue.message),
});
}
}
}
for stage in &workflow.stages {
if let StageStep::Proposal(ref ps) = stage.step {
verify_proposal(&stage.id, "proposal", &ps.proposal, &mut issues);
}
}
for stage in &workflow.stages {
validate_dynamic_step(&stage.id, &stage.step, &mut issues);
}
for stage in &workflow.stages {
if exceeds_nesting(&stage.step, MAX_STEP_NESTING_DEPTH) {
issues.push(WorkflowIssue {
severity: "error".into(),
stage_id: Some(stage.id.clone()),
message: format!(
"stage '{}' nests loop/foreach/sub-workflow bodies deeper than the limit of {}",
stage.id, MAX_STEP_NESTING_DEPTH
),
});
}
}
let valid = !issues.iter().any(|i| i.severity == "error");
WorkflowVerifyResult {
valid,
issues,
reachable_stages,
unreachable_stages,
has_cycles,
}
}
fn verify_proposal(
stage_id: &str,
label: &str,
proposal: &car_ir::ActionProposal,
issues: &mut Vec<WorkflowIssue>,
) {
let vr = car_verify::verify(proposal, None, None, 100);
for issue in &vr.issues {
if issue.severity == "error" {
issues.push(WorkflowIssue {
severity: "error".into(),
stage_id: Some(stage_id.to_string()),
message: format!("[{label}] {}", issue.message),
});
}
}
}
pub(crate) const MAX_STEP_NESTING_DEPTH: usize = 32;
pub(crate) fn exceeds_nesting(step: &StageStep, remaining: usize) -> bool {
if remaining == 0 {
return true;
}
match step {
StageStep::LoopUntil(ls) => exceeds_nesting(&ls.body, remaining - 1),
StageStep::ForEach(fe) => exceeds_nesting(&fe.body, remaining - 1),
StageStep::SubWorkflow(sw) => sw
.workflow
.stages
.iter()
.any(|s| exceeds_nesting(&s.step, remaining - 1)),
_ => false,
}
}
fn validate_dynamic_step(stage_id: &str, step: &StageStep, issues: &mut Vec<WorkflowIssue>) {
match step {
StageStep::LoopUntil(ls) => {
if ls.max_iterations < 1 {
issues.push(WorkflowIssue {
severity: "error".into(),
stage_id: Some(stage_id.to_string()),
message: format!("loop_until stage '{stage_id}' requires max_iterations >= 1"),
});
}
validate_body(stage_id, "loop_until", &ls.body, issues);
}
StageStep::ForEach(fe) => {
if fe.items_from.trim().is_empty() {
issues.push(WorkflowIssue {
severity: "error".into(),
stage_id: Some(stage_id.to_string()),
message: format!("for_each stage '{stage_id}' requires a non-empty items_from"),
});
}
validate_body(stage_id, "for_each", &fe.body, issues);
}
_ => {}
}
}
fn validate_body(
stage_id: &str,
parent_kind: &str,
body: &StageStep,
issues: &mut Vec<WorkflowIssue>,
) {
match body {
StageStep::Approval(_) => issues.push(WorkflowIssue {
severity: "error".into(),
stage_id: Some(stage_id.to_string()),
message: format!(
"{parent_kind} stage '{stage_id}' body cannot be an approval gate (no pause/resume inside a loop or fan-out)"
),
}),
StageStep::Proposal(ps) => {
verify_proposal(
stage_id,
&format!("{parent_kind} body proposal"),
&ps.proposal,
issues,
);
}
StageStep::SubWorkflow(sw) => {
let sub = verify_workflow(&sw.workflow);
for issue in sub.issues {
issues.push(WorkflowIssue {
severity: issue.severity,
stage_id: Some(format!("{stage_id}.{}", issue.stage_id.unwrap_or_default())),
message: format!("[{parent_kind} body sub-workflow] {}", issue.message),
});
}
}
StageStep::LoopUntil(_) | StageStep::ForEach(_) => {
validate_dynamic_step(stage_id, body, issues)
}
StageStep::Pattern(_) => {}
}
}
pub fn semantic_issues(workflow: &Workflow) -> Vec<String> {
let mut produced: HashSet<String> = HashSet::new();
produced.insert("user_input".into());
produced.insert("user_query".into());
for stage in &workflow.stages {
produced.insert(format!("stage.{}.succeeded", stage.id));
produced.insert(format!("stage.{}.answer", stage.id));
produced.insert(format!("stage.{}.error", stage.id));
match &stage.step {
StageStep::Approval(ap) => {
produced.insert(ap.output_key.clone());
for f in &ap.fields {
produced.insert(format!("{}.{}", ap.output_key, f.name));
}
}
StageStep::Proposal(ps) => {
for action in &ps.proposal.actions {
for k in action.expected_effects.keys() {
produced.insert(k.clone());
}
}
}
StageStep::LoopUntil(ls) => {
produced.insert(format!("stage.{}.iteration", stage.id));
if let StageStep::Proposal(ps) = ls.body.as_ref() {
for action in &ps.proposal.actions {
for k in action.expected_effects.keys() {
produced.insert(k.clone());
}
}
}
}
StageStep::ForEach(_) => {
produced.insert(format!("foreach.{}.count", stage.id));
}
StageStep::Pattern(_) | StageStep::SubWorkflow(_) => {}
}
}
let mut issues = Vec::new();
for edge in &workflow.edges {
for cond in &edge.conditions {
if !produced.contains(&cond.key) {
issues.push(format!(
"edge {}->{} branches on state key '{}', which no stage produces (the branch may never be taken)",
edge.from, edge.to, cond.key
));
}
}
}
for stage in &workflow.stages {
if let StageStep::Proposal(ps) = &stage.step {
for action in &ps.proposal.actions {
for dep in &action.state_dependencies {
if !produced.contains(dep) {
issues.push(format!(
"stage '{}' depends on state key '{}', which no stage produces",
stage.id, dep
));
}
}
}
}
}
issues
}
fn detect_cycles(adj: &HashMap<&str, Vec<&str>>, start: &str) -> bool {
let mut visited = HashSet::new();
let mut stack = HashSet::new();
fn dfs<'a>(
node: &'a str,
adj: &HashMap<&'a str, Vec<&'a str>>,
visited: &mut HashSet<&'a str>,
stack: &mut HashSet<&'a str>,
) -> bool {
visited.insert(node);
stack.insert(node);
if let Some(neighbors) = adj.get(node) {
for &next in neighbors {
if stack.contains(next) {
return true; }
if !visited.contains(next) && dfs(next, adj, visited, stack) {
return true;
}
}
}
stack.remove(node);
false
}
dfs(start, adj, &mut visited, &mut stack)
}
#[cfg(test)]
mod tests {
use super::*;
use car_ir::ActionProposal;
fn make_stage(id: &str) -> Stage {
Stage {
id: id.into(),
name: id.into(),
step: StageStep::Proposal(ProposalStep {
proposal: ActionProposal {
id: format!("p-{}", id),
source: "test".into(),
actions: vec![],
timestamp: chrono::Utc::now(),
context: std::collections::HashMap::new(),
},
}),
compensation: None,
timeout_ms: None,
metadata: std::collections::HashMap::new(),
}
}
#[test]
fn valid_linear_workflow() {
let wf = Workflow {
id: "test".into(),
name: "Test".into(),
start: "a".into(),
goal: None,
stages: vec![make_stage("a"), make_stage("b"), make_stage("c")],
edges: vec![
Edge {
from: "a".into(),
to: "b".into(),
conditions: vec![],
label: String::new(),
},
Edge {
from: "b".into(),
to: "c".into(),
conditions: vec![],
label: String::new(),
},
],
max_iterations: 100,
metadata: std::collections::HashMap::new(),
};
let result = verify_workflow(&wf);
assert!(result.valid);
assert!(!result.has_cycles);
assert_eq!(result.reachable_stages.len(), 3);
assert!(result.unreachable_stages.is_empty());
}
#[test]
fn missing_start_stage() {
let wf = Workflow {
id: "test".into(),
name: "Test".into(),
start: "nonexistent".into(),
goal: None,
stages: vec![make_stage("a")],
edges: vec![],
max_iterations: 100,
metadata: std::collections::HashMap::new(),
};
let result = verify_workflow(&wf);
assert!(!result.valid);
assert!(result
.issues
.iter()
.any(|i| i.message.contains("nonexistent")));
}
#[test]
fn unreachable_stage() {
let wf = Workflow {
id: "test".into(),
name: "Test".into(),
start: "a".into(),
goal: None,
stages: vec![make_stage("a"), make_stage("b"), make_stage("orphan")],
edges: vec![Edge {
from: "a".into(),
to: "b".into(),
conditions: vec![],
label: String::new(),
}],
max_iterations: 100,
metadata: std::collections::HashMap::new(),
};
let result = verify_workflow(&wf);
assert!(result.valid); assert_eq!(result.unreachable_stages.len(), 1);
assert!(result.unreachable_stages.contains(&"orphan".to_string()));
}
#[test]
fn cycle_detected() {
let wf = Workflow {
id: "test".into(),
name: "Test".into(),
start: "a".into(),
goal: None,
stages: vec![make_stage("a"), make_stage("b")],
edges: vec![
Edge {
from: "a".into(),
to: "b".into(),
conditions: vec![],
label: String::new(),
},
Edge {
from: "b".into(),
to: "a".into(),
conditions: vec![],
label: String::new(),
},
],
max_iterations: 100,
metadata: std::collections::HashMap::new(),
};
let result = verify_workflow(&wf);
assert!(result.valid); assert!(result.has_cycles);
}
#[test]
fn invalid_edge_reference() {
let wf = Workflow {
id: "test".into(),
name: "Test".into(),
start: "a".into(),
goal: None,
stages: vec![make_stage("a")],
edges: vec![Edge {
from: "a".into(),
to: "ghost".into(),
conditions: vec![],
label: String::new(),
}],
max_iterations: 100,
metadata: std::collections::HashMap::new(),
};
let result = verify_workflow(&wf);
assert!(!result.valid);
assert!(result.issues.iter().any(|i| i.message.contains("ghost")));
}
fn approval_stage(id: &str, output_key: &str) -> Stage {
Stage {
id: id.into(),
name: id.into(),
step: StageStep::Approval(crate::types::ApprovalStep {
prompt: "approve?".into(),
fields: vec![],
output_key: output_key.into(),
}),
compensation: None,
timeout_ms: None,
metadata: std::collections::HashMap::new(),
}
}
#[test]
fn semantic_issues_flag_unknown_edge_key_and_dependency() {
let wf = Workflow {
id: "t".into(),
name: "T".into(),
start: "gate".into(),
goal: None,
stages: vec![approval_stage("gate", "approval"), make_stage("done")],
edges: vec![
Edge {
from: "gate".into(),
to: "done".into(),
conditions: vec![car_ir::Precondition {
key: "approval.decision".into(),
operator: "eq".into(),
value: serde_json::Value::String("approve".into()),
description: String::new(),
}],
label: String::new(),
},
],
max_iterations: 100,
metadata: std::collections::HashMap::new(),
};
let issues = semantic_issues(&wf);
assert!(issues.iter().any(|i| i.contains("approval.decision")));
let wf_ok = Workflow {
edges: vec![Edge {
from: "gate".into(),
to: "done".into(),
conditions: vec![car_ir::Precondition {
key: "stage.gate.succeeded".into(),
operator: "eq".into(),
value: serde_json::Value::Bool(true),
description: String::new(),
}],
label: String::new(),
}],
..wf
};
assert!(semantic_issues(&wf_ok).is_empty());
}
#[test]
fn approval_empty_output_key_is_error() {
let wf = Workflow {
id: "test".into(),
name: "Test".into(),
start: "gate".into(),
goal: None,
stages: vec![approval_stage("gate", "")],
edges: vec![],
max_iterations: 100,
metadata: std::collections::HashMap::new(),
};
let result = verify_workflow(&wf);
assert!(!result.valid);
assert!(result
.issues
.iter()
.any(|i| i.message.contains("empty output_key")));
}
#[test]
fn approval_as_compensation_is_error() {
let mut work = make_stage("work");
work.compensation = Some(CompensationHandler::StageRef {
stage_id: "gate".into(),
});
let wf = Workflow {
id: "test".into(),
name: "Test".into(),
start: "work".into(),
goal: None,
stages: vec![work, approval_stage("gate", "approval")],
edges: vec![],
max_iterations: 100,
metadata: std::collections::HashMap::new(),
};
let result = verify_workflow(&wf);
assert!(!result.valid);
assert!(result
.issues
.iter()
.any(|i| i.message.contains("cannot be run as a compensation")));
}
#[test]
fn invalid_compensation_ref() {
let mut stage = make_stage("a");
stage.compensation = Some(CompensationHandler::StageRef {
stage_id: "nonexistent".into(),
});
let wf = Workflow {
id: "test".into(),
name: "Test".into(),
start: "a".into(),
goal: None,
stages: vec![stage],
edges: vec![],
max_iterations: 100,
metadata: std::collections::HashMap::new(),
};
let result = verify_workflow(&wf);
assert!(!result.valid);
}
fn single_step_wf(step: StageStep) -> Workflow {
Workflow {
id: "test".into(),
name: "Test".into(),
start: "s".into(),
goal: None,
stages: vec![Stage {
id: "s".into(),
name: "s".into(),
step,
compensation: None,
timeout_ms: None,
metadata: std::collections::HashMap::new(),
}],
edges: vec![],
max_iterations: 100,
metadata: std::collections::HashMap::new(),
}
}
#[test]
fn loop_until_zero_iterations_is_error() {
let wf = single_step_wf(StageStep::LoopUntil(LoopUntilStep {
body: Box::new(make_stage("b").step),
until: vec![],
max_iterations: 0,
}));
let result = verify_workflow(&wf);
assert!(!result.valid);
assert!(result
.issues
.iter()
.any(|i| i.message.contains("max_iterations >= 1")));
}
#[test]
fn for_each_empty_items_from_is_error() {
let wf = single_step_wf(StageStep::ForEach(ForEachStep {
items_from: " ".into(),
body: Box::new(make_stage("b").step),
max_concurrent: 0,
}));
let result = verify_workflow(&wf);
assert!(!result.valid);
assert!(result
.issues
.iter()
.any(|i| i.message.contains("non-empty items_from")));
}
#[test]
fn excessive_nesting_is_error() {
let mut step = make_stage("leaf").step;
for _ in 0..(MAX_STEP_NESTING_DEPTH + 2) {
step = StageStep::LoopUntil(LoopUntilStep {
body: Box::new(step),
until: vec![],
max_iterations: 1,
});
}
let wf = single_step_wf(step);
let result = verify_workflow(&wf);
assert!(!result.valid);
assert!(result
.issues
.iter()
.any(|i| i.message.contains("nests")));
}
#[test]
fn nesting_within_limit_is_ok() {
let mut step = make_stage("leaf").step;
for _ in 0..4 {
step = StageStep::LoopUntil(LoopUntilStep {
body: Box::new(step),
until: vec![],
max_iterations: 1,
});
}
let wf = single_step_wf(step);
let result = verify_workflow(&wf);
assert!(result.valid, "issues: {:?}", result.issues);
}
#[test]
fn approval_inside_loop_body_is_error() {
let wf = single_step_wf(StageStep::LoopUntil(LoopUntilStep {
body: Box::new(StageStep::Approval(crate::types::ApprovalStep {
prompt: "p".into(),
fields: vec![],
output_key: "k".into(),
})),
until: vec![],
max_iterations: 3,
}));
let result = verify_workflow(&wf);
assert!(!result.valid);
assert!(result
.issues
.iter()
.any(|i| i.message.contains("cannot be an approval gate")));
}
}