1use super::validation::{FlowValidator, ValidationResult};
4use super::{EdgeDefinition, FlowSettings, NodeDefinition, TriggerDefinition};
5use serde::{Deserialize, Serialize};
6use std::collections::HashMap;
7
8#[derive(Debug, Clone, Serialize, Deserialize)]
61pub struct FlowDefinition {
62 pub name: String,
64
65 #[serde(default)]
67 pub version: Option<String>,
68
69 #[serde(default)]
71 pub description: Option<String>,
72
73 #[serde(default)]
75 pub triggers: Vec<TriggerDefinition>,
76
77 #[serde(default)]
79 pub nodes: HashMap<String, NodeDefinition>,
80
81 #[serde(default)]
83 pub edges: Vec<EdgeDefinition>,
84
85 #[serde(default)]
87 pub settings: FlowSettings,
88}
89
90impl FlowDefinition {
91 pub fn new(name: impl Into<String>) -> Self {
93 Self {
94 name: name.into(),
95 version: Some("1.0".to_string()),
96 description: None,
97 triggers: Vec::new(),
98 nodes: HashMap::new(),
99 edges: Vec::new(),
100 settings: FlowSettings::default(),
101 }
102 }
103
104 pub fn from_yaml(yaml: &str) -> Result<Self, serde_yaml::Error> {
106 serde_yaml::from_str(yaml)
107 }
108
109 pub fn from_file(path: &std::path::Path) -> Result<Self, FlowLoadError> {
111 let content = std::fs::read_to_string(path).map_err(|e| FlowLoadError::Io {
112 path: path.to_path_buf(),
113 source: e,
114 })?;
115
116 Self::from_yaml(&content).map_err(|e| FlowLoadError::Parse {
117 path: path.to_path_buf(),
118 source: e,
119 })
120 }
121
122 pub fn to_yaml(&self) -> Result<String, serde_yaml::Error> {
124 serde_yaml::to_string(self)
125 }
126
127 pub fn validate(&self) -> ValidationResult {
129 FlowValidator::new().validate(self)
130 }
131
132 pub fn from_yaml_validated(yaml: &str) -> Result<Self, FlowLoadError> {
134 let flow = Self::from_yaml(yaml).map_err(|e| FlowLoadError::ParseString { source: e })?;
135
136 flow.validate()
137 .map_err(|errors| FlowLoadError::Validation { errors })?;
138
139 Ok(flow)
140 }
141
142 pub fn with_version(mut self, version: impl Into<String>) -> Self {
144 self.version = Some(version.into());
145 self
146 }
147
148 pub fn with_description(mut self, desc: impl Into<String>) -> Self {
150 self.description = Some(desc.into());
151 self
152 }
153
154 pub fn with_trigger(mut self, trigger: TriggerDefinition) -> Self {
156 self.triggers.push(trigger);
157 self
158 }
159
160 pub fn with_node(mut self, id: impl Into<String>, node: NodeDefinition) -> Self {
162 self.nodes.insert(id.into(), node);
163 self
164 }
165
166 pub fn with_edge(mut self, edge: EdgeDefinition) -> Self {
168 self.edges.push(edge);
169 self
170 }
171
172 pub fn with_settings(mut self, settings: FlowSettings) -> Self {
174 self.settings = settings;
175 self
176 }
177
178 pub fn effective_version(&self) -> &str {
180 self.version.as_deref().unwrap_or("1.0")
181 }
182
183 pub fn node_ids(&self) -> impl Iterator<Item = &str> {
185 self.nodes.keys().map(|s| s.as_str())
186 }
187
188 pub fn trigger_ids(&self) -> impl Iterator<Item = &str> {
190 self.triggers.iter().map(|t| t.id.as_str())
191 }
192
193 pub fn get_node(&self, id: &str) -> Option<&NodeDefinition> {
195 self.nodes.get(id)
196 }
197
198 pub fn get_trigger(&self, id: &str) -> Option<&TriggerDefinition> {
200 self.triggers.iter().find(|t| t.id == id)
201 }
202
203 pub fn has_trigger(&self, id: &str) -> bool {
205 self.triggers.iter().any(|t| t.id == id)
206 }
207
208 pub fn has_node(&self, id: &str) -> bool {
210 self.nodes.contains_key(id)
211 }
212
213 pub fn enabled_triggers(&self) -> impl Iterator<Item = &TriggerDefinition> {
215 self.triggers.iter().filter(|t| t.enabled)
216 }
217
218 pub fn enabled_nodes(&self) -> impl Iterator<Item = (&str, &NodeDefinition)> {
220 self.nodes
221 .iter()
222 .filter(|(_, n)| n.enabled)
223 .map(|(k, v)| (k.as_str(), v))
224 }
225
226 pub fn edges_from(&self, node_id: &str) -> impl Iterator<Item = &EdgeDefinition> {
228 self.edges.iter().filter(move |e| e.from_node() == node_id)
229 }
230
231 pub fn edges_to(&self, node_id: &str) -> impl Iterator<Item = &EdgeDefinition> {
233 self.edges.iter().filter(move |e| e.to_node() == node_id)
234 }
235}
236
237#[derive(Debug)]
239pub enum FlowLoadError {
240 Io {
242 path: std::path::PathBuf,
243 source: std::io::Error,
244 },
245 Parse {
247 path: std::path::PathBuf,
248 source: serde_yaml::Error,
249 },
250 ParseString { source: serde_yaml::Error },
252 Validation {
254 errors: Vec<super::validation::ValidationError>,
255 },
256}
257
258impl std::fmt::Display for FlowLoadError {
259 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
260 match self {
261 Self::Io { path, source } => {
262 write!(
263 f,
264 "failed to read flow file '{}': {}",
265 path.display(),
266 source
267 )
268 }
269 Self::Parse { path, source } => {
270 write!(
271 f,
272 "failed to parse flow file '{}': {}",
273 path.display(),
274 source
275 )
276 }
277 Self::ParseString { source } => {
278 write!(f, "failed to parse YAML: {}", source)
279 }
280 Self::Validation { errors } => {
281 writeln!(f, "flow validation failed with {} error(s):", errors.len())?;
282 for error in errors {
283 writeln!(f, " - {}", error)?;
284 }
285 Ok(())
286 }
287 }
288 }
289}
290
291impl std::error::Error for FlowLoadError {
292 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
293 match self {
294 Self::Io { source, .. } => Some(source),
295 Self::Parse { source, .. } => Some(source),
296 Self::ParseString { source } => Some(source),
297 Self::Validation { .. } => None,
298 }
299 }
300}
301
302#[cfg(test)]
303mod tests {
304 use super::*;
305
306 #[test]
307 fn parse_complete_flow() {
308 let yaml = r#"
309name: order_processing
310version: "2.0"
311description: Process orders with fraud detection
312
313triggers:
314 - id: api_webhook
315 type: webhook
316 params:
317 port: 8080
318 path: /orders
319
320nodes:
321 fraud_check:
322 type: std::switch
323 config:
324 condition:
325 type: greater_than
326 field: risk_score
327 value: 0.8
328
329 process_order:
330 type: std::log
331 config:
332 message: "Processing order"
333
334 flag_fraud:
335 type: std::log
336 config:
337 message: "Fraud detected"
338
339edges:
340 - from: api_webhook
341 to: fraud_check
342 - from: fraud_check.false
343 to: process_order
344 - from: fraud_check.true
345 to: flag_fraud
346
347settings:
348 max_concurrent_executions: 50
349 execution_timeout_ms: 30000
350"#;
351
352 let flow = FlowDefinition::from_yaml(yaml).unwrap();
353
354 assert_eq!(flow.name, "order_processing");
355 assert_eq!(flow.version, Some("2.0".to_string()));
356 assert_eq!(
357 flow.description,
358 Some("Process orders with fraud detection".to_string())
359 );
360
361 assert_eq!(flow.triggers.len(), 1);
362 assert_eq!(flow.triggers[0].id, "api_webhook");
363
364 assert_eq!(flow.nodes.len(), 3);
365 assert!(flow.has_node("fraud_check"));
366 assert!(flow.has_node("process_order"));
367 assert!(flow.has_node("flag_fraud"));
368
369 assert_eq!(flow.edges.len(), 3);
370
371 assert_eq!(flow.settings.max_concurrent_executions, 50);
372 assert_eq!(flow.settings.execution_timeout_ms, 30000);
373 }
374
375 #[test]
376 fn parse_minimal_flow() {
377 let yaml = r#"
378name: simple
379"#;
380 let flow = FlowDefinition::from_yaml(yaml).unwrap();
381 assert_eq!(flow.name, "simple");
382 assert!(flow.triggers.is_empty());
383 assert!(flow.nodes.is_empty());
384 assert!(flow.edges.is_empty());
385 }
386
387 #[test]
388 fn flow_builder() {
389 let flow = FlowDefinition::new("test_flow")
390 .with_version("1.0.0")
391 .with_description("A test flow")
392 .with_trigger(TriggerDefinition::new("webhook", "webhook"))
393 .with_node("log", NodeDefinition::new("std::log"))
394 .with_edge(EdgeDefinition::new("webhook", "log"));
395
396 assert_eq!(flow.name, "test_flow");
397 assert_eq!(flow.triggers.len(), 1);
398 assert_eq!(flow.nodes.len(), 1);
399 assert_eq!(flow.edges.len(), 1);
400 }
401
402 #[test]
403 fn validate_and_parse() {
404 let yaml = r#"
405name: validated_flow
406triggers:
407 - id: webhook
408 type: webhook
409nodes:
410 log:
411 type: std::log
412edges:
413 - from: webhook
414 to: log
415"#;
416
417 let result = FlowDefinition::from_yaml_validated(yaml);
418 assert!(result.is_ok());
419 }
420
421 #[test]
422 fn validation_errors() {
423 let yaml = r#"
424name: ""
425triggers:
426 - id: test
427 type: invalid_trigger_type
428"#;
429
430 let result = FlowDefinition::from_yaml_validated(yaml);
431 assert!(result.is_err());
432
433 if let Err(FlowLoadError::Validation { errors }) = result {
434 assert!(!errors.is_empty());
435 } else {
436 panic!("Expected validation error");
437 }
438 }
439
440 #[test]
441 fn to_yaml_roundtrip() {
442 let flow = FlowDefinition::new("roundtrip_test")
443 .with_trigger(TriggerDefinition::new("webhook", "webhook"))
444 .with_node("log", NodeDefinition::new("std::log"));
445
446 let yaml = flow.to_yaml().unwrap();
447 let parsed = FlowDefinition::from_yaml(&yaml).unwrap();
448
449 assert_eq!(parsed.name, "roundtrip_test");
450 assert_eq!(parsed.triggers.len(), 1);
451 assert_eq!(parsed.nodes.len(), 1);
452 }
453
454 #[test]
455 fn query_methods() {
456 let flow = FlowDefinition::new("query_test")
457 .with_trigger(TriggerDefinition::new("t1", "webhook"))
458 .with_node("n1", NodeDefinition::new("std::log"))
459 .with_node("n2", NodeDefinition::new("std::switch"))
460 .with_edge(EdgeDefinition::new("t1", "n1"))
461 .with_edge(EdgeDefinition::new("n1", "n2"));
462
463 assert!(flow.has_trigger("t1"));
464 assert!(!flow.has_trigger("nonexistent"));
465
466 assert!(flow.has_node("n1"));
467 assert!(!flow.has_node("nonexistent"));
468
469 let edges_from_t1: Vec<_> = flow.edges_from("t1").collect();
470 assert_eq!(edges_from_t1.len(), 1);
471
472 let edges_to_n2: Vec<_> = flow.edges_to("n2").collect();
473 assert_eq!(edges_to_n2.len(), 1);
474 }
475}