1use std::collections::{HashMap, HashSet, VecDeque};
4
5use crate::types::*;
6
7#[derive(Debug, Clone)]
9pub struct WorkflowIssue {
10 pub severity: String, pub stage_id: Option<String>,
12 pub message: String,
13}
14
15#[derive(Debug)]
17pub struct WorkflowVerifyResult {
18 pub valid: bool,
19 pub issues: Vec<WorkflowIssue>,
20 pub reachable_stages: Vec<String>,
21 pub unreachable_stages: Vec<String>,
22 pub has_cycles: bool,
23}
24
25pub fn verify_workflow(workflow: &Workflow) -> WorkflowVerifyResult {
27 let mut issues = Vec::new();
28 let stage_ids: HashSet<&str> = workflow.stages.iter().map(|s| s.id.as_str()).collect();
29
30 if !stage_ids.contains(workflow.start.as_str()) {
32 issues.push(WorkflowIssue {
33 severity: "error".into(),
34 stage_id: None,
35 message: format!("start stage '{}' does not exist", workflow.start),
36 });
37 }
38
39 for edge in &workflow.edges {
41 if !stage_ids.contains(edge.from.as_str()) {
42 issues.push(WorkflowIssue {
43 severity: "error".into(),
44 stage_id: None,
45 message: format!("edge from '{}' references unknown stage", edge.from),
46 });
47 }
48 if !stage_ids.contains(edge.to.as_str()) {
49 issues.push(WorkflowIssue {
50 severity: "error".into(),
51 stage_id: None,
52 message: format!("edge to '{}' references unknown stage", edge.to),
53 });
54 }
55 }
56
57 for stage in &workflow.stages {
59 if let Some(CompensationHandler::StageRef { stage_id }) = &stage.compensation {
60 if !stage_ids.contains(stage_id.as_str()) {
61 issues.push(WorkflowIssue {
62 severity: "error".into(),
63 stage_id: Some(stage.id.clone()),
64 message: format!(
65 "compensation for stage '{}' references unknown stage '{}'",
66 stage.id, stage_id
67 ),
68 });
69 }
70 }
71 }
72
73 let adj: HashMap<&str, Vec<&str>> = {
75 let mut m: HashMap<&str, Vec<&str>> = HashMap::new();
76 for edge in &workflow.edges {
77 m.entry(edge.from.as_str())
78 .or_default()
79 .push(edge.to.as_str());
80 }
81 m
82 };
83
84 let mut visited: HashSet<&str> = HashSet::new();
85 let mut queue: VecDeque<&str> = VecDeque::new();
86 if stage_ids.contains(workflow.start.as_str()) {
87 queue.push_back(workflow.start.as_str());
88 visited.insert(workflow.start.as_str());
89 }
90 while let Some(node) = queue.pop_front() {
91 if let Some(neighbors) = adj.get(node) {
92 for &next in neighbors {
93 if visited.insert(next) {
94 queue.push_back(next);
95 }
96 }
97 }
98 }
99
100 let reachable_stages: Vec<String> = visited.iter().map(|s| s.to_string()).collect();
101 let unreachable_stages: Vec<String> = stage_ids
102 .iter()
103 .filter(|s| !visited.contains(**s))
104 .map(|s| s.to_string())
105 .collect();
106
107 for id in &unreachable_stages {
108 issues.push(WorkflowIssue {
109 severity: "warning".into(),
110 stage_id: Some(id.clone()),
111 message: format!("stage '{}' is unreachable from start", id),
112 });
113 }
114
115 let has_cycles = detect_cycles(&adj, workflow.start.as_str());
117 if has_cycles {
118 issues.push(WorkflowIssue {
119 severity: "warning".into(),
120 stage_id: None,
121 message: "workflow contains cycles (ensure max_iterations is set)".into(),
122 });
123 }
124
125 for stage in &workflow.stages {
127 if let StageStep::SubWorkflow(ref sw) = stage.step {
128 let sub_result = verify_workflow(&sw.workflow);
129 for issue in sub_result.issues {
130 issues.push(WorkflowIssue {
131 severity: issue.severity,
132 stage_id: Some(format!(
133 "{}.{}",
134 stage.id,
135 issue.stage_id.unwrap_or_default()
136 )),
137 message: format!("[sub-workflow {}] {}", stage.id, issue.message),
138 });
139 }
140 }
141 }
142
143 for stage in &workflow.stages {
145 if let StageStep::Proposal(ref ps) = stage.step {
146 let vr = car_verify::verify(&ps.proposal, None, None, 100);
147 for issue in &vr.issues {
148 if issue.severity == "error" {
149 issues.push(WorkflowIssue {
150 severity: "error".into(),
151 stage_id: Some(stage.id.clone()),
152 message: format!("[proposal] {}", issue.message),
153 });
154 }
155 }
156 }
157 }
158
159 let valid = !issues.iter().any(|i| i.severity == "error");
160
161 WorkflowVerifyResult {
162 valid,
163 issues,
164 reachable_stages,
165 unreachable_stages,
166 has_cycles,
167 }
168}
169
170fn detect_cycles(adj: &HashMap<&str, Vec<&str>>, start: &str) -> bool {
172 let mut visited = HashSet::new();
173 let mut stack = HashSet::new();
174
175 fn dfs<'a>(
176 node: &'a str,
177 adj: &HashMap<&'a str, Vec<&'a str>>,
178 visited: &mut HashSet<&'a str>,
179 stack: &mut HashSet<&'a str>,
180 ) -> bool {
181 visited.insert(node);
182 stack.insert(node);
183
184 if let Some(neighbors) = adj.get(node) {
185 for &next in neighbors {
186 if stack.contains(next) {
187 return true; }
189 if !visited.contains(next) && dfs(next, adj, visited, stack) {
190 return true;
191 }
192 }
193 }
194
195 stack.remove(node);
196 false
197 }
198
199 dfs(start, adj, &mut visited, &mut stack)
200}
201
202#[cfg(test)]
203mod tests {
204 use super::*;
205 use car_ir::ActionProposal;
206
207 fn make_stage(id: &str) -> Stage {
208 Stage {
209 id: id.into(),
210 name: id.into(),
211 step: StageStep::Proposal(ProposalStep {
212 proposal: ActionProposal {
213 id: format!("p-{}", id),
214 source: "test".into(),
215 actions: vec![],
216 timestamp: chrono::Utc::now(),
217 context: std::collections::HashMap::new(),
218 },
219 }),
220 compensation: None,
221 timeout_ms: None,
222 metadata: std::collections::HashMap::new(),
223 }
224 }
225
226 #[test]
227 fn valid_linear_workflow() {
228 let wf = Workflow {
229 id: "test".into(),
230 name: "Test".into(),
231 start: "a".into(),
232 stages: vec![make_stage("a"), make_stage("b"), make_stage("c")],
233 edges: vec![
234 Edge {
235 from: "a".into(),
236 to: "b".into(),
237 conditions: vec![],
238 label: String::new(),
239 },
240 Edge {
241 from: "b".into(),
242 to: "c".into(),
243 conditions: vec![],
244 label: String::new(),
245 },
246 ],
247 max_iterations: 100,
248 metadata: std::collections::HashMap::new(),
249 };
250 let result = verify_workflow(&wf);
251 assert!(result.valid);
252 assert!(!result.has_cycles);
253 assert_eq!(result.reachable_stages.len(), 3);
254 assert!(result.unreachable_stages.is_empty());
255 }
256
257 #[test]
258 fn missing_start_stage() {
259 let wf = Workflow {
260 id: "test".into(),
261 name: "Test".into(),
262 start: "nonexistent".into(),
263 stages: vec![make_stage("a")],
264 edges: vec![],
265 max_iterations: 100,
266 metadata: std::collections::HashMap::new(),
267 };
268 let result = verify_workflow(&wf);
269 assert!(!result.valid);
270 assert!(result
271 .issues
272 .iter()
273 .any(|i| i.message.contains("nonexistent")));
274 }
275
276 #[test]
277 fn unreachable_stage() {
278 let wf = Workflow {
279 id: "test".into(),
280 name: "Test".into(),
281 start: "a".into(),
282 stages: vec![make_stage("a"), make_stage("b"), make_stage("orphan")],
283 edges: vec![Edge {
284 from: "a".into(),
285 to: "b".into(),
286 conditions: vec![],
287 label: String::new(),
288 }],
289 max_iterations: 100,
290 metadata: std::collections::HashMap::new(),
291 };
292 let result = verify_workflow(&wf);
293 assert!(result.valid); assert_eq!(result.unreachable_stages.len(), 1);
295 assert!(result.unreachable_stages.contains(&"orphan".to_string()));
296 }
297
298 #[test]
299 fn cycle_detected() {
300 let wf = Workflow {
301 id: "test".into(),
302 name: "Test".into(),
303 start: "a".into(),
304 stages: vec![make_stage("a"), make_stage("b")],
305 edges: vec![
306 Edge {
307 from: "a".into(),
308 to: "b".into(),
309 conditions: vec![],
310 label: String::new(),
311 },
312 Edge {
313 from: "b".into(),
314 to: "a".into(),
315 conditions: vec![],
316 label: String::new(),
317 },
318 ],
319 max_iterations: 100,
320 metadata: std::collections::HashMap::new(),
321 };
322 let result = verify_workflow(&wf);
323 assert!(result.valid); assert!(result.has_cycles);
325 }
326
327 #[test]
328 fn invalid_edge_reference() {
329 let wf = Workflow {
330 id: "test".into(),
331 name: "Test".into(),
332 start: "a".into(),
333 stages: vec![make_stage("a")],
334 edges: vec![Edge {
335 from: "a".into(),
336 to: "ghost".into(),
337 conditions: vec![],
338 label: String::new(),
339 }],
340 max_iterations: 100,
341 metadata: std::collections::HashMap::new(),
342 };
343 let result = verify_workflow(&wf);
344 assert!(!result.valid);
345 assert!(result.issues.iter().any(|i| i.message.contains("ghost")));
346 }
347
348 #[test]
349 fn invalid_compensation_ref() {
350 let mut stage = make_stage("a");
351 stage.compensation = Some(CompensationHandler::StageRef {
352 stage_id: "nonexistent".into(),
353 });
354 let wf = Workflow {
355 id: "test".into(),
356 name: "Test".into(),
357 start: "a".into(),
358 stages: vec![stage],
359 edges: vec![],
360 max_iterations: 100,
361 metadata: std::collections::HashMap::new(),
362 };
363 let result = verify_workflow(&wf);
364 assert!(!result.valid);
365 }
366}