Skip to main content

worldinterface_core/flowspec/
validate.rs

1//! FlowSpec validation.
2//!
3//! Validation is a pure function that checks a FlowSpec for structural
4//! correctness. It collects all errors rather than failing on the first one.
5
6use std::collections::{HashMap, HashSet, VecDeque};
7use std::fmt;
8
9use serde::{Deserialize, Serialize};
10
11use crate::flowspec::branch::{BranchCondition, ParamRef};
12use crate::flowspec::topo;
13use crate::flowspec::{FlowSpec, NodeType};
14use crate::id::NodeId;
15
16/// Validate a FlowSpec, returning all errors found.
17pub fn validate(spec: &FlowSpec) -> Result<(), ValidationError> {
18    let mut diagnostics = Vec::new();
19
20    // V-6: Must have at least one node
21    if spec.nodes.is_empty() {
22        diagnostics.push(ValidationDiagnostic {
23            rule: ValidationRule::NoNodes,
24            node_id: None,
25            message: "FlowSpec must have at least one node".into(),
26        });
27        return Err(ValidationError { errors: diagnostics });
28    }
29
30    // V-1: Build node index and detect duplicate IDs
31    let mut node_index: HashMap<NodeId, usize> = HashMap::new();
32    for (i, node) in spec.nodes.iter().enumerate() {
33        if let Some(_prev) = node_index.insert(node.id, i) {
34            diagnostics.push(ValidationDiagnostic {
35                rule: ValidationRule::DuplicateNodeId,
36                node_id: Some(node.id),
37                message: format!("Duplicate node ID: {}", node.id),
38            });
39        }
40    }
41
42    // V-10: Empty connector names
43    for node in &spec.nodes {
44        if let NodeType::Connector(ref c) = node.node_type {
45            if c.connector.is_empty() {
46                diagnostics.push(ValidationDiagnostic {
47                    rule: ValidationRule::EmptyConnectorName,
48                    node_id: Some(node.id),
49                    message: format!("Node {} has an empty connector name", node.id),
50                });
51            }
52        }
53    }
54
55    // V-2, V-3, V-11: Validate edges
56    for edge in &spec.edges {
57        if !node_index.contains_key(&edge.from) {
58            diagnostics.push(ValidationDiagnostic {
59                rule: ValidationRule::DanglingEdgeSource,
60                node_id: Some(edge.from),
61                message: format!("Edge source {} does not reference an existing node", edge.from),
62            });
63        }
64        if !node_index.contains_key(&edge.to) {
65            diagnostics.push(ValidationDiagnostic {
66                rule: ValidationRule::DanglingEdgeTarget,
67                node_id: Some(edge.to),
68                message: format!("Edge target {} does not reference an existing node", edge.to),
69            });
70        }
71        if edge.from == edge.to {
72            diagnostics.push(ValidationDiagnostic {
73                rule: ValidationRule::SelfLoop,
74                node_id: Some(edge.from),
75                message: format!("Edge creates a self-loop on node {}", edge.from),
76            });
77        }
78    }
79
80    // V-7, V-8: Validate branch node targets
81    let edge_set: HashSet<(NodeId, NodeId)> = spec.edges.iter().map(|e| (e.from, e.to)).collect();
82
83    for node in &spec.nodes {
84        if let NodeType::Branch(ref branch) = node.node_type {
85            // V-7: then_edge must reference an existing node
86            if !node_index.contains_key(&branch.then_edge) {
87                diagnostics.push(ValidationDiagnostic {
88                    rule: ValidationRule::BranchTargetMissing,
89                    node_id: Some(node.id),
90                    message: format!(
91                        "Branch node {} then_edge references nonexistent node {}",
92                        node.id, branch.then_edge
93                    ),
94                });
95            } else {
96                // V-8: must have a corresponding edge
97                if !edge_set.contains(&(node.id, branch.then_edge)) {
98                    diagnostics.push(ValidationDiagnostic {
99                        rule: ValidationRule::BranchTargetNotInEdges,
100                        node_id: Some(node.id),
101                        message: format!(
102                            "Branch node {} then_edge {} has no corresponding edge",
103                            node.id, branch.then_edge
104                        ),
105                    });
106                }
107            }
108
109            if let Some(else_target) = branch.else_edge {
110                if !node_index.contains_key(&else_target) {
111                    diagnostics.push(ValidationDiagnostic {
112                        rule: ValidationRule::BranchTargetMissing,
113                        node_id: Some(node.id),
114                        message: format!(
115                            "Branch node {} else_edge references nonexistent node {}",
116                            node.id, else_target
117                        ),
118                    });
119                } else if !edge_set.contains(&(node.id, else_target)) {
120                    diagnostics.push(ValidationDiagnostic {
121                        rule: ValidationRule::BranchTargetNotInEdges,
122                        node_id: Some(node.id),
123                        message: format!(
124                            "Branch node {} else_edge {} has no corresponding edge",
125                            node.id, else_target
126                        ),
127                    });
128                }
129            }
130        }
131    }
132
133    // V-12: ParamRef::NodeOutput references must point to existing nodes
134    for node in &spec.nodes {
135        if let NodeType::Branch(ref branch) = node.node_type {
136            let refs = match &branch.condition {
137                BranchCondition::Exists(param_ref) => vec![param_ref],
138                BranchCondition::Equals { left, .. } => vec![left],
139                BranchCondition::Expression(_) => vec![],
140            };
141            for param_ref in refs {
142                if let ParamRef::NodeOutput { node_id, .. } = param_ref {
143                    if !node_index.contains_key(node_id) {
144                        diagnostics.push(ValidationDiagnostic {
145                            rule: ValidationRule::InvalidParamRef,
146                            node_id: Some(node.id),
147                            message: format!(
148                                "Branch node {} references nonexistent node {} in condition",
149                                node.id, node_id
150                            ),
151                        });
152                    }
153                }
154            }
155        }
156    }
157
158    // Build adjacency structures for root and reachability analysis
159    let mut in_degree: HashMap<NodeId, usize> = HashMap::new();
160    let mut successors: HashMap<NodeId, Vec<NodeId>> = HashMap::new();
161    for node in &spec.nodes {
162        in_degree.entry(node.id).or_insert(0);
163        successors.entry(node.id).or_default();
164    }
165    for edge in &spec.edges {
166        if node_index.contains_key(&edge.from) && node_index.contains_key(&edge.to) {
167            *in_degree.entry(edge.to).or_insert(0) += 1;
168            successors.entry(edge.from).or_default().push(edge.to);
169        }
170    }
171
172    // V-9: Multiple roots (nodes with in-degree 0)
173    let roots: Vec<NodeId> =
174        in_degree.iter().filter(|(_, &deg)| deg == 0).map(|(&id, _)| id).collect();
175
176    if roots.len() > 1 {
177        diagnostics.push(ValidationDiagnostic {
178            rule: ValidationRule::MultipleRoots,
179            node_id: None,
180            message: format!(
181                "FlowSpec has {} root nodes (expected at most 1): {}",
182                roots.len(),
183                roots.iter().map(|id| id.to_string()).collect::<Vec<_>>().join(", ")
184            ),
185        });
186    }
187
188    // V-4: Cycle detection via topological sort
189    if let Err(remaining) = topo::topological_sort(spec) {
190        diagnostics.push(ValidationDiagnostic {
191            rule: ValidationRule::CycleDetected,
192            node_id: None,
193            message: format!(
194                "FlowSpec contains a cycle ({} of {} nodes could not be topologically sorted)",
195                remaining.len(),
196                spec.nodes.len()
197            ),
198        });
199    }
200
201    // V-5: Disconnected nodes (not reachable from any root)
202    // BFS from all roots
203    if !roots.is_empty() {
204        let mut reachable: HashSet<NodeId> = HashSet::new();
205        let mut bfs_queue: VecDeque<NodeId> = roots.iter().copied().collect();
206        while let Some(node_id) = bfs_queue.pop_front() {
207            if reachable.insert(node_id) {
208                if let Some(succs) = successors.get(&node_id) {
209                    for &succ in succs {
210                        if !reachable.contains(&succ) {
211                            bfs_queue.push_back(succ);
212                        }
213                    }
214                }
215            }
216        }
217
218        for node in &spec.nodes {
219            if !reachable.contains(&node.id) {
220                diagnostics.push(ValidationDiagnostic {
221                    rule: ValidationRule::DisconnectedNode,
222                    node_id: Some(node.id),
223                    message: format!("Node {} is not reachable from any root", node.id),
224                });
225            }
226        }
227    }
228
229    if diagnostics.is_empty() {
230        Ok(())
231    } else {
232        Err(ValidationError { errors: diagnostics })
233    }
234}
235
236/// All validation errors found in a FlowSpec.
237#[derive(Debug, Clone, Serialize, Deserialize)]
238pub struct ValidationError {
239    pub errors: Vec<ValidationDiagnostic>,
240}
241
242impl fmt::Display for ValidationError {
243    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
244        writeln!(f, "FlowSpec validation failed with {} error(s):", self.errors.len())?;
245        for diag in &self.errors {
246            writeln!(f, "  - [{}] {}", diag.rule, diag.message)?;
247        }
248        Ok(())
249    }
250}
251
252impl std::error::Error for ValidationError {}
253
254impl ValidationError {
255    /// Check whether a specific rule was violated.
256    pub fn has_rule(&self, rule: ValidationRule) -> bool {
257        self.errors.iter().any(|d| d.rule == rule)
258    }
259}
260
261/// A single validation diagnostic.
262#[derive(Debug, Clone, Serialize, Deserialize)]
263pub struct ValidationDiagnostic {
264    /// Which validation rule was violated.
265    pub rule: ValidationRule,
266    /// The node that caused the error, if applicable.
267    pub node_id: Option<NodeId>,
268    /// Human-readable description of the error.
269    pub message: String,
270}
271
272/// Identifies which validation rule was violated.
273#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
274#[serde(rename_all = "snake_case")]
275pub enum ValidationRule {
276    DuplicateNodeId,
277    DanglingEdgeSource,
278    DanglingEdgeTarget,
279    CycleDetected,
280    DisconnectedNode,
281    NoNodes,
282    BranchTargetMissing,
283    BranchTargetNotInEdges,
284    MultipleRoots,
285    EmptyConnectorName,
286    SelfLoop,
287    InvalidParamRef,
288}
289
290impl fmt::Display for ValidationRule {
291    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
292        match self {
293            Self::DuplicateNodeId => write!(f, "duplicate_node_id"),
294            Self::DanglingEdgeSource => write!(f, "dangling_edge_source"),
295            Self::DanglingEdgeTarget => write!(f, "dangling_edge_target"),
296            Self::CycleDetected => write!(f, "cycle_detected"),
297            Self::DisconnectedNode => write!(f, "disconnected_node"),
298            Self::NoNodes => write!(f, "no_nodes"),
299            Self::BranchTargetMissing => write!(f, "branch_target_missing"),
300            Self::BranchTargetNotInEdges => write!(f, "branch_target_not_in_edges"),
301            Self::MultipleRoots => write!(f, "multiple_roots"),
302            Self::EmptyConnectorName => write!(f, "empty_connector_name"),
303            Self::SelfLoop => write!(f, "self_loop"),
304            Self::InvalidParamRef => write!(f, "invalid_param_ref"),
305        }
306    }
307}
308
309#[cfg(test)]
310mod tests {
311    use serde_json::json;
312
313    use super::*;
314    use crate::flowspec::*;
315
316    fn connector_node(id: NodeId, name: &str) -> Node {
317        Node {
318            id,
319            label: None,
320            node_type: NodeType::Connector(ConnectorNode {
321                connector: name.into(),
322                params: json!({}),
323                idempotency_config: None,
324            }),
325        }
326    }
327
328    fn branch_node(id: NodeId, then_edge: NodeId, else_edge: Option<NodeId>) -> Node {
329        Node {
330            id,
331            label: None,
332            node_type: NodeType::Branch(BranchNode {
333                condition: BranchCondition::Exists(ParamRef::FlowParam { path: "flag".into() }),
334                then_edge,
335                else_edge,
336            }),
337        }
338    }
339
340    fn edge(from: NodeId, to: NodeId) -> Edge {
341        Edge { from, to, condition: None }
342    }
343
344    fn make_ids(n: usize) -> Vec<NodeId> {
345        (0..n).map(|_| NodeId::new()).collect()
346    }
347
348    #[test]
349    fn valid_linear_flow() {
350        let ids = make_ids(3);
351        let spec = FlowSpec {
352            id: None,
353            name: None,
354            nodes: vec![
355                connector_node(ids[0], "http.request"),
356                connector_node(ids[1], "transform"),
357                connector_node(ids[2], "fs.write"),
358            ],
359            edges: vec![edge(ids[0], ids[1]), edge(ids[1], ids[2])],
360            params: None,
361        };
362        assert!(spec.validate().is_ok());
363    }
364
365    #[test]
366    fn valid_branch_flow() {
367        let ids = make_ids(4);
368        let spec = FlowSpec {
369            id: None,
370            name: None,
371            nodes: vec![
372                connector_node(ids[0], "http.request"),
373                branch_node(ids[1], ids[2], Some(ids[3])),
374                connector_node(ids[2], "fs.write"),
375                connector_node(ids[3], "delay"),
376            ],
377            edges: vec![
378                edge(ids[0], ids[1]),
379                Edge { from: ids[1], to: ids[2], condition: Some(EdgeCondition::BranchTrue) },
380                Edge { from: ids[1], to: ids[3], condition: Some(EdgeCondition::BranchFalse) },
381            ],
382            params: None,
383        };
384        assert!(spec.validate().is_ok());
385    }
386
387    #[test]
388    fn valid_single_node() {
389        let id = NodeId::new();
390        let spec = FlowSpec {
391            id: None,
392            name: None,
393            nodes: vec![connector_node(id, "delay")],
394            edges: vec![],
395            params: None,
396        };
397        assert!(spec.validate().is_ok());
398    }
399
400    #[test]
401    fn reject_no_nodes() {
402        let spec = FlowSpec { id: None, name: None, nodes: vec![], edges: vec![], params: None };
403        let err = spec.validate().unwrap_err();
404        assert!(err.has_rule(ValidationRule::NoNodes));
405    }
406
407    #[test]
408    fn reject_duplicate_node_id() {
409        let id = NodeId::new();
410        let spec = FlowSpec {
411            id: None,
412            name: None,
413            nodes: vec![connector_node(id, "a"), connector_node(id, "b")],
414            edges: vec![],
415            params: None,
416        };
417        let err = spec.validate().unwrap_err();
418        assert!(err.has_rule(ValidationRule::DuplicateNodeId));
419    }
420
421    #[test]
422    fn reject_dangling_source() {
423        let ids = make_ids(2);
424        let phantom = NodeId::new();
425        let spec = FlowSpec {
426            id: None,
427            name: None,
428            nodes: vec![connector_node(ids[0], "a"), connector_node(ids[1], "b")],
429            edges: vec![edge(phantom, ids[1])],
430            params: None,
431        };
432        let err = spec.validate().unwrap_err();
433        assert!(err.has_rule(ValidationRule::DanglingEdgeSource));
434    }
435
436    #[test]
437    fn reject_dangling_target() {
438        let ids = make_ids(2);
439        let phantom = NodeId::new();
440        let spec = FlowSpec {
441            id: None,
442            name: None,
443            nodes: vec![connector_node(ids[0], "a"), connector_node(ids[1], "b")],
444            edges: vec![edge(ids[0], phantom)],
445            params: None,
446        };
447        let err = spec.validate().unwrap_err();
448        assert!(err.has_rule(ValidationRule::DanglingEdgeTarget));
449    }
450
451    #[test]
452    fn reject_cycle_simple() {
453        let ids = make_ids(2);
454        let spec = FlowSpec {
455            id: None,
456            name: None,
457            nodes: vec![connector_node(ids[0], "a"), connector_node(ids[1], "b")],
458            edges: vec![edge(ids[0], ids[1]), edge(ids[1], ids[0])],
459            params: None,
460        };
461        let err = spec.validate().unwrap_err();
462        assert!(err.has_rule(ValidationRule::CycleDetected));
463    }
464
465    #[test]
466    fn reject_cycle_complex() {
467        let ids = make_ids(3);
468        let spec = FlowSpec {
469            id: None,
470            name: None,
471            nodes: vec![
472                connector_node(ids[0], "a"),
473                connector_node(ids[1], "b"),
474                connector_node(ids[2], "c"),
475            ],
476            edges: vec![edge(ids[0], ids[1]), edge(ids[1], ids[2]), edge(ids[2], ids[0])],
477            params: None,
478        };
479        let err = spec.validate().unwrap_err();
480        assert!(err.has_rule(ValidationRule::CycleDetected));
481    }
482
483    #[test]
484    fn reject_self_loop() {
485        let id = NodeId::new();
486        let spec = FlowSpec {
487            id: None,
488            name: None,
489            nodes: vec![connector_node(id, "a")],
490            edges: vec![edge(id, id)],
491            params: None,
492        };
493        let err = spec.validate().unwrap_err();
494        assert!(err.has_rule(ValidationRule::SelfLoop));
495    }
496
497    #[test]
498    fn reject_disconnected_node() {
499        let ids = make_ids(4);
500        // A→B is the main flow (A is the sole root).
501        // C→D→C forms an isolated cycle: both have in-degree > 0 (not roots)
502        // but are unreachable from root A. This triggers DisconnectedNode
503        // without MultipleRoots.
504        let spec = FlowSpec {
505            id: None,
506            name: None,
507            nodes: vec![
508                connector_node(ids[0], "a"),
509                connector_node(ids[1], "b"),
510                connector_node(ids[2], "c"),
511                connector_node(ids[3], "d"),
512            ],
513            edges: vec![edge(ids[0], ids[1]), edge(ids[2], ids[3]), edge(ids[3], ids[2])],
514            params: None,
515        };
516        let err = spec.validate().unwrap_err();
517        assert!(err.has_rule(ValidationRule::DisconnectedNode));
518        assert!(err.has_rule(ValidationRule::CycleDetected));
519        assert!(!err.has_rule(ValidationRule::MultipleRoots));
520    }
521
522    #[test]
523    fn reject_branch_target_missing() {
524        let ids = make_ids(2);
525        let phantom = NodeId::new();
526        let spec = FlowSpec {
527            id: None,
528            name: None,
529            nodes: vec![connector_node(ids[0], "a"), branch_node(ids[1], phantom, None)],
530            edges: vec![edge(ids[0], ids[1]), edge(ids[1], phantom)],
531            params: None,
532        };
533        let err = spec.validate().unwrap_err();
534        assert!(err.has_rule(ValidationRule::BranchTargetMissing));
535    }
536
537    #[test]
538    fn reject_branch_not_in_edges() {
539        let ids = make_ids(3);
540        let spec = FlowSpec {
541            id: None,
542            name: None,
543            nodes: vec![
544                connector_node(ids[0], "a"),
545                branch_node(ids[1], ids[2], None),
546                connector_node(ids[2], "b"),
547            ],
548            // Edge from A→Branch exists, but no edge from Branch→B
549            edges: vec![edge(ids[0], ids[1])],
550            params: None,
551        };
552        let err = spec.validate().unwrap_err();
553        assert!(err.has_rule(ValidationRule::BranchTargetNotInEdges));
554    }
555
556    #[test]
557    fn reject_multiple_roots() {
558        let ids = make_ids(2);
559        let spec = FlowSpec {
560            id: None,
561            name: None,
562            nodes: vec![connector_node(ids[0], "a"), connector_node(ids[1], "b")],
563            edges: vec![],
564            params: None,
565        };
566        let err = spec.validate().unwrap_err();
567        assert!(err.has_rule(ValidationRule::MultipleRoots));
568    }
569
570    #[test]
571    fn reject_empty_connector_name() {
572        let id = NodeId::new();
573        let spec = FlowSpec {
574            id: None,
575            name: None,
576            nodes: vec![connector_node(id, "")],
577            edges: vec![],
578            params: None,
579        };
580        let err = spec.validate().unwrap_err();
581        assert!(err.has_rule(ValidationRule::EmptyConnectorName));
582    }
583
584    #[test]
585    fn reject_invalid_param_ref() {
586        let ids = make_ids(3);
587        let phantom = NodeId::new();
588        let spec = FlowSpec {
589            id: None,
590            name: None,
591            nodes: vec![
592                connector_node(ids[0], "a"),
593                Node {
594                    id: ids[1],
595                    label: None,
596                    node_type: NodeType::Branch(BranchNode {
597                        condition: BranchCondition::Exists(ParamRef::NodeOutput {
598                            node_id: phantom,
599                            path: "status".into(),
600                        }),
601                        then_edge: ids[2],
602                        else_edge: None,
603                    }),
604                },
605                connector_node(ids[2], "b"),
606            ],
607            edges: vec![
608                edge(ids[0], ids[1]),
609                Edge { from: ids[1], to: ids[2], condition: Some(EdgeCondition::BranchTrue) },
610            ],
611            params: None,
612        };
613        let err = spec.validate().unwrap_err();
614        assert!(err.has_rule(ValidationRule::InvalidParamRef));
615    }
616
617    #[test]
618    fn multiple_errors_reported() {
619        let id = NodeId::new();
620        let phantom = NodeId::new();
621        let spec = FlowSpec {
622            id: None,
623            name: None,
624            nodes: vec![connector_node(id, "")], // V-10: empty name
625            edges: vec![
626                edge(id, id),      // V-11: self-loop
627                edge(id, phantom), // V-3: dangling target
628            ],
629            params: None,
630        };
631        let err = spec.validate().unwrap_err();
632        assert!(err.errors.len() >= 3);
633        assert!(err.has_rule(ValidationRule::EmptyConnectorName));
634        assert!(err.has_rule(ValidationRule::SelfLoop));
635        assert!(err.has_rule(ValidationRule::DanglingEdgeTarget));
636    }
637}