1use std::collections::{HashMap, HashSet, VecDeque};
4
5use crate::types::*;
6
7#[derive(Debug, Clone)]
9pub struct WorkflowIssue {
10 pub severity: String, pub stage_id: Option<String>,
12 pub message: String,
13}
14
15#[derive(Debug)]
17pub struct WorkflowVerifyResult {
18 pub valid: bool,
19 pub issues: Vec<WorkflowIssue>,
20 pub reachable_stages: Vec<String>,
21 pub unreachable_stages: Vec<String>,
22 pub has_cycles: bool,
23}
24
25pub fn verify_workflow(workflow: &Workflow) -> WorkflowVerifyResult {
27 let mut issues = Vec::new();
28 let stage_ids: HashSet<&str> = workflow.stages.iter().map(|s| s.id.as_str()).collect();
29
30 if !stage_ids.contains(workflow.start.as_str()) {
32 issues.push(WorkflowIssue {
33 severity: "error".into(),
34 stage_id: None,
35 message: format!("start stage '{}' does not exist", workflow.start),
36 });
37 }
38
39 for edge in &workflow.edges {
41 if !stage_ids.contains(edge.from.as_str()) {
42 issues.push(WorkflowIssue {
43 severity: "error".into(),
44 stage_id: None,
45 message: format!("edge from '{}' references unknown stage", edge.from),
46 });
47 }
48 if !stage_ids.contains(edge.to.as_str()) {
49 issues.push(WorkflowIssue {
50 severity: "error".into(),
51 stage_id: None,
52 message: format!("edge to '{}' references unknown stage", edge.to),
53 });
54 }
55 }
56
57 for stage in &workflow.stages {
59 if let Some(CompensationHandler::StageRef { stage_id }) = &stage.compensation {
60 match workflow.stage(stage_id) {
61 None => issues.push(WorkflowIssue {
62 severity: "error".into(),
63 stage_id: Some(stage.id.clone()),
64 message: format!(
65 "compensation for stage '{}' references unknown stage '{}'",
66 stage.id, stage_id
67 ),
68 }),
69 Some(s) if matches!(s.step, StageStep::Approval(_)) => issues.push(WorkflowIssue {
70 severity: "error".into(),
71 stage_id: Some(stage.id.clone()),
72 message: format!(
73 "compensation for stage '{}' references approval gate '{}', which cannot be run as a compensation",
74 stage.id, stage_id
75 ),
76 }),
77 Some(_) => {}
78 }
79 }
80 }
81
82 for stage in &workflow.stages {
85 if let StageStep::Approval(ap) = &stage.step {
86 if ap.output_key.trim().is_empty() {
87 issues.push(WorkflowIssue {
88 severity: "error".into(),
89 stage_id: Some(stage.id.clone()),
90 message: format!("approval stage '{}' has an empty output_key", stage.id),
91 });
92 }
93 if ap.output_key == "goal" {
94 issues.push(WorkflowIssue {
95 severity: "error".into(),
96 stage_id: Some(stage.id.clone()),
97 message: format!(
98 "approval stage '{}' uses reserved output_key 'goal' (the drift anchor)",
99 stage.id
100 ),
101 });
102 }
103 }
104 }
105
106 let adj: HashMap<&str, Vec<&str>> = {
108 let mut m: HashMap<&str, Vec<&str>> = HashMap::new();
109 for edge in &workflow.edges {
110 m.entry(edge.from.as_str())
111 .or_default()
112 .push(edge.to.as_str());
113 }
114 m
115 };
116
117 let mut visited: HashSet<&str> = HashSet::new();
118 let mut queue: VecDeque<&str> = VecDeque::new();
119 if stage_ids.contains(workflow.start.as_str()) {
120 queue.push_back(workflow.start.as_str());
121 visited.insert(workflow.start.as_str());
122 }
123 while let Some(node) = queue.pop_front() {
124 if let Some(neighbors) = adj.get(node) {
125 for &next in neighbors {
126 if visited.insert(next) {
127 queue.push_back(next);
128 }
129 }
130 }
131 }
132
133 let reachable_stages: Vec<String> = visited.iter().map(|s| s.to_string()).collect();
134 let unreachable_stages: Vec<String> = stage_ids
135 .iter()
136 .filter(|s| !visited.contains(**s))
137 .map(|s| s.to_string())
138 .collect();
139
140 for id in &unreachable_stages {
141 issues.push(WorkflowIssue {
142 severity: "warning".into(),
143 stage_id: Some(id.clone()),
144 message: format!("stage '{}' is unreachable from start", id),
145 });
146 }
147
148 let has_cycles = detect_cycles(&adj, workflow.start.as_str());
150 if has_cycles {
151 issues.push(WorkflowIssue {
152 severity: "warning".into(),
153 stage_id: None,
154 message: "workflow contains cycles (ensure max_iterations is set)".into(),
155 });
156 }
157
158 for stage in &workflow.stages {
160 if let StageStep::SubWorkflow(ref sw) = stage.step {
161 let sub_result = verify_workflow(&sw.workflow);
162 for issue in sub_result.issues {
163 issues.push(WorkflowIssue {
164 severity: issue.severity,
165 stage_id: Some(format!(
166 "{}.{}",
167 stage.id,
168 issue.stage_id.unwrap_or_default()
169 )),
170 message: format!("[sub-workflow {}] {}", stage.id, issue.message),
171 });
172 }
173 }
174 }
175
176 for stage in &workflow.stages {
178 if let StageStep::Proposal(ref ps) = stage.step {
179 verify_proposal(&stage.id, "proposal", &ps.proposal, &mut issues);
180 }
181 }
182
183 for stage in &workflow.stages {
186 validate_dynamic_step(&stage.id, &stage.step, &mut issues);
187 }
188
189 for stage in &workflow.stages {
195 if exceeds_nesting(&stage.step, MAX_STEP_NESTING_DEPTH) {
196 issues.push(WorkflowIssue {
197 severity: "error".into(),
198 stage_id: Some(stage.id.clone()),
199 message: format!(
200 "stage '{}' nests loop/foreach/sub-workflow bodies deeper than the limit of {}",
201 stage.id, MAX_STEP_NESTING_DEPTH
202 ),
203 });
204 }
205 }
206
207 let valid = !issues.iter().any(|i| i.severity == "error");
208
209 WorkflowVerifyResult {
210 valid,
211 issues,
212 reachable_stages,
213 unreachable_stages,
214 has_cycles,
215 }
216}
217
218fn verify_proposal(
221 stage_id: &str,
222 label: &str,
223 proposal: &car_ir::ActionProposal,
224 issues: &mut Vec<WorkflowIssue>,
225) {
226 let vr = car_verify::verify(proposal, None, None, 100);
227 for issue in &vr.issues {
228 if issue.severity == "error" {
229 issues.push(WorkflowIssue {
230 severity: "error".into(),
231 stage_id: Some(stage_id.to_string()),
232 message: format!("[{label}] {}", issue.message),
233 });
234 }
235 }
236}
237
238pub(crate) const MAX_STEP_NESTING_DEPTH: usize = 32;
242
243pub(crate) fn exceeds_nesting(step: &StageStep, remaining: usize) -> bool {
246 if remaining == 0 {
247 return true;
248 }
249 match step {
250 StageStep::LoopUntil(ls) => exceeds_nesting(&ls.body, remaining - 1),
251 StageStep::ForEach(fe) => exceeds_nesting(&fe.body, remaining - 1),
252 StageStep::SubWorkflow(sw) => sw
253 .workflow
254 .stages
255 .iter()
256 .any(|s| exceeds_nesting(&s.step, remaining - 1)),
257 _ => false,
258 }
259}
260
261fn validate_dynamic_step(stage_id: &str, step: &StageStep, issues: &mut Vec<WorkflowIssue>) {
264 match step {
265 StageStep::LoopUntil(ls) => {
266 if ls.max_iterations < 1 {
267 issues.push(WorkflowIssue {
268 severity: "error".into(),
269 stage_id: Some(stage_id.to_string()),
270 message: format!("loop_until stage '{stage_id}' requires max_iterations >= 1"),
271 });
272 }
273 validate_body(stage_id, "loop_until", &ls.body, issues);
274 }
275 StageStep::ForEach(fe) => {
276 if fe.items_from.trim().is_empty() {
277 issues.push(WorkflowIssue {
278 severity: "error".into(),
279 stage_id: Some(stage_id.to_string()),
280 message: format!("for_each stage '{stage_id}' requires a non-empty items_from"),
281 });
282 }
283 validate_body(stage_id, "for_each", &fe.body, issues);
284 }
285 _ => {}
286 }
287}
288
289fn validate_body(
293 stage_id: &str,
294 parent_kind: &str,
295 body: &StageStep,
296 issues: &mut Vec<WorkflowIssue>,
297) {
298 match body {
299 StageStep::Approval(_) => issues.push(WorkflowIssue {
300 severity: "error".into(),
301 stage_id: Some(stage_id.to_string()),
302 message: format!(
303 "{parent_kind} stage '{stage_id}' body cannot be an approval gate (no pause/resume inside a loop or fan-out)"
304 ),
305 }),
306 StageStep::Proposal(ps) => {
307 verify_proposal(
308 stage_id,
309 &format!("{parent_kind} body proposal"),
310 &ps.proposal,
311 issues,
312 );
313 }
314 StageStep::SubWorkflow(sw) => {
315 let sub = verify_workflow(&sw.workflow);
316 for issue in sub.issues {
317 issues.push(WorkflowIssue {
318 severity: issue.severity,
319 stage_id: Some(format!("{stage_id}.{}", issue.stage_id.unwrap_or_default())),
320 message: format!("[{parent_kind} body sub-workflow] {}", issue.message),
321 });
322 }
323 }
324 StageStep::LoopUntil(_) | StageStep::ForEach(_) => {
326 validate_dynamic_step(stage_id, body, issues)
327 }
328 StageStep::Pattern(_) => {}
329 }
330}
331
332pub fn semantic_issues(workflow: &Workflow) -> Vec<String> {
342 let mut produced: HashSet<String> = HashSet::new();
343 produced.insert("user_input".into());
344 produced.insert("user_query".into());
345 for stage in &workflow.stages {
346 produced.insert(format!("stage.{}.succeeded", stage.id));
347 produced.insert(format!("stage.{}.answer", stage.id));
348 produced.insert(format!("stage.{}.error", stage.id));
349 match &stage.step {
350 StageStep::Approval(ap) => {
351 produced.insert(ap.output_key.clone());
352 for f in &ap.fields {
353 produced.insert(format!("{}.{}", ap.output_key, f.name));
354 }
355 }
356 StageStep::Proposal(ps) => {
357 for action in &ps.proposal.actions {
358 for k in action.expected_effects.keys() {
359 produced.insert(k.clone());
360 }
361 }
362 }
363 StageStep::LoopUntil(ls) => {
364 produced.insert(format!("stage.{}.iteration", stage.id));
367 if let StageStep::Proposal(ps) = ls.body.as_ref() {
368 for action in &ps.proposal.actions {
369 for k in action.expected_effects.keys() {
370 produced.insert(k.clone());
371 }
372 }
373 }
374 }
375 StageStep::ForEach(_) => {
376 produced.insert(format!("foreach.{}.count", stage.id));
381 }
382 StageStep::Pattern(_) | StageStep::SubWorkflow(_) => {}
383 }
384 }
385
386 let mut issues = Vec::new();
387 for edge in &workflow.edges {
388 for cond in &edge.conditions {
389 if !produced.contains(&cond.key) {
390 issues.push(format!(
391 "edge {}->{} branches on state key '{}', which no stage produces (the branch may never be taken)",
392 edge.from, edge.to, cond.key
393 ));
394 }
395 }
396 }
397 for stage in &workflow.stages {
398 if let StageStep::Proposal(ps) = &stage.step {
399 for action in &ps.proposal.actions {
400 for dep in &action.state_dependencies {
401 if !produced.contains(dep) {
402 issues.push(format!(
403 "stage '{}' depends on state key '{}', which no stage produces",
404 stage.id, dep
405 ));
406 }
407 }
408 }
409 }
410 }
411 issues
412}
413
414fn detect_cycles(adj: &HashMap<&str, Vec<&str>>, start: &str) -> bool {
416 let mut visited = HashSet::new();
417 let mut stack = HashSet::new();
418
419 fn dfs<'a>(
420 node: &'a str,
421 adj: &HashMap<&'a str, Vec<&'a str>>,
422 visited: &mut HashSet<&'a str>,
423 stack: &mut HashSet<&'a str>,
424 ) -> bool {
425 visited.insert(node);
426 stack.insert(node);
427
428 if let Some(neighbors) = adj.get(node) {
429 for &next in neighbors {
430 if stack.contains(next) {
431 return true; }
433 if !visited.contains(next) && dfs(next, adj, visited, stack) {
434 return true;
435 }
436 }
437 }
438
439 stack.remove(node);
440 false
441 }
442
443 dfs(start, adj, &mut visited, &mut stack)
444}
445
446#[cfg(test)]
447mod tests {
448 use super::*;
449 use car_ir::ActionProposal;
450
451 fn make_stage(id: &str) -> Stage {
452 Stage {
453 id: id.into(),
454 name: id.into(),
455 step: StageStep::Proposal(ProposalStep {
456 proposal: ActionProposal {
457 id: format!("p-{}", id),
458 source: "test".into(),
459 actions: vec![],
460 timestamp: chrono::Utc::now(),
461 context: std::collections::HashMap::new(),
462 },
463 }),
464 compensation: None,
465 timeout_ms: None,
466 metadata: std::collections::HashMap::new(),
467 }
468 }
469
470 #[test]
471 fn valid_linear_workflow() {
472 let wf = Workflow {
473 id: "test".into(),
474 name: "Test".into(),
475 start: "a".into(),
476 goal: None,
477 stages: vec![make_stage("a"), make_stage("b"), make_stage("c")],
478 edges: vec![
479 Edge {
480 from: "a".into(),
481 to: "b".into(),
482 conditions: vec![],
483 label: String::new(),
484 },
485 Edge {
486 from: "b".into(),
487 to: "c".into(),
488 conditions: vec![],
489 label: String::new(),
490 },
491 ],
492 max_iterations: 100,
493 metadata: std::collections::HashMap::new(),
494 };
495 let result = verify_workflow(&wf);
496 assert!(result.valid);
497 assert!(!result.has_cycles);
498 assert_eq!(result.reachable_stages.len(), 3);
499 assert!(result.unreachable_stages.is_empty());
500 }
501
502 #[test]
503 fn missing_start_stage() {
504 let wf = Workflow {
505 id: "test".into(),
506 name: "Test".into(),
507 start: "nonexistent".into(),
508 goal: None,
509 stages: vec![make_stage("a")],
510 edges: vec![],
511 max_iterations: 100,
512 metadata: std::collections::HashMap::new(),
513 };
514 let result = verify_workflow(&wf);
515 assert!(!result.valid);
516 assert!(result
517 .issues
518 .iter()
519 .any(|i| i.message.contains("nonexistent")));
520 }
521
522 #[test]
523 fn unreachable_stage() {
524 let wf = Workflow {
525 id: "test".into(),
526 name: "Test".into(),
527 start: "a".into(),
528 goal: None,
529 stages: vec![make_stage("a"), make_stage("b"), make_stage("orphan")],
530 edges: vec![Edge {
531 from: "a".into(),
532 to: "b".into(),
533 conditions: vec![],
534 label: String::new(),
535 }],
536 max_iterations: 100,
537 metadata: std::collections::HashMap::new(),
538 };
539 let result = verify_workflow(&wf);
540 assert!(result.valid); assert_eq!(result.unreachable_stages.len(), 1);
542 assert!(result.unreachable_stages.contains(&"orphan".to_string()));
543 }
544
545 #[test]
546 fn cycle_detected() {
547 let wf = Workflow {
548 id: "test".into(),
549 name: "Test".into(),
550 start: "a".into(),
551 goal: None,
552 stages: vec![make_stage("a"), make_stage("b")],
553 edges: vec![
554 Edge {
555 from: "a".into(),
556 to: "b".into(),
557 conditions: vec![],
558 label: String::new(),
559 },
560 Edge {
561 from: "b".into(),
562 to: "a".into(),
563 conditions: vec![],
564 label: String::new(),
565 },
566 ],
567 max_iterations: 100,
568 metadata: std::collections::HashMap::new(),
569 };
570 let result = verify_workflow(&wf);
571 assert!(result.valid); assert!(result.has_cycles);
573 }
574
575 #[test]
576 fn invalid_edge_reference() {
577 let wf = Workflow {
578 id: "test".into(),
579 name: "Test".into(),
580 start: "a".into(),
581 goal: None,
582 stages: vec![make_stage("a")],
583 edges: vec![Edge {
584 from: "a".into(),
585 to: "ghost".into(),
586 conditions: vec![],
587 label: String::new(),
588 }],
589 max_iterations: 100,
590 metadata: std::collections::HashMap::new(),
591 };
592 let result = verify_workflow(&wf);
593 assert!(!result.valid);
594 assert!(result.issues.iter().any(|i| i.message.contains("ghost")));
595 }
596
597 fn approval_stage(id: &str, output_key: &str) -> Stage {
598 Stage {
599 id: id.into(),
600 name: id.into(),
601 step: StageStep::Approval(crate::types::ApprovalStep {
602 prompt: "approve?".into(),
603 fields: vec![],
604 output_key: output_key.into(),
605 }),
606 compensation: None,
607 timeout_ms: None,
608 metadata: std::collections::HashMap::new(),
609 }
610 }
611
612 #[test]
613 fn semantic_issues_flag_unknown_edge_key_and_dependency() {
614 let wf = Workflow {
615 id: "t".into(),
616 name: "T".into(),
617 start: "gate".into(),
618 goal: None,
619 stages: vec![approval_stage("gate", "approval"), make_stage("done")],
620 edges: vec![
621 Edge {
623 from: "gate".into(),
624 to: "done".into(),
625 conditions: vec![car_ir::Precondition {
626 key: "approval.decision".into(),
627 operator: "eq".into(),
628 value: serde_json::Value::String("approve".into()),
629 description: String::new(),
630 }],
631 label: String::new(),
632 },
633 ],
634 max_iterations: 100,
635 metadata: std::collections::HashMap::new(),
636 };
637 let issues = semantic_issues(&wf);
638 assert!(issues.iter().any(|i| i.contains("approval.decision")));
639
640 let wf_ok = Workflow {
642 edges: vec![Edge {
643 from: "gate".into(),
644 to: "done".into(),
645 conditions: vec![car_ir::Precondition {
646 key: "stage.gate.succeeded".into(),
647 operator: "eq".into(),
648 value: serde_json::Value::Bool(true),
649 description: String::new(),
650 }],
651 label: String::new(),
652 }],
653 ..wf
654 };
655 assert!(semantic_issues(&wf_ok).is_empty());
656 }
657
658 #[test]
659 fn approval_empty_output_key_is_error() {
660 let wf = Workflow {
661 id: "test".into(),
662 name: "Test".into(),
663 start: "gate".into(),
664 goal: None,
665 stages: vec![approval_stage("gate", "")],
666 edges: vec![],
667 max_iterations: 100,
668 metadata: std::collections::HashMap::new(),
669 };
670 let result = verify_workflow(&wf);
671 assert!(!result.valid);
672 assert!(result
673 .issues
674 .iter()
675 .any(|i| i.message.contains("empty output_key")));
676 }
677
678 #[test]
679 fn approval_as_compensation_is_error() {
680 let mut work = make_stage("work");
681 work.compensation = Some(CompensationHandler::StageRef {
682 stage_id: "gate".into(),
683 });
684 let wf = Workflow {
685 id: "test".into(),
686 name: "Test".into(),
687 start: "work".into(),
688 goal: None,
689 stages: vec![work, approval_stage("gate", "approval")],
690 edges: vec![],
691 max_iterations: 100,
692 metadata: std::collections::HashMap::new(),
693 };
694 let result = verify_workflow(&wf);
695 assert!(!result.valid);
696 assert!(result
697 .issues
698 .iter()
699 .any(|i| i.message.contains("cannot be run as a compensation")));
700 }
701
702 #[test]
703 fn invalid_compensation_ref() {
704 let mut stage = make_stage("a");
705 stage.compensation = Some(CompensationHandler::StageRef {
706 stage_id: "nonexistent".into(),
707 });
708 let wf = Workflow {
709 id: "test".into(),
710 name: "Test".into(),
711 start: "a".into(),
712 goal: None,
713 stages: vec![stage],
714 edges: vec![],
715 max_iterations: 100,
716 metadata: std::collections::HashMap::new(),
717 };
718 let result = verify_workflow(&wf);
719 assert!(!result.valid);
720 }
721
722 fn single_step_wf(step: StageStep) -> Workflow {
723 Workflow {
724 id: "test".into(),
725 name: "Test".into(),
726 start: "s".into(),
727 goal: None,
728 stages: vec![Stage {
729 id: "s".into(),
730 name: "s".into(),
731 step,
732 compensation: None,
733 timeout_ms: None,
734 metadata: std::collections::HashMap::new(),
735 }],
736 edges: vec![],
737 max_iterations: 100,
738 metadata: std::collections::HashMap::new(),
739 }
740 }
741
742 #[test]
743 fn loop_until_zero_iterations_is_error() {
744 let wf = single_step_wf(StageStep::LoopUntil(LoopUntilStep {
745 body: Box::new(make_stage("b").step),
746 until: vec![],
747 max_iterations: 0,
748 }));
749 let result = verify_workflow(&wf);
750 assert!(!result.valid);
751 assert!(result
752 .issues
753 .iter()
754 .any(|i| i.message.contains("max_iterations >= 1")));
755 }
756
757 #[test]
758 fn for_each_empty_items_from_is_error() {
759 let wf = single_step_wf(StageStep::ForEach(ForEachStep {
760 items_from: " ".into(),
761 body: Box::new(make_stage("b").step),
762 max_concurrent: 0,
763 }));
764 let result = verify_workflow(&wf);
765 assert!(!result.valid);
766 assert!(result
767 .issues
768 .iter()
769 .any(|i| i.message.contains("non-empty items_from")));
770 }
771
772 #[test]
773 fn excessive_nesting_is_error() {
774 let mut step = make_stage("leaf").step;
776 for _ in 0..(MAX_STEP_NESTING_DEPTH + 2) {
777 step = StageStep::LoopUntil(LoopUntilStep {
778 body: Box::new(step),
779 until: vec![],
780 max_iterations: 1,
781 });
782 }
783 let wf = single_step_wf(step);
784 let result = verify_workflow(&wf);
785 assert!(!result.valid);
786 assert!(result
787 .issues
788 .iter()
789 .any(|i| i.message.contains("nests")));
790 }
791
792 #[test]
793 fn nesting_within_limit_is_ok() {
794 let mut step = make_stage("leaf").step;
795 for _ in 0..4 {
796 step = StageStep::LoopUntil(LoopUntilStep {
797 body: Box::new(step),
798 until: vec![],
799 max_iterations: 1,
800 });
801 }
802 let wf = single_step_wf(step);
803 let result = verify_workflow(&wf);
804 assert!(result.valid, "issues: {:?}", result.issues);
805 }
806
807 #[test]
808 fn approval_inside_loop_body_is_error() {
809 let wf = single_step_wf(StageStep::LoopUntil(LoopUntilStep {
810 body: Box::new(StageStep::Approval(crate::types::ApprovalStep {
811 prompt: "p".into(),
812 fields: vec![],
813 output_key: "k".into(),
814 })),
815 until: vec![],
816 max_iterations: 3,
817 }));
818 let result = verify_workflow(&wf);
819 assert!(!result.valid);
820 assert!(result
821 .issues
822 .iter()
823 .any(|i| i.message.contains("cannot be an approval gate")));
824 }
825}