1use std::collections::{BTreeSet, HashMap, HashSet, VecDeque};
2
3use thiserror::Error;
4
5use crate::ir::{MergePolicy, NodeKind, WorkflowDefinition, WORKFLOW_IR_V0};
6
7pub fn validate_and_normalize(
9 input: &WorkflowDefinition,
10) -> Result<WorkflowDefinition, ValidationErrors> {
11 let normalized = input.normalized();
12 let diagnostics = validate(&normalized);
13
14 if diagnostics.is_empty() {
15 Ok(normalized)
16 } else {
17 Err(ValidationErrors { diagnostics })
18 }
19}
20
21#[derive(Debug, Clone, Copy, PartialEq, Eq)]
23pub enum Severity {
24 Error,
26}
27
28#[derive(Debug, Clone, Copy, PartialEq, Eq)]
30pub enum DiagnosticCode {
31 UnsupportedVersion,
33 EmptyWorkflowName,
35 EmptyWorkflow,
37 DuplicateNodeId,
39 EmptyNodeId,
41 UnknownTarget,
43 MissingStart,
45 MultipleStart,
47 MissingEnd,
49 UnreachableNode,
51 NoPathToEnd,
53 EmptyField,
55}
56
57#[derive(Debug, Clone, PartialEq, Eq)]
59pub struct Diagnostic {
60 pub severity: Severity,
62 pub code: DiagnosticCode,
64 pub message: String,
66 pub node_id: Option<String>,
68}
69
70impl Diagnostic {
71 fn error(code: DiagnosticCode, message: impl Into<String>, node_id: Option<String>) -> Self {
72 Self {
73 severity: Severity::Error,
74 code,
75 message: message.into(),
76 node_id,
77 }
78 }
79}
80
81#[derive(Debug, Clone, PartialEq, Eq, Error)]
83#[error("workflow validation failed")]
84pub struct ValidationErrors {
85 pub diagnostics: Vec<Diagnostic>,
87}
88
89pub type ValidationError = ValidationErrors;
91
92fn validate(workflow: &WorkflowDefinition) -> Vec<Diagnostic> {
93 let mut diagnostics = Vec::new();
94
95 if workflow.version != WORKFLOW_IR_V0 {
96 diagnostics.push(Diagnostic::error(
97 DiagnosticCode::UnsupportedVersion,
98 format!(
99 "unsupported workflow IR version '{}'; expected '{}'",
100 workflow.version, WORKFLOW_IR_V0
101 ),
102 None,
103 ));
104 }
105
106 if workflow.name.is_empty() {
107 diagnostics.push(Diagnostic::error(
108 DiagnosticCode::EmptyWorkflowName,
109 "workflow name must not be empty",
110 None,
111 ));
112 }
113
114 if workflow.nodes.is_empty() {
115 diagnostics.push(Diagnostic::error(
116 DiagnosticCode::EmptyWorkflow,
117 "workflow must contain at least one node",
118 None,
119 ));
120 return diagnostics;
121 }
122
123 let mut node_index = HashMap::with_capacity(workflow.nodes.len());
124 let mut duplicates = BTreeSet::new();
125 let mut start_ids = Vec::new();
126 let mut end_count = 0usize;
127
128 for node in &workflow.nodes {
129 if node.id.is_empty() {
130 diagnostics.push(Diagnostic::error(
131 DiagnosticCode::EmptyNodeId,
132 "node id must not be empty",
133 Some(node.id.clone()),
134 ));
135 }
136
137 if let Some(previous_id) = node_index.insert(node.id.as_str(), node) {
138 duplicates.insert(previous_id.id.clone());
139 duplicates.insert(node.id.clone());
140 }
141 validate_node_kind_fields(node, &mut diagnostics, &mut start_ids, &mut end_count);
142 }
143
144 for node in &workflow.nodes {
145 match &node.kind {
146 NodeKind::Merge { sources, .. } => {
147 for source in sources {
148 if !node_index.contains_key(source.as_str()) {
149 diagnostics.push(Diagnostic::error(
150 DiagnosticCode::UnknownTarget,
151 format!("node '{}' references unknown source '{}'", node.id, source),
152 Some(node.id.clone()),
153 ));
154 }
155 }
156 }
157 NodeKind::Reduce { source, .. } => {
158 if !node_index.contains_key(source.as_str()) {
159 diagnostics.push(Diagnostic::error(
160 DiagnosticCode::UnknownTarget,
161 format!("node '{}' references unknown source '{}'", node.id, source),
162 Some(node.id.clone()),
163 ));
164 }
165 }
166 _ => {}
167 }
168 }
169
170 for duplicate_id in duplicates {
171 diagnostics.push(Diagnostic::error(
172 DiagnosticCode::DuplicateNodeId,
173 format!("duplicate node id '{}'", duplicate_id),
174 Some(duplicate_id),
175 ));
176 }
177
178 if start_ids.is_empty() {
179 diagnostics.push(Diagnostic::error(
180 DiagnosticCode::MissingStart,
181 "workflow must contain exactly one start node",
182 None,
183 ));
184 } else if start_ids.len() > 1 {
185 diagnostics.push(Diagnostic::error(
186 DiagnosticCode::MultipleStart,
187 format!(
188 "workflow must contain exactly one start node, found {}",
189 start_ids.len()
190 ),
191 None,
192 ));
193 }
194
195 if end_count == 0 {
196 diagnostics.push(Diagnostic::error(
197 DiagnosticCode::MissingEnd,
198 "workflow must contain at least one end node",
199 None,
200 ));
201 }
202
203 for node in &workflow.nodes {
204 for edge in node.outgoing_edges() {
205 if !node_index.contains_key(edge) {
206 diagnostics.push(Diagnostic::error(
207 DiagnosticCode::UnknownTarget,
208 format!("node '{}' references unknown target '{}'", node.id, edge),
209 Some(node.id.clone()),
210 ));
211 }
212 }
213 }
214
215 if start_ids.len() == 1 {
216 let start_id = start_ids[0].as_str();
217 let reachable = reachable_nodes(start_id, &node_index);
218
219 for node in &workflow.nodes {
220 if !reachable.contains(node.id.as_str()) {
221 diagnostics.push(Diagnostic::error(
222 DiagnosticCode::UnreachableNode,
223 format!(
224 "node '{}' is unreachable from start node '{}'",
225 node.id, start_id
226 ),
227 Some(node.id.clone()),
228 ));
229 }
230 }
231
232 let has_path_to_end = reachable.iter().any(|id| {
233 node_index
234 .get(*id)
235 .is_some_and(|node| matches!(node.kind, NodeKind::End))
236 });
237
238 if !has_path_to_end {
239 diagnostics.push(Diagnostic::error(
240 DiagnosticCode::NoPathToEnd,
241 format!("start node '{}' cannot reach any end node", start_id),
242 Some(start_id.to_string()),
243 ));
244 }
245 }
246
247 diagnostics
248}
249
250fn validate_node_kind_fields(
251 node: &crate::ir::Node,
252 diagnostics: &mut Vec<Diagnostic>,
253 start_ids: &mut Vec<String>,
254 end_count: &mut usize,
255) {
256 match &node.kind {
257 NodeKind::Start { next } => {
258 start_ids.push(node.id.clone());
259 if next.is_empty() {
260 diagnostics.push(Diagnostic::error(
261 DiagnosticCode::EmptyField,
262 "start.next must not be empty",
263 Some(node.id.clone()),
264 ));
265 }
266 }
267 NodeKind::Llm {
268 model,
269 prompt,
270 next: _,
271 } => {
272 if model.is_empty() {
273 diagnostics.push(Diagnostic::error(
274 DiagnosticCode::EmptyField,
275 "llm.model must not be empty",
276 Some(node.id.clone()),
277 ));
278 }
279 if prompt.is_empty() {
280 diagnostics.push(Diagnostic::error(
281 DiagnosticCode::EmptyField,
282 "llm.prompt must not be empty",
283 Some(node.id.clone()),
284 ));
285 }
286 }
287 NodeKind::Tool { tool, .. } => {
288 if tool.is_empty() {
289 diagnostics.push(Diagnostic::error(
290 DiagnosticCode::EmptyField,
291 "tool.tool must not be empty",
292 Some(node.id.clone()),
293 ));
294 }
295 }
296 NodeKind::Condition {
297 expression,
298 on_true,
299 on_false,
300 } => {
301 if expression.is_empty() {
302 diagnostics.push(Diagnostic::error(
303 DiagnosticCode::EmptyField,
304 "condition.expression must not be empty",
305 Some(node.id.clone()),
306 ));
307 }
308 if on_true.is_empty() {
309 diagnostics.push(Diagnostic::error(
310 DiagnosticCode::EmptyField,
311 "condition.on_true must not be empty",
312 Some(node.id.clone()),
313 ));
314 }
315 if on_false.is_empty() {
316 diagnostics.push(Diagnostic::error(
317 DiagnosticCode::EmptyField,
318 "condition.on_false must not be empty",
319 Some(node.id.clone()),
320 ));
321 }
322 }
323 NodeKind::Debounce {
324 key_path,
325 window_steps,
326 next,
327 on_suppressed,
328 } => {
329 if key_path.is_empty() {
330 diagnostics.push(Diagnostic::error(
331 DiagnosticCode::EmptyField,
332 "debounce.key_path must not be empty",
333 Some(node.id.clone()),
334 ));
335 }
336 if *window_steps == 0 {
337 diagnostics.push(Diagnostic::error(
338 DiagnosticCode::EmptyField,
339 "debounce.window_steps must be greater than zero",
340 Some(node.id.clone()),
341 ));
342 }
343 if next.is_empty() {
344 diagnostics.push(Diagnostic::error(
345 DiagnosticCode::EmptyField,
346 "debounce.next must not be empty",
347 Some(node.id.clone()),
348 ));
349 }
350 if on_suppressed.as_ref().is_some_and(String::is_empty) {
351 diagnostics.push(Diagnostic::error(
352 DiagnosticCode::EmptyField,
353 "debounce.on_suppressed must not be empty when provided",
354 Some(node.id.clone()),
355 ));
356 }
357 }
358 NodeKind::Throttle {
359 key_path,
360 window_steps,
361 next,
362 on_throttled,
363 } => {
364 if key_path.is_empty() {
365 diagnostics.push(Diagnostic::error(
366 DiagnosticCode::EmptyField,
367 "throttle.key_path must not be empty",
368 Some(node.id.clone()),
369 ));
370 }
371 if *window_steps == 0 {
372 diagnostics.push(Diagnostic::error(
373 DiagnosticCode::EmptyField,
374 "throttle.window_steps must be greater than zero",
375 Some(node.id.clone()),
376 ));
377 }
378 if next.is_empty() {
379 diagnostics.push(Diagnostic::error(
380 DiagnosticCode::EmptyField,
381 "throttle.next must not be empty",
382 Some(node.id.clone()),
383 ));
384 }
385 if on_throttled.as_ref().is_some_and(String::is_empty) {
386 diagnostics.push(Diagnostic::error(
387 DiagnosticCode::EmptyField,
388 "throttle.on_throttled must not be empty when provided",
389 Some(node.id.clone()),
390 ));
391 }
392 }
393 NodeKind::RetryCompensate {
394 tool,
395 input: _,
396 max_retries: _,
397 compensate_tool,
398 compensate_input: _,
399 next,
400 on_compensated,
401 } => {
402 if tool.is_empty() {
403 diagnostics.push(Diagnostic::error(
404 DiagnosticCode::EmptyField,
405 "retry_compensate.tool must not be empty",
406 Some(node.id.clone()),
407 ));
408 }
409 if compensate_tool.is_empty() {
410 diagnostics.push(Diagnostic::error(
411 DiagnosticCode::EmptyField,
412 "retry_compensate.compensate_tool must not be empty",
413 Some(node.id.clone()),
414 ));
415 }
416 if next.is_empty() {
417 diagnostics.push(Diagnostic::error(
418 DiagnosticCode::EmptyField,
419 "retry_compensate.next must not be empty",
420 Some(node.id.clone()),
421 ));
422 }
423 if on_compensated.as_ref().is_some_and(String::is_empty) {
424 diagnostics.push(Diagnostic::error(
425 DiagnosticCode::EmptyField,
426 "retry_compensate.on_compensated must not be empty when provided",
427 Some(node.id.clone()),
428 ));
429 }
430 }
431 NodeKind::HumanInTheLoop {
432 decision_path,
433 response_path,
434 on_approve,
435 on_reject,
436 } => {
437 if decision_path.is_empty() {
438 diagnostics.push(Diagnostic::error(
439 DiagnosticCode::EmptyField,
440 "human_in_the_loop.decision_path must not be empty",
441 Some(node.id.clone()),
442 ));
443 }
444 if response_path.as_ref().is_some_and(String::is_empty) {
445 diagnostics.push(Diagnostic::error(
446 DiagnosticCode::EmptyField,
447 "human_in_the_loop.response_path must not be empty when provided",
448 Some(node.id.clone()),
449 ));
450 }
451 if on_approve.is_empty() {
452 diagnostics.push(Diagnostic::error(
453 DiagnosticCode::EmptyField,
454 "human_in_the_loop.on_approve must not be empty",
455 Some(node.id.clone()),
456 ));
457 }
458 if on_reject.is_empty() {
459 diagnostics.push(Diagnostic::error(
460 DiagnosticCode::EmptyField,
461 "human_in_the_loop.on_reject must not be empty",
462 Some(node.id.clone()),
463 ));
464 }
465 }
466 NodeKind::CacheWrite {
467 key_path,
468 value_path,
469 next,
470 } => {
471 if key_path.is_empty() {
472 diagnostics.push(Diagnostic::error(
473 DiagnosticCode::EmptyField,
474 "cache_write.key_path must not be empty",
475 Some(node.id.clone()),
476 ));
477 }
478 if value_path.is_empty() {
479 diagnostics.push(Diagnostic::error(
480 DiagnosticCode::EmptyField,
481 "cache_write.value_path must not be empty",
482 Some(node.id.clone()),
483 ));
484 }
485 if next.is_empty() {
486 diagnostics.push(Diagnostic::error(
487 DiagnosticCode::EmptyField,
488 "cache_write.next must not be empty",
489 Some(node.id.clone()),
490 ));
491 }
492 }
493 NodeKind::CacheRead {
494 key_path,
495 next,
496 on_miss,
497 } => {
498 if key_path.is_empty() {
499 diagnostics.push(Diagnostic::error(
500 DiagnosticCode::EmptyField,
501 "cache_read.key_path must not be empty",
502 Some(node.id.clone()),
503 ));
504 }
505 if next.is_empty() {
506 diagnostics.push(Diagnostic::error(
507 DiagnosticCode::EmptyField,
508 "cache_read.next must not be empty",
509 Some(node.id.clone()),
510 ));
511 }
512 if on_miss.as_ref().is_some_and(String::is_empty) {
513 diagnostics.push(Diagnostic::error(
514 DiagnosticCode::EmptyField,
515 "cache_read.on_miss must not be empty when provided",
516 Some(node.id.clone()),
517 ));
518 }
519 }
520 NodeKind::EventTrigger {
521 event,
522 event_path,
523 next,
524 on_mismatch,
525 } => {
526 if event.is_empty() {
527 diagnostics.push(Diagnostic::error(
528 DiagnosticCode::EmptyField,
529 "event_trigger.event must not be empty",
530 Some(node.id.clone()),
531 ));
532 }
533 if event_path.is_empty() {
534 diagnostics.push(Diagnostic::error(
535 DiagnosticCode::EmptyField,
536 "event_trigger.event_path must not be empty",
537 Some(node.id.clone()),
538 ));
539 }
540 if next.is_empty() {
541 diagnostics.push(Diagnostic::error(
542 DiagnosticCode::EmptyField,
543 "event_trigger.next must not be empty",
544 Some(node.id.clone()),
545 ));
546 }
547 if on_mismatch.as_ref().is_some_and(String::is_empty) {
548 diagnostics.push(Diagnostic::error(
549 DiagnosticCode::EmptyField,
550 "event_trigger.on_mismatch must not be empty when provided",
551 Some(node.id.clone()),
552 ));
553 }
554 }
555 NodeKind::Router { routes, default } => {
556 if routes.is_empty() {
557 diagnostics.push(Diagnostic::error(
558 DiagnosticCode::EmptyField,
559 "router.routes must contain at least one route",
560 Some(node.id.clone()),
561 ));
562 }
563 if routes
564 .iter()
565 .any(|route| route.when.is_empty() || route.next.is_empty())
566 {
567 diagnostics.push(Diagnostic::error(
568 DiagnosticCode::EmptyField,
569 "router.routes entries must include non-empty when and next",
570 Some(node.id.clone()),
571 ));
572 }
573 if default.is_empty() {
574 diagnostics.push(Diagnostic::error(
575 DiagnosticCode::EmptyField,
576 "router.default must not be empty",
577 Some(node.id.clone()),
578 ));
579 }
580 }
581 NodeKind::Transform { expression, next } => {
582 if expression.is_empty() {
583 diagnostics.push(Diagnostic::error(
584 DiagnosticCode::EmptyField,
585 "transform.expression must not be empty",
586 Some(node.id.clone()),
587 ));
588 }
589 if next.is_empty() {
590 diagnostics.push(Diagnostic::error(
591 DiagnosticCode::EmptyField,
592 "transform.next must not be empty",
593 Some(node.id.clone()),
594 ));
595 }
596 }
597 NodeKind::Loop {
598 condition,
599 body,
600 next,
601 max_iterations,
602 } => {
603 if condition.is_empty() {
604 diagnostics.push(Diagnostic::error(
605 DiagnosticCode::EmptyField,
606 "loop.condition must not be empty",
607 Some(node.id.clone()),
608 ));
609 }
610 if body.is_empty() {
611 diagnostics.push(Diagnostic::error(
612 DiagnosticCode::EmptyField,
613 "loop.body must not be empty",
614 Some(node.id.clone()),
615 ));
616 }
617 if next.is_empty() {
618 diagnostics.push(Diagnostic::error(
619 DiagnosticCode::EmptyField,
620 "loop.next must not be empty",
621 Some(node.id.clone()),
622 ));
623 }
624 if max_iterations.is_some_and(|limit| limit == 0) {
625 diagnostics.push(Diagnostic::error(
626 DiagnosticCode::EmptyField,
627 "loop.max_iterations must be greater than zero when provided",
628 Some(node.id.clone()),
629 ));
630 }
631 }
632 NodeKind::End => {
633 *end_count += 1;
634 }
635 NodeKind::Subgraph { graph, next } => {
636 if graph.is_empty() {
637 diagnostics.push(Diagnostic::error(
638 DiagnosticCode::EmptyField,
639 "subgraph.graph must not be empty",
640 Some(node.id.clone()),
641 ));
642 }
643 if next.as_ref().is_some_and(String::is_empty) {
644 diagnostics.push(Diagnostic::error(
645 DiagnosticCode::EmptyField,
646 "subgraph.next must not be empty when provided",
647 Some(node.id.clone()),
648 ));
649 }
650 }
651 NodeKind::Batch { items_path, next } => {
652 if items_path.is_empty() {
653 diagnostics.push(Diagnostic::error(
654 DiagnosticCode::EmptyField,
655 "batch.items_path must not be empty",
656 Some(node.id.clone()),
657 ));
658 }
659 if next.is_empty() {
660 diagnostics.push(Diagnostic::error(
661 DiagnosticCode::EmptyField,
662 "batch.next must not be empty",
663 Some(node.id.clone()),
664 ));
665 }
666 }
667 NodeKind::Filter {
668 items_path,
669 expression,
670 next,
671 } => {
672 if items_path.is_empty() {
673 diagnostics.push(Diagnostic::error(
674 DiagnosticCode::EmptyField,
675 "filter.items_path must not be empty",
676 Some(node.id.clone()),
677 ));
678 }
679 if expression.is_empty() {
680 diagnostics.push(Diagnostic::error(
681 DiagnosticCode::EmptyField,
682 "filter.expression must not be empty",
683 Some(node.id.clone()),
684 ));
685 }
686 if next.is_empty() {
687 diagnostics.push(Diagnostic::error(
688 DiagnosticCode::EmptyField,
689 "filter.next must not be empty",
690 Some(node.id.clone()),
691 ));
692 }
693 }
694 NodeKind::Parallel {
695 branches,
696 next,
697 max_in_flight,
698 } => {
699 if branches.is_empty() {
700 diagnostics.push(Diagnostic::error(
701 DiagnosticCode::EmptyField,
702 "parallel.branches must contain at least one node id",
703 Some(node.id.clone()),
704 ));
705 }
706 if branches.iter().any(String::is_empty) {
707 diagnostics.push(Diagnostic::error(
708 DiagnosticCode::EmptyField,
709 "parallel.branches must not contain empty node ids",
710 Some(node.id.clone()),
711 ));
712 }
713 if next.is_empty() {
714 diagnostics.push(Diagnostic::error(
715 DiagnosticCode::EmptyField,
716 "parallel.next must not be empty",
717 Some(node.id.clone()),
718 ));
719 }
720 if max_in_flight.is_some_and(|limit| limit == 0) {
721 diagnostics.push(Diagnostic::error(
722 DiagnosticCode::EmptyField,
723 "parallel.max_in_flight must be greater than zero when provided",
724 Some(node.id.clone()),
725 ));
726 }
727 }
728 NodeKind::Merge {
729 sources,
730 policy,
731 quorum,
732 next,
733 } => {
734 if sources.is_empty() {
735 diagnostics.push(Diagnostic::error(
736 DiagnosticCode::EmptyField,
737 "merge.sources must contain at least one node id",
738 Some(node.id.clone()),
739 ));
740 }
741 if sources.iter().any(String::is_empty) {
742 diagnostics.push(Diagnostic::error(
743 DiagnosticCode::EmptyField,
744 "merge.sources must not contain empty node ids",
745 Some(node.id.clone()),
746 ));
747 }
748 if next.is_empty() {
749 diagnostics.push(Diagnostic::error(
750 DiagnosticCode::EmptyField,
751 "merge.next must not be empty",
752 Some(node.id.clone()),
753 ));
754 }
755 match policy {
756 MergePolicy::Quorum => {
757 let invalid_quorum = match quorum {
758 Some(value) => *value == 0 || *value > sources.len(),
759 None => true,
760 };
761 if invalid_quorum {
762 diagnostics.push(Diagnostic::error(
763 DiagnosticCode::EmptyField,
764 "merge.quorum must be between 1 and merge.sources length for quorum policy",
765 Some(node.id.clone()),
766 ));
767 }
768 }
769 _ => {
770 if quorum.is_some() {
771 diagnostics.push(Diagnostic::error(
772 DiagnosticCode::EmptyField,
773 "merge.quorum is only valid with quorum policy",
774 Some(node.id.clone()),
775 ));
776 }
777 }
778 }
779 }
780 NodeKind::Map {
781 tool,
782 items_path,
783 next,
784 max_in_flight,
785 } => {
786 if tool.is_empty() {
787 diagnostics.push(Diagnostic::error(
788 DiagnosticCode::EmptyField,
789 "map.tool must not be empty",
790 Some(node.id.clone()),
791 ));
792 }
793 if items_path.is_empty() {
794 diagnostics.push(Diagnostic::error(
795 DiagnosticCode::EmptyField,
796 "map.items_path must not be empty",
797 Some(node.id.clone()),
798 ));
799 }
800 if next.is_empty() {
801 diagnostics.push(Diagnostic::error(
802 DiagnosticCode::EmptyField,
803 "map.next must not be empty",
804 Some(node.id.clone()),
805 ));
806 }
807 if max_in_flight.is_some_and(|limit| limit == 0) {
808 diagnostics.push(Diagnostic::error(
809 DiagnosticCode::EmptyField,
810 "map.max_in_flight must be greater than zero when provided",
811 Some(node.id.clone()),
812 ));
813 }
814 }
815 NodeKind::Reduce {
816 source,
817 operation: _,
818 next,
819 } => {
820 if source.is_empty() {
821 diagnostics.push(Diagnostic::error(
822 DiagnosticCode::EmptyField,
823 "reduce.source must not be empty",
824 Some(node.id.clone()),
825 ));
826 }
827 if next.is_empty() {
828 diagnostics.push(Diagnostic::error(
829 DiagnosticCode::EmptyField,
830 "reduce.next must not be empty",
831 Some(node.id.clone()),
832 ));
833 }
834 }
835 }
836}
837
838fn reachable_nodes<'a>(
839 start_id: &'a str,
840 node_index: &HashMap<&'a str, &'a crate::ir::Node>,
841) -> HashSet<&'a str> {
842 let mut visited = HashSet::new();
843 let mut queue = VecDeque::from([start_id]);
844
845 while let Some(current) = queue.pop_front() {
846 if !visited.insert(current) {
847 continue;
848 }
849
850 if let Some(node) = node_index.get(current) {
851 for edge in node.outgoing_edges() {
852 if node_index.contains_key(edge) {
853 queue.push_back(edge);
854 }
855 }
856 }
857 }
858
859 visited
860}
861
862#[cfg(test)]
863mod tests {
864 use proptest::prelude::*;
865 use serde_json::json;
866
867 use crate::ir::{MergePolicy, Node, NodeKind, ReduceOperation, WorkflowDefinition};
868 use crate::validation::{validate_and_normalize, DiagnosticCode};
869
870 fn valid_workflow() -> WorkflowDefinition {
871 WorkflowDefinition {
872 version: "v0".to_string(),
873 name: "basic".to_string(),
874 nodes: vec![
875 Node {
876 id: "start".to_string(),
877 kind: NodeKind::Start {
878 next: "llm".to_string(),
879 },
880 },
881 Node {
882 id: "llm".to_string(),
883 kind: NodeKind::Llm {
884 model: "gpt-4".to_string(),
885 prompt: "Say hi".to_string(),
886 next: Some("tool".to_string()),
887 },
888 },
889 Node {
890 id: "tool".to_string(),
891 kind: NodeKind::Tool {
892 tool: "validator".to_string(),
893 input: json!({"strict": true}),
894 next: Some("end".to_string()),
895 },
896 },
897 Node {
898 id: "end".to_string(),
899 kind: NodeKind::End,
900 },
901 ],
902 }
903 }
904
905 #[test]
906 fn validates_and_normalizes_valid_workflow() {
907 let workflow = valid_workflow();
908 let normalized = validate_and_normalize(&workflow).expect("workflow should validate");
909
910 assert_eq!(normalized.nodes.first().map(|n| n.id.as_str()), Some("end"));
911 assert_eq!(normalized.nodes.last().map(|n| n.id.as_str()), Some("tool"));
912 }
913
914 #[test]
915 fn reports_unknown_target() {
916 let mut workflow = valid_workflow();
917 workflow.nodes[0].kind = NodeKind::Start {
918 next: "missing".to_string(),
919 };
920
921 let err = validate_and_normalize(&workflow).expect_err("should fail validation");
922 assert!(err
923 .diagnostics
924 .iter()
925 .any(|d| d.code == DiagnosticCode::UnknownTarget));
926 }
927
928 #[test]
929 fn reports_unreachable_node() {
930 let mut workflow = valid_workflow();
931 workflow.nodes.push(Node {
932 id: "orphan".to_string(),
933 kind: NodeKind::End,
934 });
935
936 let err = validate_and_normalize(&workflow).expect_err("should fail validation");
937 assert!(err
938 .diagnostics
939 .iter()
940 .any(|d| d.code == DiagnosticCode::UnreachableNode
941 && d.node_id.as_deref() == Some("orphan")));
942 }
943
944 #[test]
945 fn reports_duplicate_node_id() {
946 let mut workflow = valid_workflow();
947 workflow.nodes.push(Node {
948 id: "llm".to_string(),
949 kind: NodeKind::End,
950 });
951
952 let err = validate_and_normalize(&workflow).expect_err("should fail validation");
953 assert!(err
954 .diagnostics
955 .iter()
956 .any(|d| d.code == DiagnosticCode::DuplicateNodeId));
957 }
958
959 #[test]
960 fn reports_no_path_to_end() {
961 let workflow = WorkflowDefinition {
962 version: "v0".to_string(),
963 name: "no-end-path".to_string(),
964 nodes: vec![
965 Node {
966 id: "start".to_string(),
967 kind: NodeKind::Start {
968 next: "llm".to_string(),
969 },
970 },
971 Node {
972 id: "llm".to_string(),
973 kind: NodeKind::Llm {
974 model: "gpt-4".to_string(),
975 prompt: "test".to_string(),
976 next: None,
977 },
978 },
979 Node {
980 id: "end".to_string(),
981 kind: NodeKind::End,
982 },
983 ],
984 };
985
986 let err = validate_and_normalize(&workflow).expect_err("should fail validation");
987 assert!(err
988 .diagnostics
989 .iter()
990 .any(|d| d.code == DiagnosticCode::NoPathToEnd));
991 }
992
993 #[test]
994 fn reports_invalid_loop_configuration() {
995 let workflow = WorkflowDefinition {
996 version: "v0".to_string(),
997 name: "bad-loop".to_string(),
998 nodes: vec![
999 Node {
1000 id: "start".to_string(),
1001 kind: NodeKind::Start {
1002 next: "loop".to_string(),
1003 },
1004 },
1005 Node {
1006 id: "loop".to_string(),
1007 kind: NodeKind::Loop {
1008 condition: "".to_string(),
1009 body: "".to_string(),
1010 next: "end".to_string(),
1011 max_iterations: Some(0),
1012 },
1013 },
1014 Node {
1015 id: "end".to_string(),
1016 kind: NodeKind::End,
1017 },
1018 ],
1019 };
1020
1021 let err = validate_and_normalize(&workflow).expect_err("loop validation should fail");
1022 assert!(err
1023 .diagnostics
1024 .iter()
1025 .any(|d| d.code == DiagnosticCode::EmptyField));
1026 }
1027
1028 #[test]
1029 fn reports_invalid_merge_quorum_configuration() {
1030 let workflow = WorkflowDefinition {
1031 version: "v0".to_string(),
1032 name: "bad-merge".to_string(),
1033 nodes: vec![
1034 Node {
1035 id: "start".to_string(),
1036 kind: NodeKind::Start {
1037 next: "merge".to_string(),
1038 },
1039 },
1040 Node {
1041 id: "source".to_string(),
1042 kind: NodeKind::Tool {
1043 tool: "echo".to_string(),
1044 input: json!({}),
1045 next: Some("end".to_string()),
1046 },
1047 },
1048 Node {
1049 id: "merge".to_string(),
1050 kind: NodeKind::Merge {
1051 sources: vec!["source".to_string()],
1052 policy: MergePolicy::Quorum,
1053 quorum: Some(2),
1054 next: "end".to_string(),
1055 },
1056 },
1057 Node {
1058 id: "end".to_string(),
1059 kind: NodeKind::End,
1060 },
1061 ],
1062 };
1063
1064 let err = validate_and_normalize(&workflow).expect_err("merge quorum should fail");
1065 assert!(
1066 err.diagnostics
1067 .iter()
1068 .any(|d| d.code == DiagnosticCode::EmptyField
1069 && d.node_id.as_deref() == Some("merge"))
1070 );
1071 }
1072
1073 #[test]
1074 fn reports_unknown_reduce_source() {
1075 let workflow = WorkflowDefinition {
1076 version: "v0".to_string(),
1077 name: "bad-reduce".to_string(),
1078 nodes: vec![
1079 Node {
1080 id: "start".to_string(),
1081 kind: NodeKind::Start {
1082 next: "reduce".to_string(),
1083 },
1084 },
1085 Node {
1086 id: "reduce".to_string(),
1087 kind: NodeKind::Reduce {
1088 source: "missing".to_string(),
1089 operation: ReduceOperation::Count,
1090 next: "end".to_string(),
1091 },
1092 },
1093 Node {
1094 id: "end".to_string(),
1095 kind: NodeKind::End,
1096 },
1097 ],
1098 };
1099
1100 let err = validate_and_normalize(&workflow).expect_err("reduce source should fail");
1101 assert!(err
1102 .diagnostics
1103 .iter()
1104 .any(|d| d.code == DiagnosticCode::UnknownTarget
1105 && d.node_id.as_deref() == Some("reduce")));
1106 }
1107
1108 #[test]
1109 fn reports_invalid_extended_node_configuration() {
1110 let workflow = WorkflowDefinition {
1111 version: "v0".to_string(),
1112 name: "invalid-extended".to_string(),
1113 nodes: vec![
1114 Node {
1115 id: "start".to_string(),
1116 kind: NodeKind::Start {
1117 next: "debounce".to_string(),
1118 },
1119 },
1120 Node {
1121 id: "debounce".to_string(),
1122 kind: NodeKind::Debounce {
1123 key_path: "".to_string(),
1124 window_steps: 0,
1125 next: "router".to_string(),
1126 on_suppressed: None,
1127 },
1128 },
1129 Node {
1130 id: "router".to_string(),
1131 kind: NodeKind::Router {
1132 routes: vec![],
1133 default: "".to_string(),
1134 },
1135 },
1136 Node {
1137 id: "transform".to_string(),
1138 kind: NodeKind::Transform {
1139 expression: "".to_string(),
1140 next: "end".to_string(),
1141 },
1142 },
1143 Node {
1144 id: "end".to_string(),
1145 kind: NodeKind::End,
1146 },
1147 ],
1148 };
1149
1150 let err =
1151 validate_and_normalize(&workflow).expect_err("extended node validation should fail");
1152 assert!(err
1153 .diagnostics
1154 .iter()
1155 .any(|d| d.code == DiagnosticCode::EmptyField));
1156 }
1157
1158 #[test]
1159 fn reports_multiple_empty_field_diagnostics_for_parallel_node() {
1160 let workflow = WorkflowDefinition {
1161 version: "v0".to_string(),
1162 name: "bad-parallel".to_string(),
1163 nodes: vec![
1164 Node {
1165 id: "start".to_string(),
1166 kind: NodeKind::Start {
1167 next: "parallel".to_string(),
1168 },
1169 },
1170 Node {
1171 id: "parallel".to_string(),
1172 kind: NodeKind::Parallel {
1173 branches: vec!["".to_string()],
1174 next: "".to_string(),
1175 max_in_flight: Some(0),
1176 },
1177 },
1178 Node {
1179 id: "end".to_string(),
1180 kind: NodeKind::End,
1181 },
1182 ],
1183 };
1184
1185 let err = validate_and_normalize(&workflow).expect_err("parallel validation should fail");
1186
1187 assert!(err.diagnostics.iter().any(|d| d.message
1188 == "parallel.branches must not contain empty node ids"
1189 && d.node_id.as_deref() == Some("parallel")));
1190 assert!(err
1191 .diagnostics
1192 .iter()
1193 .any(|d| d.message == "parallel.next must not be empty"
1194 && d.node_id.as_deref() == Some("parallel")));
1195 assert!(err.diagnostics.iter().any(|d| d.message
1196 == "parallel.max_in_flight must be greater than zero when provided"
1197 && d.node_id.as_deref() == Some("parallel")));
1198 }
1199
1200 #[test]
1201 fn reports_merge_source_unknown_and_quorum_policy_errors_together() {
1202 let workflow = WorkflowDefinition {
1203 version: "v0".to_string(),
1204 name: "bad-merge-sources".to_string(),
1205 nodes: vec![
1206 Node {
1207 id: "start".to_string(),
1208 kind: NodeKind::Start {
1209 next: "merge".to_string(),
1210 },
1211 },
1212 Node {
1213 id: "merge".to_string(),
1214 kind: NodeKind::Merge {
1215 sources: vec!["missing".to_string()],
1216 policy: MergePolicy::Quorum,
1217 quorum: Some(0),
1218 next: "end".to_string(),
1219 },
1220 },
1221 Node {
1222 id: "end".to_string(),
1223 kind: NodeKind::End,
1224 },
1225 ],
1226 };
1227
1228 let err = validate_and_normalize(&workflow).expect_err("merge validation should fail");
1229
1230 assert!(err.diagnostics.iter().any(|d| {
1231 d.code == DiagnosticCode::UnknownTarget
1232 && d.node_id.as_deref() == Some("merge")
1233 && d.message.contains("unknown source 'missing'")
1234 }));
1235
1236 assert!(err.diagnostics.iter().any(|d| {
1237 d.code == DiagnosticCode::EmptyField
1238 && d.node_id.as_deref() == Some("merge")
1239 && d.message
1240 == "merge.quorum must be between 1 and merge.sources length for quorum policy"
1241 }));
1242 }
1243
1244 proptest! {
1245 #[test]
1246 fn validate_and_normalize_never_panics(name in ".*", version in ".*") {
1247 let workflow = WorkflowDefinition {
1248 version,
1249 name,
1250 nodes: vec![
1251 Node {
1252 id: "start".to_string(),
1253 kind: NodeKind::Start { next: "end".to_string() },
1254 },
1255 Node {
1256 id: "end".to_string(),
1257 kind: NodeKind::End,
1258 },
1259 ],
1260 };
1261
1262 let _ = validate_and_normalize(&workflow);
1263 }
1264 }
1265}