Skip to main content

minion_engine/steps/
map.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use tokio::sync::Semaphore;
6use tokio::task::JoinSet;
7
8use crate::config::StepConfig;
9use crate::config::manager::ConfigManager;
10use crate::control_flow::ControlFlow;
11use crate::engine::context::Context;
12use crate::error::StepError;
13use crate::workflow::schema::{ScopeDef, StepDef};
14
15use super::{
16    call::{dispatch_scope_step_sandboxed, resolve_scope_step_config},
17    CmdOutput, IterationOutput, ScopeOutput, SharedSandbox,
18    StepExecutor, StepOutput,
19};
20
21/// Apply a reduce operation to ScopeOutput iterations (Story 7.2)
22fn apply_reduce(
23    scope: &ScopeOutput,
24    reducer: &str,
25    condition_template: Option<&str>,
26) -> Result<StepOutput, crate::error::StepError> {
27    let iterations = &scope.iterations;
28
29    match reducer {
30        "concat" => {
31            let joined = iterations
32                .iter()
33                .map(|it| it.output.text().to_string())
34                .collect::<Vec<_>>()
35                .join("\n");
36            Ok(StepOutput::Cmd(CmdOutput {
37                stdout: joined,
38                stderr: String::new(),
39                exit_code: 0,
40                duration: std::time::Duration::ZERO,
41            }))
42        }
43        "sum" => {
44            let sum: f64 = iterations
45                .iter()
46                .map(|it| it.output.text().trim().parse::<f64>().unwrap_or(0.0))
47                .sum();
48            // Format without trailing .0 if integer
49            let result = if sum.fract() == 0.0 {
50                format!("{}", sum as i64)
51            } else {
52                format!("{}", sum)
53            };
54            Ok(StepOutput::Cmd(CmdOutput {
55                stdout: result,
56                stderr: String::new(),
57                exit_code: 0,
58                duration: std::time::Duration::ZERO,
59            }))
60        }
61        "count" => {
62            Ok(StepOutput::Cmd(CmdOutput {
63                stdout: iterations.len().to_string(),
64                stderr: String::new(),
65                exit_code: 0,
66                duration: std::time::Duration::ZERO,
67            }))
68        }
69        "min" => {
70            let min_val = iterations
71                .iter()
72                .filter_map(|it| it.output.text().trim().parse::<f64>().ok())
73                .fold(f64::INFINITY, f64::min);
74            let result = if min_val.fract() == 0.0 {
75                format!("{}", min_val as i64)
76            } else {
77                format!("{}", min_val)
78            };
79            Ok(StepOutput::Cmd(CmdOutput {
80                stdout: result,
81                stderr: String::new(),
82                exit_code: 0,
83                duration: std::time::Duration::ZERO,
84            }))
85        }
86        "max" => {
87            let max_val = iterations
88                .iter()
89                .filter_map(|it| it.output.text().trim().parse::<f64>().ok())
90                .fold(f64::NEG_INFINITY, f64::max);
91            let result = if max_val.fract() == 0.0 {
92                format!("{}", max_val as i64)
93            } else {
94                format!("{}", max_val)
95            };
96            Ok(StepOutput::Cmd(CmdOutput {
97                stdout: result,
98                stderr: String::new(),
99                exit_code: 0,
100                duration: std::time::Duration::ZERO,
101            }))
102        }
103        "filter" => {
104            let tmpl = condition_template.ok_or_else(|| {
105                crate::error::StepError::Fail(
106                    "reduce: 'filter' requires 'reduce_condition' to be set".to_string(),
107                )
108            })?;
109
110            let mut kept = Vec::new();
111            for it in iterations {
112                // Build a mini-context with item.output accessible
113                let mut vars = std::collections::HashMap::new();
114                vars.insert(
115                    "item_output".to_string(),
116                    serde_json::Value::String(it.output.text().to_string()),
117                );
118                // Render condition; treat "true" / non-empty as pass
119                // Replace {{item.output}} with item_output for simple template resolution
120                let simplified_tmpl = tmpl
121                    .replace("{{item.output}}", "{{ item_output }}")
122                    .replace("{{ item.output }}", "{{ item_output }}");
123                let child_ctx =
124                    crate::engine::context::Context::new(String::new(), vars);
125                let rendered = child_ctx
126                    .render_template(&simplified_tmpl)
127                    .unwrap_or_default();
128                let passes = !rendered.trim().is_empty()
129                    && rendered.trim() != "false"
130                    && rendered.trim() != "0";
131                if passes {
132                    kept.push(it.output.text().to_string());
133                }
134            }
135
136            let joined = kept.join("\n");
137            Ok(StepOutput::Cmd(CmdOutput {
138                stdout: joined,
139                stderr: String::new(),
140                exit_code: 0,
141                duration: std::time::Duration::ZERO,
142            }))
143        }
144        other => Err(crate::error::StepError::Fail(format!(
145            "unknown reduce operation '{}'; expected concat, sum, count, filter, min, max",
146            other
147        ))),
148    }
149}
150
151/// Apply a collect transformation to ScopeOutput (Story 7.1)
152fn apply_collect(scope: ScopeOutput, mode: &str) -> Result<StepOutput, crate::error::StepError> {
153    match mode {
154        "text" => {
155            let joined = scope
156                .iterations
157                .iter()
158                .map(|it| it.output.text().to_string())
159                .collect::<Vec<_>>()
160                .join("\n");
161            Ok(StepOutput::Cmd(CmdOutput {
162                stdout: joined,
163                stderr: String::new(),
164                exit_code: 0,
165                duration: std::time::Duration::ZERO,
166            }))
167        }
168        "all" | "json" => {
169            let arr: Vec<serde_json::Value> = scope
170                .iterations
171                .iter()
172                .map(|it| serde_json::Value::String(it.output.text().to_string()))
173                .collect();
174            let json = serde_json::to_string(&arr)
175                .map_err(|e| crate::error::StepError::Fail(format!("collect serialize error: {e}")))?;
176            Ok(StepOutput::Cmd(CmdOutput {
177                stdout: json,
178                stderr: String::new(),
179                exit_code: 0,
180                duration: std::time::Duration::ZERO,
181            }))
182        }
183        other => Err(crate::error::StepError::Fail(format!(
184            "unknown collect mode '{}'; expected all, text, or json",
185            other
186        ))),
187    }
188}
189
190pub struct MapExecutor {
191    scopes: HashMap<String, ScopeDef>,
192    sandbox: SharedSandbox,
193    config_manager: Option<Arc<ConfigManager>>,
194}
195
196impl MapExecutor {
197    pub fn new(scopes: &HashMap<String, ScopeDef>, sandbox: SharedSandbox) -> Self {
198        Self {
199            scopes: scopes.clone(),
200            sandbox,
201            config_manager: None,
202        }
203    }
204
205    pub fn with_config_manager(mut self, cm: Option<Arc<ConfigManager>>) -> Self {
206        self.config_manager = cm;
207        self
208    }
209}
210
211#[async_trait]
212impl StepExecutor for MapExecutor {
213    async fn execute(
214        &self,
215        step: &StepDef,
216        _config: &StepConfig,
217        ctx: &Context,
218    ) -> Result<StepOutput, StepError> {
219        let items_template = step
220            .items
221            .as_ref()
222            .ok_or_else(|| StepError::Fail("map step missing 'items' field".into()))?;
223
224        let scope_name = step
225            .scope
226            .as_ref()
227            .ok_or_else(|| StepError::Fail("map step missing 'scope' field".into()))?;
228
229        let scope = self
230            .scopes
231            .get(scope_name)
232            .ok_or_else(|| StepError::Fail(format!("scope '{}' not found", scope_name)))?
233            .clone();
234
235        let rendered_items = ctx.render_template(items_template)?;
236
237        // Parse items: try JSON array first, then split by lines
238        let items: Vec<String> = if rendered_items.trim().starts_with('[') {
239            serde_json::from_str::<Vec<serde_json::Value>>(&rendered_items)
240                .map(|arr| {
241                    arr.into_iter()
242                        .map(|v| match v {
243                            serde_json::Value::String(s) => s,
244                            other => other.to_string(),
245                        })
246                        .collect()
247                })
248                .unwrap_or_else(|_| {
249                    rendered_items
250                        .lines()
251                        .filter(|l| !l.trim().is_empty())
252                        .map(|l| l.to_string())
253                        .collect()
254                })
255        } else {
256            rendered_items
257                .lines()
258                .filter(|l| !l.trim().is_empty())
259                .map(|l| l.to_string())
260                .collect()
261        };
262
263        let parallel_count = step.parallel.unwrap_or(0);
264
265        let scope_output = if parallel_count == 0 {
266            // Serial execution
267            serial_execute(items, &scope, ctx, &self.scopes, &self.sandbox, &self.config_manager).await?
268        } else {
269            // Parallel execution with semaphore
270            parallel_execute(items, &scope, ctx, &self.scopes, parallel_count, &self.sandbox, &self.config_manager).await?
271        };
272
273        // Story 7.2: Apply reduce if configured (takes precedence over collect)
274        let reduce_mode = _config.get_str("reduce").map(|s| s.to_string());
275        if let Some(ref reducer) = reduce_mode {
276            if let StepOutput::Scope(ref s) = scope_output {
277                let condition = _config.get_str("reduce_condition");
278                return apply_reduce(s, reducer, condition);
279            }
280        }
281
282        // Story 7.1: Apply collect transformation if configured
283        let collect_mode = _config.get_str("collect").map(|s| s.to_string());
284        match (scope_output, collect_mode) {
285            (StepOutput::Scope(s), Some(mode)) => apply_collect(s, &mode),
286            (output, _) => Ok(output),
287        }
288    }
289}
290
291async fn serial_execute(
292    items: Vec<String>,
293    scope: &ScopeDef,
294    ctx: &Context,
295    scopes: &HashMap<String, ScopeDef>,
296    sandbox: &SharedSandbox,
297    config_manager: &Option<Arc<ConfigManager>>,
298) -> Result<StepOutput, StepError> {
299    let mut iterations = Vec::new();
300
301    for (i, item) in items.iter().enumerate() {
302        let mut child_ctx = make_child_ctx(ctx, Some(serde_json::Value::String(item.clone())), i);
303
304        let iter_output = execute_scope_steps(scope, &mut child_ctx, scopes, sandbox, config_manager).await?;
305
306        iterations.push(IterationOutput {
307            index: i,
308            output: iter_output,
309        });
310    }
311
312    let final_value = iterations.last().map(|i| Box::new(i.output.clone()));
313    Ok(StepOutput::Scope(ScopeOutput {
314        iterations,
315        final_value,
316    }))
317}
318
319async fn parallel_execute(
320    items: Vec<String>,
321    scope: &ScopeDef,
322    ctx: &Context,
323    scopes: &HashMap<String, ScopeDef>,
324    parallel_count: usize,
325    sandbox: &SharedSandbox,
326    config_manager: &Option<Arc<ConfigManager>>,
327) -> Result<StepOutput, StepError> {
328    let sem = Arc::new(Semaphore::new(parallel_count));
329    let mut set: JoinSet<(usize, Result<StepOutput, StepError>)> = JoinSet::new();
330
331    for (i, item) in items.iter().enumerate() {
332        let sem = Arc::clone(&sem);
333        let item_val = serde_json::Value::String(item.clone());
334        let child_ctx = make_child_ctx(ctx, Some(item_val), i);
335        let scope_clone = scope.clone();
336        let scopes_clone = scopes.clone();
337        let sandbox_clone = sandbox.clone();
338        let cm_clone = config_manager.clone();
339
340        set.spawn(async move {
341            let _permit = sem.acquire().await.expect("semaphore closed");
342            let result = execute_scope_steps_owned(scope_clone, child_ctx, scopes_clone, sandbox_clone, cm_clone).await;
343            (i, result)
344        });
345    }
346
347    let mut results: Vec<Option<StepOutput>> = vec![None; items.len()];
348
349    while let Some(res) = set.join_next().await {
350        match res {
351            Ok((i, Ok(output))) => {
352                results[i] = Some(output);
353            }
354            Ok((_, Err(e))) => {
355                set.abort_all();
356                return Err(e);
357            }
358            Err(e) => {
359                set.abort_all();
360                return Err(StepError::Fail(format!("Task panicked: {e}")));
361            }
362        }
363    }
364
365    let iterations: Vec<IterationOutput> = results
366        .into_iter()
367        .enumerate()
368        .map(|(i, opt)| IterationOutput {
369            index: i,
370            output: opt.unwrap_or(StepOutput::Empty),
371        })
372        .collect();
373
374    let final_value = iterations.last().map(|i| Box::new(i.output.clone()));
375    Ok(StepOutput::Scope(ScopeOutput {
376        iterations,
377        final_value,
378    }))
379}
380
381fn make_child_ctx(
382    parent: &Context,
383    scope_value: Option<serde_json::Value>,
384    index: usize,
385) -> Context {
386    let target = parent
387        .get_var("target")
388        .and_then(|v| v.as_str())
389        .unwrap_or("")
390        .to_string();
391    let mut ctx = Context::new(target, parent.all_variables());
392    ctx.scope_value = scope_value;
393    ctx.scope_index = index;
394    ctx.stack_info = parent.get_stack_info().cloned();
395    ctx.prompts_dir = parent.prompts_dir.clone();
396    ctx
397}
398
399async fn execute_scope_steps(
400    scope: &ScopeDef,
401    child_ctx: &mut Context,
402    scopes: &HashMap<String, ScopeDef>,
403    sandbox: &SharedSandbox,
404    config_manager: &Option<Arc<ConfigManager>>,
405) -> Result<StepOutput, StepError> {
406    let mut last_output = StepOutput::Empty;
407
408    for scope_step in &scope.steps {
409        let config = resolve_scope_step_config(config_manager, scope_step);
410        let result = dispatch_scope_step_sandboxed(scope_step, &config, child_ctx, scopes, sandbox, config_manager).await;
411
412        match result {
413            Ok(output) => {
414                child_ctx.store(&scope_step.name, output.clone());
415                last_output = output;
416            }
417            Err(StepError::ControlFlow(ControlFlow::Break { value, .. })) => {
418                if let Some(v) = value {
419                    last_output = v;
420                }
421                break;
422            }
423            Err(StepError::ControlFlow(ControlFlow::Skip { .. })) => {
424                child_ctx.store(&scope_step.name, StepOutput::Empty);
425            }
426            Err(StepError::ControlFlow(ControlFlow::Next { .. })) => {
427                break;
428            }
429            Err(e) => return Err(e),
430        }
431    }
432
433    // Apply scope outputs if defined
434    if let Some(outputs_template) = &scope.outputs {
435        if let Ok(rendered) = child_ctx.render_template(outputs_template) {
436            return Ok(StepOutput::Cmd(CmdOutput {
437                stdout: rendered,
438                stderr: String::new(),
439                exit_code: 0,
440                duration: std::time::Duration::ZERO,
441            }));
442        }
443    }
444
445    Ok(last_output)
446}
447
448async fn execute_scope_steps_owned(
449    scope: ScopeDef,
450    mut child_ctx: Context,
451    scopes: HashMap<String, ScopeDef>,
452    sandbox: SharedSandbox,
453    config_manager: Option<Arc<ConfigManager>>,
454) -> Result<StepOutput, StepError> {
455    execute_scope_steps(&scope, &mut child_ctx, &scopes, &sandbox, &config_manager).await
456}
457
458#[cfg(test)]
459mod tests {
460    use super::*;
461    use std::collections::HashMap;
462    use crate::workflow::schema::{ScopeDef, StepType};
463
464    fn cmd_step(name: &str, run: &str) -> StepDef {
465        StepDef {
466            name: name.to_string(),
467            step_type: StepType::Cmd,
468            run: Some(run.to_string()),
469            prompt: None,
470            condition: None,
471            on_pass: None,
472            on_fail: None,
473            message: None,
474            scope: None,
475            max_iterations: None,
476            initial_value: None,
477            items: None,
478            parallel: None,
479            steps: None,
480            config: HashMap::new(),
481            outputs: None,
482            output_type: None,
483            async_exec: None,
484        }
485    }
486
487    fn map_step(name: &str, items: &str, scope: &str, parallel: Option<usize>) -> StepDef {
488        StepDef {
489            name: name.to_string(),
490            step_type: StepType::Map,
491            run: None,
492            prompt: None,
493            condition: None,
494            on_pass: None,
495            on_fail: None,
496            message: None,
497            scope: Some(scope.to_string()),
498            max_iterations: None,
499            initial_value: None,
500            items: Some(items.to_string()),
501            parallel,
502            steps: None,
503            config: HashMap::new(),
504            outputs: None,
505            output_type: None,
506            async_exec: None,
507        }
508    }
509
510    fn echo_scope() -> ScopeDef {
511        ScopeDef {
512            steps: vec![cmd_step("echo", "echo {{ scope.value }}")],
513            outputs: None,
514        }
515    }
516
517    #[tokio::test]
518    async fn map_three_items_serial() {
519        let mut scopes = HashMap::new();
520        scopes.insert("echo_scope".to_string(), echo_scope());
521
522        let step = map_step("map_test", "alpha\nbeta\ngamma", "echo_scope", None);
523        let executor = MapExecutor::new(&scopes, None);
524        let config = StepConfig::default();
525        let ctx = Context::new(String::new(), HashMap::new());
526
527        let result = executor.execute(&step, &config, &ctx).await.unwrap();
528        if let StepOutput::Scope(scope_out) = &result {
529            assert_eq!(scope_out.iterations.len(), 3);
530            assert!(scope_out.iterations[0].output.text().contains("alpha"));
531            assert!(scope_out.iterations[1].output.text().contains("beta"));
532            assert!(scope_out.iterations[2].output.text().contains("gamma"));
533        } else {
534            panic!("Expected Scope output");
535        }
536    }
537
538    #[tokio::test]
539    async fn map_three_items_parallel() {
540        let mut scopes = HashMap::new();
541        scopes.insert("echo_scope".to_string(), echo_scope());
542
543        let step = map_step("map_parallel", "a\nb\nc", "echo_scope", Some(3));
544        let executor = MapExecutor::new(&scopes, None);
545        let config = StepConfig::default();
546        let ctx = Context::new(String::new(), HashMap::new());
547
548        let result = executor.execute(&step, &config, &ctx).await.unwrap();
549        if let StepOutput::Scope(scope_out) = &result {
550            assert_eq!(scope_out.iterations.len(), 3);
551        } else {
552            panic!("Expected Scope output");
553        }
554    }
555
556    fn map_step_with_config(
557        name: &str,
558        items: &str,
559        scope: &str,
560        config_values: HashMap<String, serde_yaml::Value>,
561    ) -> StepDef {
562        StepDef {
563            name: name.to_string(),
564            step_type: StepType::Map,
565            run: None,
566            prompt: None,
567            condition: None,
568            on_pass: None,
569            on_fail: None,
570            message: None,
571            scope: Some(scope.to_string()),
572            max_iterations: None,
573            initial_value: None,
574            items: Some(items.to_string()),
575            parallel: None,
576            steps: None,
577            config: config_values,
578            outputs: None,
579            output_type: None,
580            async_exec: None,
581        }
582    }
583
584    #[tokio::test]
585    async fn map_collect_text_joins_with_newlines() {
586        let mut scopes = HashMap::new();
587        scopes.insert("echo_scope".to_string(), echo_scope());
588
589        let mut cfg = HashMap::new();
590        cfg.insert(
591            "collect".to_string(),
592            serde_yaml::Value::String("text".to_string()),
593        );
594        let step = map_step_with_config("map_collect_text", "alpha\nbeta\ngamma", "echo_scope", cfg);
595        let executor = MapExecutor::new(&scopes, None);
596
597        // Build StepConfig with collect=text
598        let mut config_values = HashMap::new();
599        config_values.insert(
600            "collect".to_string(),
601            serde_json::Value::String("text".to_string()),
602        );
603        let config = crate::config::StepConfig { values: config_values };
604        let ctx = Context::new(String::new(), HashMap::new());
605
606        let result = executor.execute(&step, &config, &ctx).await.unwrap();
607        // Should be a Cmd output with newline-joined text
608        assert!(matches!(result, StepOutput::Cmd(_)));
609        let text = result.text();
610        assert!(text.contains("alpha"), "Missing alpha in: {}", text);
611        assert!(text.contains("beta"), "Missing beta in: {}", text);
612        assert!(text.contains("gamma"), "Missing gamma in: {}", text);
613    }
614
615    #[tokio::test]
616    async fn map_collect_all_produces_json_array() {
617        let mut scopes = HashMap::new();
618        scopes.insert("echo_scope".to_string(), echo_scope());
619
620        let step = map_step_with_config(
621            "map_collect_all",
622            "x\ny\nz",
623            "echo_scope",
624            HashMap::new(),
625        );
626        let executor = MapExecutor::new(&scopes, None);
627
628        let mut config_values = HashMap::new();
629        config_values.insert(
630            "collect".to_string(),
631            serde_json::Value::String("all".to_string()),
632        );
633        let config = crate::config::StepConfig { values: config_values };
634        let ctx = Context::new(String::new(), HashMap::new());
635
636        let result = executor.execute(&step, &config, &ctx).await.unwrap();
637        assert!(matches!(result, StepOutput::Cmd(_)));
638        let text = result.text();
639        // Should be a valid JSON array
640        let arr: Vec<serde_json::Value> = serde_json::from_str(text).expect("Expected JSON array");
641        assert_eq!(arr.len(), 3);
642    }
643
644    #[tokio::test]
645    async fn map_no_collect_returns_scope_output() {
646        let mut scopes = HashMap::new();
647        scopes.insert("echo_scope".to_string(), echo_scope());
648
649        let step = map_step("map_no_collect", "a\nb", "echo_scope", None);
650        let executor = MapExecutor::new(&scopes, None);
651        let config = StepConfig::default();
652        let ctx = Context::new(String::new(), HashMap::new());
653
654        let result = executor.execute(&step, &config, &ctx).await.unwrap();
655        // Without collect, should still be a Scope output
656        assert!(matches!(result, StepOutput::Scope(_)));
657    }
658
659    #[tokio::test]
660    async fn map_reduce_concat_joins_outputs() {
661        let mut scopes = HashMap::new();
662        scopes.insert("echo_scope".to_string(), echo_scope());
663
664        let step = map_step("map_reduce_concat", "hello\nworld", "echo_scope", None);
665        let executor = MapExecutor::new(&scopes, None);
666
667        let mut config_values = HashMap::new();
668        config_values.insert(
669            "reduce".to_string(),
670            serde_json::Value::String("concat".to_string()),
671        );
672        let config = crate::config::StepConfig { values: config_values };
673        let ctx = Context::new(String::new(), HashMap::new());
674
675        let result = executor.execute(&step, &config, &ctx).await.unwrap();
676        assert!(matches!(result, StepOutput::Cmd(_)));
677        let text = result.text();
678        assert!(text.contains("hello"), "Missing hello: {}", text);
679        assert!(text.contains("world"), "Missing world: {}", text);
680    }
681
682    #[tokio::test]
683    async fn map_reduce_sum_adds_numbers() {
684        let mut scopes = HashMap::new();
685        // Each scope step echos the item value (which will be a number string)
686        scopes.insert(
687            "echo_scope".to_string(),
688            ScopeDef {
689                steps: vec![cmd_step("echo_val", "echo {{ scope.value }}")],
690                outputs: None,
691            },
692        );
693
694        let step = map_step("map_reduce_sum", "10\n20\n30", "echo_scope", None);
695        let executor = MapExecutor::new(&scopes, None);
696
697        let mut config_values = HashMap::new();
698        config_values.insert(
699            "reduce".to_string(),
700            serde_json::Value::String("sum".to_string()),
701        );
702        let config = crate::config::StepConfig { values: config_values };
703        let ctx = Context::new(String::new(), HashMap::new());
704
705        let result = executor.execute(&step, &config, &ctx).await.unwrap();
706        assert!(matches!(result, StepOutput::Cmd(_)));
707        let text = result.text().trim().to_string();
708        assert_eq!(text, "60", "Expected 60, got: {}", text);
709    }
710
711    #[tokio::test]
712    async fn map_reduce_filter_removes_empty() {
713        let mut scopes = HashMap::new();
714        // Scope that outputs the item value (some will be empty, some not)
715        scopes.insert(
716            "echo_scope".to_string(),
717            ScopeDef {
718                steps: vec![cmd_step("echo_val", "echo {{ scope.value }}")],
719                outputs: None,
720            },
721        );
722
723        let step = map_step("map_reduce_filter", "hello\n\nworld", "echo_scope", None);
724        let executor = MapExecutor::new(&scopes, None);
725
726        let mut config_values = HashMap::new();
727        config_values.insert(
728            "reduce".to_string(),
729            serde_json::Value::String("filter".to_string()),
730        );
731        config_values.insert(
732            "reduce_condition".to_string(),
733            serde_json::Value::String("{{ item.output }}".to_string()),
734        );
735        let config = crate::config::StepConfig { values: config_values };
736        let ctx = Context::new(String::new(), HashMap::new());
737
738        let result = executor.execute(&step, &config, &ctx).await.unwrap();
739        assert!(matches!(result, StepOutput::Cmd(_)));
740        let text = result.text();
741        // Empty lines should be filtered out
742        let lines: Vec<&str> = text.lines().filter(|l| !l.trim().is_empty()).collect();
743        assert!(lines.len() <= 3, "Should have at most 3 lines: {:?}", lines);
744    }
745
746    #[tokio::test]
747    async fn map_order_preserved_parallel() {
748        let mut scopes = HashMap::new();
749        scopes.insert("echo_scope".to_string(), echo_scope());
750
751        let step = map_step("map_order", "first\nsecond\nthird", "echo_scope", Some(3));
752        let executor = MapExecutor::new(&scopes, None);
753        let config = StepConfig::default();
754        let ctx = Context::new(String::new(), HashMap::new());
755
756        let result = executor.execute(&step, &config, &ctx).await.unwrap();
757        if let StepOutput::Scope(scope_out) = &result {
758            assert_eq!(scope_out.iterations[0].index, 0);
759            assert_eq!(scope_out.iterations[1].index, 1);
760            assert_eq!(scope_out.iterations[2].index, 2);
761            assert!(scope_out.iterations[0].output.text().contains("first"));
762            assert!(scope_out.iterations[1].output.text().contains("second"));
763            assert!(scope_out.iterations[2].output.text().contains("third"));
764        } else {
765            panic!("Expected Scope output");
766        }
767    }
768}