1use std::collections::HashMap;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use tokio::sync::Semaphore;
6use tokio::task::JoinSet;
7
8use crate::config::StepConfig;
9use crate::config::manager::ConfigManager;
10use crate::control_flow::ControlFlow;
11use crate::engine::context::Context;
12use crate::error::StepError;
13use crate::workflow::schema::{ScopeDef, StepDef};
14
15use super::{
16 call::{dispatch_scope_step_sandboxed, resolve_scope_step_config},
17 CmdOutput, IterationOutput, ScopeOutput, SharedSandbox,
18 StepExecutor, StepOutput,
19};
20
21fn apply_reduce(
23 scope: &ScopeOutput,
24 reducer: &str,
25 condition_template: Option<&str>,
26) -> Result<StepOutput, crate::error::StepError> {
27 let iterations = &scope.iterations;
28
29 match reducer {
30 "concat" => {
31 let joined = iterations
32 .iter()
33 .map(|it| it.output.text().to_string())
34 .collect::<Vec<_>>()
35 .join("\n");
36 Ok(StepOutput::Cmd(CmdOutput {
37 stdout: joined,
38 stderr: String::new(),
39 exit_code: 0,
40 duration: std::time::Duration::ZERO,
41 }))
42 }
43 "sum" => {
44 let sum: f64 = iterations
45 .iter()
46 .map(|it| it.output.text().trim().parse::<f64>().unwrap_or(0.0))
47 .sum();
48 let result = if sum.fract() == 0.0 {
50 format!("{}", sum as i64)
51 } else {
52 format!("{}", sum)
53 };
54 Ok(StepOutput::Cmd(CmdOutput {
55 stdout: result,
56 stderr: String::new(),
57 exit_code: 0,
58 duration: std::time::Duration::ZERO,
59 }))
60 }
61 "count" => {
62 Ok(StepOutput::Cmd(CmdOutput {
63 stdout: iterations.len().to_string(),
64 stderr: String::new(),
65 exit_code: 0,
66 duration: std::time::Duration::ZERO,
67 }))
68 }
69 "min" => {
70 let min_val = iterations
71 .iter()
72 .filter_map(|it| it.output.text().trim().parse::<f64>().ok())
73 .fold(f64::INFINITY, f64::min);
74 let result = if min_val.fract() == 0.0 {
75 format!("{}", min_val as i64)
76 } else {
77 format!("{}", min_val)
78 };
79 Ok(StepOutput::Cmd(CmdOutput {
80 stdout: result,
81 stderr: String::new(),
82 exit_code: 0,
83 duration: std::time::Duration::ZERO,
84 }))
85 }
86 "max" => {
87 let max_val = iterations
88 .iter()
89 .filter_map(|it| it.output.text().trim().parse::<f64>().ok())
90 .fold(f64::NEG_INFINITY, f64::max);
91 let result = if max_val.fract() == 0.0 {
92 format!("{}", max_val as i64)
93 } else {
94 format!("{}", max_val)
95 };
96 Ok(StepOutput::Cmd(CmdOutput {
97 stdout: result,
98 stderr: String::new(),
99 exit_code: 0,
100 duration: std::time::Duration::ZERO,
101 }))
102 }
103 "filter" => {
104 let tmpl = condition_template.ok_or_else(|| {
105 crate::error::StepError::Fail(
106 "reduce: 'filter' requires 'reduce_condition' to be set".to_string(),
107 )
108 })?;
109
110 let mut kept = Vec::new();
111 for it in iterations {
112 let mut vars = std::collections::HashMap::new();
114 vars.insert(
115 "item_output".to_string(),
116 serde_json::Value::String(it.output.text().to_string()),
117 );
118 let simplified_tmpl = tmpl
121 .replace("{{item.output}}", "{{ item_output }}")
122 .replace("{{ item.output }}", "{{ item_output }}");
123 let child_ctx =
124 crate::engine::context::Context::new(String::new(), vars);
125 let rendered = child_ctx
126 .render_template(&simplified_tmpl)
127 .unwrap_or_default();
128 let passes = !rendered.trim().is_empty()
129 && rendered.trim() != "false"
130 && rendered.trim() != "0";
131 if passes {
132 kept.push(it.output.text().to_string());
133 }
134 }
135
136 let joined = kept.join("\n");
137 Ok(StepOutput::Cmd(CmdOutput {
138 stdout: joined,
139 stderr: String::new(),
140 exit_code: 0,
141 duration: std::time::Duration::ZERO,
142 }))
143 }
144 other => Err(crate::error::StepError::Fail(format!(
145 "unknown reduce operation '{}'; expected concat, sum, count, filter, min, max",
146 other
147 ))),
148 }
149}
150
151fn apply_collect(scope: ScopeOutput, mode: &str) -> Result<StepOutput, crate::error::StepError> {
153 match mode {
154 "text" => {
155 let joined = scope
156 .iterations
157 .iter()
158 .map(|it| it.output.text().to_string())
159 .collect::<Vec<_>>()
160 .join("\n");
161 Ok(StepOutput::Cmd(CmdOutput {
162 stdout: joined,
163 stderr: String::new(),
164 exit_code: 0,
165 duration: std::time::Duration::ZERO,
166 }))
167 }
168 "all" | "json" => {
169 let arr: Vec<serde_json::Value> = scope
170 .iterations
171 .iter()
172 .map(|it| serde_json::Value::String(it.output.text().to_string()))
173 .collect();
174 let json = serde_json::to_string(&arr)
175 .map_err(|e| crate::error::StepError::Fail(format!("collect serialize error: {e}")))?;
176 Ok(StepOutput::Cmd(CmdOutput {
177 stdout: json,
178 stderr: String::new(),
179 exit_code: 0,
180 duration: std::time::Duration::ZERO,
181 }))
182 }
183 other => Err(crate::error::StepError::Fail(format!(
184 "unknown collect mode '{}'; expected all, text, or json",
185 other
186 ))),
187 }
188}
189
190pub struct MapExecutor {
191 scopes: HashMap<String, ScopeDef>,
192 sandbox: SharedSandbox,
193 config_manager: Option<Arc<ConfigManager>>,
194}
195
196impl MapExecutor {
197 pub fn new(scopes: &HashMap<String, ScopeDef>, sandbox: SharedSandbox) -> Self {
198 Self {
199 scopes: scopes.clone(),
200 sandbox,
201 config_manager: None,
202 }
203 }
204
205 pub fn with_config_manager(mut self, cm: Option<Arc<ConfigManager>>) -> Self {
206 self.config_manager = cm;
207 self
208 }
209}
210
211#[async_trait]
212impl StepExecutor for MapExecutor {
213 async fn execute(
214 &self,
215 step: &StepDef,
216 _config: &StepConfig,
217 ctx: &Context,
218 ) -> Result<StepOutput, StepError> {
219 let items_template = step
220 .items
221 .as_ref()
222 .ok_or_else(|| StepError::Fail("map step missing 'items' field".into()))?;
223
224 let scope_name = step
225 .scope
226 .as_ref()
227 .ok_or_else(|| StepError::Fail("map step missing 'scope' field".into()))?;
228
229 let scope = self
230 .scopes
231 .get(scope_name)
232 .ok_or_else(|| StepError::Fail(format!("scope '{}' not found", scope_name)))?
233 .clone();
234
235 let rendered_items = ctx.render_template(items_template)?;
236
237 let items: Vec<String> = if rendered_items.trim().starts_with('[') {
239 serde_json::from_str::<Vec<serde_json::Value>>(&rendered_items)
240 .map(|arr| {
241 arr.into_iter()
242 .map(|v| match v {
243 serde_json::Value::String(s) => s,
244 other => other.to_string(),
245 })
246 .collect()
247 })
248 .unwrap_or_else(|_| {
249 rendered_items
250 .lines()
251 .filter(|l| !l.trim().is_empty())
252 .map(|l| l.to_string())
253 .collect()
254 })
255 } else {
256 rendered_items
257 .lines()
258 .filter(|l| !l.trim().is_empty())
259 .map(|l| l.to_string())
260 .collect()
261 };
262
263 let parallel_count = step.parallel.unwrap_or(0);
264
265 let scope_output = if parallel_count == 0 {
266 serial_execute(items, &scope, ctx, &self.scopes, &self.sandbox, &self.config_manager).await?
268 } else {
269 parallel_execute(items, &scope, ctx, &self.scopes, parallel_count, &self.sandbox, &self.config_manager).await?
271 };
272
273 let reduce_mode = _config.get_str("reduce").map(|s| s.to_string());
275 if let Some(ref reducer) = reduce_mode {
276 if let StepOutput::Scope(ref s) = scope_output {
277 let condition = _config.get_str("reduce_condition");
278 return apply_reduce(s, reducer, condition);
279 }
280 }
281
282 let collect_mode = _config.get_str("collect").map(|s| s.to_string());
284 match (scope_output, collect_mode) {
285 (StepOutput::Scope(s), Some(mode)) => apply_collect(s, &mode),
286 (output, _) => Ok(output),
287 }
288 }
289}
290
291async fn serial_execute(
292 items: Vec<String>,
293 scope: &ScopeDef,
294 ctx: &Context,
295 scopes: &HashMap<String, ScopeDef>,
296 sandbox: &SharedSandbox,
297 config_manager: &Option<Arc<ConfigManager>>,
298) -> Result<StepOutput, StepError> {
299 let mut iterations = Vec::new();
300
301 for (i, item) in items.iter().enumerate() {
302 let mut child_ctx = make_child_ctx(ctx, Some(serde_json::Value::String(item.clone())), i);
303
304 let iter_output = execute_scope_steps(scope, &mut child_ctx, scopes, sandbox, config_manager).await?;
305
306 iterations.push(IterationOutput {
307 index: i,
308 output: iter_output,
309 });
310 }
311
312 let final_value = iterations.last().map(|i| Box::new(i.output.clone()));
313 Ok(StepOutput::Scope(ScopeOutput {
314 iterations,
315 final_value,
316 }))
317}
318
319async fn parallel_execute(
320 items: Vec<String>,
321 scope: &ScopeDef,
322 ctx: &Context,
323 scopes: &HashMap<String, ScopeDef>,
324 parallel_count: usize,
325 sandbox: &SharedSandbox,
326 config_manager: &Option<Arc<ConfigManager>>,
327) -> Result<StepOutput, StepError> {
328 let sem = Arc::new(Semaphore::new(parallel_count));
329 let mut set: JoinSet<(usize, Result<StepOutput, StepError>)> = JoinSet::new();
330
331 for (i, item) in items.iter().enumerate() {
332 let sem = Arc::clone(&sem);
333 let item_val = serde_json::Value::String(item.clone());
334 let child_ctx = make_child_ctx(ctx, Some(item_val), i);
335 let scope_clone = scope.clone();
336 let scopes_clone = scopes.clone();
337 let sandbox_clone = sandbox.clone();
338 let cm_clone = config_manager.clone();
339
340 set.spawn(async move {
341 let _permit = sem.acquire().await.expect("semaphore closed");
342 let result = execute_scope_steps_owned(scope_clone, child_ctx, scopes_clone, sandbox_clone, cm_clone).await;
343 (i, result)
344 });
345 }
346
347 let mut results: Vec<Option<StepOutput>> = vec![None; items.len()];
348
349 while let Some(res) = set.join_next().await {
350 match res {
351 Ok((i, Ok(output))) => {
352 results[i] = Some(output);
353 }
354 Ok((_, Err(e))) => {
355 set.abort_all();
356 return Err(e);
357 }
358 Err(e) => {
359 set.abort_all();
360 return Err(StepError::Fail(format!("Task panicked: {e}")));
361 }
362 }
363 }
364
365 let iterations: Vec<IterationOutput> = results
366 .into_iter()
367 .enumerate()
368 .map(|(i, opt)| IterationOutput {
369 index: i,
370 output: opt.unwrap_or(StepOutput::Empty),
371 })
372 .collect();
373
374 let final_value = iterations.last().map(|i| Box::new(i.output.clone()));
375 Ok(StepOutput::Scope(ScopeOutput {
376 iterations,
377 final_value,
378 }))
379}
380
381fn make_child_ctx(
382 parent: &Context,
383 scope_value: Option<serde_json::Value>,
384 index: usize,
385) -> Context {
386 let target = parent
387 .get_var("target")
388 .and_then(|v| v.as_str())
389 .unwrap_or("")
390 .to_string();
391 let mut ctx = Context::new(target, parent.all_variables());
392 ctx.scope_value = scope_value;
393 ctx.scope_index = index;
394 ctx.stack_info = parent.get_stack_info().cloned();
395 ctx.prompts_dir = parent.prompts_dir.clone();
396 ctx
397}
398
399async fn execute_scope_steps(
400 scope: &ScopeDef,
401 child_ctx: &mut Context,
402 scopes: &HashMap<String, ScopeDef>,
403 sandbox: &SharedSandbox,
404 config_manager: &Option<Arc<ConfigManager>>,
405) -> Result<StepOutput, StepError> {
406 let mut last_output = StepOutput::Empty;
407
408 for scope_step in &scope.steps {
409 let config = resolve_scope_step_config(config_manager, scope_step);
410 let result = dispatch_scope_step_sandboxed(scope_step, &config, child_ctx, scopes, sandbox, config_manager).await;
411
412 match result {
413 Ok(output) => {
414 child_ctx.store(&scope_step.name, output.clone());
415 last_output = output;
416 }
417 Err(StepError::ControlFlow(ControlFlow::Break { value, .. })) => {
418 if let Some(v) = value {
419 last_output = v;
420 }
421 break;
422 }
423 Err(StepError::ControlFlow(ControlFlow::Skip { .. })) => {
424 child_ctx.store(&scope_step.name, StepOutput::Empty);
425 }
426 Err(StepError::ControlFlow(ControlFlow::Next { .. })) => {
427 break;
428 }
429 Err(e) => return Err(e),
430 }
431 }
432
433 if let Some(outputs_template) = &scope.outputs {
435 if let Ok(rendered) = child_ctx.render_template(outputs_template) {
436 return Ok(StepOutput::Cmd(CmdOutput {
437 stdout: rendered,
438 stderr: String::new(),
439 exit_code: 0,
440 duration: std::time::Duration::ZERO,
441 }));
442 }
443 }
444
445 Ok(last_output)
446}
447
448async fn execute_scope_steps_owned(
449 scope: ScopeDef,
450 mut child_ctx: Context,
451 scopes: HashMap<String, ScopeDef>,
452 sandbox: SharedSandbox,
453 config_manager: Option<Arc<ConfigManager>>,
454) -> Result<StepOutput, StepError> {
455 execute_scope_steps(&scope, &mut child_ctx, &scopes, &sandbox, &config_manager).await
456}
457
458#[cfg(test)]
459mod tests {
460 use super::*;
461 use std::collections::HashMap;
462 use crate::workflow::schema::{ScopeDef, StepType};
463
464 fn cmd_step(name: &str, run: &str) -> StepDef {
465 StepDef {
466 name: name.to_string(),
467 step_type: StepType::Cmd,
468 run: Some(run.to_string()),
469 prompt: None,
470 condition: None,
471 on_pass: None,
472 on_fail: None,
473 message: None,
474 scope: None,
475 max_iterations: None,
476 initial_value: None,
477 items: None,
478 parallel: None,
479 steps: None,
480 config: HashMap::new(),
481 outputs: None,
482 output_type: None,
483 async_exec: None,
484 }
485 }
486
487 fn map_step(name: &str, items: &str, scope: &str, parallel: Option<usize>) -> StepDef {
488 StepDef {
489 name: name.to_string(),
490 step_type: StepType::Map,
491 run: None,
492 prompt: None,
493 condition: None,
494 on_pass: None,
495 on_fail: None,
496 message: None,
497 scope: Some(scope.to_string()),
498 max_iterations: None,
499 initial_value: None,
500 items: Some(items.to_string()),
501 parallel,
502 steps: None,
503 config: HashMap::new(),
504 outputs: None,
505 output_type: None,
506 async_exec: None,
507 }
508 }
509
510 fn echo_scope() -> ScopeDef {
511 ScopeDef {
512 steps: vec![cmd_step("echo", "echo {{ scope.value }}")],
513 outputs: None,
514 }
515 }
516
517 #[tokio::test]
518 async fn map_three_items_serial() {
519 let mut scopes = HashMap::new();
520 scopes.insert("echo_scope".to_string(), echo_scope());
521
522 let step = map_step("map_test", "alpha\nbeta\ngamma", "echo_scope", None);
523 let executor = MapExecutor::new(&scopes, None);
524 let config = StepConfig::default();
525 let ctx = Context::new(String::new(), HashMap::new());
526
527 let result = executor.execute(&step, &config, &ctx).await.unwrap();
528 if let StepOutput::Scope(scope_out) = &result {
529 assert_eq!(scope_out.iterations.len(), 3);
530 assert!(scope_out.iterations[0].output.text().contains("alpha"));
531 assert!(scope_out.iterations[1].output.text().contains("beta"));
532 assert!(scope_out.iterations[2].output.text().contains("gamma"));
533 } else {
534 panic!("Expected Scope output");
535 }
536 }
537
538 #[tokio::test]
539 async fn map_three_items_parallel() {
540 let mut scopes = HashMap::new();
541 scopes.insert("echo_scope".to_string(), echo_scope());
542
543 let step = map_step("map_parallel", "a\nb\nc", "echo_scope", Some(3));
544 let executor = MapExecutor::new(&scopes, None);
545 let config = StepConfig::default();
546 let ctx = Context::new(String::new(), HashMap::new());
547
548 let result = executor.execute(&step, &config, &ctx).await.unwrap();
549 if let StepOutput::Scope(scope_out) = &result {
550 assert_eq!(scope_out.iterations.len(), 3);
551 } else {
552 panic!("Expected Scope output");
553 }
554 }
555
556 fn map_step_with_config(
557 name: &str,
558 items: &str,
559 scope: &str,
560 config_values: HashMap<String, serde_yaml::Value>,
561 ) -> StepDef {
562 StepDef {
563 name: name.to_string(),
564 step_type: StepType::Map,
565 run: None,
566 prompt: None,
567 condition: None,
568 on_pass: None,
569 on_fail: None,
570 message: None,
571 scope: Some(scope.to_string()),
572 max_iterations: None,
573 initial_value: None,
574 items: Some(items.to_string()),
575 parallel: None,
576 steps: None,
577 config: config_values,
578 outputs: None,
579 output_type: None,
580 async_exec: None,
581 }
582 }
583
584 #[tokio::test]
585 async fn map_collect_text_joins_with_newlines() {
586 let mut scopes = HashMap::new();
587 scopes.insert("echo_scope".to_string(), echo_scope());
588
589 let mut cfg = HashMap::new();
590 cfg.insert(
591 "collect".to_string(),
592 serde_yaml::Value::String("text".to_string()),
593 );
594 let step = map_step_with_config("map_collect_text", "alpha\nbeta\ngamma", "echo_scope", cfg);
595 let executor = MapExecutor::new(&scopes, None);
596
597 let mut config_values = HashMap::new();
599 config_values.insert(
600 "collect".to_string(),
601 serde_json::Value::String("text".to_string()),
602 );
603 let config = crate::config::StepConfig { values: config_values };
604 let ctx = Context::new(String::new(), HashMap::new());
605
606 let result = executor.execute(&step, &config, &ctx).await.unwrap();
607 assert!(matches!(result, StepOutput::Cmd(_)));
609 let text = result.text();
610 assert!(text.contains("alpha"), "Missing alpha in: {}", text);
611 assert!(text.contains("beta"), "Missing beta in: {}", text);
612 assert!(text.contains("gamma"), "Missing gamma in: {}", text);
613 }
614
615 #[tokio::test]
616 async fn map_collect_all_produces_json_array() {
617 let mut scopes = HashMap::new();
618 scopes.insert("echo_scope".to_string(), echo_scope());
619
620 let step = map_step_with_config(
621 "map_collect_all",
622 "x\ny\nz",
623 "echo_scope",
624 HashMap::new(),
625 );
626 let executor = MapExecutor::new(&scopes, None);
627
628 let mut config_values = HashMap::new();
629 config_values.insert(
630 "collect".to_string(),
631 serde_json::Value::String("all".to_string()),
632 );
633 let config = crate::config::StepConfig { values: config_values };
634 let ctx = Context::new(String::new(), HashMap::new());
635
636 let result = executor.execute(&step, &config, &ctx).await.unwrap();
637 assert!(matches!(result, StepOutput::Cmd(_)));
638 let text = result.text();
639 let arr: Vec<serde_json::Value> = serde_json::from_str(text).expect("Expected JSON array");
641 assert_eq!(arr.len(), 3);
642 }
643
644 #[tokio::test]
645 async fn map_no_collect_returns_scope_output() {
646 let mut scopes = HashMap::new();
647 scopes.insert("echo_scope".to_string(), echo_scope());
648
649 let step = map_step("map_no_collect", "a\nb", "echo_scope", None);
650 let executor = MapExecutor::new(&scopes, None);
651 let config = StepConfig::default();
652 let ctx = Context::new(String::new(), HashMap::new());
653
654 let result = executor.execute(&step, &config, &ctx).await.unwrap();
655 assert!(matches!(result, StepOutput::Scope(_)));
657 }
658
659 #[tokio::test]
660 async fn map_reduce_concat_joins_outputs() {
661 let mut scopes = HashMap::new();
662 scopes.insert("echo_scope".to_string(), echo_scope());
663
664 let step = map_step("map_reduce_concat", "hello\nworld", "echo_scope", None);
665 let executor = MapExecutor::new(&scopes, None);
666
667 let mut config_values = HashMap::new();
668 config_values.insert(
669 "reduce".to_string(),
670 serde_json::Value::String("concat".to_string()),
671 );
672 let config = crate::config::StepConfig { values: config_values };
673 let ctx = Context::new(String::new(), HashMap::new());
674
675 let result = executor.execute(&step, &config, &ctx).await.unwrap();
676 assert!(matches!(result, StepOutput::Cmd(_)));
677 let text = result.text();
678 assert!(text.contains("hello"), "Missing hello: {}", text);
679 assert!(text.contains("world"), "Missing world: {}", text);
680 }
681
682 #[tokio::test]
683 async fn map_reduce_sum_adds_numbers() {
684 let mut scopes = HashMap::new();
685 scopes.insert(
687 "echo_scope".to_string(),
688 ScopeDef {
689 steps: vec![cmd_step("echo_val", "echo {{ scope.value }}")],
690 outputs: None,
691 },
692 );
693
694 let step = map_step("map_reduce_sum", "10\n20\n30", "echo_scope", None);
695 let executor = MapExecutor::new(&scopes, None);
696
697 let mut config_values = HashMap::new();
698 config_values.insert(
699 "reduce".to_string(),
700 serde_json::Value::String("sum".to_string()),
701 );
702 let config = crate::config::StepConfig { values: config_values };
703 let ctx = Context::new(String::new(), HashMap::new());
704
705 let result = executor.execute(&step, &config, &ctx).await.unwrap();
706 assert!(matches!(result, StepOutput::Cmd(_)));
707 let text = result.text().trim().to_string();
708 assert_eq!(text, "60", "Expected 60, got: {}", text);
709 }
710
711 #[tokio::test]
712 async fn map_reduce_filter_removes_empty() {
713 let mut scopes = HashMap::new();
714 scopes.insert(
716 "echo_scope".to_string(),
717 ScopeDef {
718 steps: vec![cmd_step("echo_val", "echo {{ scope.value }}")],
719 outputs: None,
720 },
721 );
722
723 let step = map_step("map_reduce_filter", "hello\n\nworld", "echo_scope", None);
724 let executor = MapExecutor::new(&scopes, None);
725
726 let mut config_values = HashMap::new();
727 config_values.insert(
728 "reduce".to_string(),
729 serde_json::Value::String("filter".to_string()),
730 );
731 config_values.insert(
732 "reduce_condition".to_string(),
733 serde_json::Value::String("{{ item.output }}".to_string()),
734 );
735 let config = crate::config::StepConfig { values: config_values };
736 let ctx = Context::new(String::new(), HashMap::new());
737
738 let result = executor.execute(&step, &config, &ctx).await.unwrap();
739 assert!(matches!(result, StepOutput::Cmd(_)));
740 let text = result.text();
741 let lines: Vec<&str> = text.lines().filter(|l| !l.trim().is_empty()).collect();
743 assert!(lines.len() <= 3, "Should have at most 3 lines: {:?}", lines);
744 }
745
746 #[tokio::test]
747 async fn map_order_preserved_parallel() {
748 let mut scopes = HashMap::new();
749 scopes.insert("echo_scope".to_string(), echo_scope());
750
751 let step = map_step("map_order", "first\nsecond\nthird", "echo_scope", Some(3));
752 let executor = MapExecutor::new(&scopes, None);
753 let config = StepConfig::default();
754 let ctx = Context::new(String::new(), HashMap::new());
755
756 let result = executor.execute(&step, &config, &ctx).await.unwrap();
757 if let StepOutput::Scope(scope_out) = &result {
758 assert_eq!(scope_out.iterations[0].index, 0);
759 assert_eq!(scope_out.iterations[1].index, 1);
760 assert_eq!(scope_out.iterations[2].index, 2);
761 assert!(scope_out.iterations[0].output.text().contains("first"));
762 assert!(scope_out.iterations[1].output.text().contains("second"));
763 assert!(scope_out.iterations[2].output.text().contains("third"));
764 } else {
765 panic!("Expected Scope output");
766 }
767 }
768}