minion_engine/steps/
repeat.rs1use std::collections::HashMap;
2
3use async_trait::async_trait;
4
5use crate::cli::display;
6use crate::config::StepConfig;
7use crate::control_flow::ControlFlow;
8use crate::engine::context::Context;
9use crate::error::StepError;
10use crate::workflow::schema::{ScopeDef, StepDef};
11
12use super::{
13 cmd::CmdExecutor, agent::AgentExecutor, gate::GateExecutor,
14 IterationOutput, ScopeOutput, StepExecutor, StepOutput,
15};
16
17pub struct RepeatExecutor {
18 scopes: HashMap<String, ScopeDef>,
19}
20
21impl RepeatExecutor {
22 pub fn new(scopes: &HashMap<String, ScopeDef>) -> Self {
23 Self {
24 scopes: scopes.clone(),
25 }
26 }
27}
28
29#[async_trait]
30impl StepExecutor for RepeatExecutor {
31 async fn execute(
32 &self,
33 step: &StepDef,
34 _config: &StepConfig,
35 ctx: &Context,
36 ) -> Result<StepOutput, StepError> {
37 let scope_name = step
38 .scope
39 .as_ref()
40 .ok_or_else(|| StepError::Fail("repeat step missing 'scope' field".into()))?;
41
42 let scope = self
43 .scopes
44 .get(scope_name)
45 .ok_or_else(|| StepError::Fail(format!("scope '{}' not found", scope_name)))?;
46
47 let max_iterations = step.max_iterations.unwrap_or(3);
48 let mut iterations = Vec::new();
49 let mut scope_value = step
50 .initial_value
51 .as_ref()
52 .map(|v| serde_json::to_value(v).unwrap_or(serde_json::Value::Null))
53 .unwrap_or(serde_json::Value::Null);
54
55 for i in 0..max_iterations {
56 display::iteration(i, max_iterations);
57
58 let mut child_ctx = Context::new(
62 ctx.get_var("target")
63 .and_then(|v| v.as_str())
64 .unwrap_or("")
65 .to_string(),
66 HashMap::new(),
67 );
68 child_ctx.scope_value = Some(scope_value.clone());
69 child_ctx.scope_index = i;
70
71 let mut last_output = StepOutput::Empty;
75 let mut should_break = false;
76
77 for scope_step in &scope.steps {
78 let step_config = StepConfig::default();
79
80 let result = match scope_step.step_type {
81 crate::workflow::schema::StepType::Cmd => {
82 CmdExecutor.execute(scope_step, &step_config, &child_ctx).await
83 }
84 crate::workflow::schema::StepType::Agent => {
85 AgentExecutor.execute(scope_step, &step_config, &child_ctx).await
86 }
87 crate::workflow::schema::StepType::Gate => {
88 GateExecutor.execute(scope_step, &step_config, &child_ctx).await
89 }
90 _ => Err(StepError::Fail(format!(
91 "Step type '{}' not supported in repeat scope",
92 scope_step.step_type
93 ))),
94 };
95
96 match result {
97 Ok(output) => {
98 child_ctx.store(&scope_step.name, output.clone());
99 last_output = output;
100 }
101 Err(StepError::ControlFlow(ControlFlow::Break { value, .. })) => {
102 if let Some(v) = value {
103 last_output = v;
104 }
105 should_break = true;
106 break;
107 }
108 Err(StepError::ControlFlow(ControlFlow::Skip { .. })) => {
109 child_ctx.store(&scope_step.name, StepOutput::Empty);
110 }
111 Err(StepError::ControlFlow(ControlFlow::Next { .. })) => {
112 break;
113 }
114 Err(e) => return Err(e),
115 }
116 }
117
118 let iter_output = if let Some(outputs_template) = &scope.outputs {
120 match child_ctx.render_template(outputs_template) {
121 Ok(rendered) => StepOutput::Cmd(super::CmdOutput {
122 stdout: rendered,
123 stderr: String::new(),
124 exit_code: 0,
125 duration: std::time::Duration::ZERO,
126 }),
127 Err(_) => last_output,
128 }
129 } else {
130 last_output
131 };
132
133 scope_value =
135 serde_json::Value::String(iter_output.text().to_string());
136
137 iterations.push(IterationOutput {
138 index: i,
139 output: iter_output,
140 });
141
142 if should_break {
143 break;
144 }
145 }
146
147 if iterations.len() == max_iterations {
148 tracing::warn!(
149 "repeat '{}': max iterations ({}) reached without break",
150 step.name,
151 max_iterations
152 );
153 }
154
155 let final_value = iterations.last().map(|i| Box::new(i.output.clone()));
156
157 Ok(StepOutput::Scope(ScopeOutput {
158 iterations,
159 final_value,
160 }))
161 }
162}
163
164#[cfg(test)]
165mod tests {
166 use super::*;
167 use crate::config::StepConfig;
168 use crate::engine::context::Context;
169 use crate::workflow::parser;
170
171 #[tokio::test]
172 async fn repeat_runs_max_iterations_without_break() {
173 let yaml = r#"
174name: test
175scopes:
176 my_scope:
177 steps:
178 - name: step1
179 type: cmd
180 run: "echo hello"
181steps:
182 - name: repeat_step
183 type: repeat
184 scope: my_scope
185 max_iterations: 3
186"#;
187 let wf = parser::parse_str(yaml).unwrap();
188 let repeat_step = &wf.steps[0];
189 let executor = RepeatExecutor::new(&wf.scopes);
190 let ctx = Context::new(String::new(), HashMap::new());
191
192 let result = executor
193 .execute(repeat_step, &StepConfig::default(), &ctx)
194 .await
195 .unwrap();
196
197 if let StepOutput::Scope(scope_out) = result {
198 assert_eq!(scope_out.iterations.len(), 3);
199 } else {
200 panic!("Expected Scope output");
201 }
202 }
203
204 #[tokio::test]
205 async fn repeat_breaks_on_first_iteration_when_gate_passes() {
206 let yaml = r#"
207name: test
208scopes:
209 my_scope:
210 steps:
211 - name: step1
212 type: cmd
213 run: "echo hello"
214 - name: check
215 type: gate
216 condition: "true"
217 on_pass: break
218 message: "done"
219steps:
220 - name: repeat_step
221 type: repeat
222 scope: my_scope
223 max_iterations: 5
224"#;
225 let wf = parser::parse_str(yaml).unwrap();
226 let repeat_step = &wf.steps[0];
227 let executor = RepeatExecutor::new(&wf.scopes);
228 let ctx = Context::new(String::new(), HashMap::new());
229
230 let result = executor
231 .execute(repeat_step, &StepConfig::default(), &ctx)
232 .await
233 .unwrap();
234
235 if let StepOutput::Scope(scope_out) = result {
236 assert_eq!(scope_out.iterations.len(), 1, "Should break after 1 iteration");
237 } else {
238 panic!("Expected Scope output");
239 }
240 }
241
242 #[tokio::test]
243 async fn repeat_scope_index_increments_each_iteration() {
244 let yaml = r#"
245name: test
246scopes:
247 counter:
248 steps:
249 - name: output_index
250 type: cmd
251 run: "echo {{ scope.index }}"
252steps:
253 - name: repeat_step
254 type: repeat
255 scope: counter
256 max_iterations: 3
257"#;
258 let wf = parser::parse_str(yaml).unwrap();
259 let repeat_step = &wf.steps[0];
260 let executor = RepeatExecutor::new(&wf.scopes);
261 let ctx = Context::new(String::new(), HashMap::new());
262
263 let result = executor
264 .execute(repeat_step, &StepConfig::default(), &ctx)
265 .await
266 .unwrap();
267
268 if let StepOutput::Scope(scope_out) = result {
269 assert_eq!(scope_out.iterations.len(), 3);
270 assert_eq!(scope_out.iterations[0].output.text().trim(), "0");
271 assert_eq!(scope_out.iterations[1].output.text().trim(), "1");
272 assert_eq!(scope_out.iterations[2].output.text().trim(), "2");
273 } else {
274 panic!("Expected Scope output");
275 }
276 }
277
278 #[tokio::test]
279 async fn repeat_scope_value_flows_between_iterations() {
280 let yaml = r#"
282name: test
283scopes:
284 counter:
285 steps:
286 - name: echo_scope
287 type: cmd
288 run: "echo iter-{{ scope.index }}"
289steps:
290 - name: repeat_step
291 type: repeat
292 scope: counter
293 max_iterations: 3
294 initial_value: "start"
295"#;
296 let wf = parser::parse_str(yaml).unwrap();
297 let repeat_step = &wf.steps[0];
298 let executor = RepeatExecutor::new(&wf.scopes);
299 let ctx = Context::new(String::new(), HashMap::new());
300
301 let result = executor
302 .execute(repeat_step, &StepConfig::default(), &ctx)
303 .await
304 .unwrap();
305
306 if let StepOutput::Scope(scope_out) = result {
307 assert_eq!(scope_out.iterations.len(), 3);
308 assert_eq!(scope_out.iterations[0].output.text().trim(), "iter-0");
310 assert_eq!(scope_out.iterations[1].output.text().trim(), "iter-1");
311 assert_eq!(scope_out.iterations[2].output.text().trim(), "iter-2");
312 } else {
313 panic!("Expected Scope output");
314 }
315 }
316}