xerv_nodes/flow/
loop_node.rs

1//! Loop node (controlled iteration).
2//!
3//! Provides controlled iteration with configurable exit conditions.
4//! Loop back-edges must be declared in the flow graph to avoid
5//! cycle detection errors.
6
7use std::collections::HashMap;
8use xerv_core::traits::{Context, Node, NodeFuture, NodeInfo, NodeOutput, Port, PortDirection};
9use xerv_core::types::RelPtr;
10use xerv_core::value::Value;
11
12/// Exit condition for the loop.
13#[derive(Debug, Clone)]
14pub enum LoopCondition {
15    /// Exit after a fixed number of iterations.
16    MaxIterations(u32),
17    /// Exit when a field equals a value.
18    UntilFieldEquals { field: String, value: String },
19    /// Exit when a boolean field becomes true.
20    UntilTrue { field: String },
21    /// Exit when a boolean field becomes false.
22    UntilFalse { field: String },
23    /// Exit based on expression evaluation.
24    UntilExpression(String),
25}
26
27impl Default for LoopCondition {
28    fn default() -> Self {
29        Self::MaxIterations(10)
30    }
31}
32
33/// Loop node - controlled iteration.
34///
35/// Implements a loop construct for repeated execution. The loop
36/// continues until an exit condition is met. Loop back-edges must
37/// be declared in the flow topology to be excluded from cycle detection.
38///
39/// # Ports
40/// - Input: "in" - Initial input / data from loop body
41/// - Output: "continue" - Activated to continue the loop
42/// - Output: "exit" - Activated when loop terminates
43///
44/// # Example Configuration
45/// ```yaml
46/// nodes:
47///   retry_loop:
48///     type: std::loop
49///     config:
50///       max_iterations: 3
51///       # Or: until_true: $.success
52///       # Or: until_expression: "${iteration} >= 3 || ${result.success}"
53///     inputs:
54///       - from: start.out -> in
55///       - from: retry_action.out -> in  # back-edge from loop body
56///     outputs:
57///       continue: -> retry_action.in
58///       exit: -> done.in
59/// ```
60///
61/// # Back-edge Declaration
62/// ```yaml
63/// topology:
64///   loop_edges:
65///     - from: retry_action
66///       to: retry_loop  # This edge is treated as a back-edge
67/// ```
68#[derive(Debug)]
69pub struct LoopNode {
70    /// Exit condition.
71    condition: LoopCondition,
72    /// Maximum iterations (safety limit).
73    max_iterations: u32,
74}
75
76impl LoopNode {
77    /// Create a loop with max iterations exit condition.
78    pub fn with_max_iterations(max: u32) -> Self {
79        Self {
80            condition: LoopCondition::MaxIterations(max),
81            max_iterations: max,
82        }
83    }
84
85    /// Create a loop that runs until a field equals a value.
86    pub fn until_field_equals(
87        field: impl Into<String>,
88        value: impl Into<String>,
89        max_iterations: u32,
90    ) -> Self {
91        Self {
92            condition: LoopCondition::UntilFieldEquals {
93                field: field.into(),
94                value: value.into(),
95            },
96            max_iterations,
97        }
98    }
99
100    /// Create a loop that runs until a boolean field is true.
101    pub fn until_true(field: impl Into<String>, max_iterations: u32) -> Self {
102        Self {
103            condition: LoopCondition::UntilTrue {
104                field: field.into(),
105            },
106            max_iterations,
107        }
108    }
109
110    /// Create a loop with a custom exit expression.
111    pub fn with_expression(expr: impl Into<String>, max_iterations: u32) -> Self {
112        Self {
113            condition: LoopCondition::UntilExpression(expr.into()),
114            max_iterations,
115        }
116    }
117
118    /// Evaluate whether the loop should exit.
119    ///
120    /// Returns `true` if the loop should exit, `false` to continue.
121    /// The max_iterations limit is always checked as a safety mechanism.
122    fn should_exit(&self, value: &Value, iteration: u32) -> bool {
123        // Check max iterations safety limit
124        if iteration >= self.max_iterations {
125            tracing::debug!(
126                iteration = iteration,
127                max = self.max_iterations,
128                "Loop hit max iterations safety limit"
129            );
130            return true;
131        }
132
133        // Check condition-specific exit
134        match &self.condition {
135            LoopCondition::MaxIterations(max) => {
136                let should_exit = iteration >= *max;
137                tracing::debug!(
138                    iteration = iteration,
139                    max = max,
140                    should_exit = should_exit,
141                    "Evaluating max_iterations"
142                );
143                should_exit
144            }
145
146            LoopCondition::UntilFieldEquals {
147                field,
148                value: expected,
149            } => {
150                let should_exit = value.field_equals(field, expected);
151                tracing::debug!(
152                    field = %field,
153                    expected = %expected,
154                    iteration = iteration,
155                    should_exit = should_exit,
156                    "Evaluated until_field_equals"
157                );
158                should_exit
159            }
160
161            LoopCondition::UntilTrue { field } => {
162                let should_exit = value.field_is_true(field);
163                tracing::debug!(
164                    field = %field,
165                    iteration = iteration,
166                    should_exit = should_exit,
167                    "Evaluated until_true"
168                );
169                should_exit
170            }
171
172            LoopCondition::UntilFalse { field } => {
173                let should_exit = value.field_is_false(field);
174                tracing::debug!(
175                    field = %field,
176                    iteration = iteration,
177                    should_exit = should_exit,
178                    "Evaluated until_false"
179                );
180                should_exit
181            }
182
183            LoopCondition::UntilExpression(expr) => {
184                let should_exit = self.evaluate_expression(expr, value, iteration);
185                tracing::debug!(
186                    expr = %expr,
187                    iteration = iteration,
188                    should_exit = should_exit,
189                    "Evaluated until_expression"
190                );
191                should_exit
192            }
193        }
194    }
195
196    /// Evaluate a simple expression for loop exit condition.
197    ///
198    /// Supports:
199    /// - `${field} == "value"` -> field equality
200    /// - `${field} > number` -> numeric comparison
201    /// - `${field}` -> boolean check
202    /// - `${iteration} >= number` -> iteration count check
203    fn evaluate_expression(&self, expr: &str, value: &Value, iteration: u32) -> bool {
204        let expr = expr.trim();
205
206        // Handle special ${iteration} variable
207        if expr.contains("${iteration}") {
208            return self.evaluate_iteration_expr(expr, iteration);
209        }
210
211        // Try to parse comparison expressions
212        if let Some((field, op, rhs)) = self.parse_comparison(expr) {
213            match op {
214                "==" | "=" => {
215                    let rhs = rhs.trim_matches('"').trim_matches('\'');
216                    value.field_equals(&field, rhs)
217                }
218                "!=" => {
219                    let rhs = rhs.trim_matches('"').trim_matches('\'');
220                    !value.field_equals(&field, rhs)
221                }
222                ">" => {
223                    if let Ok(threshold) = rhs.parse::<f64>() {
224                        value.field_greater_than(&field, threshold)
225                    } else {
226                        false
227                    }
228                }
229                "<" => {
230                    if let Ok(threshold) = rhs.parse::<f64>() {
231                        value.field_less_than(&field, threshold)
232                    } else {
233                        false
234                    }
235                }
236                ">=" => {
237                    if let Ok(threshold) = rhs.parse::<f64>() {
238                        value.get_f64(&field).map_or(false, |v| v >= threshold)
239                    } else {
240                        false
241                    }
242                }
243                "<=" => {
244                    if let Ok(threshold) = rhs.parse::<f64>() {
245                        value.get_f64(&field).map_or(false, |v| v <= threshold)
246                    } else {
247                        false
248                    }
249                }
250                _ => false,
251            }
252        } else if let Some(field) = self.parse_field_ref(expr) {
253            // Just a field reference - check if truthy (exit when true)
254            value.field_is_true(&field)
255        } else {
256            tracing::warn!(expr = %expr, "Unrecognized expression format");
257            false
258        }
259    }
260
261    /// Evaluate an expression containing ${iteration}.
262    fn evaluate_iteration_expr(&self, expr: &str, iteration: u32) -> bool {
263        // Parse patterns like "${iteration} >= 3" or "${iteration} == 5"
264        let operators = [">=", "<=", "==", "!=", ">", "<"];
265
266        for op in operators {
267            if let Some(pos) = expr.find(op) {
268                let lhs = expr[..pos].trim();
269                let rhs = expr[pos + op.len()..].trim();
270
271                if lhs == "${iteration}" {
272                    if let Ok(threshold) = rhs.parse::<u32>() {
273                        return match op {
274                            ">=" => iteration >= threshold,
275                            "<=" => iteration <= threshold,
276                            "==" => iteration == threshold,
277                            "!=" => iteration != threshold,
278                            ">" => iteration > threshold,
279                            "<" => iteration < threshold,
280                            _ => false,
281                        };
282                    }
283                }
284            }
285        }
286
287        false
288    }
289
290    /// Parse a comparison expression like "${field} > 0.5"
291    fn parse_comparison<'a>(&self, expr: &'a str) -> Option<(String, &'a str, &'a str)> {
292        let operators = [">=", "<=", "==", "!=", ">", "<", "="];
293
294        for op in operators {
295            if let Some(pos) = expr.find(op) {
296                let lhs = expr[..pos].trim();
297                let rhs = expr[pos + op.len()..].trim();
298
299                if let Some(field) = self.parse_field_ref(lhs) {
300                    return Some((field, op, rhs));
301                }
302            }
303        }
304        None
305    }
306
307    /// Parse a field reference like "${field}" or "$.field"
308    fn parse_field_ref(&self, s: &str) -> Option<String> {
309        let s = s.trim();
310
311        // Skip ${iteration} - it's handled separately
312        if s == "${iteration}" {
313            return None;
314        }
315
316        // ${field.path} format
317        if s.starts_with("${") && s.ends_with('}') {
318            return Some(s[2..s.len() - 1].to_string());
319        }
320
321        // $.field.path format
322        if s.starts_with("$.") {
323            return Some(s[2..].to_string());
324        }
325
326        // Plain field name
327        if !s.is_empty()
328            && s.chars()
329                .all(|c| c.is_alphanumeric() || c == '_' || c == '.')
330        {
331            return Some(s.to_string());
332        }
333
334        None
335    }
336}
337
338impl Node for LoopNode {
339    fn info(&self) -> NodeInfo {
340        NodeInfo::new("std", "loop")
341            .with_description("Controlled iteration with configurable exit conditions")
342            .with_inputs(vec![Port::input("Any")])
343            .with_outputs(vec![
344                Port::named("continue", PortDirection::Output, "Any")
345                    .with_description("Activated to continue loop iteration"),
346                Port::named("exit", PortDirection::Output, "Any")
347                    .with_description("Activated when loop terminates"),
348                Port::error(),
349            ])
350    }
351
352    fn execute<'a>(&'a self, ctx: Context, inputs: HashMap<String, RelPtr<()>>) -> NodeFuture<'a> {
353        Box::pin(async move {
354            let input = inputs.get("in").copied().unwrap_or_else(RelPtr::null);
355
356            // Read and parse input data from arena
357            let value = if input.is_null() {
358                Value::null()
359            } else {
360                match ctx.read_bytes(input) {
361                    Ok(bytes) => Value::from_bytes(&bytes).unwrap_or_else(|e| {
362                        tracing::warn!(error = %e, "Failed to parse input as JSON, using null");
363                        Value::null()
364                    }),
365                    Err(e) => {
366                        tracing::warn!(error = %e, "Failed to read input from arena, using null");
367                        Value::null()
368                    }
369                }
370            };
371
372            // In a real implementation, we'd track iteration count in the trace state.
373            // The executor would call ctx.loop_iteration() to get/increment.
374            // For now, we demonstrate the structure with a placeholder.
375            // TODO: Implement iteration tracking in executor/trace state
376            let iteration = 0_u32; // Would be: ctx.get_loop_iteration(node_id)
377
378            let should_exit = self.should_exit(&value, iteration);
379
380            tracing::debug!(
381                iteration = iteration,
382                should_exit = should_exit,
383                "Loop iteration decision"
384            );
385
386            if should_exit {
387                // Exit the loop
388                Ok(NodeOutput::new("exit", input))
389            } else {
390                // Continue looping
391                Ok(NodeOutput::new("continue", input))
392            }
393        })
394    }
395}
396
397#[cfg(test)]
398mod tests {
399    use super::*;
400    use serde_json::json;
401
402    #[test]
403    fn loop_node_info() {
404        let node = LoopNode::with_max_iterations(5);
405        let info = node.info();
406
407        assert_eq!(info.name, "std::loop");
408        assert_eq!(info.inputs.len(), 1);
409        assert_eq!(info.outputs.len(), 3);
410        assert_eq!(info.outputs[0].name, "continue");
411        assert_eq!(info.outputs[1].name, "exit");
412    }
413
414    #[test]
415    fn loop_max_iterations() {
416        let node = LoopNode::with_max_iterations(3);
417        let value = Value::null();
418
419        // Should not exit on iteration 0, 1, 2
420        assert!(!node.should_exit(&value, 0));
421        assert!(!node.should_exit(&value, 1));
422        assert!(!node.should_exit(&value, 2));
423
424        // Should exit on iteration 3+
425        assert!(node.should_exit(&value, 3));
426        assert!(node.should_exit(&value, 10));
427    }
428
429    #[test]
430    fn loop_condition_default() {
431        let condition = LoopCondition::default();
432        assert!(matches!(condition, LoopCondition::MaxIterations(10)));
433    }
434
435    #[test]
436    fn loop_until_field_equals() {
437        let node = LoopNode::until_field_equals("status", "complete", 100);
438
439        // Should continue when status != "complete"
440        let value = Value(json!({"status": "pending"}));
441        assert!(!node.should_exit(&value, 0));
442
443        // Should exit when status == "complete"
444        let value = Value(json!({"status": "complete"}));
445        assert!(node.should_exit(&value, 0));
446    }
447
448    #[test]
449    fn loop_until_true() {
450        let node = LoopNode::until_true("done", 100);
451
452        // Should continue when done == false
453        let value = Value(json!({"done": false}));
454        assert!(!node.should_exit(&value, 0));
455
456        // Should exit when done == true
457        let value = Value(json!({"done": true}));
458        assert!(node.should_exit(&value, 0));
459    }
460
461    #[test]
462    fn loop_until_false() {
463        let node = LoopNode {
464            condition: LoopCondition::UntilFalse {
465                field: "running".to_string(),
466            },
467            max_iterations: 100,
468        };
469
470        // Should continue when running == true
471        let value = Value(json!({"running": true}));
472        assert!(!node.should_exit(&value, 0));
473
474        // Should exit when running == false
475        let value = Value(json!({"running": false}));
476        assert!(node.should_exit(&value, 0));
477    }
478
479    #[test]
480    fn loop_expression_iteration_count() {
481        let node = LoopNode::with_expression("${iteration} >= 3", 100);
482
483        let value = Value::null();
484
485        // Should continue when iteration < 3
486        assert!(!node.should_exit(&value, 0));
487        assert!(!node.should_exit(&value, 1));
488        assert!(!node.should_exit(&value, 2));
489
490        // Should exit when iteration >= 3
491        assert!(node.should_exit(&value, 3));
492        assert!(node.should_exit(&value, 5));
493    }
494
495    #[test]
496    fn loop_expression_field_comparison() {
497        let node = LoopNode::with_expression("${error_count} > 5", 100);
498
499        // Should continue when error_count <= 5
500        let value = Value(json!({"error_count": 3}));
501        assert!(!node.should_exit(&value, 0));
502
503        // Should exit when error_count > 5
504        let value = Value(json!({"error_count": 7}));
505        assert!(node.should_exit(&value, 0));
506    }
507
508    #[test]
509    fn loop_safety_limit_overrides_condition() {
510        // Even if condition says continue, safety limit kicks in
511        let node = LoopNode::until_field_equals("status", "complete", 5);
512
513        // status is never "complete", but safety limit is 5
514        let value = Value(json!({"status": "pending"}));
515
516        assert!(!node.should_exit(&value, 0));
517        assert!(!node.should_exit(&value, 4));
518        assert!(node.should_exit(&value, 5)); // safety limit
519    }
520
521    #[test]
522    fn loop_nested_field_access() {
523        let node = LoopNode::until_field_equals("result.status", "success", 100);
524
525        let value = Value(json!({"result": {"status": "pending"}}));
526        assert!(!node.should_exit(&value, 0));
527
528        let value = Value(json!({"result": {"status": "success"}}));
529        assert!(node.should_exit(&value, 0));
530    }
531
532    #[test]
533    fn loop_expression_boolean_field() {
534        let node = LoopNode::with_expression("${is_valid}", 100);
535
536        // Should continue when is_valid is false
537        let value = Value(json!({"is_valid": false}));
538        assert!(!node.should_exit(&value, 0));
539
540        // Should exit when is_valid is true
541        let value = Value(json!({"is_valid": true}));
542        assert!(node.should_exit(&value, 0));
543    }
544
545    #[test]
546    fn loop_expression_equality() {
547        let node = LoopNode::with_expression("${state} == \"done\"", 100);
548
549        let value = Value(json!({"state": "running"}));
550        assert!(!node.should_exit(&value, 0));
551
552        let value = Value(json!({"state": "done"}));
553        assert!(node.should_exit(&value, 0));
554    }
555}