Skip to main content

a3s_flow/nodes/
loop_node.rs

1//! `"loop"` node — while-loop over an inline sub-flow.
2//!
3//! Repeatedly executes a sub-flow until either a `break_condition` is satisfied
4//! or `max_iterations` is reached. The last iteration's collected output is
5//! returned. Each iteration receives the previous one's output as a variable,
6//! enabling accumulation and chaining patterns.
7//!
8//! # Config schema
9//!
10//! ```json
11//! {
12//!   "flow":            { "nodes": [...], "edges": [...] },
13//!   "output_selector": "step.result",
14//!   "max_iterations":  10,
15//!   "break_condition": { "from": "step", "path": "done", "op": "eq", "value": true }
16//! }
17//! ```
18//!
19//! | Field | Type | Required | Description |
20//! |-------|------|:--------:|-------------|
21//! | `flow` | object | ✅ | Inline sub-flow definition (`{ "nodes", "edges" }`) |
22//! | `output_selector` | string | ✅ | Dot path into sub-flow outputs to collect each iteration (`"node_id"` or `"node_id.field"`) |
23//! | `max_iterations` | integer | — | Safety cap (default `10`, minimum `1`) |
24//! | `break_condition` | Condition | — | If provided, the loop stops when this condition evaluates to true against the sub-flow's outputs; without it the loop always runs `max_iterations` times |
25//!
26//! ## Variables injected into each iteration's sub-flow
27//!
28//! | Variable | Value |
29//! |----------|-------|
30//! | `iteration_index` | 0-based iteration counter |
31//! | `loop_output` | The previous iteration's collected output (`null` for the first iteration) |
32//!
33//! # Output schema
34//!
35//! ```json
36//! { "output": <last_output_selector_result>, "iterations": 3 }
37//! ```
38//!
39//! # Example — retry until success
40//!
41//! ```json
42//! {
43//!   "id": "retry",
44//!   "type": "loop",
45//!   "data": {
46//!     "max_iterations": 5,
47//!     "output_selector": "check.ok",
48//!     "break_condition": { "from": "check", "path": "ok", "op": "eq", "value": true },
49//!     "flow": {
50//!       "nodes": [
51//!         { "id": "fetch", "type": "http-request", "data": { "url": "https://api.example.com/status" } },
52//!         { "id": "check", "type": "if-else", "data": { "cases": [
53//!           { "id": "ok", "conditions": [{ "from": "fetch", "path": "status", "op": "eq", "value": 200 }] }
54//!         ]}}
55//!       ],
56//!       "edges": [{ "source": "fetch", "target": "check" }]
57//!     }
58//!   }
59//! }
60//! ```
61
62use std::collections::HashMap;
63use std::sync::Arc;
64
65use async_trait::async_trait;
66use serde_json::{json, Value};
67
68use crate::condition::{get_path, Condition};
69use crate::error::{FlowError, Result};
70use crate::graph::DagGraph;
71use crate::node::{ExecContext, Node};
72use crate::runner::FlowRunner;
73
74/// Loop node — while-loop over a sub-flow.
75pub struct LoopNode;
76
77#[async_trait]
78impl Node for LoopNode {
79    fn node_type(&self) -> &str {
80        "loop"
81    }
82
83    async fn execute(&self, ctx: ExecContext) -> Result<Value> {
84        // ── Parse config ──────────────────────────────────────────────────
85        let sub_flow_def = ctx
86            .data
87            .get("flow")
88            .ok_or_else(|| FlowError::InvalidDefinition("loop: missing data.flow".into()))?;
89
90        let output_selector = ctx.data["output_selector"]
91            .as_str()
92            .ok_or_else(|| {
93                FlowError::InvalidDefinition("loop: missing data.output_selector".into())
94            })?
95            .to_string();
96
97        let max_iterations = ctx.data["max_iterations"].as_u64().unwrap_or(10).max(1) as usize;
98
99        let break_condition: Option<Condition> = ctx
100            .data
101            .get("break_condition")
102            .and_then(|v| serde_json::from_value(v.clone()).ok());
103
104        // ── Parse sub-flow DAG once ───────────────────────────────────────
105        let sub_dag = DagGraph::from_json(sub_flow_def)?;
106        let registry = Arc::clone(&ctx.registry);
107        let base_variables = ctx.variables.clone();
108
109        let mut loop_output = Value::Null;
110        let mut actual_iterations = 0usize;
111
112        for i in 0..max_iterations {
113            actual_iterations = i + 1;
114
115            let mut vars = base_variables.clone();
116            vars.insert("iteration_index".into(), json!(i));
117            vars.insert("loop_output".into(), loop_output.clone());
118
119            let runner = FlowRunner::with_arc_registry(sub_dag.clone(), Arc::clone(&registry));
120            let sub_result = runner.run(vars).await?;
121
122            // Collect this iteration's output via the selector.
123            loop_output = resolve_selector(&sub_result.outputs, &output_selector)
124                .cloned()
125                .unwrap_or(Value::Null);
126
127            // Evaluate break condition against sub-flow outputs.
128            if let Some(ref cond) = break_condition {
129                if cond.evaluate(&sub_result.outputs, &sub_result.skipped_nodes) {
130                    break;
131                }
132            }
133        }
134
135        Ok(json!({
136            "output":     loop_output,
137            "iterations": actual_iterations,
138        }))
139    }
140}
141
142// ── Internal helpers ───────────────────────────────────────────────────────
143
144/// Resolve `"node_id"` or `"node_id.field.subfield"` into the sub-flow outputs.
145fn resolve_selector<'a>(outputs: &'a HashMap<String, Value>, selector: &str) -> Option<&'a Value> {
146    let (node_id, rest) = match selector.find('.') {
147        Some(pos) => (&selector[..pos], &selector[pos + 1..]),
148        None => (selector, ""),
149    };
150    let node_out = outputs.get(node_id)?;
151    if rest.is_empty() {
152        Some(node_out)
153    } else {
154        get_path(node_out, rest)
155    }
156}
157
158// ── Tests ─────────────────────────────────────────────────────────────────────
159
160#[cfg(test)]
161mod tests {
162    use super::*;
163    use serde_json::json;
164
165    fn make_ctx(data: Value) -> ExecContext {
166        ExecContext {
167            data,
168            ..Default::default()
169        }
170    }
171
172    // ── Config validation ──────────────────────────────────────────────────
173
174    #[tokio::test]
175    async fn rejects_missing_flow() {
176        let node = LoopNode;
177        let err = node
178            .execute(make_ctx(json!({ "output_selector": "n" })))
179            .await
180            .unwrap_err();
181        assert!(matches!(err, FlowError::InvalidDefinition(_)));
182    }
183
184    #[tokio::test]
185    async fn rejects_missing_output_selector() {
186        let node = LoopNode;
187        let err = node
188            .execute(make_ctx(json!({
189                "flow": { "nodes": [{ "id": "n", "type": "noop" }], "edges": [] }
190            })))
191            .await
192            .unwrap_err();
193        assert!(matches!(err, FlowError::InvalidDefinition(_)));
194    }
195
196    // ── Execution ──────────────────────────────────────────────────────────
197
198    #[tokio::test]
199    async fn runs_max_iterations_without_break_condition() {
200        let node = LoopNode;
201        let out = node
202            .execute(make_ctx(json!({
203                "output_selector": "step.output",
204                "max_iterations":  3,
205                "flow": {
206                    "nodes": [{
207                        "id": "step", "type": "code",
208                        "data": { "language": "rhai", "code": "variables.iteration_index" }
209                    }],
210                    "edges": []
211                }
212            })))
213            .await
214            .unwrap();
215
216        // Ran exactly 3 times; last index is 2.
217        assert_eq!(out["iterations"], json!(3));
218        assert_eq!(out["output"], json!(2));
219    }
220
221    #[tokio::test]
222    async fn defaults_to_ten_iterations() {
223        // No max_iterations specified → defaults to 10.
224        let node = LoopNode;
225        let out = node
226            .execute(make_ctx(json!({
227                "output_selector": "step.output",
228                "flow": {
229                    "nodes": [{
230                        "id": "step", "type": "code",
231                        "data": { "language": "rhai", "code": "variables.iteration_index" }
232                    }],
233                    "edges": []
234                }
235            })))
236            .await
237            .unwrap();
238
239        assert_eq!(out["iterations"], json!(10));
240    }
241
242    #[tokio::test]
243    async fn break_condition_stops_loop_early() {
244        // Sub-flow returns { "done": true } when iteration_index >= 2.
245        // break_condition: done == true → loop ends after iteration 2 (index 2).
246        let node = LoopNode;
247        let out = node
248            .execute(make_ctx(json!({
249                "output_selector": "step.output",
250                "max_iterations":  10,
251                "break_condition": {
252                    "from": "gate", "path": "output", "op": "eq", "value": true
253                },
254                "flow": {
255                    "nodes": [
256                        {
257                            "id": "step", "type": "code",
258                            "data": { "language": "rhai", "code": "variables.iteration_index" }
259                        },
260                        {
261                            "id": "gate", "type": "code",
262                            "data": { "language": "rhai", "code": "variables.iteration_index >= 2" }
263                        }
264                    ],
265                    "edges": []
266                }
267            })))
268            .await
269            .unwrap();
270
271        // Break fires when iteration_index == 2, so 3 iterations ran (0, 1, 2).
272        assert_eq!(out["iterations"], json!(3));
273        assert_eq!(out["output"], json!(2));
274    }
275
276    #[tokio::test]
277    async fn loop_output_injected_into_next_iteration() {
278        // Each iteration's output is its iteration_index.
279        // loop_output for iteration N is the output of iteration N-1.
280        // We collect loop_output from iteration index=2 (the 3rd and final run).
281        let node = LoopNode;
282        let out = node
283            .execute(make_ctx(json!({
284                "output_selector": "collect.output",
285                "max_iterations":  3,
286                "flow": {
287                    "nodes": [{
288                        // Return iteration_index as a proxy for the collected value.
289                        "id": "collect", "type": "code",
290                        "data": { "language": "rhai", "code": "variables.iteration_index" }
291                    }],
292                    "edges": []
293                }
294            })))
295            .await
296            .unwrap();
297
298        // 3 iterations ran; final output = index of last = 2.
299        assert_eq!(out["output"], json!(2));
300    }
301
302    #[tokio::test]
303    async fn min_iterations_is_one() {
304        // max_iterations: 0 is clamped to 1.
305        let node = LoopNode;
306        let out = node
307            .execute(make_ctx(json!({
308                "output_selector": "step.output",
309                "max_iterations":  0,
310                "flow": {
311                    "nodes": [{
312                        "id": "step", "type": "code",
313                        "data": { "language": "rhai", "code": "42" }
314                    }],
315                    "edges": []
316                }
317            })))
318            .await
319            .unwrap();
320
321        assert_eq!(out["iterations"], json!(1));
322        assert_eq!(out["output"], json!(42));
323    }
324}