xerv_core/flow/
definition.rs

1//! Flow definition - the top-level YAML document.
2
3use super::validation::{FlowValidator, ValidationResult};
4use super::{EdgeDefinition, FlowSettings, NodeDefinition, TriggerDefinition};
5use serde::{Deserialize, Serialize};
6use std::collections::HashMap;
7
8/// A complete flow definition from YAML.
9///
10/// This is the top-level structure representing a XERV flow document.
11///
12/// # Example
13///
14/// ```yaml
15/// name: order_processing
16/// version: "1.0"
17/// description: Process incoming orders with fraud detection
18///
19/// triggers:
20///   - id: api_webhook
21///     type: webhook
22///     params:
23///       port: 8080
24///       path: /orders
25///
26/// nodes:
27///   validate:
28///     type: std::json_parse
29///     config:
30///       strict: true
31///
32///   fraud_check:
33///     type: std::switch
34///     config:
35///       condition:
36///         type: greater_than
37///         field: risk_score
38///         value: 0.8
39///
40///   process_order:
41///     type: plugins::order_processor
42///
43///   flag_fraud:
44///     type: plugins::fraud_handler
45///
46/// edges:
47///   - from: api_webhook
48///     to: validate
49///   - from: validate
50///     to: fraud_check
51///   - from: fraud_check.true
52///     to: flag_fraud
53///   - from: fraud_check.false
54///     to: process_order
55///
56/// settings:
57///   max_concurrent_executions: 100
58///   execution_timeout_ms: 30000
59/// ```
60#[derive(Debug, Clone, Serialize, Deserialize)]
61pub struct FlowDefinition {
62    /// Flow name (required).
63    pub name: String,
64
65    /// Flow version (optional, defaults to "1.0").
66    #[serde(default)]
67    pub version: Option<String>,
68
69    /// Human-readable description.
70    #[serde(default)]
71    pub description: Option<String>,
72
73    /// Triggers that start this flow.
74    #[serde(default)]
75    pub triggers: Vec<TriggerDefinition>,
76
77    /// Nodes in the flow, keyed by node ID.
78    #[serde(default)]
79    pub nodes: HashMap<String, NodeDefinition>,
80
81    /// Edges connecting nodes.
82    #[serde(default)]
83    pub edges: Vec<EdgeDefinition>,
84
85    /// Runtime settings.
86    #[serde(default)]
87    pub settings: FlowSettings,
88}
89
90impl FlowDefinition {
91    /// Create a new flow definition.
92    pub fn new(name: impl Into<String>) -> Self {
93        Self {
94            name: name.into(),
95            version: Some("1.0".to_string()),
96            description: None,
97            triggers: Vec::new(),
98            nodes: HashMap::new(),
99            edges: Vec::new(),
100            settings: FlowSettings::default(),
101        }
102    }
103
104    /// Parse a flow definition from YAML string.
105    pub fn from_yaml(yaml: &str) -> Result<Self, serde_yaml::Error> {
106        serde_yaml::from_str(yaml)
107    }
108
109    /// Parse a flow definition from YAML file.
110    pub fn from_file(path: &std::path::Path) -> Result<Self, FlowLoadError> {
111        let content = std::fs::read_to_string(path).map_err(|e| FlowLoadError::Io {
112            path: path.to_path_buf(),
113            source: e,
114        })?;
115
116        Self::from_yaml(&content).map_err(|e| FlowLoadError::Parse {
117            path: path.to_path_buf(),
118            source: e,
119        })
120    }
121
122    /// Serialize to YAML string.
123    pub fn to_yaml(&self) -> Result<String, serde_yaml::Error> {
124        serde_yaml::to_string(self)
125    }
126
127    /// Validate the flow definition.
128    pub fn validate(&self) -> ValidationResult {
129        FlowValidator::new().validate(self)
130    }
131
132    /// Parse and validate in one step.
133    pub fn from_yaml_validated(yaml: &str) -> Result<Self, FlowLoadError> {
134        let flow = Self::from_yaml(yaml).map_err(|e| FlowLoadError::ParseString { source: e })?;
135
136        flow.validate()
137            .map_err(|errors| FlowLoadError::Validation { errors })?;
138
139        Ok(flow)
140    }
141
142    /// Set version.
143    pub fn with_version(mut self, version: impl Into<String>) -> Self {
144        self.version = Some(version.into());
145        self
146    }
147
148    /// Set description.
149    pub fn with_description(mut self, desc: impl Into<String>) -> Self {
150        self.description = Some(desc.into());
151        self
152    }
153
154    /// Add a trigger.
155    pub fn with_trigger(mut self, trigger: TriggerDefinition) -> Self {
156        self.triggers.push(trigger);
157        self
158    }
159
160    /// Add a node.
161    pub fn with_node(mut self, id: impl Into<String>, node: NodeDefinition) -> Self {
162        self.nodes.insert(id.into(), node);
163        self
164    }
165
166    /// Add an edge.
167    pub fn with_edge(mut self, edge: EdgeDefinition) -> Self {
168        self.edges.push(edge);
169        self
170    }
171
172    /// Set settings.
173    pub fn with_settings(mut self, settings: FlowSettings) -> Self {
174        self.settings = settings;
175        self
176    }
177
178    /// Get the effective version (defaults to "1.0").
179    pub fn effective_version(&self) -> &str {
180        self.version.as_deref().unwrap_or("1.0")
181    }
182
183    /// Get all node IDs.
184    pub fn node_ids(&self) -> impl Iterator<Item = &str> {
185        self.nodes.keys().map(|s| s.as_str())
186    }
187
188    /// Get all trigger IDs.
189    pub fn trigger_ids(&self) -> impl Iterator<Item = &str> {
190        self.triggers.iter().map(|t| t.id.as_str())
191    }
192
193    /// Get a node by ID.
194    pub fn get_node(&self, id: &str) -> Option<&NodeDefinition> {
195        self.nodes.get(id)
196    }
197
198    /// Get a trigger by ID.
199    pub fn get_trigger(&self, id: &str) -> Option<&TriggerDefinition> {
200        self.triggers.iter().find(|t| t.id == id)
201    }
202
203    /// Check if a trigger with the given ID exists.
204    pub fn has_trigger(&self, id: &str) -> bool {
205        self.triggers.iter().any(|t| t.id == id)
206    }
207
208    /// Check if a node with the given ID exists.
209    pub fn has_node(&self, id: &str) -> bool {
210        self.nodes.contains_key(id)
211    }
212
213    /// Get enabled triggers.
214    pub fn enabled_triggers(&self) -> impl Iterator<Item = &TriggerDefinition> {
215        self.triggers.iter().filter(|t| t.enabled)
216    }
217
218    /// Get enabled nodes.
219    pub fn enabled_nodes(&self) -> impl Iterator<Item = (&str, &NodeDefinition)> {
220        self.nodes
221            .iter()
222            .filter(|(_, n)| n.enabled)
223            .map(|(k, v)| (k.as_str(), v))
224    }
225
226    /// Find edges from a given node.
227    pub fn edges_from(&self, node_id: &str) -> impl Iterator<Item = &EdgeDefinition> {
228        self.edges.iter().filter(move |e| e.from_node() == node_id)
229    }
230
231    /// Find edges to a given node.
232    pub fn edges_to(&self, node_id: &str) -> impl Iterator<Item = &EdgeDefinition> {
233        self.edges.iter().filter(move |e| e.to_node() == node_id)
234    }
235}
236
237/// Error loading a flow definition.
238#[derive(Debug)]
239pub enum FlowLoadError {
240    /// I/O error reading file.
241    Io {
242        path: std::path::PathBuf,
243        source: std::io::Error,
244    },
245    /// YAML parse error.
246    Parse {
247        path: std::path::PathBuf,
248        source: serde_yaml::Error,
249    },
250    /// YAML parse error (from string).
251    ParseString { source: serde_yaml::Error },
252    /// Validation errors.
253    Validation {
254        errors: Vec<super::validation::ValidationError>,
255    },
256}
257
258impl std::fmt::Display for FlowLoadError {
259    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
260        match self {
261            Self::Io { path, source } => {
262                write!(
263                    f,
264                    "failed to read flow file '{}': {}",
265                    path.display(),
266                    source
267                )
268            }
269            Self::Parse { path, source } => {
270                write!(
271                    f,
272                    "failed to parse flow file '{}': {}",
273                    path.display(),
274                    source
275                )
276            }
277            Self::ParseString { source } => {
278                write!(f, "failed to parse YAML: {}", source)
279            }
280            Self::Validation { errors } => {
281                writeln!(f, "flow validation failed with {} error(s):", errors.len())?;
282                for error in errors {
283                    writeln!(f, "  - {}", error)?;
284                }
285                Ok(())
286            }
287        }
288    }
289}
290
291impl std::error::Error for FlowLoadError {
292    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
293        match self {
294            Self::Io { source, .. } => Some(source),
295            Self::Parse { source, .. } => Some(source),
296            Self::ParseString { source } => Some(source),
297            Self::Validation { .. } => None,
298        }
299    }
300}
301
302#[cfg(test)]
303mod tests {
304    use super::*;
305
306    #[test]
307    fn parse_complete_flow() {
308        let yaml = r#"
309name: order_processing
310version: "2.0"
311description: Process orders with fraud detection
312
313triggers:
314  - id: api_webhook
315    type: webhook
316    params:
317      port: 8080
318      path: /orders
319
320nodes:
321  fraud_check:
322    type: std::switch
323    config:
324      condition:
325        type: greater_than
326        field: risk_score
327        value: 0.8
328
329  process_order:
330    type: std::log
331    config:
332      message: "Processing order"
333
334  flag_fraud:
335    type: std::log
336    config:
337      message: "Fraud detected"
338
339edges:
340  - from: api_webhook
341    to: fraud_check
342  - from: fraud_check.false
343    to: process_order
344  - from: fraud_check.true
345    to: flag_fraud
346
347settings:
348  max_concurrent_executions: 50
349  execution_timeout_ms: 30000
350"#;
351
352        let flow = FlowDefinition::from_yaml(yaml).unwrap();
353
354        assert_eq!(flow.name, "order_processing");
355        assert_eq!(flow.version, Some("2.0".to_string()));
356        assert_eq!(
357            flow.description,
358            Some("Process orders with fraud detection".to_string())
359        );
360
361        assert_eq!(flow.triggers.len(), 1);
362        assert_eq!(flow.triggers[0].id, "api_webhook");
363
364        assert_eq!(flow.nodes.len(), 3);
365        assert!(flow.has_node("fraud_check"));
366        assert!(flow.has_node("process_order"));
367        assert!(flow.has_node("flag_fraud"));
368
369        assert_eq!(flow.edges.len(), 3);
370
371        assert_eq!(flow.settings.max_concurrent_executions, 50);
372        assert_eq!(flow.settings.execution_timeout_ms, 30000);
373    }
374
375    #[test]
376    fn parse_minimal_flow() {
377        let yaml = r#"
378name: simple
379"#;
380        let flow = FlowDefinition::from_yaml(yaml).unwrap();
381        assert_eq!(flow.name, "simple");
382        assert!(flow.triggers.is_empty());
383        assert!(flow.nodes.is_empty());
384        assert!(flow.edges.is_empty());
385    }
386
387    #[test]
388    fn flow_builder() {
389        let flow = FlowDefinition::new("test_flow")
390            .with_version("1.0.0")
391            .with_description("A test flow")
392            .with_trigger(TriggerDefinition::new("webhook", "webhook"))
393            .with_node("log", NodeDefinition::new("std::log"))
394            .with_edge(EdgeDefinition::new("webhook", "log"));
395
396        assert_eq!(flow.name, "test_flow");
397        assert_eq!(flow.triggers.len(), 1);
398        assert_eq!(flow.nodes.len(), 1);
399        assert_eq!(flow.edges.len(), 1);
400    }
401
402    #[test]
403    fn validate_and_parse() {
404        let yaml = r#"
405name: validated_flow
406triggers:
407  - id: webhook
408    type: webhook
409nodes:
410  log:
411    type: std::log
412edges:
413  - from: webhook
414    to: log
415"#;
416
417        let result = FlowDefinition::from_yaml_validated(yaml);
418        assert!(result.is_ok());
419    }
420
421    #[test]
422    fn validation_errors() {
423        let yaml = r#"
424name: ""
425triggers:
426  - id: test
427    type: invalid_trigger_type
428"#;
429
430        let result = FlowDefinition::from_yaml_validated(yaml);
431        assert!(result.is_err());
432
433        if let Err(FlowLoadError::Validation { errors }) = result {
434            assert!(!errors.is_empty());
435        } else {
436            panic!("Expected validation error");
437        }
438    }
439
440    #[test]
441    fn to_yaml_roundtrip() {
442        let flow = FlowDefinition::new("roundtrip_test")
443            .with_trigger(TriggerDefinition::new("webhook", "webhook"))
444            .with_node("log", NodeDefinition::new("std::log"));
445
446        let yaml = flow.to_yaml().unwrap();
447        let parsed = FlowDefinition::from_yaml(&yaml).unwrap();
448
449        assert_eq!(parsed.name, "roundtrip_test");
450        assert_eq!(parsed.triggers.len(), 1);
451        assert_eq!(parsed.nodes.len(), 1);
452    }
453
454    #[test]
455    fn query_methods() {
456        let flow = FlowDefinition::new("query_test")
457            .with_trigger(TriggerDefinition::new("t1", "webhook"))
458            .with_node("n1", NodeDefinition::new("std::log"))
459            .with_node("n2", NodeDefinition::new("std::switch"))
460            .with_edge(EdgeDefinition::new("t1", "n1"))
461            .with_edge(EdgeDefinition::new("n1", "n2"));
462
463        assert!(flow.has_trigger("t1"));
464        assert!(!flow.has_trigger("nonexistent"));
465
466        assert!(flow.has_node("n1"));
467        assert!(!flow.has_node("nonexistent"));
468
469        let edges_from_t1: Vec<_> = flow.edges_from("t1").collect();
470        assert_eq!(edges_from_t1.len(), 1);
471
472        let edges_to_n2: Vec<_> = flow.edges_to("n2").collect();
473        assert_eq!(edges_to_n2.len(), 1);
474    }
475}