minion_engine/steps/
repeat.rs1use std::collections::HashMap;
2
3use async_trait::async_trait;
4
5use crate::cli::display;
6use crate::config::StepConfig;
7use crate::control_flow::ControlFlow;
8use crate::engine::context::Context;
9use crate::error::StepError;
10use crate::workflow::schema::{ScopeDef, StepDef};
11
12use super::{
13 cmd::CmdExecutor, agent::AgentExecutor, gate::GateExecutor,
14 IterationOutput, ScopeOutput, StepExecutor, StepOutput,
15};
16
17pub struct RepeatExecutor {
18 scopes: HashMap<String, ScopeDef>,
19}
20
21impl RepeatExecutor {
22 pub fn new(scopes: &HashMap<String, ScopeDef>) -> Self {
23 Self {
24 scopes: scopes.clone(),
25 }
26 }
27}
28
29#[async_trait]
30impl StepExecutor for RepeatExecutor {
31 async fn execute(
32 &self,
33 step: &StepDef,
34 _config: &StepConfig,
35 ctx: &Context,
36 ) -> Result<StepOutput, StepError> {
37 let scope_name = step
38 .scope
39 .as_ref()
40 .ok_or_else(|| StepError::Fail("repeat step missing 'scope' field".into()))?;
41
42 let scope = self
43 .scopes
44 .get(scope_name)
45 .ok_or_else(|| StepError::Fail(format!("scope '{}' not found", scope_name)))?;
46
47 let max_iterations = step.max_iterations.unwrap_or(3);
48 let mut iterations = Vec::new();
49 let mut scope_value = step
50 .initial_value
51 .as_ref()
52 .map(|v| serde_json::to_value(v).unwrap_or(serde_json::Value::Null))
53 .unwrap_or(serde_json::Value::Null);
54
55 for i in 0..max_iterations {
56 display::iteration(i, max_iterations);
57
58 let mut child_ctx = Context::new(
60 ctx.get_var("target")
61 .and_then(|v| v.as_str())
62 .unwrap_or("")
63 .to_string(),
64 ctx.all_variables(),
65 );
66 child_ctx.scope_value = Some(scope_value.clone());
67 child_ctx.scope_index = i;
68 child_ctx.stack_info = ctx.get_stack_info().cloned();
69 child_ctx.prompts_dir = ctx.prompts_dir.clone();
70
71 let mut last_output = StepOutput::Empty;
72 let mut should_break = false;
73
74 for scope_step in &scope.steps {
75 let step_config = StepConfig::default();
76
77 let result = match scope_step.step_type {
78 crate::workflow::schema::StepType::Cmd => {
79 CmdExecutor.execute(scope_step, &step_config, &child_ctx).await
80 }
81 crate::workflow::schema::StepType::Agent => {
82 AgentExecutor.execute(scope_step, &step_config, &child_ctx).await
83 }
84 crate::workflow::schema::StepType::Gate => {
85 GateExecutor.execute(scope_step, &step_config, &child_ctx).await
86 }
87 _ => Err(StepError::Fail(format!(
88 "Step type '{}' not supported in repeat scope",
89 scope_step.step_type
90 ))),
91 };
92
93 match result {
94 Ok(output) => {
95 child_ctx.store(&scope_step.name, output.clone());
96 last_output = output;
97 }
98 Err(StepError::ControlFlow(ControlFlow::Break { value, .. })) => {
99 if let Some(v) = value {
100 last_output = v;
101 }
102 should_break = true;
103 break;
104 }
105 Err(StepError::ControlFlow(ControlFlow::Skip { .. })) => {
106 child_ctx.store(&scope_step.name, StepOutput::Empty);
107 }
108 Err(StepError::ControlFlow(ControlFlow::Next { .. })) => {
109 break;
110 }
111 Err(e) => return Err(e),
112 }
113 }
114
115 let iter_output = if let Some(outputs_template) = &scope.outputs {
117 match child_ctx.render_template(outputs_template) {
118 Ok(rendered) => StepOutput::Cmd(super::CmdOutput {
119 stdout: rendered,
120 stderr: String::new(),
121 exit_code: 0,
122 duration: std::time::Duration::ZERO,
123 }),
124 Err(_) => last_output,
125 }
126 } else {
127 last_output
128 };
129
130 scope_value =
132 serde_json::Value::String(iter_output.text().to_string());
133
134 iterations.push(IterationOutput {
135 index: i,
136 output: iter_output,
137 });
138
139 if should_break {
140 break;
141 }
142 }
143
144 if iterations.len() == max_iterations {
145 tracing::warn!(
146 "repeat '{}': max iterations ({}) reached without break",
147 step.name,
148 max_iterations
149 );
150 }
151
152 let final_value = iterations.last().map(|i| Box::new(i.output.clone()));
153
154 Ok(StepOutput::Scope(ScopeOutput {
155 iterations,
156 final_value,
157 }))
158 }
159}
160
161#[cfg(test)]
162mod tests {
163 use super::*;
164 use crate::config::StepConfig;
165 use crate::engine::context::Context;
166 use crate::workflow::parser;
167
168 #[tokio::test]
169 async fn repeat_runs_max_iterations_without_break() {
170 let yaml = r#"
171name: test
172scopes:
173 my_scope:
174 steps:
175 - name: step1
176 type: cmd
177 run: "echo hello"
178steps:
179 - name: repeat_step
180 type: repeat
181 scope: my_scope
182 max_iterations: 3
183"#;
184 let wf = parser::parse_str(yaml).unwrap();
185 let repeat_step = &wf.steps[0];
186 let executor = RepeatExecutor::new(&wf.scopes);
187 let ctx = Context::new(String::new(), HashMap::new());
188
189 let result = executor
190 .execute(repeat_step, &StepConfig::default(), &ctx)
191 .await
192 .unwrap();
193
194 if let StepOutput::Scope(scope_out) = result {
195 assert_eq!(scope_out.iterations.len(), 3);
196 } else {
197 panic!("Expected Scope output");
198 }
199 }
200
201 #[tokio::test]
202 async fn repeat_breaks_on_first_iteration_when_gate_passes() {
203 let yaml = r#"
204name: test
205scopes:
206 my_scope:
207 steps:
208 - name: step1
209 type: cmd
210 run: "echo hello"
211 - name: check
212 type: gate
213 condition: "true"
214 on_pass: break
215 message: "done"
216steps:
217 - name: repeat_step
218 type: repeat
219 scope: my_scope
220 max_iterations: 5
221"#;
222 let wf = parser::parse_str(yaml).unwrap();
223 let repeat_step = &wf.steps[0];
224 let executor = RepeatExecutor::new(&wf.scopes);
225 let ctx = Context::new(String::new(), HashMap::new());
226
227 let result = executor
228 .execute(repeat_step, &StepConfig::default(), &ctx)
229 .await
230 .unwrap();
231
232 if let StepOutput::Scope(scope_out) = result {
233 assert_eq!(scope_out.iterations.len(), 1, "Should break after 1 iteration");
234 } else {
235 panic!("Expected Scope output");
236 }
237 }
238
239 #[tokio::test]
240 async fn repeat_scope_index_increments_each_iteration() {
241 let yaml = r#"
242name: test
243scopes:
244 counter:
245 steps:
246 - name: output_index
247 type: cmd
248 run: "echo {{ scope.index }}"
249steps:
250 - name: repeat_step
251 type: repeat
252 scope: counter
253 max_iterations: 3
254"#;
255 let wf = parser::parse_str(yaml).unwrap();
256 let repeat_step = &wf.steps[0];
257 let executor = RepeatExecutor::new(&wf.scopes);
258 let ctx = Context::new(String::new(), HashMap::new());
259
260 let result = executor
261 .execute(repeat_step, &StepConfig::default(), &ctx)
262 .await
263 .unwrap();
264
265 if let StepOutput::Scope(scope_out) = result {
266 assert_eq!(scope_out.iterations.len(), 3);
267 assert_eq!(scope_out.iterations[0].output.text().trim(), "0");
268 assert_eq!(scope_out.iterations[1].output.text().trim(), "1");
269 assert_eq!(scope_out.iterations[2].output.text().trim(), "2");
270 } else {
271 panic!("Expected Scope output");
272 }
273 }
274
275 #[tokio::test]
276 async fn repeat_scope_value_flows_between_iterations() {
277 let yaml = r#"
279name: test
280scopes:
281 counter:
282 steps:
283 - name: echo_scope
284 type: cmd
285 run: "echo iter-{{ scope.index }}"
286steps:
287 - name: repeat_step
288 type: repeat
289 scope: counter
290 max_iterations: 3
291 initial_value: "start"
292"#;
293 let wf = parser::parse_str(yaml).unwrap();
294 let repeat_step = &wf.steps[0];
295 let executor = RepeatExecutor::new(&wf.scopes);
296 let ctx = Context::new(String::new(), HashMap::new());
297
298 let result = executor
299 .execute(repeat_step, &StepConfig::default(), &ctx)
300 .await
301 .unwrap();
302
303 if let StepOutput::Scope(scope_out) = result {
304 assert_eq!(scope_out.iterations.len(), 3);
305 assert_eq!(scope_out.iterations[0].output.text().trim(), "iter-0");
307 assert_eq!(scope_out.iterations[1].output.text().trim(), "iter-1");
308 assert_eq!(scope_out.iterations[2].output.text().trim(), "iter-2");
309 } else {
310 panic!("Expected Scope output");
311 }
312 }
313}