use super::validation::{FlowValidator, ValidationResult};
use super::{EdgeDefinition, FlowSettings, NodeDefinition, TriggerDefinition};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FlowDefinition {
pub name: String,
#[serde(default)]
pub version: Option<String>,
#[serde(default)]
pub description: Option<String>,
#[serde(default)]
pub triggers: Vec<TriggerDefinition>,
#[serde(default)]
pub nodes: HashMap<String, NodeDefinition>,
#[serde(default)]
pub edges: Vec<EdgeDefinition>,
#[serde(default)]
pub settings: FlowSettings,
}
impl FlowDefinition {
pub fn new(name: impl Into<String>) -> Self {
Self {
name: name.into(),
version: Some("1.0".to_string()),
description: None,
triggers: Vec::new(),
nodes: HashMap::new(),
edges: Vec::new(),
settings: FlowSettings::default(),
}
}
pub fn from_yaml(yaml: &str) -> Result<Self, serde_yaml::Error> {
serde_yaml::from_str(yaml)
}
pub fn from_file(path: &std::path::Path) -> Result<Self, FlowLoadError> {
let content = std::fs::read_to_string(path).map_err(|e| FlowLoadError::Io {
path: path.to_path_buf(),
source: e,
})?;
Self::from_yaml(&content).map_err(|e| FlowLoadError::Parse {
path: path.to_path_buf(),
source: e,
})
}
pub fn to_yaml(&self) -> Result<String, serde_yaml::Error> {
serde_yaml::to_string(self)
}
pub fn validate(&self) -> ValidationResult {
FlowValidator::new().validate(self)
}
pub fn from_yaml_validated(yaml: &str) -> Result<Self, FlowLoadError> {
let flow = Self::from_yaml(yaml).map_err(|e| FlowLoadError::ParseString { source: e })?;
flow.validate()
.map_err(|errors| FlowLoadError::Validation { errors })?;
Ok(flow)
}
pub fn with_version(mut self, version: impl Into<String>) -> Self {
self.version = Some(version.into());
self
}
pub fn with_description(mut self, desc: impl Into<String>) -> Self {
self.description = Some(desc.into());
self
}
pub fn with_trigger(mut self, trigger: TriggerDefinition) -> Self {
self.triggers.push(trigger);
self
}
pub fn with_node(mut self, id: impl Into<String>, node: NodeDefinition) -> Self {
self.nodes.insert(id.into(), node);
self
}
pub fn with_edge(mut self, edge: EdgeDefinition) -> Self {
self.edges.push(edge);
self
}
pub fn with_settings(mut self, settings: FlowSettings) -> Self {
self.settings = settings;
self
}
pub fn effective_version(&self) -> &str {
self.version.as_deref().unwrap_or("1.0")
}
pub fn node_ids(&self) -> impl Iterator<Item = &str> {
self.nodes.keys().map(|s| s.as_str())
}
pub fn trigger_ids(&self) -> impl Iterator<Item = &str> {
self.triggers.iter().map(|t| t.id.as_str())
}
pub fn get_node(&self, id: &str) -> Option<&NodeDefinition> {
self.nodes.get(id)
}
pub fn get_trigger(&self, id: &str) -> Option<&TriggerDefinition> {
self.triggers.iter().find(|t| t.id == id)
}
pub fn has_trigger(&self, id: &str) -> bool {
self.triggers.iter().any(|t| t.id == id)
}
pub fn has_node(&self, id: &str) -> bool {
self.nodes.contains_key(id)
}
pub fn enabled_triggers(&self) -> impl Iterator<Item = &TriggerDefinition> {
self.triggers.iter().filter(|t| t.enabled)
}
pub fn enabled_nodes(&self) -> impl Iterator<Item = (&str, &NodeDefinition)> {
self.nodes
.iter()
.filter(|(_, n)| n.enabled)
.map(|(k, v)| (k.as_str(), v))
}
pub fn edges_from(&self, node_id: &str) -> impl Iterator<Item = &EdgeDefinition> {
self.edges.iter().filter(move |e| e.from_node() == node_id)
}
pub fn edges_to(&self, node_id: &str) -> impl Iterator<Item = &EdgeDefinition> {
self.edges.iter().filter(move |e| e.to_node() == node_id)
}
}
#[derive(Debug)]
pub enum FlowLoadError {
Io {
path: std::path::PathBuf,
source: std::io::Error,
},
Parse {
path: std::path::PathBuf,
source: serde_yaml::Error,
},
ParseString { source: serde_yaml::Error },
Validation {
errors: Vec<super::validation::ValidationError>,
},
}
impl std::fmt::Display for FlowLoadError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Io { path, source } => {
write!(
f,
"failed to read flow file '{}': {}",
path.display(),
source
)
}
Self::Parse { path, source } => {
write!(
f,
"failed to parse flow file '{}': {}",
path.display(),
source
)
}
Self::ParseString { source } => {
write!(f, "failed to parse YAML: {}", source)
}
Self::Validation { errors } => {
writeln!(f, "flow validation failed with {} error(s):", errors.len())?;
for error in errors {
writeln!(f, " - {}", error)?;
}
Ok(())
}
}
}
}
impl std::error::Error for FlowLoadError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Self::Io { source, .. } => Some(source),
Self::Parse { source, .. } => Some(source),
Self::ParseString { source } => Some(source),
Self::Validation { .. } => None,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_complete_flow() {
let yaml = r#"
name: order_processing
version: "2.0"
description: Process orders with fraud detection
triggers:
- id: api_webhook
type: webhook
params:
port: 8080
path: /orders
nodes:
fraud_check:
type: std::switch
config:
condition:
type: greater_than
field: risk_score
value: 0.8
process_order:
type: std::log
config:
message: "Processing order"
flag_fraud:
type: std::log
config:
message: "Fraud detected"
edges:
- from: api_webhook
to: fraud_check
- from: fraud_check.false
to: process_order
- from: fraud_check.true
to: flag_fraud
settings:
max_concurrent_executions: 50
execution_timeout_ms: 30000
"#;
let flow = FlowDefinition::from_yaml(yaml).unwrap();
assert_eq!(flow.name, "order_processing");
assert_eq!(flow.version, Some("2.0".to_string()));
assert_eq!(
flow.description,
Some("Process orders with fraud detection".to_string())
);
assert_eq!(flow.triggers.len(), 1);
assert_eq!(flow.triggers[0].id, "api_webhook");
assert_eq!(flow.nodes.len(), 3);
assert!(flow.has_node("fraud_check"));
assert!(flow.has_node("process_order"));
assert!(flow.has_node("flag_fraud"));
assert_eq!(flow.edges.len(), 3);
assert_eq!(flow.settings.max_concurrent_executions, 50);
assert_eq!(flow.settings.execution_timeout_ms, 30000);
}
#[test]
fn parse_minimal_flow() {
let yaml = r#"
name: simple
"#;
let flow = FlowDefinition::from_yaml(yaml).unwrap();
assert_eq!(flow.name, "simple");
assert!(flow.triggers.is_empty());
assert!(flow.nodes.is_empty());
assert!(flow.edges.is_empty());
}
#[test]
fn flow_builder() {
let flow = FlowDefinition::new("test_flow")
.with_version("1.0.0")
.with_description("A test flow")
.with_trigger(TriggerDefinition::new("webhook", "webhook"))
.with_node("log", NodeDefinition::new("std::log"))
.with_edge(EdgeDefinition::new("webhook", "log"));
assert_eq!(flow.name, "test_flow");
assert_eq!(flow.triggers.len(), 1);
assert_eq!(flow.nodes.len(), 1);
assert_eq!(flow.edges.len(), 1);
}
#[test]
fn validate_and_parse() {
let yaml = r#"
name: validated_flow
triggers:
- id: webhook
type: webhook
nodes:
log:
type: std::log
edges:
- from: webhook
to: log
"#;
let result = FlowDefinition::from_yaml_validated(yaml);
assert!(result.is_ok());
}
#[test]
fn validation_errors() {
let yaml = r#"
name: ""
triggers:
- id: test
type: invalid_trigger_type
"#;
let result = FlowDefinition::from_yaml_validated(yaml);
assert!(result.is_err());
if let Err(FlowLoadError::Validation { errors }) = result {
assert!(!errors.is_empty());
} else {
panic!("Expected validation error");
}
}
#[test]
fn to_yaml_roundtrip() {
let flow = FlowDefinition::new("roundtrip_test")
.with_trigger(TriggerDefinition::new("webhook", "webhook"))
.with_node("log", NodeDefinition::new("std::log"));
let yaml = flow.to_yaml().unwrap();
let parsed = FlowDefinition::from_yaml(&yaml).unwrap();
assert_eq!(parsed.name, "roundtrip_test");
assert_eq!(parsed.triggers.len(), 1);
assert_eq!(parsed.nodes.len(), 1);
}
#[test]
fn query_methods() {
let flow = FlowDefinition::new("query_test")
.with_trigger(TriggerDefinition::new("t1", "webhook"))
.with_node("n1", NodeDefinition::new("std::log"))
.with_node("n2", NodeDefinition::new("std::switch"))
.with_edge(EdgeDefinition::new("t1", "n1"))
.with_edge(EdgeDefinition::new("n1", "n2"));
assert!(flow.has_trigger("t1"));
assert!(!flow.has_trigger("nonexistent"));
assert!(flow.has_node("n1"));
assert!(!flow.has_node("nonexistent"));
let edges_from_t1: Vec<_> = flow.edges_from("t1").collect();
assert_eq!(edges_from_t1.len(), 1);
let edges_to_n2: Vec<_> = flow.edges_to("n2").collect();
assert_eq!(edges_to_n2.len(), 1);
}
}