1use super::{FlowDefinition, NodeDefinition, TriggerDefinition};
4use std::collections::HashSet;
5
6pub type ValidationResult = Result<(), Vec<ValidationError>>;
8
9#[derive(Debug, Clone)]
11pub struct ValidationError {
12 pub kind: ValidationErrorKind,
14 pub location: String,
16 pub message: String,
18}
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22pub enum ValidationErrorKind {
23 MissingField,
25 InvalidValue,
27 DuplicateId,
29 InvalidReference,
31 InvalidTriggerType,
33 InvalidNodeType,
35 CycleDetected,
37 UnreachableNode,
39 InvalidSelector,
41}
42
43impl std::fmt::Display for ValidationError {
44 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45 write!(f, "[{}] {}: {}", self.kind, self.location, self.message)
46 }
47}
48
49impl std::fmt::Display for ValidationErrorKind {
50 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51 let s = match self {
52 Self::MissingField => "MISSING_FIELD",
53 Self::InvalidValue => "INVALID_VALUE",
54 Self::DuplicateId => "DUPLICATE_ID",
55 Self::InvalidReference => "INVALID_REFERENCE",
56 Self::InvalidTriggerType => "INVALID_TRIGGER_TYPE",
57 Self::InvalidNodeType => "INVALID_NODE_TYPE",
58 Self::CycleDetected => "CYCLE_DETECTED",
59 Self::UnreachableNode => "UNREACHABLE_NODE",
60 Self::InvalidSelector => "INVALID_SELECTOR",
61 };
62 write!(f, "{}", s)
63 }
64}
65
66impl ValidationError {
67 pub fn new(
69 kind: ValidationErrorKind,
70 location: impl Into<String>,
71 message: impl Into<String>,
72 ) -> Self {
73 Self {
74 kind,
75 location: location.into(),
76 message: message.into(),
77 }
78 }
79
80 pub fn missing_field(location: impl Into<String>, field: &str) -> Self {
82 Self::new(
83 ValidationErrorKind::MissingField,
84 location,
85 format!("missing required field '{}'", field),
86 )
87 }
88
89 pub fn invalid_value(location: impl Into<String>, message: impl Into<String>) -> Self {
91 Self::new(ValidationErrorKind::InvalidValue, location, message)
92 }
93
94 pub fn duplicate_id(location: impl Into<String>, id: &str) -> Self {
96 Self::new(
97 ValidationErrorKind::DuplicateId,
98 location,
99 format!("duplicate identifier '{}'", id),
100 )
101 }
102
103 pub fn invalid_reference(location: impl Into<String>, reference: &str) -> Self {
105 Self::new(
106 ValidationErrorKind::InvalidReference,
107 location,
108 format!("reference to non-existent node '{}'", reference),
109 )
110 }
111}
112
113pub struct FlowValidator {
115 errors: Vec<ValidationError>,
116}
117
118impl FlowValidator {
119 pub fn new() -> Self {
121 Self { errors: Vec::new() }
122 }
123
124 pub fn validate(mut self, flow: &FlowDefinition) -> ValidationResult {
126 self.validate_metadata(flow);
127 self.validate_triggers(flow);
128 self.validate_nodes(flow);
129 self.validate_edges(flow);
130 self.validate_references(flow);
131
132 if self.errors.is_empty() {
133 Ok(())
134 } else {
135 Err(self.errors)
136 }
137 }
138
139 fn add_error(&mut self, error: ValidationError) {
140 self.errors.push(error);
141 }
142
143 fn validate_metadata(&mut self, flow: &FlowDefinition) {
144 if flow.name.is_empty() {
145 self.add_error(ValidationError::missing_field("flow", "name"));
146 }
147
148 if let Some(ref version) = flow.version {
149 if version.is_empty() {
150 self.add_error(ValidationError::invalid_value(
151 "flow.version",
152 "version cannot be empty string",
153 ));
154 }
155 }
156 }
157
158 fn validate_triggers(&mut self, flow: &FlowDefinition) {
159 let mut seen_ids = HashSet::new();
160
161 for (idx, trigger) in flow.triggers.iter().enumerate() {
162 let location = format!("triggers[{}]", idx);
163
164 if !seen_ids.insert(&trigger.id) {
166 self.add_error(ValidationError::duplicate_id(&location, &trigger.id));
167 }
168
169 if trigger.id.is_empty() {
171 self.add_error(ValidationError::missing_field(&location, "id"));
172 }
173
174 if trigger.parsed_type().is_none() {
176 self.add_error(ValidationError::new(
177 ValidationErrorKind::InvalidTriggerType,
178 &location,
179 format!("unknown trigger type '{}'", trigger.trigger_type),
180 ));
181 }
182
183 self.validate_trigger_params(trigger, &location);
185 }
186 }
187
188 fn validate_trigger_params(&mut self, trigger: &TriggerDefinition, location: &str) {
189 match trigger.trigger_type.as_str() {
190 "webhook" | "trigger::webhook" => {
191 if let Some(port) = trigger.get_i64("port") {
193 if port < 1 || port > 65535 {
194 self.add_error(ValidationError::invalid_value(
195 format!("{}.params.port", location),
196 format!("port must be between 1 and 65535, got {}", port),
197 ));
198 }
199 }
200 }
201 "cron" | "trigger::cron" => {
202 if trigger.get_string("schedule").is_none() {
204 self.add_error(ValidationError::missing_field(
205 format!("{}.params", location),
206 "schedule",
207 ));
208 }
209 }
210 "filesystem" | "trigger::filesystem" => {
211 if trigger.get_string("path").is_none() {
213 self.add_error(ValidationError::missing_field(
214 format!("{}.params", location),
215 "path",
216 ));
217 }
218 }
219 _ => {}
220 }
221 }
222
223 fn validate_nodes(&mut self, flow: &FlowDefinition) {
224 let mut seen_ids = HashSet::new();
225
226 for (node_id, node) in &flow.nodes {
227 let location = format!("nodes.{}", node_id);
228
229 if !seen_ids.insert(node_id) {
231 self.add_error(ValidationError::duplicate_id(&location, node_id));
232 }
233
234 for trigger in &flow.triggers {
236 if &trigger.id == node_id {
237 self.add_error(ValidationError::new(
238 ValidationErrorKind::DuplicateId,
239 &location,
240 format!("node ID conflicts with trigger ID '{}'", node_id),
241 ));
242 }
243 }
244
245 if node.node_type.is_empty() {
247 self.add_error(ValidationError::missing_field(&location, "type"));
248 }
249
250 self.validate_node_config(node, node_id, &location);
252 }
253 }
254
255 fn validate_node_config(&mut self, node: &NodeDefinition, _node_id: &str, location: &str) {
256 match node.node_type.as_str() {
257 "std::switch" => {
258 if node.get_nested(&["condition"]).is_none()
260 && node.get_string("expression").is_none()
261 {
262 self.add_error(ValidationError::missing_field(
263 format!("{}.config", location),
264 "condition or expression",
265 ));
266 }
267 }
268 "std::loop" => {
269 if node.get_i64("max_iterations").is_none()
271 && node.get_nested(&["condition"]).is_none()
272 {
273 self.add_error(ValidationError::missing_field(
274 format!("{}.config", location),
275 "max_iterations or condition",
276 ));
277 }
278 }
279 "std::merge" => {
280 }
282 "std::aggregate" => {
283 if node.get_string("operation").is_none() {
285 self.add_error(ValidationError::missing_field(
286 format!("{}.config", location),
287 "operation",
288 ));
289 }
290 }
291 _ => {}
292 }
293 }
294
295 fn validate_edges(&mut self, flow: &FlowDefinition) {
296 for (idx, edge) in flow.edges.iter().enumerate() {
297 let location = format!("edges[{}]", idx);
298
299 if edge.from.is_empty() {
301 self.add_error(ValidationError::missing_field(&location, "from"));
302 }
303
304 if edge.to.is_empty() {
306 self.add_error(ValidationError::missing_field(&location, "to"));
307 }
308
309 if let Some(ref condition) = edge.condition {
311 self.validate_selector_syntax(condition, &format!("{}.condition", location));
312 }
313 }
314 }
315
316 fn validate_references(&mut self, flow: &FlowDefinition) {
317 let mut valid_ids: HashSet<&str> = flow.nodes.keys().map(|s| s.as_str()).collect();
319
320 for trigger in &flow.triggers {
322 valid_ids.insert(&trigger.id);
323 }
324
325 for (idx, edge) in flow.edges.iter().enumerate() {
327 let location = format!("edges[{}]", idx);
328
329 let from_node = edge.from_node();
330 if !valid_ids.contains(from_node) {
331 self.add_error(ValidationError::invalid_reference(
332 format!("{}.from", location),
333 from_node,
334 ));
335 }
336
337 let to_node = edge.to_node();
338 if !valid_ids.contains(to_node) {
339 self.add_error(ValidationError::invalid_reference(
340 format!("{}.to", location),
341 to_node,
342 ));
343 }
344 }
345 }
346
347 fn validate_selector_syntax(&mut self, selector: &str, location: &str) {
348 let mut in_selector = false;
351 let mut brace_depth = 0;
352
353 for c in selector.chars() {
354 match c {
355 '$' => {
356 }
358 '{' if in_selector || selector.contains("${") => {
359 brace_depth += 1;
360 in_selector = true;
361 }
362 '}' if in_selector => {
363 brace_depth -= 1;
364 if brace_depth == 0 {
365 in_selector = false;
366 }
367 }
368 _ => {}
369 }
370 }
371
372 if brace_depth != 0 {
373 self.add_error(ValidationError::new(
374 ValidationErrorKind::InvalidSelector,
375 location,
376 "unbalanced braces in selector",
377 ));
378 }
379 }
380}
381
382impl Default for FlowValidator {
383 fn default() -> Self {
384 Self::new()
385 }
386}
387
388#[cfg(test)]
389mod tests {
390 use super::*;
391 use crate::flow::EdgeDefinition;
392 use std::collections::HashMap;
393
394 fn minimal_flow() -> FlowDefinition {
395 FlowDefinition {
396 name: "test".to_string(),
397 version: Some("1.0".to_string()),
398 description: None,
399 triggers: vec![TriggerDefinition::new("webhook", "webhook")],
400 nodes: HashMap::new(),
401 edges: vec![],
402 settings: Default::default(),
403 }
404 }
405
406 #[test]
407 fn validate_minimal_flow() {
408 let flow = minimal_flow();
409 let result = FlowValidator::new().validate(&flow);
410 assert!(result.is_ok());
411 }
412
413 #[test]
414 fn validate_missing_name() {
415 let mut flow = minimal_flow();
416 flow.name = String::new();
417
418 let result = FlowValidator::new().validate(&flow);
419 assert!(result.is_err());
420 let errors = result.unwrap_err();
421 assert!(
422 errors
423 .iter()
424 .any(|e| e.kind == ValidationErrorKind::MissingField && e.location == "flow")
425 );
426 }
427
428 #[test]
429 fn validate_duplicate_trigger_ids() {
430 let mut flow = minimal_flow();
431 flow.triggers = vec![
432 TriggerDefinition::new("dup_id", "webhook"),
433 TriggerDefinition::new("dup_id", "cron"),
434 ];
435
436 let result = FlowValidator::new().validate(&flow);
437 assert!(result.is_err());
438 let errors = result.unwrap_err();
439 assert!(
440 errors
441 .iter()
442 .any(|e| e.kind == ValidationErrorKind::DuplicateId)
443 );
444 }
445
446 #[test]
447 fn validate_invalid_trigger_type() {
448 let mut flow = minimal_flow();
449 flow.triggers = vec![TriggerDefinition::new("test", "invalid_type")];
450
451 let result = FlowValidator::new().validate(&flow);
452 assert!(result.is_err());
453 let errors = result.unwrap_err();
454 assert!(
455 errors
456 .iter()
457 .any(|e| e.kind == ValidationErrorKind::InvalidTriggerType)
458 );
459 }
460
461 #[test]
462 fn validate_invalid_edge_reference() {
463 let mut flow = minimal_flow();
464 flow.edges = vec![EdgeDefinition::new("nonexistent", "also_nonexistent")];
465
466 let result = FlowValidator::new().validate(&flow);
467 assert!(result.is_err());
468 let errors = result.unwrap_err();
469 assert!(
470 errors
471 .iter()
472 .any(|e| e.kind == ValidationErrorKind::InvalidReference)
473 );
474 }
475
476 #[test]
477 fn validate_valid_edge_reference() {
478 let mut flow = minimal_flow();
479 flow.nodes
480 .insert("processor".to_string(), NodeDefinition::new("std::log"));
481 flow.edges = vec![EdgeDefinition::new("webhook", "processor")];
482
483 let result = FlowValidator::new().validate(&flow);
484 assert!(result.is_ok());
485 }
486
487 #[test]
488 fn validate_cron_requires_schedule() {
489 let mut flow = minimal_flow();
490 flow.triggers = vec![TriggerDefinition::new("cron_trigger", "cron")];
491
492 let result = FlowValidator::new().validate(&flow);
493 assert!(result.is_err());
494 let errors = result.unwrap_err();
495 assert!(
496 errors
497 .iter()
498 .any(|e| e.kind == ValidationErrorKind::MissingField
499 && e.message.contains("schedule"))
500 );
501 }
502}