Skip to main content

simple_agents_workflow/
validation.rs

1use std::collections::{BTreeSet, HashMap, HashSet, VecDeque};
2
3use thiserror::Error;
4
5use crate::ir::{MergePolicy, NodeKind, WorkflowDefinition, WORKFLOW_IR_V0};
6
7/// Validates and returns a deterministic normalized workflow definition.
8pub fn validate_and_normalize(
9    input: &WorkflowDefinition,
10) -> Result<WorkflowDefinition, ValidationErrors> {
11    let normalized = input.normalized();
12    let diagnostics = validate(&normalized);
13
14    if diagnostics.is_empty() {
15        Ok(normalized)
16    } else {
17        Err(ValidationErrors { diagnostics })
18    }
19}
20
21/// Validation diagnostic severity.
22#[derive(Debug, Clone, Copy, PartialEq, Eq)]
23pub enum Severity {
24    /// A hard validation failure.
25    Error,
26}
27
28/// Stable diagnostic codes for workflow validation.
29#[derive(Debug, Clone, Copy, PartialEq, Eq)]
30pub enum DiagnosticCode {
31    /// Unsupported IR version.
32    UnsupportedVersion,
33    /// Empty workflow name.
34    EmptyWorkflowName,
35    /// Workflow contains no nodes.
36    EmptyWorkflow,
37    /// Duplicate node id.
38    DuplicateNodeId,
39    /// Node id is empty.
40    EmptyNodeId,
41    /// Node edge references unknown node id.
42    UnknownTarget,
43    /// Missing start node.
44    MissingStart,
45    /// More than one start node found.
46    MultipleStart,
47    /// No terminal node found.
48    MissingEnd,
49    /// Node is unreachable from start.
50    UnreachableNode,
51    /// Start node cannot reach an end node.
52    NoPathToEnd,
53    /// Node has an empty required field.
54    EmptyField,
55}
56
57/// A workflow validation diagnostic.
58#[derive(Debug, Clone, PartialEq, Eq)]
59pub struct Diagnostic {
60    /// Severity.
61    pub severity: Severity,
62    /// Stable code.
63    pub code: DiagnosticCode,
64    /// Human-readable message.
65    pub message: String,
66    /// Optional node id where the issue occurred.
67    pub node_id: Option<String>,
68}
69
70impl Diagnostic {
71    fn error(code: DiagnosticCode, message: impl Into<String>, node_id: Option<String>) -> Self {
72        Self {
73            severity: Severity::Error,
74            code,
75            message: message.into(),
76            node_id,
77        }
78    }
79}
80
81/// Aggregated validation errors.
82#[derive(Debug, Clone, PartialEq, Eq, Error)]
83#[error("workflow validation failed")]
84pub struct ValidationErrors {
85    /// Collected diagnostics.
86    pub diagnostics: Vec<Diagnostic>,
87}
88
89/// Canonical validation error alias.
90pub type ValidationError = ValidationErrors;
91
92fn validate(workflow: &WorkflowDefinition) -> Vec<Diagnostic> {
93    let mut diagnostics = Vec::new();
94
95    if workflow.version != WORKFLOW_IR_V0 {
96        diagnostics.push(Diagnostic::error(
97            DiagnosticCode::UnsupportedVersion,
98            format!(
99                "unsupported workflow IR version '{}'; expected '{}'",
100                workflow.version, WORKFLOW_IR_V0
101            ),
102            None,
103        ));
104    }
105
106    if workflow.name.is_empty() {
107        diagnostics.push(Diagnostic::error(
108            DiagnosticCode::EmptyWorkflowName,
109            "workflow name must not be empty",
110            None,
111        ));
112    }
113
114    if workflow.nodes.is_empty() {
115        diagnostics.push(Diagnostic::error(
116            DiagnosticCode::EmptyWorkflow,
117            "workflow must contain at least one node",
118            None,
119        ));
120        return diagnostics;
121    }
122
123    let mut node_index = HashMap::with_capacity(workflow.nodes.len());
124    let mut duplicates = BTreeSet::new();
125    let mut start_ids = Vec::new();
126    let mut end_count = 0usize;
127
128    for node in &workflow.nodes {
129        if node.id.is_empty() {
130            diagnostics.push(Diagnostic::error(
131                DiagnosticCode::EmptyNodeId,
132                "node id must not be empty",
133                Some(node.id.clone()),
134            ));
135        }
136
137        if let Some(previous_id) = node_index.insert(node.id.as_str(), node) {
138            duplicates.insert(previous_id.id.clone());
139            duplicates.insert(node.id.clone());
140        }
141        validate_node_kind_fields(node, &mut diagnostics, &mut start_ids, &mut end_count);
142    }
143
144    for node in &workflow.nodes {
145        match &node.kind {
146            NodeKind::Merge { sources, .. } => {
147                for source in sources {
148                    if !node_index.contains_key(source.as_str()) {
149                        diagnostics.push(Diagnostic::error(
150                            DiagnosticCode::UnknownTarget,
151                            format!("node '{}' references unknown source '{}'", node.id, source),
152                            Some(node.id.clone()),
153                        ));
154                    }
155                }
156            }
157            NodeKind::Reduce { source, .. } => {
158                if !node_index.contains_key(source.as_str()) {
159                    diagnostics.push(Diagnostic::error(
160                        DiagnosticCode::UnknownTarget,
161                        format!("node '{}' references unknown source '{}'", node.id, source),
162                        Some(node.id.clone()),
163                    ));
164                }
165            }
166            _ => {}
167        }
168    }
169
170    for duplicate_id in duplicates {
171        diagnostics.push(Diagnostic::error(
172            DiagnosticCode::DuplicateNodeId,
173            format!("duplicate node id '{}'", duplicate_id),
174            Some(duplicate_id),
175        ));
176    }
177
178    if start_ids.is_empty() {
179        diagnostics.push(Diagnostic::error(
180            DiagnosticCode::MissingStart,
181            "workflow must contain exactly one start node",
182            None,
183        ));
184    } else if start_ids.len() > 1 {
185        diagnostics.push(Diagnostic::error(
186            DiagnosticCode::MultipleStart,
187            format!(
188                "workflow must contain exactly one start node, found {}",
189                start_ids.len()
190            ),
191            None,
192        ));
193    }
194
195    if end_count == 0 {
196        diagnostics.push(Diagnostic::error(
197            DiagnosticCode::MissingEnd,
198            "workflow must contain at least one end node",
199            None,
200        ));
201    }
202
203    for node in &workflow.nodes {
204        for edge in node.outgoing_edges() {
205            if !node_index.contains_key(edge) {
206                diagnostics.push(Diagnostic::error(
207                    DiagnosticCode::UnknownTarget,
208                    format!("node '{}' references unknown target '{}'", node.id, edge),
209                    Some(node.id.clone()),
210                ));
211            }
212        }
213    }
214
215    if start_ids.len() == 1 {
216        let start_id = start_ids[0].as_str();
217        let reachable = reachable_nodes(start_id, &node_index);
218
219        for node in &workflow.nodes {
220            if !reachable.contains(node.id.as_str()) {
221                diagnostics.push(Diagnostic::error(
222                    DiagnosticCode::UnreachableNode,
223                    format!(
224                        "node '{}' is unreachable from start node '{}'",
225                        node.id, start_id
226                    ),
227                    Some(node.id.clone()),
228                ));
229            }
230        }
231
232        let has_path_to_end = reachable.iter().any(|id| {
233            node_index
234                .get(*id)
235                .is_some_and(|node| matches!(node.kind, NodeKind::End))
236        });
237
238        if !has_path_to_end {
239            diagnostics.push(Diagnostic::error(
240                DiagnosticCode::NoPathToEnd,
241                format!("start node '{}' cannot reach any end node", start_id),
242                Some(start_id.to_string()),
243            ));
244        }
245    }
246
247    diagnostics
248}
249
250fn validate_node_kind_fields(
251    node: &crate::ir::Node,
252    diagnostics: &mut Vec<Diagnostic>,
253    start_ids: &mut Vec<String>,
254    end_count: &mut usize,
255) {
256    match &node.kind {
257        NodeKind::Start { next } => {
258            start_ids.push(node.id.clone());
259            if next.is_empty() {
260                diagnostics.push(Diagnostic::error(
261                    DiagnosticCode::EmptyField,
262                    "start.next must not be empty",
263                    Some(node.id.clone()),
264                ));
265            }
266        }
267        NodeKind::Llm {
268            model,
269            prompt,
270            next: _,
271        } => {
272            if model.is_empty() {
273                diagnostics.push(Diagnostic::error(
274                    DiagnosticCode::EmptyField,
275                    "llm.model must not be empty",
276                    Some(node.id.clone()),
277                ));
278            }
279            if prompt.is_empty() {
280                diagnostics.push(Diagnostic::error(
281                    DiagnosticCode::EmptyField,
282                    "llm.prompt must not be empty",
283                    Some(node.id.clone()),
284                ));
285            }
286        }
287        NodeKind::Tool { tool, .. } => {
288            if tool.is_empty() {
289                diagnostics.push(Diagnostic::error(
290                    DiagnosticCode::EmptyField,
291                    "tool.tool must not be empty",
292                    Some(node.id.clone()),
293                ));
294            }
295        }
296        NodeKind::Condition {
297            expression,
298            on_true,
299            on_false,
300        } => {
301            if expression.is_empty() {
302                diagnostics.push(Diagnostic::error(
303                    DiagnosticCode::EmptyField,
304                    "condition.expression must not be empty",
305                    Some(node.id.clone()),
306                ));
307            }
308            if on_true.is_empty() {
309                diagnostics.push(Diagnostic::error(
310                    DiagnosticCode::EmptyField,
311                    "condition.on_true must not be empty",
312                    Some(node.id.clone()),
313                ));
314            }
315            if on_false.is_empty() {
316                diagnostics.push(Diagnostic::error(
317                    DiagnosticCode::EmptyField,
318                    "condition.on_false must not be empty",
319                    Some(node.id.clone()),
320                ));
321            }
322        }
323        NodeKind::Debounce {
324            key_path,
325            window_steps,
326            next,
327            on_suppressed,
328        } => {
329            if key_path.is_empty() {
330                diagnostics.push(Diagnostic::error(
331                    DiagnosticCode::EmptyField,
332                    "debounce.key_path must not be empty",
333                    Some(node.id.clone()),
334                ));
335            }
336            if *window_steps == 0 {
337                diagnostics.push(Diagnostic::error(
338                    DiagnosticCode::EmptyField,
339                    "debounce.window_steps must be greater than zero",
340                    Some(node.id.clone()),
341                ));
342            }
343            if next.is_empty() {
344                diagnostics.push(Diagnostic::error(
345                    DiagnosticCode::EmptyField,
346                    "debounce.next must not be empty",
347                    Some(node.id.clone()),
348                ));
349            }
350            if on_suppressed.as_ref().is_some_and(String::is_empty) {
351                diagnostics.push(Diagnostic::error(
352                    DiagnosticCode::EmptyField,
353                    "debounce.on_suppressed must not be empty when provided",
354                    Some(node.id.clone()),
355                ));
356            }
357        }
358        NodeKind::Throttle {
359            key_path,
360            window_steps,
361            next,
362            on_throttled,
363        } => {
364            if key_path.is_empty() {
365                diagnostics.push(Diagnostic::error(
366                    DiagnosticCode::EmptyField,
367                    "throttle.key_path must not be empty",
368                    Some(node.id.clone()),
369                ));
370            }
371            if *window_steps == 0 {
372                diagnostics.push(Diagnostic::error(
373                    DiagnosticCode::EmptyField,
374                    "throttle.window_steps must be greater than zero",
375                    Some(node.id.clone()),
376                ));
377            }
378            if next.is_empty() {
379                diagnostics.push(Diagnostic::error(
380                    DiagnosticCode::EmptyField,
381                    "throttle.next must not be empty",
382                    Some(node.id.clone()),
383                ));
384            }
385            if on_throttled.as_ref().is_some_and(String::is_empty) {
386                diagnostics.push(Diagnostic::error(
387                    DiagnosticCode::EmptyField,
388                    "throttle.on_throttled must not be empty when provided",
389                    Some(node.id.clone()),
390                ));
391            }
392        }
393        NodeKind::RetryCompensate {
394            tool,
395            input: _,
396            max_retries: _,
397            compensate_tool,
398            compensate_input: _,
399            next,
400            on_compensated,
401        } => {
402            if tool.is_empty() {
403                diagnostics.push(Diagnostic::error(
404                    DiagnosticCode::EmptyField,
405                    "retry_compensate.tool must not be empty",
406                    Some(node.id.clone()),
407                ));
408            }
409            if compensate_tool.is_empty() {
410                diagnostics.push(Diagnostic::error(
411                    DiagnosticCode::EmptyField,
412                    "retry_compensate.compensate_tool must not be empty",
413                    Some(node.id.clone()),
414                ));
415            }
416            if next.is_empty() {
417                diagnostics.push(Diagnostic::error(
418                    DiagnosticCode::EmptyField,
419                    "retry_compensate.next must not be empty",
420                    Some(node.id.clone()),
421                ));
422            }
423            if on_compensated.as_ref().is_some_and(String::is_empty) {
424                diagnostics.push(Diagnostic::error(
425                    DiagnosticCode::EmptyField,
426                    "retry_compensate.on_compensated must not be empty when provided",
427                    Some(node.id.clone()),
428                ));
429            }
430        }
431        NodeKind::HumanInTheLoop {
432            decision_path,
433            response_path,
434            on_approve,
435            on_reject,
436        } => {
437            if decision_path.is_empty() {
438                diagnostics.push(Diagnostic::error(
439                    DiagnosticCode::EmptyField,
440                    "human_in_the_loop.decision_path must not be empty",
441                    Some(node.id.clone()),
442                ));
443            }
444            if response_path.as_ref().is_some_and(String::is_empty) {
445                diagnostics.push(Diagnostic::error(
446                    DiagnosticCode::EmptyField,
447                    "human_in_the_loop.response_path must not be empty when provided",
448                    Some(node.id.clone()),
449                ));
450            }
451            if on_approve.is_empty() {
452                diagnostics.push(Diagnostic::error(
453                    DiagnosticCode::EmptyField,
454                    "human_in_the_loop.on_approve must not be empty",
455                    Some(node.id.clone()),
456                ));
457            }
458            if on_reject.is_empty() {
459                diagnostics.push(Diagnostic::error(
460                    DiagnosticCode::EmptyField,
461                    "human_in_the_loop.on_reject must not be empty",
462                    Some(node.id.clone()),
463                ));
464            }
465        }
466        NodeKind::CacheWrite {
467            key_path,
468            value_path,
469            next,
470        } => {
471            if key_path.is_empty() {
472                diagnostics.push(Diagnostic::error(
473                    DiagnosticCode::EmptyField,
474                    "cache_write.key_path must not be empty",
475                    Some(node.id.clone()),
476                ));
477            }
478            if value_path.is_empty() {
479                diagnostics.push(Diagnostic::error(
480                    DiagnosticCode::EmptyField,
481                    "cache_write.value_path must not be empty",
482                    Some(node.id.clone()),
483                ));
484            }
485            if next.is_empty() {
486                diagnostics.push(Diagnostic::error(
487                    DiagnosticCode::EmptyField,
488                    "cache_write.next must not be empty",
489                    Some(node.id.clone()),
490                ));
491            }
492        }
493        NodeKind::CacheRead {
494            key_path,
495            next,
496            on_miss,
497        } => {
498            if key_path.is_empty() {
499                diagnostics.push(Diagnostic::error(
500                    DiagnosticCode::EmptyField,
501                    "cache_read.key_path must not be empty",
502                    Some(node.id.clone()),
503                ));
504            }
505            if next.is_empty() {
506                diagnostics.push(Diagnostic::error(
507                    DiagnosticCode::EmptyField,
508                    "cache_read.next must not be empty",
509                    Some(node.id.clone()),
510                ));
511            }
512            if on_miss.as_ref().is_some_and(String::is_empty) {
513                diagnostics.push(Diagnostic::error(
514                    DiagnosticCode::EmptyField,
515                    "cache_read.on_miss must not be empty when provided",
516                    Some(node.id.clone()),
517                ));
518            }
519        }
520        NodeKind::EventTrigger {
521            event,
522            event_path,
523            next,
524            on_mismatch,
525        } => {
526            if event.is_empty() {
527                diagnostics.push(Diagnostic::error(
528                    DiagnosticCode::EmptyField,
529                    "event_trigger.event must not be empty",
530                    Some(node.id.clone()),
531                ));
532            }
533            if event_path.is_empty() {
534                diagnostics.push(Diagnostic::error(
535                    DiagnosticCode::EmptyField,
536                    "event_trigger.event_path must not be empty",
537                    Some(node.id.clone()),
538                ));
539            }
540            if next.is_empty() {
541                diagnostics.push(Diagnostic::error(
542                    DiagnosticCode::EmptyField,
543                    "event_trigger.next must not be empty",
544                    Some(node.id.clone()),
545                ));
546            }
547            if on_mismatch.as_ref().is_some_and(String::is_empty) {
548                diagnostics.push(Diagnostic::error(
549                    DiagnosticCode::EmptyField,
550                    "event_trigger.on_mismatch must not be empty when provided",
551                    Some(node.id.clone()),
552                ));
553            }
554        }
555        NodeKind::Router { routes, default } => {
556            if routes.is_empty() {
557                diagnostics.push(Diagnostic::error(
558                    DiagnosticCode::EmptyField,
559                    "router.routes must contain at least one route",
560                    Some(node.id.clone()),
561                ));
562            }
563            if routes
564                .iter()
565                .any(|route| route.when.is_empty() || route.next.is_empty())
566            {
567                diagnostics.push(Diagnostic::error(
568                    DiagnosticCode::EmptyField,
569                    "router.routes entries must include non-empty when and next",
570                    Some(node.id.clone()),
571                ));
572            }
573            if default.is_empty() {
574                diagnostics.push(Diagnostic::error(
575                    DiagnosticCode::EmptyField,
576                    "router.default must not be empty",
577                    Some(node.id.clone()),
578                ));
579            }
580        }
581        NodeKind::Transform { expression, next } => {
582            if expression.is_empty() {
583                diagnostics.push(Diagnostic::error(
584                    DiagnosticCode::EmptyField,
585                    "transform.expression must not be empty",
586                    Some(node.id.clone()),
587                ));
588            }
589            if next.is_empty() {
590                diagnostics.push(Diagnostic::error(
591                    DiagnosticCode::EmptyField,
592                    "transform.next must not be empty",
593                    Some(node.id.clone()),
594                ));
595            }
596        }
597        NodeKind::Loop {
598            condition,
599            body,
600            next,
601            max_iterations,
602        } => {
603            if condition.is_empty() {
604                diagnostics.push(Diagnostic::error(
605                    DiagnosticCode::EmptyField,
606                    "loop.condition must not be empty",
607                    Some(node.id.clone()),
608                ));
609            }
610            if body.is_empty() {
611                diagnostics.push(Diagnostic::error(
612                    DiagnosticCode::EmptyField,
613                    "loop.body must not be empty",
614                    Some(node.id.clone()),
615                ));
616            }
617            if next.is_empty() {
618                diagnostics.push(Diagnostic::error(
619                    DiagnosticCode::EmptyField,
620                    "loop.next must not be empty",
621                    Some(node.id.clone()),
622                ));
623            }
624            if max_iterations.is_some_and(|limit| limit == 0) {
625                diagnostics.push(Diagnostic::error(
626                    DiagnosticCode::EmptyField,
627                    "loop.max_iterations must be greater than zero when provided",
628                    Some(node.id.clone()),
629                ));
630            }
631        }
632        NodeKind::End => {
633            *end_count += 1;
634        }
635        NodeKind::Subgraph { graph, next } => {
636            if graph.is_empty() {
637                diagnostics.push(Diagnostic::error(
638                    DiagnosticCode::EmptyField,
639                    "subgraph.graph must not be empty",
640                    Some(node.id.clone()),
641                ));
642            }
643            if next.as_ref().is_some_and(String::is_empty) {
644                diagnostics.push(Diagnostic::error(
645                    DiagnosticCode::EmptyField,
646                    "subgraph.next must not be empty when provided",
647                    Some(node.id.clone()),
648                ));
649            }
650        }
651        NodeKind::Batch { items_path, next } => {
652            if items_path.is_empty() {
653                diagnostics.push(Diagnostic::error(
654                    DiagnosticCode::EmptyField,
655                    "batch.items_path must not be empty",
656                    Some(node.id.clone()),
657                ));
658            }
659            if next.is_empty() {
660                diagnostics.push(Diagnostic::error(
661                    DiagnosticCode::EmptyField,
662                    "batch.next must not be empty",
663                    Some(node.id.clone()),
664                ));
665            }
666        }
667        NodeKind::Filter {
668            items_path,
669            expression,
670            next,
671        } => {
672            if items_path.is_empty() {
673                diagnostics.push(Diagnostic::error(
674                    DiagnosticCode::EmptyField,
675                    "filter.items_path must not be empty",
676                    Some(node.id.clone()),
677                ));
678            }
679            if expression.is_empty() {
680                diagnostics.push(Diagnostic::error(
681                    DiagnosticCode::EmptyField,
682                    "filter.expression must not be empty",
683                    Some(node.id.clone()),
684                ));
685            }
686            if next.is_empty() {
687                diagnostics.push(Diagnostic::error(
688                    DiagnosticCode::EmptyField,
689                    "filter.next must not be empty",
690                    Some(node.id.clone()),
691                ));
692            }
693        }
694        NodeKind::Parallel {
695            branches,
696            next,
697            max_in_flight,
698        } => {
699            if branches.is_empty() {
700                diagnostics.push(Diagnostic::error(
701                    DiagnosticCode::EmptyField,
702                    "parallel.branches must contain at least one node id",
703                    Some(node.id.clone()),
704                ));
705            }
706            if branches.iter().any(String::is_empty) {
707                diagnostics.push(Diagnostic::error(
708                    DiagnosticCode::EmptyField,
709                    "parallel.branches must not contain empty node ids",
710                    Some(node.id.clone()),
711                ));
712            }
713            if next.is_empty() {
714                diagnostics.push(Diagnostic::error(
715                    DiagnosticCode::EmptyField,
716                    "parallel.next must not be empty",
717                    Some(node.id.clone()),
718                ));
719            }
720            if max_in_flight.is_some_and(|limit| limit == 0) {
721                diagnostics.push(Diagnostic::error(
722                    DiagnosticCode::EmptyField,
723                    "parallel.max_in_flight must be greater than zero when provided",
724                    Some(node.id.clone()),
725                ));
726            }
727        }
728        NodeKind::Merge {
729            sources,
730            policy,
731            quorum,
732            next,
733        } => {
734            if sources.is_empty() {
735                diagnostics.push(Diagnostic::error(
736                    DiagnosticCode::EmptyField,
737                    "merge.sources must contain at least one node id",
738                    Some(node.id.clone()),
739                ));
740            }
741            if sources.iter().any(String::is_empty) {
742                diagnostics.push(Diagnostic::error(
743                    DiagnosticCode::EmptyField,
744                    "merge.sources must not contain empty node ids",
745                    Some(node.id.clone()),
746                ));
747            }
748            if next.is_empty() {
749                diagnostics.push(Diagnostic::error(
750                    DiagnosticCode::EmptyField,
751                    "merge.next must not be empty",
752                    Some(node.id.clone()),
753                ));
754            }
755            match policy {
756                MergePolicy::Quorum => {
757                    let invalid_quorum = match quorum {
758                        Some(value) => *value == 0 || *value > sources.len(),
759                        None => true,
760                    };
761                    if invalid_quorum {
762                        diagnostics.push(Diagnostic::error(
763                            DiagnosticCode::EmptyField,
764                            "merge.quorum must be between 1 and merge.sources length for quorum policy",
765                            Some(node.id.clone()),
766                        ));
767                    }
768                }
769                _ => {
770                    if quorum.is_some() {
771                        diagnostics.push(Diagnostic::error(
772                            DiagnosticCode::EmptyField,
773                            "merge.quorum is only valid with quorum policy",
774                            Some(node.id.clone()),
775                        ));
776                    }
777                }
778            }
779        }
780        NodeKind::Map {
781            tool,
782            items_path,
783            next,
784            max_in_flight,
785        } => {
786            if tool.is_empty() {
787                diagnostics.push(Diagnostic::error(
788                    DiagnosticCode::EmptyField,
789                    "map.tool must not be empty",
790                    Some(node.id.clone()),
791                ));
792            }
793            if items_path.is_empty() {
794                diagnostics.push(Diagnostic::error(
795                    DiagnosticCode::EmptyField,
796                    "map.items_path must not be empty",
797                    Some(node.id.clone()),
798                ));
799            }
800            if next.is_empty() {
801                diagnostics.push(Diagnostic::error(
802                    DiagnosticCode::EmptyField,
803                    "map.next must not be empty",
804                    Some(node.id.clone()),
805                ));
806            }
807            if max_in_flight.is_some_and(|limit| limit == 0) {
808                diagnostics.push(Diagnostic::error(
809                    DiagnosticCode::EmptyField,
810                    "map.max_in_flight must be greater than zero when provided",
811                    Some(node.id.clone()),
812                ));
813            }
814        }
815        NodeKind::Reduce {
816            source,
817            operation: _,
818            next,
819        } => {
820            if source.is_empty() {
821                diagnostics.push(Diagnostic::error(
822                    DiagnosticCode::EmptyField,
823                    "reduce.source must not be empty",
824                    Some(node.id.clone()),
825                ));
826            }
827            if next.is_empty() {
828                diagnostics.push(Diagnostic::error(
829                    DiagnosticCode::EmptyField,
830                    "reduce.next must not be empty",
831                    Some(node.id.clone()),
832                ));
833            }
834        }
835    }
836}
837
838fn reachable_nodes<'a>(
839    start_id: &'a str,
840    node_index: &HashMap<&'a str, &'a crate::ir::Node>,
841) -> HashSet<&'a str> {
842    let mut visited = HashSet::new();
843    let mut queue = VecDeque::from([start_id]);
844
845    while let Some(current) = queue.pop_front() {
846        if !visited.insert(current) {
847            continue;
848        }
849
850        if let Some(node) = node_index.get(current) {
851            for edge in node.outgoing_edges() {
852                if node_index.contains_key(edge) {
853                    queue.push_back(edge);
854                }
855            }
856        }
857    }
858
859    visited
860}
861
862#[cfg(test)]
863mod tests {
864    use proptest::prelude::*;
865    use serde_json::json;
866
867    use crate::ir::{MergePolicy, Node, NodeKind, ReduceOperation, WorkflowDefinition};
868    use crate::validation::{validate_and_normalize, DiagnosticCode};
869
870    fn valid_workflow() -> WorkflowDefinition {
871        WorkflowDefinition {
872            version: "v0".to_string(),
873            name: "basic".to_string(),
874            nodes: vec![
875                Node {
876                    id: "start".to_string(),
877                    kind: NodeKind::Start {
878                        next: "llm".to_string(),
879                    },
880                },
881                Node {
882                    id: "llm".to_string(),
883                    kind: NodeKind::Llm {
884                        model: "gpt-4".to_string(),
885                        prompt: "Say hi".to_string(),
886                        next: Some("tool".to_string()),
887                    },
888                },
889                Node {
890                    id: "tool".to_string(),
891                    kind: NodeKind::Tool {
892                        tool: "validator".to_string(),
893                        input: json!({"strict": true}),
894                        next: Some("end".to_string()),
895                    },
896                },
897                Node {
898                    id: "end".to_string(),
899                    kind: NodeKind::End,
900                },
901            ],
902        }
903    }
904
905    #[test]
906    fn validates_and_normalizes_valid_workflow() {
907        let workflow = valid_workflow();
908        let normalized = validate_and_normalize(&workflow).expect("workflow should validate");
909
910        assert_eq!(normalized.nodes.first().map(|n| n.id.as_str()), Some("end"));
911        assert_eq!(normalized.nodes.last().map(|n| n.id.as_str()), Some("tool"));
912    }
913
914    #[test]
915    fn reports_unknown_target() {
916        let mut workflow = valid_workflow();
917        workflow.nodes[0].kind = NodeKind::Start {
918            next: "missing".to_string(),
919        };
920
921        let err = validate_and_normalize(&workflow).expect_err("should fail validation");
922        assert!(err
923            .diagnostics
924            .iter()
925            .any(|d| d.code == DiagnosticCode::UnknownTarget));
926    }
927
928    #[test]
929    fn reports_unreachable_node() {
930        let mut workflow = valid_workflow();
931        workflow.nodes.push(Node {
932            id: "orphan".to_string(),
933            kind: NodeKind::End,
934        });
935
936        let err = validate_and_normalize(&workflow).expect_err("should fail validation");
937        assert!(err
938            .diagnostics
939            .iter()
940            .any(|d| d.code == DiagnosticCode::UnreachableNode
941                && d.node_id.as_deref() == Some("orphan")));
942    }
943
944    #[test]
945    fn reports_duplicate_node_id() {
946        let mut workflow = valid_workflow();
947        workflow.nodes.push(Node {
948            id: "llm".to_string(),
949            kind: NodeKind::End,
950        });
951
952        let err = validate_and_normalize(&workflow).expect_err("should fail validation");
953        assert!(err
954            .diagnostics
955            .iter()
956            .any(|d| d.code == DiagnosticCode::DuplicateNodeId));
957    }
958
959    #[test]
960    fn reports_no_path_to_end() {
961        let workflow = WorkflowDefinition {
962            version: "v0".to_string(),
963            name: "no-end-path".to_string(),
964            nodes: vec![
965                Node {
966                    id: "start".to_string(),
967                    kind: NodeKind::Start {
968                        next: "llm".to_string(),
969                    },
970                },
971                Node {
972                    id: "llm".to_string(),
973                    kind: NodeKind::Llm {
974                        model: "gpt-4".to_string(),
975                        prompt: "test".to_string(),
976                        next: None,
977                    },
978                },
979                Node {
980                    id: "end".to_string(),
981                    kind: NodeKind::End,
982                },
983            ],
984        };
985
986        let err = validate_and_normalize(&workflow).expect_err("should fail validation");
987        assert!(err
988            .diagnostics
989            .iter()
990            .any(|d| d.code == DiagnosticCode::NoPathToEnd));
991    }
992
993    #[test]
994    fn reports_invalid_loop_configuration() {
995        let workflow = WorkflowDefinition {
996            version: "v0".to_string(),
997            name: "bad-loop".to_string(),
998            nodes: vec![
999                Node {
1000                    id: "start".to_string(),
1001                    kind: NodeKind::Start {
1002                        next: "loop".to_string(),
1003                    },
1004                },
1005                Node {
1006                    id: "loop".to_string(),
1007                    kind: NodeKind::Loop {
1008                        condition: "".to_string(),
1009                        body: "".to_string(),
1010                        next: "end".to_string(),
1011                        max_iterations: Some(0),
1012                    },
1013                },
1014                Node {
1015                    id: "end".to_string(),
1016                    kind: NodeKind::End,
1017                },
1018            ],
1019        };
1020
1021        let err = validate_and_normalize(&workflow).expect_err("loop validation should fail");
1022        assert!(err
1023            .diagnostics
1024            .iter()
1025            .any(|d| d.code == DiagnosticCode::EmptyField));
1026    }
1027
1028    #[test]
1029    fn reports_invalid_merge_quorum_configuration() {
1030        let workflow = WorkflowDefinition {
1031            version: "v0".to_string(),
1032            name: "bad-merge".to_string(),
1033            nodes: vec![
1034                Node {
1035                    id: "start".to_string(),
1036                    kind: NodeKind::Start {
1037                        next: "merge".to_string(),
1038                    },
1039                },
1040                Node {
1041                    id: "source".to_string(),
1042                    kind: NodeKind::Tool {
1043                        tool: "echo".to_string(),
1044                        input: json!({}),
1045                        next: Some("end".to_string()),
1046                    },
1047                },
1048                Node {
1049                    id: "merge".to_string(),
1050                    kind: NodeKind::Merge {
1051                        sources: vec!["source".to_string()],
1052                        policy: MergePolicy::Quorum,
1053                        quorum: Some(2),
1054                        next: "end".to_string(),
1055                    },
1056                },
1057                Node {
1058                    id: "end".to_string(),
1059                    kind: NodeKind::End,
1060                },
1061            ],
1062        };
1063
1064        let err = validate_and_normalize(&workflow).expect_err("merge quorum should fail");
1065        assert!(
1066            err.diagnostics
1067                .iter()
1068                .any(|d| d.code == DiagnosticCode::EmptyField
1069                    && d.node_id.as_deref() == Some("merge"))
1070        );
1071    }
1072
1073    #[test]
1074    fn reports_unknown_reduce_source() {
1075        let workflow = WorkflowDefinition {
1076            version: "v0".to_string(),
1077            name: "bad-reduce".to_string(),
1078            nodes: vec![
1079                Node {
1080                    id: "start".to_string(),
1081                    kind: NodeKind::Start {
1082                        next: "reduce".to_string(),
1083                    },
1084                },
1085                Node {
1086                    id: "reduce".to_string(),
1087                    kind: NodeKind::Reduce {
1088                        source: "missing".to_string(),
1089                        operation: ReduceOperation::Count,
1090                        next: "end".to_string(),
1091                    },
1092                },
1093                Node {
1094                    id: "end".to_string(),
1095                    kind: NodeKind::End,
1096                },
1097            ],
1098        };
1099
1100        let err = validate_and_normalize(&workflow).expect_err("reduce source should fail");
1101        assert!(err
1102            .diagnostics
1103            .iter()
1104            .any(|d| d.code == DiagnosticCode::UnknownTarget
1105                && d.node_id.as_deref() == Some("reduce")));
1106    }
1107
1108    #[test]
1109    fn reports_invalid_extended_node_configuration() {
1110        let workflow = WorkflowDefinition {
1111            version: "v0".to_string(),
1112            name: "invalid-extended".to_string(),
1113            nodes: vec![
1114                Node {
1115                    id: "start".to_string(),
1116                    kind: NodeKind::Start {
1117                        next: "debounce".to_string(),
1118                    },
1119                },
1120                Node {
1121                    id: "debounce".to_string(),
1122                    kind: NodeKind::Debounce {
1123                        key_path: "".to_string(),
1124                        window_steps: 0,
1125                        next: "router".to_string(),
1126                        on_suppressed: None,
1127                    },
1128                },
1129                Node {
1130                    id: "router".to_string(),
1131                    kind: NodeKind::Router {
1132                        routes: vec![],
1133                        default: "".to_string(),
1134                    },
1135                },
1136                Node {
1137                    id: "transform".to_string(),
1138                    kind: NodeKind::Transform {
1139                        expression: "".to_string(),
1140                        next: "end".to_string(),
1141                    },
1142                },
1143                Node {
1144                    id: "end".to_string(),
1145                    kind: NodeKind::End,
1146                },
1147            ],
1148        };
1149
1150        let err =
1151            validate_and_normalize(&workflow).expect_err("extended node validation should fail");
1152        assert!(err
1153            .diagnostics
1154            .iter()
1155            .any(|d| d.code == DiagnosticCode::EmptyField));
1156    }
1157
1158    #[test]
1159    fn reports_multiple_empty_field_diagnostics_for_parallel_node() {
1160        let workflow = WorkflowDefinition {
1161            version: "v0".to_string(),
1162            name: "bad-parallel".to_string(),
1163            nodes: vec![
1164                Node {
1165                    id: "start".to_string(),
1166                    kind: NodeKind::Start {
1167                        next: "parallel".to_string(),
1168                    },
1169                },
1170                Node {
1171                    id: "parallel".to_string(),
1172                    kind: NodeKind::Parallel {
1173                        branches: vec!["".to_string()],
1174                        next: "".to_string(),
1175                        max_in_flight: Some(0),
1176                    },
1177                },
1178                Node {
1179                    id: "end".to_string(),
1180                    kind: NodeKind::End,
1181                },
1182            ],
1183        };
1184
1185        let err = validate_and_normalize(&workflow).expect_err("parallel validation should fail");
1186
1187        assert!(err.diagnostics.iter().any(|d| d.message
1188            == "parallel.branches must not contain empty node ids"
1189            && d.node_id.as_deref() == Some("parallel")));
1190        assert!(err
1191            .diagnostics
1192            .iter()
1193            .any(|d| d.message == "parallel.next must not be empty"
1194                && d.node_id.as_deref() == Some("parallel")));
1195        assert!(err.diagnostics.iter().any(|d| d.message
1196            == "parallel.max_in_flight must be greater than zero when provided"
1197            && d.node_id.as_deref() == Some("parallel")));
1198    }
1199
1200    #[test]
1201    fn reports_merge_source_unknown_and_quorum_policy_errors_together() {
1202        let workflow = WorkflowDefinition {
1203            version: "v0".to_string(),
1204            name: "bad-merge-sources".to_string(),
1205            nodes: vec![
1206                Node {
1207                    id: "start".to_string(),
1208                    kind: NodeKind::Start {
1209                        next: "merge".to_string(),
1210                    },
1211                },
1212                Node {
1213                    id: "merge".to_string(),
1214                    kind: NodeKind::Merge {
1215                        sources: vec!["missing".to_string()],
1216                        policy: MergePolicy::Quorum,
1217                        quorum: Some(0),
1218                        next: "end".to_string(),
1219                    },
1220                },
1221                Node {
1222                    id: "end".to_string(),
1223                    kind: NodeKind::End,
1224                },
1225            ],
1226        };
1227
1228        let err = validate_and_normalize(&workflow).expect_err("merge validation should fail");
1229
1230        assert!(err.diagnostics.iter().any(|d| {
1231            d.code == DiagnosticCode::UnknownTarget
1232                && d.node_id.as_deref() == Some("merge")
1233                && d.message.contains("unknown source 'missing'")
1234        }));
1235
1236        assert!(err.diagnostics.iter().any(|d| {
1237            d.code == DiagnosticCode::EmptyField
1238                && d.node_id.as_deref() == Some("merge")
1239                && d.message
1240                    == "merge.quorum must be between 1 and merge.sources length for quorum policy"
1241        }));
1242    }
1243
1244    proptest! {
1245        #[test]
1246        fn validate_and_normalize_never_panics(name in ".*", version in ".*") {
1247            let workflow = WorkflowDefinition {
1248                version,
1249                name,
1250                nodes: vec![
1251                    Node {
1252                        id: "start".to_string(),
1253                        kind: NodeKind::Start { next: "end".to_string() },
1254                    },
1255                    Node {
1256                        id: "end".to_string(),
1257                        kind: NodeKind::End,
1258                    },
1259                ],
1260            };
1261
1262            let _ = validate_and_normalize(&workflow);
1263        }
1264    }
1265}