xerv_core/flow/
node.rs

1//! Node definition from YAML.
2
3use serde::{Deserialize, Serialize};
4use std::collections::HashMap;
5
6/// A node definition from YAML.
7///
8/// # Example
9///
10/// ```yaml
11/// nodes:
12///   fraud_check:
13///     type: std::switch
14///     description: Route based on risk score
15///     config:
16///       condition:
17///         type: greater_than
18///         field: risk_score
19///         value: 0.8
20///
21///   log_result:
22///     type: std::log
23///     config:
24///       message: "Processing complete"
25///       level: info
26///
27///   aggregate_stats:
28///     type: std::aggregate
29///     config:
30///       operation: sum
31///       field: amount
32/// ```
33#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct NodeDefinition {
35    /// Node type (e.g., "std::switch", "std::merge", "plugins::fraud_model").
36    #[serde(rename = "type")]
37    pub node_type: String,
38
39    /// Node-specific configuration.
40    #[serde(default)]
41    pub config: serde_yaml::Value,
42
43    /// Optional description.
44    #[serde(default)]
45    pub description: Option<String>,
46
47    /// Whether the node is enabled.
48    #[serde(default = "default_enabled")]
49    pub enabled: bool,
50
51    /// Retry configuration.
52    #[serde(default)]
53    pub retry: Option<RetryConfig>,
54
55    /// Timeout override for this node (milliseconds).
56    #[serde(default)]
57    pub timeout_ms: Option<u64>,
58
59    /// Custom labels for this node.
60    #[serde(default)]
61    pub labels: HashMap<String, String>,
62}
63
64fn default_enabled() -> bool {
65    true
66}
67
68/// Retry configuration for a node.
69#[derive(Debug, Clone, Serialize, Deserialize)]
70pub struct RetryConfig {
71    /// Maximum number of retry attempts.
72    #[serde(default = "default_max_retries")]
73    pub max_attempts: u32,
74
75    /// Initial delay between retries in milliseconds.
76    #[serde(default = "default_retry_delay_ms")]
77    pub initial_delay_ms: u64,
78
79    /// Maximum delay between retries in milliseconds.
80    #[serde(default = "default_max_delay_ms")]
81    pub max_delay_ms: u64,
82
83    /// Multiplier for exponential backoff.
84    #[serde(default = "default_backoff_multiplier")]
85    pub backoff_multiplier: f64,
86
87    /// Whether to add jitter to delay.
88    #[serde(default = "default_jitter")]
89    pub jitter: bool,
90}
91
92fn default_max_retries() -> u32 {
93    3
94}
95fn default_retry_delay_ms() -> u64 {
96    1000
97}
98fn default_max_delay_ms() -> u64 {
99    30_000
100}
101fn default_backoff_multiplier() -> f64 {
102    2.0
103}
104fn default_jitter() -> bool {
105    true
106}
107
108impl Default for RetryConfig {
109    fn default() -> Self {
110        Self {
111            max_attempts: default_max_retries(),
112            initial_delay_ms: default_retry_delay_ms(),
113            max_delay_ms: default_max_delay_ms(),
114            backoff_multiplier: default_backoff_multiplier(),
115            jitter: default_jitter(),
116        }
117    }
118}
119
120impl NodeDefinition {
121    /// Create a new node definition.
122    pub fn new(node_type: impl Into<String>) -> Self {
123        Self {
124            node_type: node_type.into(),
125            config: serde_yaml::Value::Null,
126            description: None,
127            enabled: true,
128            retry: None,
129            timeout_ms: None,
130            labels: HashMap::new(),
131        }
132    }
133
134    /// Set configuration.
135    pub fn with_config(mut self, config: serde_yaml::Value) -> Self {
136        self.config = config;
137        self
138    }
139
140    /// Set description.
141    pub fn with_description(mut self, desc: impl Into<String>) -> Self {
142        self.description = Some(desc.into());
143        self
144    }
145
146    /// Set retry configuration.
147    pub fn with_retry(mut self, retry: RetryConfig) -> Self {
148        self.retry = Some(retry);
149        self
150    }
151
152    /// Set timeout override.
153    pub fn with_timeout_ms(mut self, ms: u64) -> Self {
154        self.timeout_ms = Some(ms);
155        self
156    }
157
158    /// Add a label.
159    pub fn with_label(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
160        self.labels.insert(key.into(), value.into());
161        self
162    }
163
164    /// Disable the node.
165    pub fn disabled(mut self) -> Self {
166        self.enabled = false;
167        self
168    }
169
170    /// Check if this is a standard library node.
171    pub fn is_std(&self) -> bool {
172        self.node_type.starts_with("std::")
173    }
174
175    /// Check if this is a trigger node.
176    pub fn is_trigger(&self) -> bool {
177        self.node_type.starts_with("trigger::")
178    }
179
180    /// Get a string config value.
181    pub fn get_string(&self, key: &str) -> Option<&str> {
182        self.config.get(key).and_then(|v| v.as_str())
183    }
184
185    /// Get an integer config value.
186    pub fn get_i64(&self, key: &str) -> Option<i64> {
187        self.config.get(key).and_then(|v| v.as_i64())
188    }
189
190    /// Get a boolean config value.
191    pub fn get_bool(&self, key: &str) -> Option<bool> {
192        self.config.get(key).and_then(|v| v.as_bool())
193    }
194
195    /// Get a nested config value.
196    pub fn get_nested(&self, path: &[&str]) -> Option<&serde_yaml::Value> {
197        let mut current = &self.config;
198        for key in path {
199            current = current.get(key)?;
200        }
201        Some(current)
202    }
203}
204
205#[cfg(test)]
206mod tests {
207    use super::*;
208
209    #[test]
210    fn deserialize_simple_node() {
211        let yaml = r#"
212type: std::log
213config:
214  message: "Hello"
215  level: info
216"#;
217        let node: NodeDefinition = serde_yaml::from_str(yaml).unwrap();
218        assert_eq!(node.node_type, "std::log");
219        assert_eq!(node.get_string("message"), Some("Hello"));
220        assert!(node.enabled);
221        assert!(node.is_std());
222    }
223
224    #[test]
225    fn deserialize_node_with_retry() {
226        let yaml = r#"
227type: plugins::http_call
228config:
229  url: "https://api.example.com"
230retry:
231  max_attempts: 5
232  initial_delay_ms: 500
233"#;
234        let node: NodeDefinition = serde_yaml::from_str(yaml).unwrap();
235        assert!(node.retry.is_some());
236        let retry = node.retry.unwrap();
237        assert_eq!(retry.max_attempts, 5);
238        assert_eq!(retry.initial_delay_ms, 500);
239    }
240
241    #[test]
242    fn deserialize_node_with_labels() {
243        let yaml = r#"
244type: std::switch
245labels:
246  team: payments
247  tier: critical
248"#;
249        let node: NodeDefinition = serde_yaml::from_str(yaml).unwrap();
250        assert_eq!(node.labels.get("team"), Some(&"payments".to_string()));
251        assert_eq!(node.labels.get("tier"), Some(&"critical".to_string()));
252    }
253
254    #[test]
255    fn node_builder() {
256        let node = NodeDefinition::new("std::merge")
257            .with_description("Wait for all inputs")
258            .with_timeout_ms(5000)
259            .with_label("category", "flow-control");
260
261        assert_eq!(node.node_type, "std::merge");
262        assert_eq!(node.description, Some("Wait for all inputs".to_string()));
263        assert_eq!(node.timeout_ms, Some(5000));
264        assert!(node.is_std());
265    }
266
267    #[test]
268    fn nested_config_access() {
269        let yaml = r#"
270type: std::switch
271config:
272  condition:
273    type: greater_than
274    field: amount
275    value: 100
276"#;
277        let node: NodeDefinition = serde_yaml::from_str(yaml).unwrap();
278        let condition_type = node
279            .get_nested(&["condition", "type"])
280            .and_then(|v| v.as_str());
281        assert_eq!(condition_type, Some("greater_than"));
282    }
283
284    #[test]
285    fn disabled_node() {
286        let yaml = r#"
287type: std::log
288enabled: false
289"#;
290        let node: NodeDefinition = serde_yaml::from_str(yaml).unwrap();
291        assert!(!node.enabled);
292    }
293}