a3s_flow/nodes/loop_node.rs
1//! `"loop"` node — while-loop over an inline sub-flow.
2//!
3//! Repeatedly executes a sub-flow until either a `break_condition` is satisfied
4//! or `max_iterations` is reached. The last iteration's collected output is
5//! returned. Each iteration receives the previous one's output as a variable,
6//! enabling accumulation and chaining patterns.
7//!
8//! # Config schema
9//!
10//! ```json
11//! {
12//! "flow": { "nodes": [...], "edges": [...] },
13//! "output_selector": "step.result",
14//! "max_iterations": 10,
15//! "break_condition": { "from": "step", "path": "done", "op": "eq", "value": true }
16//! }
17//! ```
18//!
19//! | Field | Type | Required | Description |
20//! |-------|------|:--------:|-------------|
21//! | `flow` | object | ✅ | Inline sub-flow definition (`{ "nodes", "edges" }`) |
22//! | `output_selector` | string | ✅ | Dot path into sub-flow outputs to collect each iteration (`"node_id"` or `"node_id.field"`) |
23//! | `max_iterations` | integer | — | Safety cap (default `10`, minimum `1`) |
24//! | `break_condition` | Condition | — | If provided, the loop stops when this condition evaluates to true against the sub-flow's outputs; without it the loop always runs `max_iterations` times |
25//!
26//! ## Variables injected into each iteration's sub-flow
27//!
28//! | Variable | Value |
29//! |----------|-------|
30//! | `iteration_index` | 0-based iteration counter |
31//! | `loop_output` | The previous iteration's collected output (`null` for the first iteration) |
32//!
33//! # Output schema
34//!
35//! ```json
36//! { "output": <last_output_selector_result>, "iterations": 3 }
37//! ```
38//!
39//! # Example — retry until success
40//!
41//! ```json
42//! {
43//! "id": "retry",
44//! "type": "loop",
45//! "data": {
46//! "max_iterations": 5,
47//! "output_selector": "check.ok",
48//! "break_condition": { "from": "check", "path": "ok", "op": "eq", "value": true },
49//! "flow": {
50//! "nodes": [
51//! { "id": "fetch", "type": "http-request", "data": { "url": "https://api.example.com/status" } },
52//! { "id": "check", "type": "if-else", "data": { "cases": [
53//! { "id": "ok", "conditions": [{ "from": "fetch", "path": "status", "op": "eq", "value": 200 }] }
54//! ]}}
55//! ],
56//! "edges": [{ "source": "fetch", "target": "check" }]
57//! }
58//! }
59//! }
60//! ```
61
62use std::collections::HashMap;
63use std::sync::Arc;
64
65use async_trait::async_trait;
66use serde_json::{json, Value};
67
68use crate::condition::{get_path, Condition};
69use crate::error::{FlowError, Result};
70use crate::graph::DagGraph;
71use crate::node::{ExecContext, Node};
72use crate::runner::FlowRunner;
73
74/// Loop node — while-loop over a sub-flow.
75pub struct LoopNode;
76
77#[async_trait]
78impl Node for LoopNode {
79 fn node_type(&self) -> &str {
80 "loop"
81 }
82
83 async fn execute(&self, ctx: ExecContext) -> Result<Value> {
84 // ── Parse config ──────────────────────────────────────────────────
85 let sub_flow_def = ctx
86 .data
87 .get("flow")
88 .ok_or_else(|| FlowError::InvalidDefinition("loop: missing data.flow".into()))?;
89
90 let output_selector = ctx.data["output_selector"]
91 .as_str()
92 .ok_or_else(|| {
93 FlowError::InvalidDefinition("loop: missing data.output_selector".into())
94 })?
95 .to_string();
96
97 let max_iterations = ctx.data["max_iterations"].as_u64().unwrap_or(10).max(1) as usize;
98
99 let break_condition: Option<Condition> = ctx
100 .data
101 .get("break_condition")
102 .and_then(|v| serde_json::from_value(v.clone()).ok());
103
104 // ── Parse sub-flow DAG once ───────────────────────────────────────
105 let sub_dag = DagGraph::from_json(sub_flow_def)?;
106 let registry = Arc::clone(&ctx.registry);
107 let base_variables = ctx.variables.clone();
108
109 let mut loop_output = Value::Null;
110 let mut actual_iterations = 0usize;
111
112 for i in 0..max_iterations {
113 actual_iterations = i + 1;
114
115 let mut vars = base_variables.clone();
116 vars.insert("iteration_index".into(), json!(i));
117 vars.insert("loop_output".into(), loop_output.clone());
118
119 let runner = FlowRunner::with_arc_registry(sub_dag.clone(), Arc::clone(®istry));
120 let sub_result = runner.run(vars).await?;
121
122 // Collect this iteration's output via the selector.
123 loop_output = resolve_selector(&sub_result.outputs, &output_selector)
124 .cloned()
125 .unwrap_or(Value::Null);
126
127 // Evaluate break condition against sub-flow outputs.
128 if let Some(ref cond) = break_condition {
129 if cond.evaluate(&sub_result.outputs, &sub_result.skipped_nodes) {
130 break;
131 }
132 }
133 }
134
135 Ok(json!({
136 "output": loop_output,
137 "iterations": actual_iterations,
138 }))
139 }
140}
141
142// ── Internal helpers ───────────────────────────────────────────────────────
143
144/// Resolve `"node_id"` or `"node_id.field.subfield"` into the sub-flow outputs.
145fn resolve_selector<'a>(outputs: &'a HashMap<String, Value>, selector: &str) -> Option<&'a Value> {
146 let (node_id, rest) = match selector.find('.') {
147 Some(pos) => (&selector[..pos], &selector[pos + 1..]),
148 None => (selector, ""),
149 };
150 let node_out = outputs.get(node_id)?;
151 if rest.is_empty() {
152 Some(node_out)
153 } else {
154 get_path(node_out, rest)
155 }
156}
157
158// ── Tests ─────────────────────────────────────────────────────────────────────
159
160#[cfg(test)]
161mod tests {
162 use super::*;
163 use serde_json::json;
164
165 fn make_ctx(data: Value) -> ExecContext {
166 ExecContext {
167 data,
168 ..Default::default()
169 }
170 }
171
172 // ── Config validation ──────────────────────────────────────────────────
173
174 #[tokio::test]
175 async fn rejects_missing_flow() {
176 let node = LoopNode;
177 let err = node
178 .execute(make_ctx(json!({ "output_selector": "n" })))
179 .await
180 .unwrap_err();
181 assert!(matches!(err, FlowError::InvalidDefinition(_)));
182 }
183
184 #[tokio::test]
185 async fn rejects_missing_output_selector() {
186 let node = LoopNode;
187 let err = node
188 .execute(make_ctx(json!({
189 "flow": { "nodes": [{ "id": "n", "type": "noop" }], "edges": [] }
190 })))
191 .await
192 .unwrap_err();
193 assert!(matches!(err, FlowError::InvalidDefinition(_)));
194 }
195
196 // ── Execution ──────────────────────────────────────────────────────────
197
198 #[tokio::test]
199 async fn runs_max_iterations_without_break_condition() {
200 let node = LoopNode;
201 let out = node
202 .execute(make_ctx(json!({
203 "output_selector": "step.output",
204 "max_iterations": 3,
205 "flow": {
206 "nodes": [{
207 "id": "step", "type": "code",
208 "data": { "language": "rhai", "code": "variables.iteration_index" }
209 }],
210 "edges": []
211 }
212 })))
213 .await
214 .unwrap();
215
216 // Ran exactly 3 times; last index is 2.
217 assert_eq!(out["iterations"], json!(3));
218 assert_eq!(out["output"], json!(2));
219 }
220
221 #[tokio::test]
222 async fn defaults_to_ten_iterations() {
223 // No max_iterations specified → defaults to 10.
224 let node = LoopNode;
225 let out = node
226 .execute(make_ctx(json!({
227 "output_selector": "step.output",
228 "flow": {
229 "nodes": [{
230 "id": "step", "type": "code",
231 "data": { "language": "rhai", "code": "variables.iteration_index" }
232 }],
233 "edges": []
234 }
235 })))
236 .await
237 .unwrap();
238
239 assert_eq!(out["iterations"], json!(10));
240 }
241
242 #[tokio::test]
243 async fn break_condition_stops_loop_early() {
244 // Sub-flow returns { "done": true } when iteration_index >= 2.
245 // break_condition: done == true → loop ends after iteration 2 (index 2).
246 let node = LoopNode;
247 let out = node
248 .execute(make_ctx(json!({
249 "output_selector": "step.output",
250 "max_iterations": 10,
251 "break_condition": {
252 "from": "gate", "path": "output", "op": "eq", "value": true
253 },
254 "flow": {
255 "nodes": [
256 {
257 "id": "step", "type": "code",
258 "data": { "language": "rhai", "code": "variables.iteration_index" }
259 },
260 {
261 "id": "gate", "type": "code",
262 "data": { "language": "rhai", "code": "variables.iteration_index >= 2" }
263 }
264 ],
265 "edges": []
266 }
267 })))
268 .await
269 .unwrap();
270
271 // Break fires when iteration_index == 2, so 3 iterations ran (0, 1, 2).
272 assert_eq!(out["iterations"], json!(3));
273 assert_eq!(out["output"], json!(2));
274 }
275
276 #[tokio::test]
277 async fn loop_output_injected_into_next_iteration() {
278 // Each iteration's output is its iteration_index.
279 // loop_output for iteration N is the output of iteration N-1.
280 // We collect loop_output from iteration index=2 (the 3rd and final run).
281 let node = LoopNode;
282 let out = node
283 .execute(make_ctx(json!({
284 "output_selector": "collect.output",
285 "max_iterations": 3,
286 "flow": {
287 "nodes": [{
288 // Return iteration_index as a proxy for the collected value.
289 "id": "collect", "type": "code",
290 "data": { "language": "rhai", "code": "variables.iteration_index" }
291 }],
292 "edges": []
293 }
294 })))
295 .await
296 .unwrap();
297
298 // 3 iterations ran; final output = index of last = 2.
299 assert_eq!(out["output"], json!(2));
300 }
301
302 #[tokio::test]
303 async fn min_iterations_is_one() {
304 // max_iterations: 0 is clamped to 1.
305 let node = LoopNode;
306 let out = node
307 .execute(make_ctx(json!({
308 "output_selector": "step.output",
309 "max_iterations": 0,
310 "flow": {
311 "nodes": [{
312 "id": "step", "type": "code",
313 "data": { "language": "rhai", "code": "42" }
314 }],
315 "edges": []
316 }
317 })))
318 .await
319 .unwrap();
320
321 assert_eq!(out["iterations"], json!(1));
322 assert_eq!(out["output"], json!(42));
323 }
324}