1use std::collections::{HashMap, HashSet, VecDeque};
7use std::fmt;
8
9use serde::{Deserialize, Serialize};
10
11use crate::flowspec::branch::{BranchCondition, ParamRef};
12use crate::flowspec::topo;
13use crate::flowspec::{FlowSpec, NodeType};
14use crate::id::NodeId;
15
16pub fn validate(spec: &FlowSpec) -> Result<(), ValidationError> {
18 let mut diagnostics = Vec::new();
19
20 if spec.nodes.is_empty() {
22 diagnostics.push(ValidationDiagnostic {
23 rule: ValidationRule::NoNodes,
24 node_id: None,
25 message: "FlowSpec must have at least one node".into(),
26 });
27 return Err(ValidationError { errors: diagnostics });
28 }
29
30 let mut node_index: HashMap<NodeId, usize> = HashMap::new();
32 for (i, node) in spec.nodes.iter().enumerate() {
33 if let Some(_prev) = node_index.insert(node.id, i) {
34 diagnostics.push(ValidationDiagnostic {
35 rule: ValidationRule::DuplicateNodeId,
36 node_id: Some(node.id),
37 message: format!("Duplicate node ID: {}", node.id),
38 });
39 }
40 }
41
42 for node in &spec.nodes {
44 if let NodeType::Connector(ref c) = node.node_type {
45 if c.connector.is_empty() {
46 diagnostics.push(ValidationDiagnostic {
47 rule: ValidationRule::EmptyConnectorName,
48 node_id: Some(node.id),
49 message: format!("Node {} has an empty connector name", node.id),
50 });
51 }
52 }
53 }
54
55 for edge in &spec.edges {
57 if !node_index.contains_key(&edge.from) {
58 diagnostics.push(ValidationDiagnostic {
59 rule: ValidationRule::DanglingEdgeSource,
60 node_id: Some(edge.from),
61 message: format!("Edge source {} does not reference an existing node", edge.from),
62 });
63 }
64 if !node_index.contains_key(&edge.to) {
65 diagnostics.push(ValidationDiagnostic {
66 rule: ValidationRule::DanglingEdgeTarget,
67 node_id: Some(edge.to),
68 message: format!("Edge target {} does not reference an existing node", edge.to),
69 });
70 }
71 if edge.from == edge.to {
72 diagnostics.push(ValidationDiagnostic {
73 rule: ValidationRule::SelfLoop,
74 node_id: Some(edge.from),
75 message: format!("Edge creates a self-loop on node {}", edge.from),
76 });
77 }
78 }
79
80 let edge_set: HashSet<(NodeId, NodeId)> = spec.edges.iter().map(|e| (e.from, e.to)).collect();
82
83 for node in &spec.nodes {
84 if let NodeType::Branch(ref branch) = node.node_type {
85 if !node_index.contains_key(&branch.then_edge) {
87 diagnostics.push(ValidationDiagnostic {
88 rule: ValidationRule::BranchTargetMissing,
89 node_id: Some(node.id),
90 message: format!(
91 "Branch node {} then_edge references nonexistent node {}",
92 node.id, branch.then_edge
93 ),
94 });
95 } else {
96 if !edge_set.contains(&(node.id, branch.then_edge)) {
98 diagnostics.push(ValidationDiagnostic {
99 rule: ValidationRule::BranchTargetNotInEdges,
100 node_id: Some(node.id),
101 message: format!(
102 "Branch node {} then_edge {} has no corresponding edge",
103 node.id, branch.then_edge
104 ),
105 });
106 }
107 }
108
109 if let Some(else_target) = branch.else_edge {
110 if !node_index.contains_key(&else_target) {
111 diagnostics.push(ValidationDiagnostic {
112 rule: ValidationRule::BranchTargetMissing,
113 node_id: Some(node.id),
114 message: format!(
115 "Branch node {} else_edge references nonexistent node {}",
116 node.id, else_target
117 ),
118 });
119 } else if !edge_set.contains(&(node.id, else_target)) {
120 diagnostics.push(ValidationDiagnostic {
121 rule: ValidationRule::BranchTargetNotInEdges,
122 node_id: Some(node.id),
123 message: format!(
124 "Branch node {} else_edge {} has no corresponding edge",
125 node.id, else_target
126 ),
127 });
128 }
129 }
130 }
131 }
132
133 for node in &spec.nodes {
135 if let NodeType::Branch(ref branch) = node.node_type {
136 let refs = match &branch.condition {
137 BranchCondition::Exists(param_ref) => vec![param_ref],
138 BranchCondition::Equals { left, .. } => vec![left],
139 BranchCondition::Expression(_) => vec![],
140 };
141 for param_ref in refs {
142 if let ParamRef::NodeOutput { node_id, .. } = param_ref {
143 if !node_index.contains_key(node_id) {
144 diagnostics.push(ValidationDiagnostic {
145 rule: ValidationRule::InvalidParamRef,
146 node_id: Some(node.id),
147 message: format!(
148 "Branch node {} references nonexistent node {} in condition",
149 node.id, node_id
150 ),
151 });
152 }
153 }
154 }
155 }
156 }
157
158 let mut in_degree: HashMap<NodeId, usize> = HashMap::new();
160 let mut successors: HashMap<NodeId, Vec<NodeId>> = HashMap::new();
161 for node in &spec.nodes {
162 in_degree.entry(node.id).or_insert(0);
163 successors.entry(node.id).or_default();
164 }
165 for edge in &spec.edges {
166 if node_index.contains_key(&edge.from) && node_index.contains_key(&edge.to) {
167 *in_degree.entry(edge.to).or_insert(0) += 1;
168 successors.entry(edge.from).or_default().push(edge.to);
169 }
170 }
171
172 let roots: Vec<NodeId> =
174 in_degree.iter().filter(|(_, °)| deg == 0).map(|(&id, _)| id).collect();
175
176 if roots.len() > 1 {
177 diagnostics.push(ValidationDiagnostic {
178 rule: ValidationRule::MultipleRoots,
179 node_id: None,
180 message: format!(
181 "FlowSpec has {} root nodes (expected at most 1): {}",
182 roots.len(),
183 roots.iter().map(|id| id.to_string()).collect::<Vec<_>>().join(", ")
184 ),
185 });
186 }
187
188 if let Err(remaining) = topo::topological_sort(spec) {
190 diagnostics.push(ValidationDiagnostic {
191 rule: ValidationRule::CycleDetected,
192 node_id: None,
193 message: format!(
194 "FlowSpec contains a cycle ({} of {} nodes could not be topologically sorted)",
195 remaining.len(),
196 spec.nodes.len()
197 ),
198 });
199 }
200
201 if !roots.is_empty() {
204 let mut reachable: HashSet<NodeId> = HashSet::new();
205 let mut bfs_queue: VecDeque<NodeId> = roots.iter().copied().collect();
206 while let Some(node_id) = bfs_queue.pop_front() {
207 if reachable.insert(node_id) {
208 if let Some(succs) = successors.get(&node_id) {
209 for &succ in succs {
210 if !reachable.contains(&succ) {
211 bfs_queue.push_back(succ);
212 }
213 }
214 }
215 }
216 }
217
218 for node in &spec.nodes {
219 if !reachable.contains(&node.id) {
220 diagnostics.push(ValidationDiagnostic {
221 rule: ValidationRule::DisconnectedNode,
222 node_id: Some(node.id),
223 message: format!("Node {} is not reachable from any root", node.id),
224 });
225 }
226 }
227 }
228
229 if diagnostics.is_empty() {
230 Ok(())
231 } else {
232 Err(ValidationError { errors: diagnostics })
233 }
234}
235
236#[derive(Debug, Clone, Serialize, Deserialize)]
238pub struct ValidationError {
239 pub errors: Vec<ValidationDiagnostic>,
240}
241
242impl fmt::Display for ValidationError {
243 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
244 writeln!(f, "FlowSpec validation failed with {} error(s):", self.errors.len())?;
245 for diag in &self.errors {
246 writeln!(f, " - [{}] {}", diag.rule, diag.message)?;
247 }
248 Ok(())
249 }
250}
251
252impl std::error::Error for ValidationError {}
253
254impl ValidationError {
255 pub fn has_rule(&self, rule: ValidationRule) -> bool {
257 self.errors.iter().any(|d| d.rule == rule)
258 }
259}
260
261#[derive(Debug, Clone, Serialize, Deserialize)]
263pub struct ValidationDiagnostic {
264 pub rule: ValidationRule,
266 pub node_id: Option<NodeId>,
268 pub message: String,
270}
271
272#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
274#[serde(rename_all = "snake_case")]
275pub enum ValidationRule {
276 DuplicateNodeId,
277 DanglingEdgeSource,
278 DanglingEdgeTarget,
279 CycleDetected,
280 DisconnectedNode,
281 NoNodes,
282 BranchTargetMissing,
283 BranchTargetNotInEdges,
284 MultipleRoots,
285 EmptyConnectorName,
286 SelfLoop,
287 InvalidParamRef,
288}
289
290impl fmt::Display for ValidationRule {
291 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
292 match self {
293 Self::DuplicateNodeId => write!(f, "duplicate_node_id"),
294 Self::DanglingEdgeSource => write!(f, "dangling_edge_source"),
295 Self::DanglingEdgeTarget => write!(f, "dangling_edge_target"),
296 Self::CycleDetected => write!(f, "cycle_detected"),
297 Self::DisconnectedNode => write!(f, "disconnected_node"),
298 Self::NoNodes => write!(f, "no_nodes"),
299 Self::BranchTargetMissing => write!(f, "branch_target_missing"),
300 Self::BranchTargetNotInEdges => write!(f, "branch_target_not_in_edges"),
301 Self::MultipleRoots => write!(f, "multiple_roots"),
302 Self::EmptyConnectorName => write!(f, "empty_connector_name"),
303 Self::SelfLoop => write!(f, "self_loop"),
304 Self::InvalidParamRef => write!(f, "invalid_param_ref"),
305 }
306 }
307}
308
309#[cfg(test)]
310mod tests {
311 use serde_json::json;
312
313 use super::*;
314 use crate::flowspec::*;
315
316 fn connector_node(id: NodeId, name: &str) -> Node {
317 Node {
318 id,
319 label: None,
320 node_type: NodeType::Connector(ConnectorNode {
321 connector: name.into(),
322 params: json!({}),
323 idempotency_config: None,
324 }),
325 }
326 }
327
328 fn branch_node(id: NodeId, then_edge: NodeId, else_edge: Option<NodeId>) -> Node {
329 Node {
330 id,
331 label: None,
332 node_type: NodeType::Branch(BranchNode {
333 condition: BranchCondition::Exists(ParamRef::FlowParam { path: "flag".into() }),
334 then_edge,
335 else_edge,
336 }),
337 }
338 }
339
340 fn edge(from: NodeId, to: NodeId) -> Edge {
341 Edge { from, to, condition: None }
342 }
343
344 fn make_ids(n: usize) -> Vec<NodeId> {
345 (0..n).map(|_| NodeId::new()).collect()
346 }
347
348 #[test]
349 fn valid_linear_flow() {
350 let ids = make_ids(3);
351 let spec = FlowSpec {
352 id: None,
353 name: None,
354 nodes: vec![
355 connector_node(ids[0], "http.request"),
356 connector_node(ids[1], "transform"),
357 connector_node(ids[2], "fs.write"),
358 ],
359 edges: vec![edge(ids[0], ids[1]), edge(ids[1], ids[2])],
360 params: None,
361 };
362 assert!(spec.validate().is_ok());
363 }
364
365 #[test]
366 fn valid_branch_flow() {
367 let ids = make_ids(4);
368 let spec = FlowSpec {
369 id: None,
370 name: None,
371 nodes: vec![
372 connector_node(ids[0], "http.request"),
373 branch_node(ids[1], ids[2], Some(ids[3])),
374 connector_node(ids[2], "fs.write"),
375 connector_node(ids[3], "delay"),
376 ],
377 edges: vec![
378 edge(ids[0], ids[1]),
379 Edge { from: ids[1], to: ids[2], condition: Some(EdgeCondition::BranchTrue) },
380 Edge { from: ids[1], to: ids[3], condition: Some(EdgeCondition::BranchFalse) },
381 ],
382 params: None,
383 };
384 assert!(spec.validate().is_ok());
385 }
386
387 #[test]
388 fn valid_single_node() {
389 let id = NodeId::new();
390 let spec = FlowSpec {
391 id: None,
392 name: None,
393 nodes: vec![connector_node(id, "delay")],
394 edges: vec![],
395 params: None,
396 };
397 assert!(spec.validate().is_ok());
398 }
399
400 #[test]
401 fn reject_no_nodes() {
402 let spec = FlowSpec { id: None, name: None, nodes: vec![], edges: vec![], params: None };
403 let err = spec.validate().unwrap_err();
404 assert!(err.has_rule(ValidationRule::NoNodes));
405 }
406
407 #[test]
408 fn reject_duplicate_node_id() {
409 let id = NodeId::new();
410 let spec = FlowSpec {
411 id: None,
412 name: None,
413 nodes: vec![connector_node(id, "a"), connector_node(id, "b")],
414 edges: vec![],
415 params: None,
416 };
417 let err = spec.validate().unwrap_err();
418 assert!(err.has_rule(ValidationRule::DuplicateNodeId));
419 }
420
421 #[test]
422 fn reject_dangling_source() {
423 let ids = make_ids(2);
424 let phantom = NodeId::new();
425 let spec = FlowSpec {
426 id: None,
427 name: None,
428 nodes: vec![connector_node(ids[0], "a"), connector_node(ids[1], "b")],
429 edges: vec![edge(phantom, ids[1])],
430 params: None,
431 };
432 let err = spec.validate().unwrap_err();
433 assert!(err.has_rule(ValidationRule::DanglingEdgeSource));
434 }
435
436 #[test]
437 fn reject_dangling_target() {
438 let ids = make_ids(2);
439 let phantom = NodeId::new();
440 let spec = FlowSpec {
441 id: None,
442 name: None,
443 nodes: vec![connector_node(ids[0], "a"), connector_node(ids[1], "b")],
444 edges: vec![edge(ids[0], phantom)],
445 params: None,
446 };
447 let err = spec.validate().unwrap_err();
448 assert!(err.has_rule(ValidationRule::DanglingEdgeTarget));
449 }
450
451 #[test]
452 fn reject_cycle_simple() {
453 let ids = make_ids(2);
454 let spec = FlowSpec {
455 id: None,
456 name: None,
457 nodes: vec![connector_node(ids[0], "a"), connector_node(ids[1], "b")],
458 edges: vec![edge(ids[0], ids[1]), edge(ids[1], ids[0])],
459 params: None,
460 };
461 let err = spec.validate().unwrap_err();
462 assert!(err.has_rule(ValidationRule::CycleDetected));
463 }
464
465 #[test]
466 fn reject_cycle_complex() {
467 let ids = make_ids(3);
468 let spec = FlowSpec {
469 id: None,
470 name: None,
471 nodes: vec![
472 connector_node(ids[0], "a"),
473 connector_node(ids[1], "b"),
474 connector_node(ids[2], "c"),
475 ],
476 edges: vec![edge(ids[0], ids[1]), edge(ids[1], ids[2]), edge(ids[2], ids[0])],
477 params: None,
478 };
479 let err = spec.validate().unwrap_err();
480 assert!(err.has_rule(ValidationRule::CycleDetected));
481 }
482
483 #[test]
484 fn reject_self_loop() {
485 let id = NodeId::new();
486 let spec = FlowSpec {
487 id: None,
488 name: None,
489 nodes: vec![connector_node(id, "a")],
490 edges: vec![edge(id, id)],
491 params: None,
492 };
493 let err = spec.validate().unwrap_err();
494 assert!(err.has_rule(ValidationRule::SelfLoop));
495 }
496
497 #[test]
498 fn reject_disconnected_node() {
499 let ids = make_ids(4);
500 let spec = FlowSpec {
505 id: None,
506 name: None,
507 nodes: vec![
508 connector_node(ids[0], "a"),
509 connector_node(ids[1], "b"),
510 connector_node(ids[2], "c"),
511 connector_node(ids[3], "d"),
512 ],
513 edges: vec![edge(ids[0], ids[1]), edge(ids[2], ids[3]), edge(ids[3], ids[2])],
514 params: None,
515 };
516 let err = spec.validate().unwrap_err();
517 assert!(err.has_rule(ValidationRule::DisconnectedNode));
518 assert!(err.has_rule(ValidationRule::CycleDetected));
519 assert!(!err.has_rule(ValidationRule::MultipleRoots));
520 }
521
522 #[test]
523 fn reject_branch_target_missing() {
524 let ids = make_ids(2);
525 let phantom = NodeId::new();
526 let spec = FlowSpec {
527 id: None,
528 name: None,
529 nodes: vec![connector_node(ids[0], "a"), branch_node(ids[1], phantom, None)],
530 edges: vec![edge(ids[0], ids[1]), edge(ids[1], phantom)],
531 params: None,
532 };
533 let err = spec.validate().unwrap_err();
534 assert!(err.has_rule(ValidationRule::BranchTargetMissing));
535 }
536
537 #[test]
538 fn reject_branch_not_in_edges() {
539 let ids = make_ids(3);
540 let spec = FlowSpec {
541 id: None,
542 name: None,
543 nodes: vec![
544 connector_node(ids[0], "a"),
545 branch_node(ids[1], ids[2], None),
546 connector_node(ids[2], "b"),
547 ],
548 edges: vec![edge(ids[0], ids[1])],
550 params: None,
551 };
552 let err = spec.validate().unwrap_err();
553 assert!(err.has_rule(ValidationRule::BranchTargetNotInEdges));
554 }
555
556 #[test]
557 fn reject_multiple_roots() {
558 let ids = make_ids(2);
559 let spec = FlowSpec {
560 id: None,
561 name: None,
562 nodes: vec![connector_node(ids[0], "a"), connector_node(ids[1], "b")],
563 edges: vec![],
564 params: None,
565 };
566 let err = spec.validate().unwrap_err();
567 assert!(err.has_rule(ValidationRule::MultipleRoots));
568 }
569
570 #[test]
571 fn reject_empty_connector_name() {
572 let id = NodeId::new();
573 let spec = FlowSpec {
574 id: None,
575 name: None,
576 nodes: vec![connector_node(id, "")],
577 edges: vec![],
578 params: None,
579 };
580 let err = spec.validate().unwrap_err();
581 assert!(err.has_rule(ValidationRule::EmptyConnectorName));
582 }
583
584 #[test]
585 fn reject_invalid_param_ref() {
586 let ids = make_ids(3);
587 let phantom = NodeId::new();
588 let spec = FlowSpec {
589 id: None,
590 name: None,
591 nodes: vec![
592 connector_node(ids[0], "a"),
593 Node {
594 id: ids[1],
595 label: None,
596 node_type: NodeType::Branch(BranchNode {
597 condition: BranchCondition::Exists(ParamRef::NodeOutput {
598 node_id: phantom,
599 path: "status".into(),
600 }),
601 then_edge: ids[2],
602 else_edge: None,
603 }),
604 },
605 connector_node(ids[2], "b"),
606 ],
607 edges: vec![
608 edge(ids[0], ids[1]),
609 Edge { from: ids[1], to: ids[2], condition: Some(EdgeCondition::BranchTrue) },
610 ],
611 params: None,
612 };
613 let err = spec.validate().unwrap_err();
614 assert!(err.has_rule(ValidationRule::InvalidParamRef));
615 }
616
617 #[test]
618 fn multiple_errors_reported() {
619 let id = NodeId::new();
620 let phantom = NodeId::new();
621 let spec = FlowSpec {
622 id: None,
623 name: None,
624 nodes: vec![connector_node(id, "")], edges: vec![
626 edge(id, id), edge(id, phantom), ],
629 params: None,
630 };
631 let err = spec.validate().unwrap_err();
632 assert!(err.errors.len() >= 3);
633 assert!(err.has_rule(ValidationRule::EmptyConnectorName));
634 assert!(err.has_rule(ValidationRule::SelfLoop));
635 assert!(err.has_rule(ValidationRule::DanglingEdgeTarget));
636 }
637}