1use std::collections::{BTreeSet, HashMap, HashSet, VecDeque};
2
3use thiserror::Error;
4
5use crate::ir::{MergePolicy, NodeKind, WorkflowDefinition, WORKFLOW_IR_V0};
6
7pub fn validate_and_normalize(
9 input: &WorkflowDefinition,
10) -> Result<WorkflowDefinition, ValidationErrors> {
11 let normalized = input.normalized();
12 let diagnostics = validate(&normalized);
13
14 if diagnostics.is_empty() {
15 Ok(normalized)
16 } else {
17 Err(ValidationErrors { diagnostics })
18 }
19}
20
21#[derive(Debug, Clone, Copy, PartialEq, Eq)]
23pub enum Severity {
24 Error,
26}
27
28#[derive(Debug, Clone, Copy, PartialEq, Eq)]
30pub enum DiagnosticCode {
31 UnsupportedVersion,
33 EmptyWorkflowName,
35 EmptyWorkflow,
37 DuplicateNodeId,
39 EmptyNodeId,
41 UnknownTarget,
43 MissingStart,
45 MultipleStart,
47 MissingEnd,
49 UnreachableNode,
51 NoPathToEnd,
53 EmptyField,
55}
56
57#[derive(Debug, Clone, PartialEq, Eq)]
59pub struct Diagnostic {
60 pub severity: Severity,
62 pub code: DiagnosticCode,
64 pub message: String,
66 pub node_id: Option<String>,
68}
69
70impl Diagnostic {
71 fn error(code: DiagnosticCode, message: impl Into<String>, node_id: Option<String>) -> Self {
72 Self {
73 severity: Severity::Error,
74 code,
75 message: message.into(),
76 node_id,
77 }
78 }
79}
80
81#[derive(Debug, Clone, PartialEq, Eq, Error)]
83#[error("workflow validation failed")]
84pub struct ValidationErrors {
85 pub diagnostics: Vec<Diagnostic>,
87}
88
89pub type ValidationError = ValidationErrors;
91
92fn validate(workflow: &WorkflowDefinition) -> Vec<Diagnostic> {
93 let mut diagnostics = Vec::new();
94
95 if workflow.version != WORKFLOW_IR_V0 {
96 diagnostics.push(Diagnostic::error(
97 DiagnosticCode::UnsupportedVersion,
98 format!(
99 "unsupported workflow IR version '{}'; expected '{}'",
100 workflow.version, WORKFLOW_IR_V0
101 ),
102 None,
103 ));
104 }
105
106 if workflow.name.is_empty() {
107 diagnostics.push(Diagnostic::error(
108 DiagnosticCode::EmptyWorkflowName,
109 "workflow name must not be empty",
110 None,
111 ));
112 }
113
114 if workflow.nodes.is_empty() {
115 diagnostics.push(Diagnostic::error(
116 DiagnosticCode::EmptyWorkflow,
117 "workflow must contain at least one node",
118 None,
119 ));
120 return diagnostics;
121 }
122
123 let mut node_index = HashMap::with_capacity(workflow.nodes.len());
124 let mut duplicates = BTreeSet::new();
125 let mut start_ids = Vec::new();
126 let mut end_count = 0usize;
127
128 for node in &workflow.nodes {
129 if node.id.is_empty() {
130 diagnostics.push(Diagnostic::error(
131 DiagnosticCode::EmptyNodeId,
132 "node id must not be empty",
133 Some(node.id.clone()),
134 ));
135 }
136
137 if let Some(previous_id) = node_index.insert(node.id.as_str(), node) {
138 duplicates.insert(previous_id.id.clone());
139 duplicates.insert(node.id.clone());
140 }
141 validate_node_kind_fields(node, &mut diagnostics, &mut start_ids, &mut end_count);
142 }
143
144 for node in &workflow.nodes {
145 match &node.kind {
146 NodeKind::Merge { sources, .. } => {
147 for source in sources {
148 if !node_index.contains_key(source.as_str()) {
149 diagnostics.push(Diagnostic::error(
150 DiagnosticCode::UnknownTarget,
151 format!("node '{}' references unknown source '{}'", node.id, source),
152 Some(node.id.clone()),
153 ));
154 }
155 }
156 }
157 NodeKind::Reduce { source, .. } => {
158 if !node_index.contains_key(source.as_str()) {
159 diagnostics.push(Diagnostic::error(
160 DiagnosticCode::UnknownTarget,
161 format!("node '{}' references unknown source '{}'", node.id, source),
162 Some(node.id.clone()),
163 ));
164 }
165 }
166 _ => {}
167 }
168 }
169
170 for duplicate_id in duplicates {
171 diagnostics.push(Diagnostic::error(
172 DiagnosticCode::DuplicateNodeId,
173 format!("duplicate node id '{}'", duplicate_id),
174 Some(duplicate_id),
175 ));
176 }
177
178 if start_ids.is_empty() {
179 diagnostics.push(Diagnostic::error(
180 DiagnosticCode::MissingStart,
181 "workflow must contain exactly one start node",
182 None,
183 ));
184 } else if start_ids.len() > 1 {
185 diagnostics.push(Diagnostic::error(
186 DiagnosticCode::MultipleStart,
187 format!(
188 "workflow must contain exactly one start node, found {}",
189 start_ids.len()
190 ),
191 None,
192 ));
193 }
194
195 if end_count == 0 {
196 diagnostics.push(Diagnostic::error(
197 DiagnosticCode::MissingEnd,
198 "workflow must contain at least one end node",
199 None,
200 ));
201 }
202
203 for node in &workflow.nodes {
204 for edge in node.outgoing_edges() {
205 if !node_index.contains_key(edge) {
206 diagnostics.push(Diagnostic::error(
207 DiagnosticCode::UnknownTarget,
208 format!("node '{}' references unknown target '{}'", node.id, edge),
209 Some(node.id.clone()),
210 ));
211 }
212 }
213 }
214
215 if start_ids.len() == 1 {
216 let start_id = start_ids[0].as_str();
217 let reachable = reachable_nodes(start_id, &node_index);
218
219 for node in &workflow.nodes {
220 if !reachable.contains(node.id.as_str()) {
221 diagnostics.push(Diagnostic::error(
222 DiagnosticCode::UnreachableNode,
223 format!(
224 "node '{}' is unreachable from start node '{}'",
225 node.id, start_id
226 ),
227 Some(node.id.clone()),
228 ));
229 }
230 }
231
232 let has_path_to_end = reachable.iter().any(|id| {
233 node_index
234 .get(*id)
235 .is_some_and(|node| matches!(node.kind, NodeKind::End))
236 });
237
238 if !has_path_to_end {
239 diagnostics.push(Diagnostic::error(
240 DiagnosticCode::NoPathToEnd,
241 format!("start node '{}' cannot reach any end node", start_id),
242 Some(start_id.to_string()),
243 ));
244 }
245 }
246
247 diagnostics
248}
249
250fn validate_node_kind_fields(
251 node: &crate::ir::Node,
252 diagnostics: &mut Vec<Diagnostic>,
253 start_ids: &mut Vec<String>,
254 end_count: &mut usize,
255) {
256 fn emit_empty_field(diagnostics: &mut Vec<Diagnostic>, node_id: &str, message: &str) {
257 diagnostics.push(Diagnostic::error(
258 DiagnosticCode::EmptyField,
259 message,
260 Some(node_id.to_string()),
261 ));
262 }
263
264 fn require_non_empty(
265 diagnostics: &mut Vec<Diagnostic>,
266 node_id: &str,
267 value: &str,
268 message: &str,
269 ) {
270 if value.is_empty() {
271 emit_empty_field(diagnostics, node_id, message);
272 }
273 }
274
275 fn require_optional_non_empty(
276 diagnostics: &mut Vec<Diagnostic>,
277 node_id: &str,
278 value: Option<&str>,
279 message: &str,
280 ) {
281 if value.is_some_and(str::is_empty) {
282 emit_empty_field(diagnostics, node_id, message);
283 }
284 }
285
286 fn require_non_empty_fields(
287 diagnostics: &mut Vec<Diagnostic>,
288 node_id: &str,
289 fields: &[(&str, &str)],
290 ) {
291 for (value, message) in fields {
292 require_non_empty(diagnostics, node_id, value, message);
293 }
294 }
295
296 fn require_optional_non_empty_fields(
297 diagnostics: &mut Vec<Diagnostic>,
298 node_id: &str,
299 fields: &[(Option<&str>, &str)],
300 ) {
301 for (value, message) in fields {
302 require_optional_non_empty(diagnostics, node_id, *value, message);
303 }
304 }
305
306 fn require_positive_when_present<T>(
307 diagnostics: &mut Vec<Diagnostic>,
308 node_id: &str,
309 value: Option<T>,
310 message: &str,
311 ) where
312 T: Copy + PartialEq + From<u8>,
313 {
314 if value.is_some_and(|limit| limit == T::from(0u8)) {
315 emit_empty_field(diagnostics, node_id, message);
316 }
317 }
318
319 fn require_non_empty_collection<T>(
320 diagnostics: &mut Vec<Diagnostic>,
321 node_id: &str,
322 values: &[T],
323 message: &str,
324 ) {
325 if values.is_empty() {
326 emit_empty_field(diagnostics, node_id, message);
327 }
328 }
329
330 fn require_no_empty_string_entries(
331 diagnostics: &mut Vec<Diagnostic>,
332 node_id: &str,
333 values: &[String],
334 message: &str,
335 ) {
336 if values.iter().any(String::is_empty) {
337 emit_empty_field(diagnostics, node_id, message);
338 }
339 }
340
341 fn require_merge_quorum_matches_policy(
342 diagnostics: &mut Vec<Diagnostic>,
343 node_id: &str,
344 policy: &MergePolicy,
345 quorum: Option<usize>,
346 source_len: usize,
347 ) {
348 match policy {
349 MergePolicy::Quorum => {
350 let invalid_quorum = match quorum {
351 Some(value) => value == 0 || value > source_len,
352 None => true,
353 };
354 if invalid_quorum {
355 emit_empty_field(
356 diagnostics,
357 node_id,
358 "merge.quorum must be between 1 and merge.sources length for quorum policy",
359 );
360 }
361 }
362 _ => {
363 if quorum.is_some() {
364 emit_empty_field(
365 diagnostics,
366 node_id,
367 "merge.quorum is only valid with quorum policy",
368 );
369 }
370 }
371 }
372 }
373
374 match &node.kind {
375 NodeKind::Start { next } => {
376 start_ids.push(node.id.clone());
377 require_non_empty(diagnostics, &node.id, next, "start.next must not be empty");
378 }
379 NodeKind::Llm {
380 model,
381 prompt,
382 next: _,
383 } => {
384 require_non_empty_fields(
385 diagnostics,
386 &node.id,
387 &[
388 (model, "llm.model must not be empty"),
389 (prompt, "llm.prompt must not be empty"),
390 ],
391 );
392 }
393 NodeKind::Tool { tool, .. } => {
394 require_non_empty(diagnostics, &node.id, tool, "tool.tool must not be empty");
395 }
396 NodeKind::Condition {
397 expression,
398 on_true,
399 on_false,
400 } => {
401 require_non_empty_fields(
402 diagnostics,
403 &node.id,
404 &[
405 (expression, "condition.expression must not be empty"),
406 (on_true, "condition.on_true must not be empty"),
407 (on_false, "condition.on_false must not be empty"),
408 ],
409 );
410 }
411 NodeKind::Debounce {
412 key_path,
413 window_steps,
414 next,
415 on_suppressed,
416 } => {
417 require_non_empty_fields(
418 diagnostics,
419 &node.id,
420 &[
421 (key_path, "debounce.key_path must not be empty"),
422 (next, "debounce.next must not be empty"),
423 ],
424 );
425 if *window_steps == 0 {
426 emit_empty_field(
427 diagnostics,
428 &node.id,
429 "debounce.window_steps must be greater than zero",
430 );
431 }
432 require_optional_non_empty_fields(
433 diagnostics,
434 &node.id,
435 &[(
436 on_suppressed.as_deref(),
437 "debounce.on_suppressed must not be empty when provided",
438 )],
439 );
440 }
441 NodeKind::Throttle {
442 key_path,
443 window_steps,
444 next,
445 on_throttled,
446 } => {
447 require_non_empty_fields(
448 diagnostics,
449 &node.id,
450 &[
451 (key_path, "throttle.key_path must not be empty"),
452 (next, "throttle.next must not be empty"),
453 ],
454 );
455 if *window_steps == 0 {
456 emit_empty_field(
457 diagnostics,
458 &node.id,
459 "throttle.window_steps must be greater than zero",
460 );
461 }
462 require_optional_non_empty_fields(
463 diagnostics,
464 &node.id,
465 &[(
466 on_throttled.as_deref(),
467 "throttle.on_throttled must not be empty when provided",
468 )],
469 );
470 }
471 NodeKind::RetryCompensate {
472 tool,
473 input: _,
474 max_retries: _,
475 compensate_tool,
476 compensate_input: _,
477 next,
478 on_compensated,
479 } => {
480 require_non_empty_fields(
481 diagnostics,
482 &node.id,
483 &[
484 (tool, "retry_compensate.tool must not be empty"),
485 (
486 compensate_tool,
487 "retry_compensate.compensate_tool must not be empty",
488 ),
489 (next, "retry_compensate.next must not be empty"),
490 ],
491 );
492 require_optional_non_empty_fields(
493 diagnostics,
494 &node.id,
495 &[(
496 on_compensated.as_deref(),
497 "retry_compensate.on_compensated must not be empty when provided",
498 )],
499 );
500 }
501 NodeKind::HumanInTheLoop {
502 decision_path,
503 response_path,
504 on_approve,
505 on_reject,
506 } => {
507 require_non_empty_fields(
508 diagnostics,
509 &node.id,
510 &[
511 (
512 decision_path,
513 "human_in_the_loop.decision_path must not be empty",
514 ),
515 (on_approve, "human_in_the_loop.on_approve must not be empty"),
516 (on_reject, "human_in_the_loop.on_reject must not be empty"),
517 ],
518 );
519 require_optional_non_empty_fields(
520 diagnostics,
521 &node.id,
522 &[(
523 response_path.as_deref(),
524 "human_in_the_loop.response_path must not be empty when provided",
525 )],
526 );
527 }
528 NodeKind::CacheWrite {
529 key_path,
530 value_path,
531 next,
532 } => {
533 require_non_empty_fields(
534 diagnostics,
535 &node.id,
536 &[
537 (key_path, "cache_write.key_path must not be empty"),
538 (value_path, "cache_write.value_path must not be empty"),
539 (next, "cache_write.next must not be empty"),
540 ],
541 );
542 }
543 NodeKind::CacheRead {
544 key_path,
545 next,
546 on_miss,
547 } => {
548 require_non_empty_fields(
549 diagnostics,
550 &node.id,
551 &[
552 (key_path, "cache_read.key_path must not be empty"),
553 (next, "cache_read.next must not be empty"),
554 ],
555 );
556 require_optional_non_empty_fields(
557 diagnostics,
558 &node.id,
559 &[(
560 on_miss.as_deref(),
561 "cache_read.on_miss must not be empty when provided",
562 )],
563 );
564 }
565 NodeKind::EventTrigger {
566 event,
567 event_path,
568 next,
569 on_mismatch,
570 } => {
571 require_non_empty_fields(
572 diagnostics,
573 &node.id,
574 &[
575 (event, "event_trigger.event must not be empty"),
576 (event_path, "event_trigger.event_path must not be empty"),
577 (next, "event_trigger.next must not be empty"),
578 ],
579 );
580 require_optional_non_empty_fields(
581 diagnostics,
582 &node.id,
583 &[(
584 on_mismatch.as_deref(),
585 "event_trigger.on_mismatch must not be empty when provided",
586 )],
587 );
588 }
589 NodeKind::Router { routes, default } => {
590 require_non_empty_collection(
591 diagnostics,
592 &node.id,
593 routes,
594 "router.routes must contain at least one route",
595 );
596 if routes
597 .iter()
598 .any(|route| route.when.is_empty() || route.next.is_empty())
599 {
600 emit_empty_field(
601 diagnostics,
602 &node.id,
603 "router.routes entries must include non-empty when and next",
604 );
605 }
606 require_non_empty(
607 diagnostics,
608 &node.id,
609 default,
610 "router.default must not be empty",
611 );
612 }
613 NodeKind::Transform { expression, next } => {
614 require_non_empty_fields(
615 diagnostics,
616 &node.id,
617 &[
618 (expression, "transform.expression must not be empty"),
619 (next, "transform.next must not be empty"),
620 ],
621 );
622 }
623 NodeKind::Loop {
624 condition,
625 body,
626 next,
627 max_iterations,
628 } => {
629 require_non_empty_fields(
630 diagnostics,
631 &node.id,
632 &[
633 (condition, "loop.condition must not be empty"),
634 (body, "loop.body must not be empty"),
635 (next, "loop.next must not be empty"),
636 ],
637 );
638 require_positive_when_present(
639 diagnostics,
640 &node.id,
641 *max_iterations,
642 "loop.max_iterations must be greater than zero when provided",
643 );
644 }
645 NodeKind::End => {
646 *end_count += 1;
647 }
648 NodeKind::Subgraph { graph, next } => {
649 require_non_empty_fields(
650 diagnostics,
651 &node.id,
652 &[(graph, "subgraph.graph must not be empty")],
653 );
654 require_optional_non_empty_fields(
655 diagnostics,
656 &node.id,
657 &[(
658 next.as_deref(),
659 "subgraph.next must not be empty when provided",
660 )],
661 );
662 }
663 NodeKind::Batch { items_path, next } => {
664 require_non_empty_fields(
665 diagnostics,
666 &node.id,
667 &[
668 (items_path, "batch.items_path must not be empty"),
669 (next, "batch.next must not be empty"),
670 ],
671 );
672 }
673 NodeKind::Filter {
674 items_path,
675 expression,
676 next,
677 } => {
678 require_non_empty_fields(
679 diagnostics,
680 &node.id,
681 &[
682 (items_path, "filter.items_path must not be empty"),
683 (expression, "filter.expression must not be empty"),
684 (next, "filter.next must not be empty"),
685 ],
686 );
687 }
688 NodeKind::Parallel {
689 branches,
690 next,
691 max_in_flight,
692 } => {
693 require_non_empty_collection(
694 diagnostics,
695 &node.id,
696 branches,
697 "parallel.branches must contain at least one node id",
698 );
699 require_no_empty_string_entries(
700 diagnostics,
701 &node.id,
702 branches,
703 "parallel.branches must not contain empty node ids",
704 );
705 require_non_empty_fields(
706 diagnostics,
707 &node.id,
708 &[(next, "parallel.next must not be empty")],
709 );
710 require_positive_when_present(
711 diagnostics,
712 &node.id,
713 *max_in_flight,
714 "parallel.max_in_flight must be greater than zero when provided",
715 );
716 }
717 NodeKind::Merge {
718 sources,
719 policy,
720 quorum,
721 next,
722 } => {
723 require_non_empty_collection(
724 diagnostics,
725 &node.id,
726 sources,
727 "merge.sources must contain at least one node id",
728 );
729 require_no_empty_string_entries(
730 diagnostics,
731 &node.id,
732 sources,
733 "merge.sources must not contain empty node ids",
734 );
735 require_non_empty(diagnostics, &node.id, next, "merge.next must not be empty");
736 require_merge_quorum_matches_policy(
737 diagnostics,
738 &node.id,
739 policy,
740 *quorum,
741 sources.len(),
742 );
743 }
744 NodeKind::Map {
745 tool,
746 items_path,
747 next,
748 max_in_flight,
749 } => {
750 require_non_empty_fields(
751 diagnostics,
752 &node.id,
753 &[
754 (tool, "map.tool must not be empty"),
755 (items_path, "map.items_path must not be empty"),
756 (next, "map.next must not be empty"),
757 ],
758 );
759 require_positive_when_present(
760 diagnostics,
761 &node.id,
762 *max_in_flight,
763 "map.max_in_flight must be greater than zero when provided",
764 );
765 }
766 NodeKind::Reduce {
767 source,
768 operation: _,
769 next,
770 } => {
771 require_non_empty_fields(
772 diagnostics,
773 &node.id,
774 &[
775 (source, "reduce.source must not be empty"),
776 (next, "reduce.next must not be empty"),
777 ],
778 );
779 }
780 }
781}
782
783fn reachable_nodes<'a>(
784 start_id: &'a str,
785 node_index: &HashMap<&'a str, &'a crate::ir::Node>,
786) -> HashSet<&'a str> {
787 let mut visited = HashSet::new();
788 let mut queue = VecDeque::from([start_id]);
789
790 while let Some(current) = queue.pop_front() {
791 if !visited.insert(current) {
792 continue;
793 }
794
795 if let Some(node) = node_index.get(current) {
796 for edge in node.outgoing_edges() {
797 if node_index.contains_key(edge) {
798 queue.push_back(edge);
799 }
800 }
801 }
802 }
803
804 visited
805}
806
807#[cfg(test)]
808mod tests {
809 use proptest::prelude::*;
810 use serde_json::json;
811
812 use crate::ir::{MergePolicy, Node, NodeKind, ReduceOperation, WorkflowDefinition};
813 use crate::validation::{validate_and_normalize, DiagnosticCode};
814
815 fn valid_workflow() -> WorkflowDefinition {
816 WorkflowDefinition {
817 version: "v0".to_string(),
818 name: "basic".to_string(),
819 nodes: vec![
820 Node {
821 id: "start".to_string(),
822 kind: NodeKind::Start {
823 next: "llm".to_string(),
824 },
825 },
826 Node {
827 id: "llm".to_string(),
828 kind: NodeKind::Llm {
829 model: "gpt-4".to_string(),
830 prompt: "Say hi".to_string(),
831 next: Some("tool".to_string()),
832 },
833 },
834 Node {
835 id: "tool".to_string(),
836 kind: NodeKind::Tool {
837 tool: "validator".to_string(),
838 input: json!({"strict": true}),
839 next: Some("end".to_string()),
840 },
841 },
842 Node {
843 id: "end".to_string(),
844 kind: NodeKind::End,
845 },
846 ],
847 }
848 }
849
850 #[test]
851 fn validates_and_normalizes_valid_workflow() {
852 let workflow = valid_workflow();
853 let normalized = validate_and_normalize(&workflow).expect("workflow should validate");
854
855 assert_eq!(normalized.nodes.first().map(|n| n.id.as_str()), Some("end"));
856 assert_eq!(normalized.nodes.last().map(|n| n.id.as_str()), Some("tool"));
857 }
858
859 #[test]
860 fn reports_unknown_target() {
861 let mut workflow = valid_workflow();
862 workflow.nodes[0].kind = NodeKind::Start {
863 next: "missing".to_string(),
864 };
865
866 let err = validate_and_normalize(&workflow).expect_err("should fail validation");
867 assert!(err
868 .diagnostics
869 .iter()
870 .any(|d| d.code == DiagnosticCode::UnknownTarget));
871 }
872
873 #[test]
874 fn reports_unreachable_node() {
875 let mut workflow = valid_workflow();
876 workflow.nodes.push(Node {
877 id: "orphan".to_string(),
878 kind: NodeKind::End,
879 });
880
881 let err = validate_and_normalize(&workflow).expect_err("should fail validation");
882 assert!(err
883 .diagnostics
884 .iter()
885 .any(|d| d.code == DiagnosticCode::UnreachableNode
886 && d.node_id.as_deref() == Some("orphan")));
887 }
888
889 #[test]
890 fn reports_duplicate_node_id() {
891 let mut workflow = valid_workflow();
892 workflow.nodes.push(Node {
893 id: "llm".to_string(),
894 kind: NodeKind::End,
895 });
896
897 let err = validate_and_normalize(&workflow).expect_err("should fail validation");
898 assert!(err
899 .diagnostics
900 .iter()
901 .any(|d| d.code == DiagnosticCode::DuplicateNodeId));
902 }
903
904 #[test]
905 fn reports_no_path_to_end() {
906 let workflow = WorkflowDefinition {
907 version: "v0".to_string(),
908 name: "no-end-path".to_string(),
909 nodes: vec![
910 Node {
911 id: "start".to_string(),
912 kind: NodeKind::Start {
913 next: "llm".to_string(),
914 },
915 },
916 Node {
917 id: "llm".to_string(),
918 kind: NodeKind::Llm {
919 model: "gpt-4".to_string(),
920 prompt: "test".to_string(),
921 next: None,
922 },
923 },
924 Node {
925 id: "end".to_string(),
926 kind: NodeKind::End,
927 },
928 ],
929 };
930
931 let err = validate_and_normalize(&workflow).expect_err("should fail validation");
932 assert!(err
933 .diagnostics
934 .iter()
935 .any(|d| d.code == DiagnosticCode::NoPathToEnd));
936 }
937
938 #[test]
939 fn reports_invalid_loop_configuration() {
940 let workflow = WorkflowDefinition {
941 version: "v0".to_string(),
942 name: "bad-loop".to_string(),
943 nodes: vec![
944 Node {
945 id: "start".to_string(),
946 kind: NodeKind::Start {
947 next: "loop".to_string(),
948 },
949 },
950 Node {
951 id: "loop".to_string(),
952 kind: NodeKind::Loop {
953 condition: "".to_string(),
954 body: "".to_string(),
955 next: "end".to_string(),
956 max_iterations: Some(0),
957 },
958 },
959 Node {
960 id: "end".to_string(),
961 kind: NodeKind::End,
962 },
963 ],
964 };
965
966 let err = validate_and_normalize(&workflow).expect_err("loop validation should fail");
967 assert!(err
968 .diagnostics
969 .iter()
970 .any(|d| d.code == DiagnosticCode::EmptyField));
971 }
972
973 #[test]
974 fn reports_invalid_merge_quorum_configuration() {
975 let workflow = WorkflowDefinition {
976 version: "v0".to_string(),
977 name: "bad-merge".to_string(),
978 nodes: vec![
979 Node {
980 id: "start".to_string(),
981 kind: NodeKind::Start {
982 next: "merge".to_string(),
983 },
984 },
985 Node {
986 id: "source".to_string(),
987 kind: NodeKind::Tool {
988 tool: "echo".to_string(),
989 input: json!({}),
990 next: Some("end".to_string()),
991 },
992 },
993 Node {
994 id: "merge".to_string(),
995 kind: NodeKind::Merge {
996 sources: vec!["source".to_string()],
997 policy: MergePolicy::Quorum,
998 quorum: Some(2),
999 next: "end".to_string(),
1000 },
1001 },
1002 Node {
1003 id: "end".to_string(),
1004 kind: NodeKind::End,
1005 },
1006 ],
1007 };
1008
1009 let err = validate_and_normalize(&workflow).expect_err("merge quorum should fail");
1010 assert!(
1011 err.diagnostics
1012 .iter()
1013 .any(|d| d.code == DiagnosticCode::EmptyField
1014 && d.node_id.as_deref() == Some("merge"))
1015 );
1016 }
1017
1018 #[test]
1019 fn reports_unknown_reduce_source() {
1020 let workflow = WorkflowDefinition {
1021 version: "v0".to_string(),
1022 name: "bad-reduce".to_string(),
1023 nodes: vec![
1024 Node {
1025 id: "start".to_string(),
1026 kind: NodeKind::Start {
1027 next: "reduce".to_string(),
1028 },
1029 },
1030 Node {
1031 id: "reduce".to_string(),
1032 kind: NodeKind::Reduce {
1033 source: "missing".to_string(),
1034 operation: ReduceOperation::Count,
1035 next: "end".to_string(),
1036 },
1037 },
1038 Node {
1039 id: "end".to_string(),
1040 kind: NodeKind::End,
1041 },
1042 ],
1043 };
1044
1045 let err = validate_and_normalize(&workflow).expect_err("reduce source should fail");
1046 assert!(err
1047 .diagnostics
1048 .iter()
1049 .any(|d| d.code == DiagnosticCode::UnknownTarget
1050 && d.node_id.as_deref() == Some("reduce")));
1051 }
1052
1053 #[test]
1054 fn reports_invalid_extended_node_configuration() {
1055 let workflow = WorkflowDefinition {
1056 version: "v0".to_string(),
1057 name: "invalid-extended".to_string(),
1058 nodes: vec![
1059 Node {
1060 id: "start".to_string(),
1061 kind: NodeKind::Start {
1062 next: "debounce".to_string(),
1063 },
1064 },
1065 Node {
1066 id: "debounce".to_string(),
1067 kind: NodeKind::Debounce {
1068 key_path: "".to_string(),
1069 window_steps: 0,
1070 next: "router".to_string(),
1071 on_suppressed: None,
1072 },
1073 },
1074 Node {
1075 id: "router".to_string(),
1076 kind: NodeKind::Router {
1077 routes: vec![],
1078 default: "".to_string(),
1079 },
1080 },
1081 Node {
1082 id: "transform".to_string(),
1083 kind: NodeKind::Transform {
1084 expression: "".to_string(),
1085 next: "end".to_string(),
1086 },
1087 },
1088 Node {
1089 id: "end".to_string(),
1090 kind: NodeKind::End,
1091 },
1092 ],
1093 };
1094
1095 let err =
1096 validate_and_normalize(&workflow).expect_err("extended node validation should fail");
1097 assert!(err
1098 .diagnostics
1099 .iter()
1100 .any(|d| d.code == DiagnosticCode::EmptyField));
1101 }
1102
1103 #[test]
1104 fn reports_multiple_empty_field_diagnostics_for_parallel_node() {
1105 let workflow = WorkflowDefinition {
1106 version: "v0".to_string(),
1107 name: "bad-parallel".to_string(),
1108 nodes: vec![
1109 Node {
1110 id: "start".to_string(),
1111 kind: NodeKind::Start {
1112 next: "parallel".to_string(),
1113 },
1114 },
1115 Node {
1116 id: "parallel".to_string(),
1117 kind: NodeKind::Parallel {
1118 branches: vec!["".to_string()],
1119 next: "".to_string(),
1120 max_in_flight: Some(0),
1121 },
1122 },
1123 Node {
1124 id: "end".to_string(),
1125 kind: NodeKind::End,
1126 },
1127 ],
1128 };
1129
1130 let err = validate_and_normalize(&workflow).expect_err("parallel validation should fail");
1131
1132 assert!(err.diagnostics.iter().any(|d| d.message
1133 == "parallel.branches must not contain empty node ids"
1134 && d.node_id.as_deref() == Some("parallel")));
1135 assert!(err
1136 .diagnostics
1137 .iter()
1138 .any(|d| d.message == "parallel.next must not be empty"
1139 && d.node_id.as_deref() == Some("parallel")));
1140 assert!(err.diagnostics.iter().any(|d| d.message
1141 == "parallel.max_in_flight must be greater than zero when provided"
1142 && d.node_id.as_deref() == Some("parallel")));
1143 }
1144
1145 #[test]
1146 fn reports_merge_source_unknown_and_quorum_policy_errors_together() {
1147 let workflow = WorkflowDefinition {
1148 version: "v0".to_string(),
1149 name: "bad-merge-sources".to_string(),
1150 nodes: vec![
1151 Node {
1152 id: "start".to_string(),
1153 kind: NodeKind::Start {
1154 next: "merge".to_string(),
1155 },
1156 },
1157 Node {
1158 id: "merge".to_string(),
1159 kind: NodeKind::Merge {
1160 sources: vec!["missing".to_string()],
1161 policy: MergePolicy::Quorum,
1162 quorum: Some(0),
1163 next: "end".to_string(),
1164 },
1165 },
1166 Node {
1167 id: "end".to_string(),
1168 kind: NodeKind::End,
1169 },
1170 ],
1171 };
1172
1173 let err = validate_and_normalize(&workflow).expect_err("merge validation should fail");
1174
1175 assert!(err.diagnostics.iter().any(|d| {
1176 d.code == DiagnosticCode::UnknownTarget
1177 && d.node_id.as_deref() == Some("merge")
1178 && d.message.contains("unknown source 'missing'")
1179 }));
1180
1181 assert!(err.diagnostics.iter().any(|d| {
1182 d.code == DiagnosticCode::EmptyField
1183 && d.node_id.as_deref() == Some("merge")
1184 && d.message
1185 == "merge.quorum must be between 1 and merge.sources length for quorum policy"
1186 }));
1187 }
1188
1189 proptest! {
1190 #[test]
1191 fn validate_and_normalize_never_panics(name in ".*", version in ".*") {
1192 let workflow = WorkflowDefinition {
1193 version,
1194 name,
1195 nodes: vec![
1196 Node {
1197 id: "start".to_string(),
1198 kind: NodeKind::Start { next: "end".to_string() },
1199 },
1200 Node {
1201 id: "end".to_string(),
1202 kind: NodeKind::End,
1203 },
1204 ],
1205 };
1206
1207 let _ = validate_and_normalize(&workflow);
1208 }
1209 }
1210}