use super::{FlowDefinition, NodeDefinition, TriggerDefinition};
use std::collections::HashSet;
pub type ValidationResult = Result<(), Vec<ValidationError>>;
#[derive(Debug, Clone)]
pub struct ValidationError {
pub kind: ValidationErrorKind,
pub location: String,
pub message: String,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ValidationErrorKind {
MissingField,
InvalidValue,
DuplicateId,
InvalidReference,
InvalidTriggerType,
InvalidNodeType,
CycleDetected,
UnreachableNode,
InvalidSelector,
}
impl std::fmt::Display for ValidationError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "[{}] {}: {}", self.kind, self.location, self.message)
}
}
impl std::fmt::Display for ValidationErrorKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let s = match self {
Self::MissingField => "MISSING_FIELD",
Self::InvalidValue => "INVALID_VALUE",
Self::DuplicateId => "DUPLICATE_ID",
Self::InvalidReference => "INVALID_REFERENCE",
Self::InvalidTriggerType => "INVALID_TRIGGER_TYPE",
Self::InvalidNodeType => "INVALID_NODE_TYPE",
Self::CycleDetected => "CYCLE_DETECTED",
Self::UnreachableNode => "UNREACHABLE_NODE",
Self::InvalidSelector => "INVALID_SELECTOR",
};
write!(f, "{}", s)
}
}
impl ValidationError {
pub fn new(
kind: ValidationErrorKind,
location: impl Into<String>,
message: impl Into<String>,
) -> Self {
Self {
kind,
location: location.into(),
message: message.into(),
}
}
pub fn missing_field(location: impl Into<String>, field: &str) -> Self {
Self::new(
ValidationErrorKind::MissingField,
location,
format!("missing required field '{}'", field),
)
}
pub fn invalid_value(location: impl Into<String>, message: impl Into<String>) -> Self {
Self::new(ValidationErrorKind::InvalidValue, location, message)
}
pub fn duplicate_id(location: impl Into<String>, id: &str) -> Self {
Self::new(
ValidationErrorKind::DuplicateId,
location,
format!("duplicate identifier '{}'", id),
)
}
pub fn invalid_reference(location: impl Into<String>, reference: &str) -> Self {
Self::new(
ValidationErrorKind::InvalidReference,
location,
format!("reference to non-existent node '{}'", reference),
)
}
}
pub struct FlowValidator {
errors: Vec<ValidationError>,
}
impl FlowValidator {
pub fn new() -> Self {
Self { errors: Vec::new() }
}
pub fn validate(mut self, flow: &FlowDefinition) -> ValidationResult {
self.validate_metadata(flow);
self.validate_triggers(flow);
self.validate_nodes(flow);
self.validate_edges(flow);
self.validate_references(flow);
if self.errors.is_empty() {
Ok(())
} else {
Err(self.errors)
}
}
fn add_error(&mut self, error: ValidationError) {
self.errors.push(error);
}
fn validate_metadata(&mut self, flow: &FlowDefinition) {
if flow.name.is_empty() {
self.add_error(ValidationError::missing_field("flow", "name"));
}
if let Some(ref version) = flow.version {
if version.is_empty() {
self.add_error(ValidationError::invalid_value(
"flow.version",
"version cannot be empty string",
));
}
}
}
fn validate_triggers(&mut self, flow: &FlowDefinition) {
let mut seen_ids = HashSet::new();
for (idx, trigger) in flow.triggers.iter().enumerate() {
let location = format!("triggers[{}]", idx);
if !seen_ids.insert(&trigger.id) {
self.add_error(ValidationError::duplicate_id(&location, &trigger.id));
}
if trigger.id.is_empty() {
self.add_error(ValidationError::missing_field(&location, "id"));
}
if trigger.parsed_type().is_none() {
self.add_error(ValidationError::new(
ValidationErrorKind::InvalidTriggerType,
&location,
format!("unknown trigger type '{}'", trigger.trigger_type),
));
}
self.validate_trigger_params(trigger, &location);
}
}
fn validate_trigger_params(&mut self, trigger: &TriggerDefinition, location: &str) {
match trigger.trigger_type.as_str() {
"webhook" | "trigger::webhook" => {
if let Some(port) = trigger.get_i64("port") {
if port < 1 || port > 65535 {
self.add_error(ValidationError::invalid_value(
format!("{}.params.port", location),
format!("port must be between 1 and 65535, got {}", port),
));
}
}
}
"cron" | "trigger::cron" => {
if trigger.get_string("schedule").is_none() {
self.add_error(ValidationError::missing_field(
format!("{}.params", location),
"schedule",
));
}
}
"filesystem" | "trigger::filesystem" => {
if trigger.get_string("path").is_none() {
self.add_error(ValidationError::missing_field(
format!("{}.params", location),
"path",
));
}
}
_ => {}
}
}
fn validate_nodes(&mut self, flow: &FlowDefinition) {
let mut seen_ids = HashSet::new();
for (node_id, node) in &flow.nodes {
let location = format!("nodes.{}", node_id);
if !seen_ids.insert(node_id) {
self.add_error(ValidationError::duplicate_id(&location, node_id));
}
for trigger in &flow.triggers {
if &trigger.id == node_id {
self.add_error(ValidationError::new(
ValidationErrorKind::DuplicateId,
&location,
format!("node ID conflicts with trigger ID '{}'", node_id),
));
}
}
if node.node_type.is_empty() {
self.add_error(ValidationError::missing_field(&location, "type"));
}
self.validate_node_config(node, node_id, &location);
}
}
fn validate_node_config(&mut self, node: &NodeDefinition, _node_id: &str, location: &str) {
match node.node_type.as_str() {
"std::switch" => {
if node.get_nested(&["condition"]).is_none()
&& node.get_string("expression").is_none()
{
self.add_error(ValidationError::missing_field(
format!("{}.config", location),
"condition or expression",
));
}
}
"std::loop" => {
if node.get_i64("max_iterations").is_none()
&& node.get_nested(&["condition"]).is_none()
{
self.add_error(ValidationError::missing_field(
format!("{}.config", location),
"max_iterations or condition",
));
}
}
"std::merge" => {
}
"std::aggregate" => {
if node.get_string("operation").is_none() {
self.add_error(ValidationError::missing_field(
format!("{}.config", location),
"operation",
));
}
}
_ => {}
}
}
fn validate_edges(&mut self, flow: &FlowDefinition) {
for (idx, edge) in flow.edges.iter().enumerate() {
let location = format!("edges[{}]", idx);
if edge.from.is_empty() {
self.add_error(ValidationError::missing_field(&location, "from"));
}
if edge.to.is_empty() {
self.add_error(ValidationError::missing_field(&location, "to"));
}
if let Some(ref condition) = edge.condition {
self.validate_selector_syntax(condition, &format!("{}.condition", location));
}
}
}
fn validate_references(&mut self, flow: &FlowDefinition) {
let mut valid_ids: HashSet<&str> = flow.nodes.keys().map(|s| s.as_str()).collect();
for trigger in &flow.triggers {
valid_ids.insert(&trigger.id);
}
for (idx, edge) in flow.edges.iter().enumerate() {
let location = format!("edges[{}]", idx);
let from_node = edge.from_node();
if !valid_ids.contains(from_node) {
self.add_error(ValidationError::invalid_reference(
format!("{}.from", location),
from_node,
));
}
let to_node = edge.to_node();
if !valid_ids.contains(to_node) {
self.add_error(ValidationError::invalid_reference(
format!("{}.to", location),
to_node,
));
}
}
}
fn validate_selector_syntax(&mut self, selector: &str, location: &str) {
let mut in_selector = false;
let mut brace_depth = 0;
for c in selector.chars() {
match c {
'$' => {
}
'{' if in_selector || selector.contains("${") => {
brace_depth += 1;
in_selector = true;
}
'}' if in_selector => {
brace_depth -= 1;
if brace_depth == 0 {
in_selector = false;
}
}
_ => {}
}
}
if brace_depth != 0 {
self.add_error(ValidationError::new(
ValidationErrorKind::InvalidSelector,
location,
"unbalanced braces in selector",
));
}
}
}
impl Default for FlowValidator {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::flow::EdgeDefinition;
use std::collections::HashMap;
fn minimal_flow() -> FlowDefinition {
FlowDefinition {
name: "test".to_string(),
version: Some("1.0".to_string()),
description: None,
triggers: vec![TriggerDefinition::new("webhook", "webhook")],
nodes: HashMap::new(),
edges: vec![],
settings: Default::default(),
}
}
#[test]
fn validate_minimal_flow() {
let flow = minimal_flow();
let result = FlowValidator::new().validate(&flow);
assert!(result.is_ok());
}
#[test]
fn validate_missing_name() {
let mut flow = minimal_flow();
flow.name = String::new();
let result = FlowValidator::new().validate(&flow);
assert!(result.is_err());
let errors = result.unwrap_err();
assert!(
errors
.iter()
.any(|e| e.kind == ValidationErrorKind::MissingField && e.location == "flow")
);
}
#[test]
fn validate_duplicate_trigger_ids() {
let mut flow = minimal_flow();
flow.triggers = vec![
TriggerDefinition::new("dup_id", "webhook"),
TriggerDefinition::new("dup_id", "cron"),
];
let result = FlowValidator::new().validate(&flow);
assert!(result.is_err());
let errors = result.unwrap_err();
assert!(
errors
.iter()
.any(|e| e.kind == ValidationErrorKind::DuplicateId)
);
}
#[test]
fn validate_invalid_trigger_type() {
let mut flow = minimal_flow();
flow.triggers = vec![TriggerDefinition::new("test", "invalid_type")];
let result = FlowValidator::new().validate(&flow);
assert!(result.is_err());
let errors = result.unwrap_err();
assert!(
errors
.iter()
.any(|e| e.kind == ValidationErrorKind::InvalidTriggerType)
);
}
#[test]
fn validate_invalid_edge_reference() {
let mut flow = minimal_flow();
flow.edges = vec![EdgeDefinition::new("nonexistent", "also_nonexistent")];
let result = FlowValidator::new().validate(&flow);
assert!(result.is_err());
let errors = result.unwrap_err();
assert!(
errors
.iter()
.any(|e| e.kind == ValidationErrorKind::InvalidReference)
);
}
#[test]
fn validate_valid_edge_reference() {
let mut flow = minimal_flow();
flow.nodes
.insert("processor".to_string(), NodeDefinition::new("std::log"));
flow.edges = vec![EdgeDefinition::new("webhook", "processor")];
let result = FlowValidator::new().validate(&flow);
assert!(result.is_ok());
}
#[test]
fn validate_cron_requires_schedule() {
let mut flow = minimal_flow();
flow.triggers = vec![TriggerDefinition::new("cron_trigger", "cron")];
let result = FlowValidator::new().validate(&flow);
assert!(result.is_err());
let errors = result.unwrap_err();
assert!(
errors
.iter()
.any(|e| e.kind == ValidationErrorKind::MissingField
&& e.message.contains("schedule"))
);
}
}