Skip to main content

kaish_kernel/scheduler/
pipeline.rs

1//! Pipeline execution for kaish.
2//!
3//! Executes a sequence of commands connected by pipes, where the stdout
4//! of each command becomes the stdin of the next.
5//!
6//! Also handles scatter/gather pipelines for parallel execution.
7
8use std::sync::Arc;
9
10use std::collections::HashMap;
11
12use crate::arithmetic;
13use crate::ast::{Arg, Command, Expr, Redirect, RedirectKind, Value};
14use crate::interpreter::{apply_output_format, ExecResult};
15use crate::tools::{extract_output_format, ExecContext, ToolArgs, ToolRegistry, ToolSchema};
16use tokio::io::AsyncWriteExt;
17
18use super::scatter::{
19    parse_gather_options, parse_scatter_options, ScatterGatherRunner,
20};
21
22/// Apply redirects to an execution result.
23///
24/// Pre-execution redirects (Stdin, HereDoc) should be handled before calling.
25/// Post-execution redirects (stdout/stderr to file, merge) applied here.
26/// Redirects are processed left-to-right per POSIX.
27async fn apply_redirects(
28    mut result: ExecResult,
29    redirects: &[Redirect],
30    ctx: &ExecContext,
31) -> ExecResult {
32    for redir in redirects {
33        match redir.kind {
34            RedirectKind::MergeStderr => {
35                // 2>&1 - append stderr to stdout
36                if !result.err.is_empty() {
37                    result.out.push_str(&result.err);
38                    result.err.clear();
39                }
40            }
41            RedirectKind::MergeStdout => {
42                // 1>&2 or >&2 - append stdout to stderr
43                if !result.out.is_empty() {
44                    result.err.push_str(&result.out);
45                    result.out.clear();
46                }
47            }
48            RedirectKind::StdoutOverwrite => {
49                if let Some(path) = eval_redirect_target(&redir.target, ctx) {
50                    if let Err(e) = tokio::fs::write(&path, &result.out).await {
51                        return ExecResult::failure(1, format!("redirect: {e}"));
52                    }
53                    result.out.clear();
54                }
55            }
56            RedirectKind::StdoutAppend => {
57                if let Some(path) = eval_redirect_target(&redir.target, ctx) {
58                    let file = tokio::fs::OpenOptions::new()
59                        .append(true)
60                        .create(true)
61                        .open(&path)
62                        .await;
63                    match file {
64                        Ok(mut f) => {
65                            if let Err(e) = f.write_all(result.out.as_bytes()).await {
66                                return ExecResult::failure(1, format!("redirect: {e}"));
67                            }
68                        }
69                        Err(e) => return ExecResult::failure(1, format!("redirect: {e}")),
70                    }
71                    result.out.clear();
72                }
73            }
74            RedirectKind::Stderr => {
75                if let Some(path) = eval_redirect_target(&redir.target, ctx) {
76                    if let Err(e) = tokio::fs::write(&path, &result.err).await {
77                        return ExecResult::failure(1, format!("redirect: {e}"));
78                    }
79                    result.err.clear();
80                }
81            }
82            RedirectKind::Both => {
83                if let Some(path) = eval_redirect_target(&redir.target, ctx) {
84                    let combined = format!("{}{}", result.out, result.err);
85                    if let Err(e) = tokio::fs::write(&path, combined).await {
86                        return ExecResult::failure(1, format!("redirect: {e}"));
87                    }
88                    result.out.clear();
89                    result.err.clear();
90                }
91            }
92            // Pre-execution redirects - already handled before command execution
93            RedirectKind::Stdin | RedirectKind::HereDoc => {}
94        }
95    }
96    result
97}
98
99/// Evaluate a redirect target expression to get the file path.
100fn eval_redirect_target(expr: &Expr, ctx: &ExecContext) -> Option<String> {
101    eval_simple_expr(expr, ctx).map(|v| value_to_string(&v))
102}
103
104/// Set up stdin from redirects (< file, <<heredoc).
105/// Called before command execution.
106fn setup_stdin_redirects(cmd: &Command, ctx: &mut ExecContext) {
107    for redir in &cmd.redirects {
108        match &redir.kind {
109            RedirectKind::Stdin => {
110                if let Some(path) = eval_redirect_target(&redir.target, ctx)
111                    && let Ok(content) = std::fs::read_to_string(&path) {
112                        ctx.set_stdin(content);
113                    }
114            }
115            RedirectKind::HereDoc => {
116                if let Expr::Literal(Value::String(content)) = &redir.target {
117                    ctx.set_stdin(content.clone());
118                }
119            }
120            _ => {}
121        }
122    }
123}
124
125/// Runs pipelines by spawning tasks and connecting them via channels.
126#[derive(Clone)]
127pub struct PipelineRunner {
128    tools: Arc<ToolRegistry>,
129}
130
131impl PipelineRunner {
132    /// Create a new pipeline runner with the given tool registry.
133    pub fn new(tools: Arc<ToolRegistry>) -> Self {
134        Self { tools }
135    }
136
137    /// Execute a pipeline of commands.
138    ///
139    /// Each command's stdout becomes the next command's stdin.
140    /// If the pipeline contains scatter/gather, delegates to ScatterGatherRunner.
141    /// Returns the result of the last command in the pipeline.
142    pub async fn run(
143        &self,
144        commands: &[Command],
145        ctx: &mut ExecContext,
146    ) -> ExecResult {
147        if commands.is_empty() {
148            return ExecResult::success("");
149        }
150
151        // Check for scatter/gather pipeline
152        if let Some((scatter_idx, gather_idx)) = find_scatter_gather(commands) {
153            return self.run_scatter_gather(commands, scatter_idx, gather_idx, ctx).await;
154        }
155
156        if commands.len() == 1 {
157            // Single command, no piping needed
158            return self.run_single(&commands[0], ctx, None).await;
159        }
160
161        // Multi-command pipeline
162        self.run_pipeline(commands, ctx).await
163    }
164
165    /// Run a scatter/gather pipeline.
166    async fn run_scatter_gather(
167        &self,
168        commands: &[Command],
169        scatter_idx: usize,
170        gather_idx: usize,
171        ctx: &mut ExecContext,
172    ) -> ExecResult {
173        // Split pipeline into parts
174        let pre_scatter = &commands[..scatter_idx];
175        let scatter_cmd = &commands[scatter_idx];
176        let parallel = &commands[scatter_idx + 1..gather_idx];
177        let gather_cmd = &commands[gather_idx];
178        let post_gather = &commands[gather_idx + 1..];
179
180        // Parse options from scatter and gather commands
181        // These are builtins with simple key=value syntax, no schema-driven parsing needed
182        let scatter_schema = self.tools.get("scatter").map(|t| t.schema());
183        let gather_schema = self.tools.get("gather").map(|t| t.schema());
184        let scatter_opts = parse_scatter_options(&build_tool_args(&scatter_cmd.args, ctx, scatter_schema.as_ref()));
185        let gather_opts = parse_gather_options(&build_tool_args(&gather_cmd.args, ctx, gather_schema.as_ref()));
186
187        // Run with ScatterGatherRunner
188        let runner = ScatterGatherRunner::new(self.tools.clone());
189        runner
190            .run(
191                pre_scatter,
192                scatter_opts,
193                parallel,
194                gather_opts,
195                post_gather,
196                ctx,
197            )
198            .await
199    }
200
201    /// Run a single command with optional stdin.
202    async fn run_single(
203        &self,
204        cmd: &Command,
205        ctx: &mut ExecContext,
206        stdin: Option<String>,
207    ) -> ExecResult {
208        // Handle built-in true/false
209        match cmd.name.as_str() {
210            "true" => return ExecResult::success(""),
211            "false" => return ExecResult::failure(1, ""),
212            _ => {}
213        }
214
215        // Build tool args with schema-aware parsing
216        let schema = self.tools.get(&cmd.name).map(|t| t.schema());
217        let mut tool_args = build_tool_args(&cmd.args, ctx, schema.as_ref());
218        let output_format = extract_output_format(&mut tool_args, schema.as_ref());
219
220        // Set up stdin from redirects (< file, <<heredoc)
221        setup_stdin_redirects(cmd, ctx);
222
223        // Set stdin from pipeline (overrides redirect stdin)
224        if let Some(input) = stdin {
225            ctx.set_stdin(input);
226        }
227
228        // Execute via backend (clone Arc to avoid borrow conflict)
229        let backend = ctx.backend.clone();
230        let result = match backend.call_tool(&cmd.name, tool_args, ctx).await {
231            Ok(tool_result) => {
232                let mut exec = ExecResult::from_output(
233                    tool_result.code as i64, tool_result.stdout, tool_result.stderr,
234                );
235                exec.output = tool_result.output;
236                exec
237            }
238            Err(e) => ExecResult::failure(127, e.to_string()),
239        };
240
241        // Apply output format transform
242        let result = match output_format {
243            Some(format) => apply_output_format(result, format),
244            None => result,
245        };
246
247        // Apply post-execution redirects
248        apply_redirects(result, &cmd.redirects, ctx).await
249    }
250
251    /// Run a multi-command pipeline.
252    async fn run_pipeline(
253        &self,
254        commands: &[Command],
255        ctx: &mut ExecContext,
256    ) -> ExecResult {
257        // We'll run commands sequentially for simplicity, passing stdout as stdin
258        // A more sophisticated implementation would use tokio::spawn for true parallelism
259
260        let mut current_stdin: Option<String> = None;
261        let mut current_data: Option<Value> = None;
262        let mut last_result = ExecResult::success("");
263
264        for (i, cmd) in commands.iter().enumerate() {
265            // Build tool args with schema-aware parsing
266            let schema = self.tools.get(&cmd.name).map(|t| t.schema());
267            let mut tool_args = build_tool_args(&cmd.args, ctx, schema.as_ref());
268            let output_format = extract_output_format(&mut tool_args, schema.as_ref());
269
270            // Set up stdin from redirects (< file, <<heredoc)
271            setup_stdin_redirects(cmd, ctx);
272
273            // Set stdin from previous command's stdout (overrides redirect stdin)
274            // Also pass structured data if available from previous command
275            if let Some(input) = current_stdin.take() {
276                ctx.set_stdin_with_data(input, current_data.take());
277            }
278
279            // Execute via backend (clone Arc to avoid borrow conflict)
280            let backend = ctx.backend.clone();
281            last_result = match backend.call_tool(&cmd.name, tool_args, ctx).await {
282                Ok(tool_result) => {
283                    let mut exec = ExecResult::from_output(
284                        tool_result.code as i64, tool_result.stdout, tool_result.stderr,
285                    );
286                    exec.output = tool_result.output;
287                    exec
288                }
289                Err(e) => ExecResult::failure(127, e.to_string()),
290            };
291
292            // Apply output format transform
293            last_result = match output_format {
294                Some(format) => apply_output_format(last_result, format),
295                None => last_result,
296            };
297
298            // Apply post-execution redirects
299            last_result = apply_redirects(last_result, &cmd.redirects, ctx).await;
300
301            // If command failed, stop the pipeline
302            if !last_result.ok() {
303                return last_result;
304            }
305
306            // Pass stdout and structured data to next command's stdin (unless last command)
307            if i < commands.len() - 1 {
308                current_stdin = Some(last_result.out.clone());
309                current_data = last_result.data.clone();
310            }
311        }
312
313        last_result
314    }
315}
316
317/// Extract parameter types from a tool schema.
318///
319/// Returns a map from param name → param type (e.g., "verbose" → "bool", "output" → "string").
320fn schema_param_types(schema: &ToolSchema) -> HashMap<&str, &str> {
321    schema
322        .params
323        .iter()
324        .map(|p| (p.name.as_str(), p.param_type.as_str()))
325        .collect()
326}
327
328/// Check if a type is considered boolean.
329fn is_bool_type(param_type: &str) -> bool {
330    matches!(param_type.to_lowercase().as_str(), "bool" | "boolean")
331}
332
333/// Build ToolArgs from AST Args, evaluating expressions.
334///
335/// If a schema is provided, uses it to determine argument types:
336/// - For `--flag` where schema says type is non-bool: consume next positional as value
337/// - For `--flag` where schema says type is bool (or unknown): treat as boolean flag
338///
339/// This enables natural shell syntax like `mcp_tool --query "test" --limit 10`.
340pub fn build_tool_args(args: &[Arg], ctx: &ExecContext, schema: Option<&ToolSchema>) -> ToolArgs {
341    let mut tool_args = ToolArgs::new();
342    let param_types = schema.map(schema_param_types).unwrap_or_default();
343
344    // Track which positional indices have been consumed as flag values
345    let mut consumed_positionals: std::collections::HashSet<usize> = std::collections::HashSet::new();
346    let mut past_double_dash = false;
347
348    // First pass: find positional args and their indices
349    let mut positional_indices: Vec<(usize, &Expr)> = Vec::new();
350    for (i, arg) in args.iter().enumerate() {
351        if let Arg::Positional(expr) = arg {
352            positional_indices.push((i, expr));
353        }
354    }
355
356    // Second pass: process all args
357    let mut i = 0;
358    while i < args.len() {
359        let arg = &args[i];
360
361        match arg {
362            Arg::DoubleDash => {
363                past_double_dash = true;
364            }
365            Arg::Positional(expr) => {
366                // Check if this positional was consumed by a preceding flag
367                if !consumed_positionals.contains(&i)
368                    && let Some(value) = eval_simple_expr(expr, ctx)
369                {
370                    tool_args.positional.push(value);
371                }
372            }
373            Arg::Named { key, value } => {
374                if let Some(val) = eval_simple_expr(value, ctx) {
375                    tool_args.named.insert(key.clone(), val);
376                }
377            }
378            Arg::ShortFlag(name) => {
379                if past_double_dash {
380                    // After --, treat as positional (though parser wouldn't emit this)
381                    tool_args.positional.push(Value::String(format!("-{name}")));
382                } else {
383                    // Expand combined flags like -la
384                    for c in name.chars() {
385                        tool_args.flags.insert(c.to_string());
386                    }
387                }
388            }
389            Arg::LongFlag(name) => {
390                if past_double_dash {
391                    // After --, treat as positional (though parser wouldn't emit this)
392                    tool_args.positional.push(Value::String(format!("--{name}")));
393                } else {
394                    // Look up type in schema
395                    let is_bool = param_types
396                        .get(name.as_str())
397                        .map(|t| is_bool_type(t))
398                        .unwrap_or(true); // Unknown params default to bool (backward compat)
399
400                    if is_bool {
401                        // Boolean flag - just add to flags set
402                        tool_args.flags.insert(name.clone());
403                    } else {
404                        // Non-bool flag - try to consume next positional as value
405                        // Look for the next positional arg after this flag
406                        let next_positional = positional_indices
407                            .iter()
408                            .find(|(idx, _)| *idx > i && !consumed_positionals.contains(idx));
409
410                        if let Some((pos_idx, expr)) = next_positional {
411                            if let Some(value) = eval_simple_expr(expr, ctx) {
412                                tool_args.named.insert(name.clone(), value);
413                                consumed_positionals.insert(*pos_idx);
414                            } else {
415                                // Expression didn't evaluate - treat as bool flag
416                                tool_args.flags.insert(name.clone());
417                            }
418                        } else {
419                            // No positional available - treat as bool flag
420                            // (Could be an error, but we're lenient for compatibility)
421                            tool_args.flags.insert(name.clone());
422                        }
423                    }
424                }
425            }
426        }
427        i += 1;
428    }
429
430    tool_args
431}
432
433/// Simple expression evaluation for args (without full scope access).
434fn eval_simple_expr(expr: &Expr, ctx: &ExecContext) -> Option<Value> {
435    match expr {
436        Expr::Literal(value) => Some(eval_literal(value, ctx)),
437        Expr::VarRef(path) => ctx.scope.resolve_path(path),
438        Expr::Interpolated(parts) => {
439            let mut result = String::new();
440            for part in parts {
441                match part {
442                    crate::ast::StringPart::Literal(s) => result.push_str(s),
443                    crate::ast::StringPart::Var(path) => {
444                        if let Some(value) = ctx.scope.resolve_path(path) {
445                            result.push_str(&value_to_string(&value));
446                        }
447                    }
448                    crate::ast::StringPart::VarWithDefault { name, default } => {
449                        match ctx.scope.get(name) {
450                            Some(value) => {
451                                let s = value_to_string(value);
452                                if s.is_empty() {
453                                    result.push_str(&eval_string_parts_sync(default, ctx));
454                                } else {
455                                    result.push_str(&s);
456                                }
457                            }
458                            None => result.push_str(&eval_string_parts_sync(default, ctx)),
459                        }
460                    }
461                    crate::ast::StringPart::VarLength(name) => {
462                        let len = match ctx.scope.get(name) {
463                            Some(value) => value_to_string(value).len(),
464                            None => 0,
465                        };
466                        result.push_str(&len.to_string());
467                    }
468                    crate::ast::StringPart::Positional(n) => {
469                        if let Some(s) = ctx.scope.get_positional(*n) {
470                            result.push_str(s);
471                        }
472                    }
473                    crate::ast::StringPart::AllArgs => {
474                        result.push_str(&ctx.scope.all_args().join(" "));
475                    }
476                    crate::ast::StringPart::ArgCount => {
477                        result.push_str(&ctx.scope.arg_count().to_string());
478                    }
479                    crate::ast::StringPart::Arithmetic(expr) => {
480                        // Evaluate arithmetic in pipeline context
481                        if let Ok(value) = arithmetic::eval_arithmetic(expr, &ctx.scope) {
482                            result.push_str(&value.to_string());
483                        }
484                    }
485                    crate::ast::StringPart::CommandSubst(_) => {
486                        // Command substitution requires async - skip in sync context
487                    }
488                    crate::ast::StringPart::LastExitCode => {
489                        result.push_str(&ctx.scope.last_result().code.to_string());
490                    }
491                    crate::ast::StringPart::CurrentPid => {
492                        result.push_str(&ctx.scope.pid().to_string());
493                    }
494                }
495            }
496            Some(Value::String(result))
497        }
498        _ => None, // Binary ops and command subst need more context
499    }
500}
501
502/// Evaluate a literal value.
503fn eval_literal(value: &Value, _ctx: &ExecContext) -> Value {
504    value.clone()
505}
506
507/// Convert a value to a string for interpolation.
508fn value_to_string(value: &Value) -> String {
509    match value {
510        Value::Null => "".to_string(),
511        Value::Bool(b) => b.to_string(),
512        Value::Int(i) => i.to_string(),
513        Value::Float(f) => f.to_string(),
514        Value::String(s) => s.clone(),
515        Value::Json(json) => json.to_string(),
516        Value::Blob(blob) => format!("[blob: {} {}]", blob.formatted_size(), blob.content_type),
517    }
518}
519
520/// Evaluate string parts synchronously (for pipeline context).
521/// Command substitutions are skipped as they require async.
522fn eval_string_parts_sync(parts: &[crate::ast::StringPart], ctx: &ExecContext) -> String {
523    let mut result = String::new();
524    for part in parts {
525        match part {
526            crate::ast::StringPart::Literal(s) => result.push_str(s),
527            crate::ast::StringPart::Var(path) => {
528                if let Some(value) = ctx.scope.resolve_path(path) {
529                    result.push_str(&value_to_string(&value));
530                }
531            }
532            crate::ast::StringPart::VarWithDefault { name, default } => {
533                match ctx.scope.get(name) {
534                    Some(value) => {
535                        let s = value_to_string(value);
536                        if s.is_empty() {
537                            result.push_str(&eval_string_parts_sync(default, ctx));
538                        } else {
539                            result.push_str(&s);
540                        }
541                    }
542                    None => result.push_str(&eval_string_parts_sync(default, ctx)),
543                }
544            }
545            crate::ast::StringPart::VarLength(name) => {
546                let len = match ctx.scope.get(name) {
547                    Some(value) => value_to_string(value).len(),
548                    None => 0,
549                };
550                result.push_str(&len.to_string());
551            }
552            crate::ast::StringPart::Positional(n) => {
553                if let Some(s) = ctx.scope.get_positional(*n) {
554                    result.push_str(s);
555                }
556            }
557            crate::ast::StringPart::AllArgs => {
558                result.push_str(&ctx.scope.all_args().join(" "));
559            }
560            crate::ast::StringPart::ArgCount => {
561                result.push_str(&ctx.scope.arg_count().to_string());
562            }
563            crate::ast::StringPart::Arithmetic(expr) => {
564                if let Ok(value) = arithmetic::eval_arithmetic(expr, &ctx.scope) {
565                    result.push_str(&value.to_string());
566                }
567            }
568            crate::ast::StringPart::CommandSubst(_) => {
569                // Command substitution requires async - skip in sync context
570            }
571            crate::ast::StringPart::LastExitCode => {
572                result.push_str(&ctx.scope.last_result().code.to_string());
573            }
574            crate::ast::StringPart::CurrentPid => {
575                result.push_str(&ctx.scope.pid().to_string());
576            }
577        }
578    }
579    result
580}
581
582/// Find scatter and gather commands in a pipeline.
583///
584/// Returns Some((scatter_index, gather_index)) if both are found with scatter before gather.
585/// Returns None if the pipeline doesn't have a valid scatter/gather pattern.
586fn find_scatter_gather(commands: &[Command]) -> Option<(usize, usize)> {
587    let scatter_idx = commands.iter().position(|c| c.name == "scatter")?;
588    let gather_idx = commands.iter().position(|c| c.name == "gather")?;
589
590    // Gather must come after scatter
591    if gather_idx > scatter_idx {
592        Some((scatter_idx, gather_idx))
593    } else {
594        None
595    }
596}
597
598/// Run a pipeline sequentially without scatter/gather detection.
599///
600/// This is used internally by ScatterGatherRunner to avoid recursion.
601/// The `tools` parameter is used for schema lookup to enable schema-aware argument parsing.
602pub async fn run_sequential_pipeline(
603    tools: &Arc<ToolRegistry>,
604    commands: &[Command],
605    ctx: &mut ExecContext,
606) -> ExecResult {
607    if commands.is_empty() {
608        return ExecResult::success("");
609    }
610
611    let mut current_stdin: Option<String> = ctx.take_stdin();
612    let mut current_data: Option<Value> = ctx.take_stdin_data();
613    let mut last_result = ExecResult::success("");
614
615    for (i, cmd) in commands.iter().enumerate() {
616        // Handle built-in true/false
617        match cmd.name.as_str() {
618            "true" => {
619                last_result = ExecResult::success("");
620                if i < commands.len() - 1 {
621                    current_stdin = Some(last_result.out.clone());
622                    current_data = last_result.data.clone();
623                }
624                continue;
625            }
626            "false" => {
627                return ExecResult::failure(1, "");
628            }
629            _ => {}
630        }
631
632        // Build tool args with schema-aware parsing
633        let schema = tools.get(&cmd.name).map(|t| t.schema());
634        let mut tool_args = build_tool_args(&cmd.args, ctx, schema.as_ref());
635        let output_format = extract_output_format(&mut tool_args, schema.as_ref());
636
637        // Set up stdin from redirects (< file, <<heredoc)
638        setup_stdin_redirects(cmd, ctx);
639
640        // Set stdin from previous command's stdout (overrides redirect stdin)
641        // Also pass structured data if available from previous command
642        if let Some(input) = current_stdin.take() {
643            ctx.set_stdin_with_data(input, current_data.take());
644        }
645
646        // Execute via backend (clone Arc to avoid borrow conflict)
647        let backend = ctx.backend.clone();
648        last_result = match backend.call_tool(&cmd.name, tool_args, ctx).await {
649            Ok(tool_result) => {
650                let mut exec = ExecResult::from_output(
651                    tool_result.code as i64, tool_result.stdout, tool_result.stderr,
652                );
653                exec.output = tool_result.output;
654                exec
655            }
656            Err(e) => ExecResult::failure(127, e.to_string()),
657        };
658
659        // Apply output format transform
660        last_result = match output_format {
661            Some(format) => apply_output_format(last_result, format),
662            None => last_result,
663        };
664
665        // Apply post-execution redirects
666        last_result = apply_redirects(last_result, &cmd.redirects, ctx).await;
667
668        // If command failed, stop the pipeline
669        if !last_result.ok() {
670            return last_result;
671        }
672
673        // Pass stdout and structured data to next command's stdin (unless last command)
674        if i < commands.len() - 1 {
675            current_stdin = Some(last_result.out.clone());
676            current_data = last_result.data.clone();
677        }
678    }
679
680    last_result
681}
682
683/// Run a pipeline sequentially with owned data (for spawned tasks).
684///
685/// This is used by scatter workers to run commands in parallel.
686pub async fn run_sequential_pipeline_owned(
687    tools: Arc<ToolRegistry>,
688    commands: Vec<Command>,
689    ctx: &mut ExecContext,
690) -> ExecResult {
691    run_sequential_pipeline(&tools, &commands, ctx).await
692}
693
694#[cfg(test)]
695mod tests {
696    use super::*;
697    use crate::tools::register_builtins;
698    use crate::vfs::{Filesystem, MemoryFs, VfsRouter};
699    use std::path::Path;
700
701    async fn make_runner_and_ctx() -> (PipelineRunner, ExecContext) {
702        let mut tools = ToolRegistry::new();
703        register_builtins(&mut tools);
704        let tools = Arc::new(tools);
705        let runner = PipelineRunner::new(tools.clone());
706
707        let mut vfs = VfsRouter::new();
708        let mem = MemoryFs::new();
709        mem.write(Path::new("test.txt"), b"hello\nworld\nfoo").await.unwrap();
710        vfs.mount("/", mem);
711        let ctx = ExecContext::with_vfs_and_tools(Arc::new(vfs), tools);
712
713        (runner, ctx)
714    }
715
716    fn make_cmd(name: &str, args: Vec<&str>) -> Command {
717        Command {
718            name: name.to_string(),
719            args: args.iter().map(|s| Arg::Positional(Expr::Literal(Value::String(s.to_string())))).collect(),
720            redirects: vec![],
721        }
722    }
723
724    #[tokio::test]
725    async fn test_single_command() {
726        let (runner, mut ctx) = make_runner_and_ctx().await;
727        let cmd = make_cmd("echo", vec!["hello"]);
728
729        let result = runner.run(&[cmd], &mut ctx).await;
730        assert!(result.ok());
731        assert_eq!(result.out.trim(), "hello");
732    }
733
734    #[tokio::test]
735    async fn test_pipeline_echo_grep() {
736        let (runner, mut ctx) = make_runner_and_ctx().await;
737
738        // echo "hello\nworld" | grep pattern="world"
739        let echo_cmd = Command {
740            name: "echo".to_string(),
741            args: vec![Arg::Positional(Expr::Literal(Value::String("hello\nworld".to_string())))],
742            redirects: vec![],
743        };
744        let grep_cmd = Command {
745            name: "grep".to_string(),
746            args: vec![Arg::Positional(Expr::Literal(Value::String("world".to_string())))],
747            redirects: vec![],
748        };
749
750        let result = runner.run(&[echo_cmd, grep_cmd], &mut ctx).await;
751        assert!(result.ok());
752        assert_eq!(result.out.trim(), "world");
753    }
754
755    #[tokio::test]
756    async fn test_pipeline_cat_grep() {
757        let (runner, mut ctx) = make_runner_and_ctx().await;
758
759        // cat /test.txt | grep pattern="hello"
760        let cat_cmd = make_cmd("cat", vec!["/test.txt"]);
761        let grep_cmd = Command {
762            name: "grep".to_string(),
763            args: vec![Arg::Positional(Expr::Literal(Value::String("hello".to_string())))],
764            redirects: vec![],
765        };
766
767        let result = runner.run(&[cat_cmd, grep_cmd], &mut ctx).await;
768        assert!(result.ok());
769        assert!(result.out.contains("hello"));
770    }
771
772    #[tokio::test]
773    async fn test_command_not_found() {
774        let (runner, mut ctx) = make_runner_and_ctx().await;
775        let cmd = make_cmd("nonexistent", vec![]);
776
777        let result = runner.run(&[cmd], &mut ctx).await;
778        assert!(!result.ok());
779        assert_eq!(result.code, 127);
780        assert!(result.err.contains("not found"));
781    }
782
783    #[tokio::test]
784    async fn test_pipeline_stops_on_failure() {
785        let (runner, mut ctx) = make_runner_and_ctx().await;
786
787        // cat /nonexistent | grep "hello"
788        let cat_cmd = make_cmd("cat", vec!["/nonexistent"]);
789        let grep_cmd = Command {
790            name: "grep".to_string(),
791            args: vec![Arg::Positional(Expr::Literal(Value::String("hello".to_string())))],
792            redirects: vec![],
793        };
794
795        let result = runner.run(&[cat_cmd, grep_cmd], &mut ctx).await;
796        assert!(!result.ok());
797    }
798
799    #[tokio::test]
800    async fn test_empty_pipeline() {
801        let (runner, mut ctx) = make_runner_and_ctx().await;
802        let result = runner.run(&[], &mut ctx).await;
803        assert!(result.ok());
804    }
805
806    // === Scatter/Gather Tests ===
807
808    #[test]
809    fn test_find_scatter_gather_both_present() {
810        let commands = vec![
811            make_cmd("echo", vec!["a"]),
812            make_cmd("scatter", vec![]),
813            make_cmd("process", vec![]),
814            make_cmd("gather", vec![]),
815        ];
816        let result = find_scatter_gather(&commands);
817        assert_eq!(result, Some((1, 3)));
818    }
819
820    #[test]
821    fn test_find_scatter_gather_no_scatter() {
822        let commands = vec![
823            make_cmd("echo", vec!["a"]),
824            make_cmd("gather", vec![]),
825        ];
826        let result = find_scatter_gather(&commands);
827        assert!(result.is_none());
828    }
829
830    #[test]
831    fn test_find_scatter_gather_no_gather() {
832        let commands = vec![
833            make_cmd("echo", vec!["a"]),
834            make_cmd("scatter", vec![]),
835        ];
836        let result = find_scatter_gather(&commands);
837        assert!(result.is_none());
838    }
839
840    #[test]
841    fn test_find_scatter_gather_wrong_order() {
842        let commands = vec![
843            make_cmd("gather", vec![]),
844            make_cmd("scatter", vec![]),
845        ];
846        let result = find_scatter_gather(&commands);
847        assert!(result.is_none());
848    }
849
850    #[tokio::test]
851    async fn test_scatter_gather_simple() {
852        let (runner, mut ctx) = make_runner_and_ctx().await;
853
854        // echo "a\nb\nc" | scatter | echo ${ITEM} | gather
855        let echo_cmd = Command {
856            name: "echo".to_string(),
857            args: vec![Arg::Positional(Expr::Literal(Value::String("a\nb\nc".to_string())))],
858            redirects: vec![],
859        };
860        let scatter_cmd = make_cmd("scatter", vec![]);
861        let process_cmd = Command {
862            name: "echo".to_string(),
863            args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("ITEM")))],
864            redirects: vec![],
865        };
866        let gather_cmd = make_cmd("gather", vec![]);
867
868        let result = runner.run(&[echo_cmd, scatter_cmd, process_cmd, gather_cmd], &mut ctx).await;
869        assert!(result.ok());
870        // Each echo should output the item
871        assert!(result.out.contains("a"));
872        assert!(result.out.contains("b"));
873        assert!(result.out.contains("c"));
874    }
875
876    #[tokio::test]
877    async fn test_scatter_gather_empty_input() {
878        let (runner, mut ctx) = make_runner_and_ctx().await;
879
880        // echo "" | scatter | echo ${ITEM} | gather
881        let echo_cmd = Command {
882            name: "echo".to_string(),
883            args: vec![Arg::Positional(Expr::Literal(Value::String("".to_string())))],
884            redirects: vec![],
885        };
886        let scatter_cmd = make_cmd("scatter", vec![]);
887        let process_cmd = Command {
888            name: "echo".to_string(),
889            args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("ITEM")))],
890            redirects: vec![],
891        };
892        let gather_cmd = make_cmd("gather", vec![]);
893
894        let result = runner.run(&[echo_cmd, scatter_cmd, process_cmd, gather_cmd], &mut ctx).await;
895        assert!(result.ok());
896        assert!(result.out.trim().is_empty());
897    }
898
899    #[tokio::test]
900    async fn test_scatter_gather_with_stdin() {
901        let (runner, mut ctx) = make_runner_and_ctx().await;
902
903        // Set stdin directly and use scatter | echo | gather
904        ctx.set_stdin("x\ny\nz".to_string());
905
906        let scatter_cmd = make_cmd("scatter", vec![]);
907        let process_cmd = Command {
908            name: "echo".to_string(),
909            args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("ITEM")))],
910            redirects: vec![],
911        };
912        let gather_cmd = make_cmd("gather", vec![]);
913
914        let result = runner.run(&[scatter_cmd, process_cmd, gather_cmd], &mut ctx).await;
915        assert!(result.ok());
916        assert!(result.out.contains("x"));
917        assert!(result.out.contains("y"));
918        assert!(result.out.contains("z"));
919    }
920
921    #[tokio::test]
922    async fn test_scatter_gather_json_input() {
923        let (runner, mut ctx) = make_runner_and_ctx().await;
924
925        // JSON array input
926        ctx.set_stdin(r#"["one", "two", "three"]"#.to_string());
927
928        let scatter_cmd = make_cmd("scatter", vec![]);
929        let process_cmd = Command {
930            name: "echo".to_string(),
931            args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("ITEM")))],
932            redirects: vec![],
933        };
934        let gather_cmd = make_cmd("gather", vec![]);
935
936        let result = runner.run(&[scatter_cmd, process_cmd, gather_cmd], &mut ctx).await;
937        assert!(result.ok());
938        assert!(result.out.contains("one"));
939        assert!(result.out.contains("two"));
940        assert!(result.out.contains("three"));
941    }
942
943    #[tokio::test]
944    async fn test_scatter_gather_with_post_gather() {
945        let (runner, mut ctx) = make_runner_and_ctx().await;
946
947        // echo "a\nb" | scatter | echo ${ITEM} | gather | grep "a"
948        let echo_cmd = Command {
949            name: "echo".to_string(),
950            args: vec![Arg::Positional(Expr::Literal(Value::String("a\nb".to_string())))],
951            redirects: vec![],
952        };
953        let scatter_cmd = make_cmd("scatter", vec![]);
954        let process_cmd = Command {
955            name: "echo".to_string(),
956            args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("ITEM")))],
957            redirects: vec![],
958        };
959        let gather_cmd = make_cmd("gather", vec![]);
960        let grep_cmd = Command {
961            name: "grep".to_string(),
962            args: vec![Arg::Positional(Expr::Literal(Value::String("a".to_string())))],
963            redirects: vec![],
964        };
965
966        let result = runner.run(&[echo_cmd, scatter_cmd, process_cmd, gather_cmd, grep_cmd], &mut ctx).await;
967        assert!(result.ok());
968        assert!(result.out.contains("a"));
969        assert!(!result.out.contains("b"));
970    }
971
972    #[tokio::test]
973    async fn test_scatter_custom_var_name() {
974        let (runner, mut ctx) = make_runner_and_ctx().await;
975
976        ctx.set_stdin("test1\ntest2".to_string());
977
978        // scatter as=URL | echo ${URL} | gather
979        let scatter_cmd = Command {
980            name: "scatter".to_string(),
981            args: vec![Arg::Named {
982                key: "as".to_string(),
983                value: Expr::Literal(Value::String("URL".to_string())),
984            }],
985            redirects: vec![],
986        };
987        let process_cmd = Command {
988            name: "echo".to_string(),
989            args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("URL")))],
990            redirects: vec![],
991        };
992        let gather_cmd = make_cmd("gather", vec![]);
993
994        let result = runner.run(&[scatter_cmd, process_cmd, gather_cmd], &mut ctx).await;
995        assert!(result.ok());
996        assert!(result.out.contains("test1"));
997        assert!(result.out.contains("test2"));
998    }
999
1000    // === Backend Routing Tests ===
1001
1002    #[tokio::test]
1003    async fn test_pipeline_routes_through_backend() {
1004        use crate::backend::testing::MockBackend;
1005        use std::sync::atomic::Ordering;
1006
1007        // Create mock backend
1008        let (backend, call_count) = MockBackend::new();
1009        let backend: std::sync::Arc<dyn crate::backend::KernelBackend> = std::sync::Arc::new(backend);
1010
1011        // Create context with mock backend
1012        let mut ctx = crate::tools::ExecContext::with_backend(backend);
1013
1014        // Create a dummy tool registry (not used since backend intercepts)
1015        let tools = std::sync::Arc::new(ToolRegistry::new());
1016        let runner = PipelineRunner::new(tools);
1017
1018        // Single command should route through backend
1019        let cmd = make_cmd("test-tool", vec!["arg1"]);
1020        let result = runner.run(&[cmd], &mut ctx).await;
1021
1022        assert!(result.ok(), "Mock backend should return success");
1023        assert_eq!(call_count.load(Ordering::SeqCst), 1, "call_tool should be invoked once");
1024        assert!(result.out.contains("mock executed"), "Output should be from mock backend");
1025    }
1026
1027    #[tokio::test]
1028    async fn test_multi_command_pipeline_routes_through_backend() {
1029        use crate::backend::testing::MockBackend;
1030        use std::sync::atomic::Ordering;
1031
1032        let (backend, call_count) = MockBackend::new();
1033        let backend: std::sync::Arc<dyn crate::backend::KernelBackend> = std::sync::Arc::new(backend);
1034        let mut ctx = crate::tools::ExecContext::with_backend(backend);
1035
1036        let tools = std::sync::Arc::new(ToolRegistry::new());
1037        let runner = PipelineRunner::new(tools);
1038
1039        // Pipeline with 3 commands
1040        let cmd1 = make_cmd("tool1", vec![]);
1041        let cmd2 = make_cmd("tool2", vec![]);
1042        let cmd3 = make_cmd("tool3", vec![]);
1043
1044        let result = runner.run(&[cmd1, cmd2, cmd3], &mut ctx).await;
1045
1046        assert!(result.ok());
1047        assert_eq!(call_count.load(Ordering::SeqCst), 3, "call_tool should be invoked for each command");
1048    }
1049
1050    // === Schema-Aware Argument Parsing Tests ===
1051
1052    use crate::tools::{ParamSchema, ToolSchema};
1053
1054    fn make_test_schema() -> ToolSchema {
1055        ToolSchema::new("test-tool", "A test tool for schema-aware parsing")
1056            .param(ParamSchema::required("query", "string", "Search query"))
1057            .param(ParamSchema::optional("limit", "int", Value::Int(10), "Max results"))
1058            .param(ParamSchema::optional("verbose", "bool", Value::Bool(false), "Verbose output"))
1059            .param(ParamSchema::optional("output", "string", Value::String("stdout".into()), "Output destination"))
1060    }
1061
1062    fn make_minimal_ctx() -> ExecContext {
1063        let mut vfs = VfsRouter::new();
1064        vfs.mount("/", MemoryFs::new());
1065        ExecContext::new(Arc::new(vfs))
1066    }
1067
1068    #[test]
1069    fn test_schema_aware_string_arg() {
1070        // --query "test" should become named: {"query": "test"}
1071        let args = vec![
1072            Arg::LongFlag("query".to_string()),
1073            Arg::Positional(Expr::Literal(Value::String("test".to_string()))),
1074        ];
1075        let schema = make_test_schema();
1076        let ctx = make_minimal_ctx();
1077
1078        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1079
1080        assert!(tool_args.flags.is_empty(), "No flags should be set");
1081        assert!(tool_args.positional.is_empty(), "No positionals - consumed by --query");
1082        assert_eq!(
1083            tool_args.named.get("query"),
1084            Some(&Value::String("test".to_string())),
1085            "--query should consume 'test' as its value"
1086        );
1087    }
1088
1089    #[test]
1090    fn test_schema_aware_bool_flag() {
1091        // --verbose should remain a flag since schema says bool
1092        let args = vec![
1093            Arg::LongFlag("verbose".to_string()),
1094        ];
1095        let schema = make_test_schema();
1096        let ctx = make_minimal_ctx();
1097
1098        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1099
1100        assert!(tool_args.flags.contains("verbose"), "--verbose should be a flag");
1101        assert!(tool_args.named.is_empty(), "No named args");
1102        assert!(tool_args.positional.is_empty(), "No positionals");
1103    }
1104
1105    #[test]
1106    fn test_schema_aware_mixed() {
1107        // mcp_tool file.txt --output out.txt --verbose
1108        // Should produce: positional: ["file.txt"], named: {"output": "out.txt"}, flags: {"verbose"}
1109        let args = vec![
1110            Arg::Positional(Expr::Literal(Value::String("file.txt".to_string()))),
1111            Arg::LongFlag("output".to_string()),
1112            Arg::Positional(Expr::Literal(Value::String("out.txt".to_string()))),
1113            Arg::LongFlag("verbose".to_string()),
1114        ];
1115        let schema = make_test_schema();
1116        let ctx = make_minimal_ctx();
1117
1118        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1119
1120        assert_eq!(tool_args.positional, vec![Value::String("file.txt".to_string())]);
1121        assert_eq!(
1122            tool_args.named.get("output"),
1123            Some(&Value::String("out.txt".to_string()))
1124        );
1125        assert!(tool_args.flags.contains("verbose"));
1126    }
1127
1128    #[test]
1129    fn test_schema_aware_multiple_string_args() {
1130        // --query "test" --output "result.json" --verbose --limit 5
1131        let args = vec![
1132            Arg::LongFlag("query".to_string()),
1133            Arg::Positional(Expr::Literal(Value::String("test".to_string()))),
1134            Arg::LongFlag("output".to_string()),
1135            Arg::Positional(Expr::Literal(Value::String("result.json".to_string()))),
1136            Arg::LongFlag("verbose".to_string()),
1137            Arg::LongFlag("limit".to_string()),
1138            Arg::Positional(Expr::Literal(Value::Int(5))),
1139        ];
1140        let schema = make_test_schema();
1141        let ctx = make_minimal_ctx();
1142
1143        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1144
1145        assert!(tool_args.positional.is_empty(), "All positionals consumed");
1146        assert_eq!(
1147            tool_args.named.get("query"),
1148            Some(&Value::String("test".to_string()))
1149        );
1150        assert_eq!(
1151            tool_args.named.get("output"),
1152            Some(&Value::String("result.json".to_string()))
1153        );
1154        assert_eq!(
1155            tool_args.named.get("limit"),
1156            Some(&Value::Int(5))
1157        );
1158        assert!(tool_args.flags.contains("verbose"));
1159    }
1160
1161    #[test]
1162    fn test_schema_aware_double_dash() {
1163        // --output out.txt -- --this-is-data
1164        // After --, everything is positional
1165        let args = vec![
1166            Arg::LongFlag("output".to_string()),
1167            Arg::Positional(Expr::Literal(Value::String("out.txt".to_string()))),
1168            Arg::DoubleDash,
1169            Arg::Positional(Expr::Literal(Value::String("--this-is-data".to_string()))),
1170        ];
1171        let schema = make_test_schema();
1172        let ctx = make_minimal_ctx();
1173
1174        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1175
1176        assert_eq!(
1177            tool_args.named.get("output"),
1178            Some(&Value::String("out.txt".to_string()))
1179        );
1180        // After --, the --this-is-data is treated as a positional (it's a Positional in the args)
1181        assert_eq!(
1182            tool_args.positional,
1183            vec![Value::String("--this-is-data".to_string())]
1184        );
1185    }
1186
1187    #[test]
1188    fn test_no_schema_fallback() {
1189        // Without schema, all --flags are treated as bool flags
1190        let args = vec![
1191            Arg::LongFlag("query".to_string()),
1192            Arg::Positional(Expr::Literal(Value::String("test".to_string()))),
1193        ];
1194        let ctx = make_minimal_ctx();
1195
1196        let tool_args = build_tool_args(&args, &ctx, None);
1197
1198        // Without schema, --query is a flag and "test" is a positional
1199        assert!(tool_args.flags.contains("query"), "--query should be a flag");
1200        assert_eq!(
1201            tool_args.positional,
1202            vec![Value::String("test".to_string())],
1203            "'test' should be a positional"
1204        );
1205    }
1206
1207    #[test]
1208    fn test_unknown_flag_in_schema() {
1209        // --unknown-flag value should treat --unknown-flag as bool (not in schema)
1210        let args = vec![
1211            Arg::LongFlag("unknown".to_string()),
1212            Arg::Positional(Expr::Literal(Value::String("value".to_string()))),
1213        ];
1214        let schema = make_test_schema();
1215        let ctx = make_minimal_ctx();
1216
1217        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1218
1219        // Unknown flag defaults to bool behavior
1220        assert!(tool_args.flags.contains("unknown"));
1221        assert_eq!(
1222            tool_args.positional,
1223            vec![Value::String("value".to_string())]
1224        );
1225    }
1226
1227    #[test]
1228    fn test_named_args_unchanged() {
1229        // key=value syntax should work regardless of schema
1230        let args = vec![
1231            Arg::Named {
1232                key: "query".to_string(),
1233                value: Expr::Literal(Value::String("test".to_string())),
1234            },
1235            Arg::LongFlag("verbose".to_string()),
1236        ];
1237        let schema = make_test_schema();
1238        let ctx = make_minimal_ctx();
1239
1240        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1241
1242        assert_eq!(
1243            tool_args.named.get("query"),
1244            Some(&Value::String("test".to_string()))
1245        );
1246        assert!(tool_args.flags.contains("verbose"));
1247    }
1248
1249    #[test]
1250    fn test_short_flags_unchanged() {
1251        // Short flags -la should expand regardless of schema
1252        let args = vec![
1253            Arg::ShortFlag("la".to_string()),
1254            Arg::Positional(Expr::Literal(Value::String("file.txt".to_string()))),
1255        ];
1256        let schema = make_test_schema();
1257        let ctx = make_minimal_ctx();
1258
1259        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1260
1261        assert!(tool_args.flags.contains("l"));
1262        assert!(tool_args.flags.contains("a"));
1263        assert_eq!(
1264            tool_args.positional,
1265            vec![Value::String("file.txt".to_string())]
1266        );
1267    }
1268
1269    #[test]
1270    fn test_flag_at_end_no_value() {
1271        // --output at end with no value available - treat as flag (lenient)
1272        let args = vec![
1273            Arg::Positional(Expr::Literal(Value::String("file.txt".to_string()))),
1274            Arg::LongFlag("output".to_string()),
1275        ];
1276        let schema = make_test_schema();
1277        let ctx = make_minimal_ctx();
1278
1279        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1280
1281        // output expects a value but none available after it, so it becomes a flag
1282        assert!(tool_args.flags.contains("output"));
1283        assert_eq!(
1284            tool_args.positional,
1285            vec![Value::String("file.txt".to_string())]
1286        );
1287    }
1288
1289    // === Redirect Execution Tests ===
1290
1291    #[tokio::test]
1292    async fn test_merge_stderr_redirect() {
1293        // Test that 2>&1 merges stderr into stdout
1294        let result = ExecResult::from_output(0, "stdout content", "stderr content");
1295
1296        let redirects = vec![Redirect {
1297            kind: RedirectKind::MergeStderr,
1298            target: Expr::Literal(Value::Null),
1299        }];
1300
1301        let ctx = make_minimal_ctx();
1302        let result = apply_redirects(result, &redirects, &ctx).await;
1303
1304        assert_eq!(result.out, "stdout contentstderr content");
1305        assert!(result.err.is_empty());
1306    }
1307
1308    #[tokio::test]
1309    async fn test_merge_stderr_with_empty_stderr() {
1310        // Test that 2>&1 handles empty stderr gracefully
1311        let result = ExecResult::from_output(0, "stdout only", "");
1312
1313        let redirects = vec![Redirect {
1314            kind: RedirectKind::MergeStderr,
1315            target: Expr::Literal(Value::Null),
1316        }];
1317
1318        let ctx = make_minimal_ctx();
1319        let result = apply_redirects(result, &redirects, &ctx).await;
1320
1321        assert_eq!(result.out, "stdout only");
1322        assert!(result.err.is_empty());
1323    }
1324
1325    #[tokio::test]
1326    async fn test_merge_stderr_order_matters() {
1327        // Test redirect ordering: 2>&1 > file means:
1328        // 1. First merge stderr into stdout
1329        // 2. Then write stdout to file (leaving both empty for piping)
1330        // This verifies left-to-right processing
1331        let result = ExecResult::from_output(0, "stdout\n", "stderr\n");
1332
1333        // Just 2>&1 - should merge
1334        let redirects = vec![Redirect {
1335            kind: RedirectKind::MergeStderr,
1336            target: Expr::Literal(Value::Null),
1337        }];
1338
1339        let ctx = make_minimal_ctx();
1340        let result = apply_redirects(result, &redirects, &ctx).await;
1341
1342        assert_eq!(result.out, "stdout\nstderr\n");
1343        assert!(result.err.is_empty());
1344    }
1345
1346    #[tokio::test]
1347    async fn test_redirect_with_command_execution() {
1348        let (runner, mut ctx) = make_runner_and_ctx().await;
1349
1350        // echo "hello" with 2>&1 redirect
1351        let cmd = Command {
1352            name: "echo".to_string(),
1353            args: vec![Arg::Positional(Expr::Literal(Value::String("hello".to_string())))],
1354            redirects: vec![Redirect {
1355                kind: RedirectKind::MergeStderr,
1356                target: Expr::Literal(Value::Null),
1357            }],
1358        };
1359
1360        let result = runner.run(&[cmd], &mut ctx).await;
1361        assert!(result.ok());
1362        // echo produces no stderr, so this just validates the redirect doesn't break anything
1363        assert!(result.out.contains("hello"));
1364    }
1365
1366    #[tokio::test]
1367    async fn test_merge_stderr_in_pipeline() {
1368        let (runner, mut ctx) = make_runner_and_ctx().await;
1369
1370        // echo "output" 2>&1 | grep "output"
1371        // The 2>&1 should be applied to echo's result, then piped to grep
1372        let echo_cmd = Command {
1373            name: "echo".to_string(),
1374            args: vec![Arg::Positional(Expr::Literal(Value::String("output".to_string())))],
1375            redirects: vec![Redirect {
1376                kind: RedirectKind::MergeStderr,
1377                target: Expr::Literal(Value::Null),
1378            }],
1379        };
1380        let grep_cmd = Command {
1381            name: "grep".to_string(),
1382            args: vec![Arg::Positional(Expr::Literal(Value::String("output".to_string())))],
1383            redirects: vec![],
1384        };
1385
1386        let result = runner.run(&[echo_cmd, grep_cmd], &mut ctx).await;
1387        assert!(result.ok(), "result failed: code={}, err={}", result.code, result.err);
1388        assert!(result.out.contains("output"));
1389    }
1390}