minion_engine/steps/
repeat.rs1use std::collections::HashMap;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5
6use crate::cli::display;
7use crate::config::StepConfig;
8use crate::config::manager::ConfigManager;
9use crate::control_flow::ControlFlow;
10use crate::engine::context::Context;
11use crate::error::StepError;
12use crate::workflow::schema::{ScopeDef, StepDef};
13
14use super::{
15 call::{dispatch_scope_step_sandboxed, resolve_scope_step_config},
16 IterationOutput, ScopeOutput, SharedSandbox, StepExecutor, StepOutput,
17};
18
19pub struct RepeatExecutor {
20 scopes: HashMap<String, ScopeDef>,
21 sandbox: SharedSandbox,
22 config_manager: Option<Arc<ConfigManager>>,
23}
24
25impl RepeatExecutor {
26 pub fn new(scopes: &HashMap<String, ScopeDef>, sandbox: SharedSandbox) -> Self {
27 Self {
28 scopes: scopes.clone(),
29 sandbox,
30 config_manager: None,
31 }
32 }
33
34 pub fn with_config_manager(mut self, cm: Option<Arc<ConfigManager>>) -> Self {
35 self.config_manager = cm;
36 self
37 }
38}
39
40#[async_trait]
41impl StepExecutor for RepeatExecutor {
42 async fn execute(
43 &self,
44 step: &StepDef,
45 _config: &StepConfig,
46 ctx: &Context,
47 ) -> Result<StepOutput, StepError> {
48 let scope_name = step
49 .scope
50 .as_ref()
51 .ok_or_else(|| StepError::Fail("repeat step missing 'scope' field".into()))?;
52
53 let scope = self
54 .scopes
55 .get(scope_name)
56 .ok_or_else(|| StepError::Fail(format!("scope '{}' not found", scope_name)))?;
57
58 let max_iterations = step.max_iterations.unwrap_or(3);
59 let mut iterations = Vec::new();
60 let mut scope_value = step
61 .initial_value
62 .as_ref()
63 .map(|v| serde_json::to_value(v).unwrap_or(serde_json::Value::Null))
64 .unwrap_or(serde_json::Value::Null);
65
66 for i in 0..max_iterations {
67 display::iteration(i, max_iterations);
68
69 let mut child_ctx = Context::new(
71 ctx.get_var("target")
72 .and_then(|v| v.as_str())
73 .unwrap_or("")
74 .to_string(),
75 ctx.all_variables(),
76 );
77 child_ctx.scope_value = Some(scope_value.clone());
78 child_ctx.scope_index = i;
79 child_ctx.stack_info = ctx.get_stack_info().cloned();
80 child_ctx.prompts_dir = ctx.prompts_dir.clone();
81
82 let mut last_output = StepOutput::Empty;
83 let mut should_break = false;
84
85 for scope_step in &scope.steps {
86 let step_config = resolve_scope_step_config(&self.config_manager, scope_step);
87
88 let result = dispatch_scope_step_sandboxed(
89 scope_step, &step_config, &child_ctx, &self.scopes, &self.sandbox, &self.config_manager,
90 ).await;
91
92 match result {
93 Ok(output) => {
94 child_ctx.store(&scope_step.name, output.clone());
95 last_output = output;
96 }
97 Err(StepError::ControlFlow(ControlFlow::Break { value, .. })) => {
98 if let Some(v) = value {
99 last_output = v;
100 }
101 should_break = true;
102 break;
103 }
104 Err(StepError::ControlFlow(ControlFlow::Skip { .. })) => {
105 child_ctx.store(&scope_step.name, StepOutput::Empty);
106 }
107 Err(StepError::ControlFlow(ControlFlow::Next { .. })) => {
108 break;
109 }
110 Err(e) => return Err(e),
111 }
112 }
113
114 let iter_output = if let Some(outputs_template) = &scope.outputs {
116 match child_ctx.render_template(outputs_template) {
117 Ok(rendered) => StepOutput::Cmd(super::CmdOutput {
118 stdout: rendered,
119 stderr: String::new(),
120 exit_code: 0,
121 duration: std::time::Duration::ZERO,
122 }),
123 Err(_) => last_output,
124 }
125 } else {
126 last_output
127 };
128
129 scope_value =
131 serde_json::Value::String(iter_output.text().to_string());
132
133 iterations.push(IterationOutput {
134 index: i,
135 output: iter_output,
136 });
137
138 if should_break {
139 break;
140 }
141 }
142
143 if iterations.len() == max_iterations {
144 tracing::warn!(
145 "repeat '{}': max iterations ({}) reached without break",
146 step.name,
147 max_iterations
148 );
149 }
150
151 let final_value = iterations.last().map(|i| Box::new(i.output.clone()));
152
153 Ok(StepOutput::Scope(ScopeOutput {
154 iterations,
155 final_value,
156 }))
157 }
158}
159
160#[cfg(test)]
161mod tests {
162 use super::*;
163 use crate::config::StepConfig;
164 use crate::engine::context::Context;
165 use crate::workflow::parser;
166
167 #[tokio::test]
168 async fn repeat_runs_max_iterations_without_break() {
169 let yaml = r#"
170name: test
171scopes:
172 my_scope:
173 steps:
174 - name: step1
175 type: cmd
176 run: "echo hello"
177steps:
178 - name: repeat_step
179 type: repeat
180 scope: my_scope
181 max_iterations: 3
182"#;
183 let wf = parser::parse_str(yaml).unwrap();
184 let repeat_step = &wf.steps[0];
185 let executor = RepeatExecutor::new(&wf.scopes, None);
186 let ctx = Context::new(String::new(), HashMap::new());
187
188 let result = executor
189 .execute(repeat_step, &StepConfig::default(), &ctx)
190 .await
191 .unwrap();
192
193 if let StepOutput::Scope(scope_out) = result {
194 assert_eq!(scope_out.iterations.len(), 3);
195 } else {
196 panic!("Expected Scope output");
197 }
198 }
199
200 #[tokio::test]
201 async fn repeat_breaks_on_first_iteration_when_gate_passes() {
202 let yaml = r#"
203name: test
204scopes:
205 my_scope:
206 steps:
207 - name: step1
208 type: cmd
209 run: "echo hello"
210 - name: check
211 type: gate
212 condition: "true"
213 on_pass: break
214 message: "done"
215steps:
216 - name: repeat_step
217 type: repeat
218 scope: my_scope
219 max_iterations: 5
220"#;
221 let wf = parser::parse_str(yaml).unwrap();
222 let repeat_step = &wf.steps[0];
223 let executor = RepeatExecutor::new(&wf.scopes, None);
224 let ctx = Context::new(String::new(), HashMap::new());
225
226 let result = executor
227 .execute(repeat_step, &StepConfig::default(), &ctx)
228 .await
229 .unwrap();
230
231 if let StepOutput::Scope(scope_out) = result {
232 assert_eq!(scope_out.iterations.len(), 1, "Should break after 1 iteration");
233 } else {
234 panic!("Expected Scope output");
235 }
236 }
237
238 #[tokio::test]
239 async fn repeat_scope_index_increments_each_iteration() {
240 let yaml = r#"
241name: test
242scopes:
243 counter:
244 steps:
245 - name: output_index
246 type: cmd
247 run: "echo {{ scope.index }}"
248steps:
249 - name: repeat_step
250 type: repeat
251 scope: counter
252 max_iterations: 3
253"#;
254 let wf = parser::parse_str(yaml).unwrap();
255 let repeat_step = &wf.steps[0];
256 let executor = RepeatExecutor::new(&wf.scopes, None);
257 let ctx = Context::new(String::new(), HashMap::new());
258
259 let result = executor
260 .execute(repeat_step, &StepConfig::default(), &ctx)
261 .await
262 .unwrap();
263
264 if let StepOutput::Scope(scope_out) = result {
265 assert_eq!(scope_out.iterations.len(), 3);
266 assert_eq!(scope_out.iterations[0].output.text().trim(), "0");
267 assert_eq!(scope_out.iterations[1].output.text().trim(), "1");
268 assert_eq!(scope_out.iterations[2].output.text().trim(), "2");
269 } else {
270 panic!("Expected Scope output");
271 }
272 }
273
274 #[tokio::test]
275 async fn repeat_scope_value_flows_between_iterations() {
276 let yaml = r#"
278name: test
279scopes:
280 counter:
281 steps:
282 - name: echo_scope
283 type: cmd
284 run: "echo iter-{{ scope.index }}"
285steps:
286 - name: repeat_step
287 type: repeat
288 scope: counter
289 max_iterations: 3
290 initial_value: "start"
291"#;
292 let wf = parser::parse_str(yaml).unwrap();
293 let repeat_step = &wf.steps[0];
294 let executor = RepeatExecutor::new(&wf.scopes, None);
295 let ctx = Context::new(String::new(), HashMap::new());
296
297 let result = executor
298 .execute(repeat_step, &StepConfig::default(), &ctx)
299 .await
300 .unwrap();
301
302 if let StepOutput::Scope(scope_out) = result {
303 assert_eq!(scope_out.iterations.len(), 3);
304 assert_eq!(scope_out.iterations[0].output.text().trim(), "iter-0");
306 assert_eq!(scope_out.iterations[1].output.text().trim(), "iter-1");
307 assert_eq!(scope_out.iterations[2].output.text().trim(), "iter-2");
308 } else {
309 panic!("Expected Scope output");
310 }
311 }
312}