Skip to main content

car_workflow/
verify.rs

1//! Static workflow verification — validate structure before execution.
2
3use std::collections::{HashMap, HashSet, VecDeque};
4
5use crate::types::*;
6
7/// A single verification finding.
8#[derive(Debug, Clone)]
9pub struct WorkflowIssue {
10    pub severity: String, // "error", "warning"
11    pub stage_id: Option<String>,
12    pub message: String,
13}
14
15/// Result of static workflow verification.
16#[derive(Debug)]
17pub struct WorkflowVerifyResult {
18    pub valid: bool,
19    pub issues: Vec<WorkflowIssue>,
20    pub reachable_stages: Vec<String>,
21    pub unreachable_stages: Vec<String>,
22    pub has_cycles: bool,
23}
24
25/// Statically verify a workflow definition for structural correctness.
26pub fn verify_workflow(workflow: &Workflow) -> WorkflowVerifyResult {
27    let mut issues = Vec::new();
28    let stage_ids: HashSet<&str> = workflow.stages.iter().map(|s| s.id.as_str()).collect();
29
30    // 1. Start stage exists
31    if !stage_ids.contains(workflow.start.as_str()) {
32        issues.push(WorkflowIssue {
33            severity: "error".into(),
34            stage_id: None,
35            message: format!("start stage '{}' does not exist", workflow.start),
36        });
37    }
38
39    // 2. Edge references valid stage IDs
40    for edge in &workflow.edges {
41        if !stage_ids.contains(edge.from.as_str()) {
42            issues.push(WorkflowIssue {
43                severity: "error".into(),
44                stage_id: None,
45                message: format!("edge from '{}' references unknown stage", edge.from),
46            });
47        }
48        if !stage_ids.contains(edge.to.as_str()) {
49            issues.push(WorkflowIssue {
50                severity: "error".into(),
51                stage_id: None,
52                message: format!("edge to '{}' references unknown stage", edge.to),
53            });
54        }
55    }
56
57    // 3. Compensation StageRef references valid stage IDs
58    for stage in &workflow.stages {
59        if let Some(CompensationHandler::StageRef { stage_id }) = &stage.compensation {
60            if !stage_ids.contains(stage_id.as_str()) {
61                issues.push(WorkflowIssue {
62                    severity: "error".into(),
63                    stage_id: Some(stage.id.clone()),
64                    message: format!(
65                        "compensation for stage '{}' references unknown stage '{}'",
66                        stage.id, stage_id
67                    ),
68                });
69            }
70        }
71    }
72
73    // 4. Reachability via BFS from start
74    let adj: HashMap<&str, Vec<&str>> = {
75        let mut m: HashMap<&str, Vec<&str>> = HashMap::new();
76        for edge in &workflow.edges {
77            m.entry(edge.from.as_str())
78                .or_default()
79                .push(edge.to.as_str());
80        }
81        m
82    };
83
84    let mut visited: HashSet<&str> = HashSet::new();
85    let mut queue: VecDeque<&str> = VecDeque::new();
86    if stage_ids.contains(workflow.start.as_str()) {
87        queue.push_back(workflow.start.as_str());
88        visited.insert(workflow.start.as_str());
89    }
90    while let Some(node) = queue.pop_front() {
91        if let Some(neighbors) = adj.get(node) {
92            for &next in neighbors {
93                if visited.insert(next) {
94                    queue.push_back(next);
95                }
96            }
97        }
98    }
99
100    let reachable_stages: Vec<String> = visited.iter().map(|s| s.to_string()).collect();
101    let unreachable_stages: Vec<String> = stage_ids
102        .iter()
103        .filter(|s| !visited.contains(**s))
104        .map(|s| s.to_string())
105        .collect();
106
107    for id in &unreachable_stages {
108        issues.push(WorkflowIssue {
109            severity: "warning".into(),
110            stage_id: Some(id.clone()),
111            message: format!("stage '{}' is unreachable from start", id),
112        });
113    }
114
115    // 5. Cycle detection via DFS
116    let has_cycles = detect_cycles(&adj, workflow.start.as_str());
117    if has_cycles {
118        issues.push(WorkflowIssue {
119            severity: "warning".into(),
120            stage_id: None,
121            message: "workflow contains cycles (ensure max_iterations is set)".into(),
122        });
123    }
124
125    // 6. Recurse into sub-workflows
126    for stage in &workflow.stages {
127        if let StageStep::SubWorkflow(ref sw) = stage.step {
128            let sub_result = verify_workflow(&sw.workflow);
129            for issue in sub_result.issues {
130                issues.push(WorkflowIssue {
131                    severity: issue.severity,
132                    stage_id: Some(format!(
133                        "{}.{}",
134                        stage.id,
135                        issue.stage_id.unwrap_or_default()
136                    )),
137                    message: format!("[sub-workflow {}] {}", stage.id, issue.message),
138                });
139            }
140        }
141    }
142
143    // 7. Proposal verification via car-verify
144    for stage in &workflow.stages {
145        if let StageStep::Proposal(ref ps) = stage.step {
146            let vr = car_verify::verify(&ps.proposal, None, None, 100);
147            for issue in &vr.issues {
148                if issue.severity == "error" {
149                    issues.push(WorkflowIssue {
150                        severity: "error".into(),
151                        stage_id: Some(stage.id.clone()),
152                        message: format!("[proposal] {}", issue.message),
153                    });
154                }
155            }
156        }
157    }
158
159    let valid = !issues.iter().any(|i| i.severity == "error");
160
161    WorkflowVerifyResult {
162        valid,
163        issues,
164        reachable_stages,
165        unreachable_stages,
166        has_cycles,
167    }
168}
169
170/// DFS-based cycle detection.
171fn detect_cycles(adj: &HashMap<&str, Vec<&str>>, start: &str) -> bool {
172    let mut visited = HashSet::new();
173    let mut stack = HashSet::new();
174
175    fn dfs<'a>(
176        node: &'a str,
177        adj: &HashMap<&'a str, Vec<&'a str>>,
178        visited: &mut HashSet<&'a str>,
179        stack: &mut HashSet<&'a str>,
180    ) -> bool {
181        visited.insert(node);
182        stack.insert(node);
183
184        if let Some(neighbors) = adj.get(node) {
185            for &next in neighbors {
186                if stack.contains(next) {
187                    return true; // back edge = cycle
188                }
189                if !visited.contains(next) && dfs(next, adj, visited, stack) {
190                    return true;
191                }
192            }
193        }
194
195        stack.remove(node);
196        false
197    }
198
199    dfs(start, adj, &mut visited, &mut stack)
200}
201
202#[cfg(test)]
203mod tests {
204    use super::*;
205    use car_ir::ActionProposal;
206
207    fn make_stage(id: &str) -> Stage {
208        Stage {
209            id: id.into(),
210            name: id.into(),
211            step: StageStep::Proposal(ProposalStep {
212                proposal: ActionProposal {
213                    id: format!("p-{}", id),
214                    source: "test".into(),
215                    actions: vec![],
216                    timestamp: chrono::Utc::now(),
217                    context: std::collections::HashMap::new(),
218                },
219            }),
220            compensation: None,
221            timeout_ms: None,
222            metadata: std::collections::HashMap::new(),
223        }
224    }
225
226    #[test]
227    fn valid_linear_workflow() {
228        let wf = Workflow {
229            id: "test".into(),
230            name: "Test".into(),
231            start: "a".into(),
232            stages: vec![make_stage("a"), make_stage("b"), make_stage("c")],
233            edges: vec![
234                Edge {
235                    from: "a".into(),
236                    to: "b".into(),
237                    conditions: vec![],
238                    label: String::new(),
239                },
240                Edge {
241                    from: "b".into(),
242                    to: "c".into(),
243                    conditions: vec![],
244                    label: String::new(),
245                },
246            ],
247            max_iterations: 100,
248            metadata: std::collections::HashMap::new(),
249        };
250        let result = verify_workflow(&wf);
251        assert!(result.valid);
252        assert!(!result.has_cycles);
253        assert_eq!(result.reachable_stages.len(), 3);
254        assert!(result.unreachable_stages.is_empty());
255    }
256
257    #[test]
258    fn missing_start_stage() {
259        let wf = Workflow {
260            id: "test".into(),
261            name: "Test".into(),
262            start: "nonexistent".into(),
263            stages: vec![make_stage("a")],
264            edges: vec![],
265            max_iterations: 100,
266            metadata: std::collections::HashMap::new(),
267        };
268        let result = verify_workflow(&wf);
269        assert!(!result.valid);
270        assert!(result
271            .issues
272            .iter()
273            .any(|i| i.message.contains("nonexistent")));
274    }
275
276    #[test]
277    fn unreachable_stage() {
278        let wf = Workflow {
279            id: "test".into(),
280            name: "Test".into(),
281            start: "a".into(),
282            stages: vec![make_stage("a"), make_stage("b"), make_stage("orphan")],
283            edges: vec![Edge {
284                from: "a".into(),
285                to: "b".into(),
286                conditions: vec![],
287                label: String::new(),
288            }],
289            max_iterations: 100,
290            metadata: std::collections::HashMap::new(),
291        };
292        let result = verify_workflow(&wf);
293        assert!(result.valid); // unreachable is a warning, not error
294        assert_eq!(result.unreachable_stages.len(), 1);
295        assert!(result.unreachable_stages.contains(&"orphan".to_string()));
296    }
297
298    #[test]
299    fn cycle_detected() {
300        let wf = Workflow {
301            id: "test".into(),
302            name: "Test".into(),
303            start: "a".into(),
304            stages: vec![make_stage("a"), make_stage("b")],
305            edges: vec![
306                Edge {
307                    from: "a".into(),
308                    to: "b".into(),
309                    conditions: vec![],
310                    label: String::new(),
311                },
312                Edge {
313                    from: "b".into(),
314                    to: "a".into(),
315                    conditions: vec![],
316                    label: String::new(),
317                },
318            ],
319            max_iterations: 100,
320            metadata: std::collections::HashMap::new(),
321        };
322        let result = verify_workflow(&wf);
323        assert!(result.valid); // cycles are warnings
324        assert!(result.has_cycles);
325    }
326
327    #[test]
328    fn invalid_edge_reference() {
329        let wf = Workflow {
330            id: "test".into(),
331            name: "Test".into(),
332            start: "a".into(),
333            stages: vec![make_stage("a")],
334            edges: vec![Edge {
335                from: "a".into(),
336                to: "ghost".into(),
337                conditions: vec![],
338                label: String::new(),
339            }],
340            max_iterations: 100,
341            metadata: std::collections::HashMap::new(),
342        };
343        let result = verify_workflow(&wf);
344        assert!(!result.valid);
345        assert!(result.issues.iter().any(|i| i.message.contains("ghost")));
346    }
347
348    #[test]
349    fn invalid_compensation_ref() {
350        let mut stage = make_stage("a");
351        stage.compensation = Some(CompensationHandler::StageRef {
352            stage_id: "nonexistent".into(),
353        });
354        let wf = Workflow {
355            id: "test".into(),
356            name: "Test".into(),
357            start: "a".into(),
358            stages: vec![stage],
359            edges: vec![],
360            max_iterations: 100,
361            metadata: std::collections::HashMap::new(),
362        };
363        let result = verify_workflow(&wf);
364        assert!(!result.valid);
365    }
366}