Skip to main content

kaish_kernel/scheduler/
pipeline.rs

1//! Pipeline execution for kaish.
2//!
3//! Executes a sequence of commands connected by pipes, where the stdout
4//! of each command becomes the stdin of the next.
5//!
6//! Also handles scatter/gather pipelines for parallel execution.
7
8use std::sync::Arc;
9
10use std::collections::HashMap;
11
12use crate::arithmetic;
13use crate::ast::{Arg, Command, Expr, Redirect, RedirectKind, Value};
14use crate::dispatch::{CommandDispatcher, PipelinePosition};
15use crate::interpreter::ExecResult;
16use crate::tools::{ExecContext, ToolArgs, ToolRegistry, ToolSchema};
17use tokio::io::AsyncWriteExt;
18
19use super::scatter::{
20    parse_gather_options, parse_scatter_options, ScatterGatherRunner,
21};
22
23/// Apply redirects to an execution result.
24///
25/// Pre-execution redirects (Stdin, HereDoc) should be handled before calling.
26/// Post-execution redirects (stdout/stderr to file, merge) applied here.
27/// Redirects are processed left-to-right per POSIX.
28async fn apply_redirects(
29    mut result: ExecResult,
30    redirects: &[Redirect],
31    ctx: &ExecContext,
32) -> ExecResult {
33    for redir in redirects {
34        match redir.kind {
35            RedirectKind::MergeStderr => {
36                // 2>&1 - append stderr to stdout
37                if !result.err.is_empty() {
38                    result.out.push_str(&result.err);
39                    result.err.clear();
40                }
41            }
42            RedirectKind::MergeStdout => {
43                // 1>&2 or >&2 - append stdout to stderr
44                if !result.out.is_empty() {
45                    result.err.push_str(&result.out);
46                    result.out.clear();
47                }
48            }
49            RedirectKind::StdoutOverwrite => {
50                if let Some(path) = eval_redirect_target(&redir.target, ctx) {
51                    if let Err(e) = tokio::fs::write(&path, &result.out).await {
52                        return ExecResult::failure(1, format!("redirect: {e}"));
53                    }
54                    result.out.clear();
55                }
56            }
57            RedirectKind::StdoutAppend => {
58                if let Some(path) = eval_redirect_target(&redir.target, ctx) {
59                    let file = tokio::fs::OpenOptions::new()
60                        .append(true)
61                        .create(true)
62                        .open(&path)
63                        .await;
64                    match file {
65                        Ok(mut f) => {
66                            if let Err(e) = f.write_all(result.out.as_bytes()).await {
67                                return ExecResult::failure(1, format!("redirect: {e}"));
68                            }
69                        }
70                        Err(e) => return ExecResult::failure(1, format!("redirect: {e}")),
71                    }
72                    result.out.clear();
73                }
74            }
75            RedirectKind::Stderr => {
76                if let Some(path) = eval_redirect_target(&redir.target, ctx) {
77                    if let Err(e) = tokio::fs::write(&path, &result.err).await {
78                        return ExecResult::failure(1, format!("redirect: {e}"));
79                    }
80                    result.err.clear();
81                }
82            }
83            RedirectKind::Both => {
84                if let Some(path) = eval_redirect_target(&redir.target, ctx) {
85                    let combined = format!("{}{}", result.out, result.err);
86                    if let Err(e) = tokio::fs::write(&path, combined).await {
87                        return ExecResult::failure(1, format!("redirect: {e}"));
88                    }
89                    result.out.clear();
90                    result.err.clear();
91                }
92            }
93            // Pre-execution redirects - already handled before command execution
94            RedirectKind::Stdin | RedirectKind::HereDoc => {}
95        }
96    }
97    result
98}
99
100/// Evaluate a redirect target expression to get the file path.
101fn eval_redirect_target(expr: &Expr, ctx: &ExecContext) -> Option<String> {
102    eval_simple_expr(expr, ctx).map(|v| value_to_string(&v))
103}
104
105/// Set up stdin from redirects (< file, <<heredoc).
106/// Called before command execution.
107fn setup_stdin_redirects(cmd: &Command, ctx: &mut ExecContext) {
108    for redir in &cmd.redirects {
109        match &redir.kind {
110            RedirectKind::Stdin => {
111                if let Some(path) = eval_redirect_target(&redir.target, ctx)
112                    && let Ok(content) = std::fs::read_to_string(&path) {
113                        ctx.set_stdin(content);
114                    }
115            }
116            RedirectKind::HereDoc => {
117                if let Expr::Literal(Value::String(content)) = &redir.target {
118                    ctx.set_stdin(content.clone());
119                }
120            }
121            _ => {}
122        }
123    }
124}
125
126/// Runs pipelines by spawning tasks and connecting them via channels.
127#[derive(Clone)]
128pub struct PipelineRunner {
129    tools: Arc<ToolRegistry>,
130}
131
132impl PipelineRunner {
133    /// Create a new pipeline runner with the given tool registry.
134    pub fn new(tools: Arc<ToolRegistry>) -> Self {
135        Self { tools }
136    }
137
138    /// Execute a pipeline of commands.
139    ///
140    /// Each command's stdout becomes the next command's stdin.
141    /// If the pipeline contains scatter/gather, delegates to ScatterGatherRunner.
142    /// Returns the result of the last command in the pipeline.
143    ///
144    /// The `dispatcher` handles the full command resolution chain (user tools,
145    /// builtins, scripts, external commands, backend tools). The runner handles
146    /// I/O routing: stdin redirects, piping between commands, and output redirects.
147    pub async fn run(
148        &self,
149        commands: &[Command],
150        ctx: &mut ExecContext,
151        dispatcher: &dyn CommandDispatcher,
152    ) -> ExecResult {
153        if commands.is_empty() {
154            return ExecResult::success("");
155        }
156
157        // Check for scatter/gather pipeline
158        if let Some((scatter_idx, gather_idx)) = find_scatter_gather(commands) {
159            return self.run_scatter_gather(commands, scatter_idx, gather_idx, ctx, dispatcher).await;
160        }
161
162        self.run_sequential(commands, ctx, dispatcher).await
163    }
164
165    /// Execute commands sequentially without scatter/gather detection.
166    ///
167    /// Used by `ScatterGatherRunner` for pre_scatter, post_gather, and parallel
168    /// workers. Breaks the async recursion chain (`run` → scatter → `run`).
169    pub async fn run_sequential(
170        &self,
171        commands: &[Command],
172        ctx: &mut ExecContext,
173        dispatcher: &dyn CommandDispatcher,
174    ) -> ExecResult {
175        if commands.is_empty() {
176            return ExecResult::success("");
177        }
178
179        if commands.len() == 1 {
180            // Single command, no piping needed
181            return self.run_single(&commands[0], ctx, None, dispatcher).await;
182        }
183
184        // Multi-command pipeline
185        self.run_pipeline(commands, ctx, dispatcher).await
186    }
187
188    /// Run a scatter/gather pipeline.
189    async fn run_scatter_gather(
190        &self,
191        commands: &[Command],
192        scatter_idx: usize,
193        gather_idx: usize,
194        ctx: &mut ExecContext,
195        _dispatcher: &dyn CommandDispatcher,
196    ) -> ExecResult {
197        // Split pipeline into parts
198        let pre_scatter = &commands[..scatter_idx];
199        let scatter_cmd = &commands[scatter_idx];
200        let parallel = &commands[scatter_idx + 1..gather_idx];
201        let gather_cmd = &commands[gather_idx];
202        let post_gather = &commands[gather_idx + 1..];
203
204        // Parse options from scatter and gather commands
205        // These are builtins with simple key=value syntax, no schema-driven parsing needed
206        let scatter_schema = self.tools.get("scatter").map(|t| t.schema());
207        let gather_schema = self.tools.get("gather").map(|t| t.schema());
208        let scatter_opts = parse_scatter_options(&build_tool_args(&scatter_cmd.args, ctx, scatter_schema.as_ref()));
209        let gather_opts = parse_gather_options(&build_tool_args(&gather_cmd.args, ctx, gather_schema.as_ref()));
210
211        // Create a BackendDispatcher for scatter workers.
212        // Parallel workers MUST use a stateless dispatcher to avoid data races
213        // on shared scope. BackendDispatcher is stateless (routes through backend.call_tool).
214        let scatter_dispatcher: Arc<dyn CommandDispatcher> =
215            Arc::new(crate::dispatch::BackendDispatcher::new(self.tools.clone()));
216
217        let runner = ScatterGatherRunner::new(self.tools.clone(), scatter_dispatcher);
218        runner
219            .run(
220                pre_scatter,
221                scatter_opts,
222                parallel,
223                gather_opts,
224                post_gather,
225                ctx,
226            )
227            .await
228    }
229
230    /// Run a single command with optional stdin.
231    ///
232    /// The dispatcher handles arg parsing, schema lookup, output format, and execution.
233    /// The runner handles stdin setup (redirects + pipeline) and output redirects.
234    async fn run_single(
235        &self,
236        cmd: &Command,
237        ctx: &mut ExecContext,
238        stdin: Option<String>,
239        dispatcher: &dyn CommandDispatcher,
240    ) -> ExecResult {
241        // Set up stdin from redirects (< file, <<heredoc)
242        setup_stdin_redirects(cmd, ctx);
243
244        // Set stdin from pipeline (overrides redirect stdin)
245        if let Some(input) = stdin {
246            ctx.set_stdin(input);
247        }
248
249        // Set pipeline position for stdio inheritance decisions
250        ctx.pipeline_position = PipelinePosition::Only;
251
252        // Execute via dispatcher (full resolution chain)
253        let result = match dispatcher.dispatch(cmd, ctx).await {
254            Ok(result) => result,
255            Err(e) => ExecResult::failure(1, e.to_string()),
256        };
257
258        // Apply post-execution redirects
259        apply_redirects(result, &cmd.redirects, ctx).await
260    }
261
262    /// Run a multi-command pipeline.
263    ///
264    /// Each command's stdout becomes the next command's stdin.
265    /// The dispatcher handles execution; the runner handles I/O routing.
266    async fn run_pipeline(
267        &self,
268        commands: &[Command],
269        ctx: &mut ExecContext,
270        dispatcher: &dyn CommandDispatcher,
271    ) -> ExecResult {
272        let mut current_stdin: Option<String> = None;
273        let mut current_data: Option<Value> = None;
274        let mut last_result = ExecResult::success("");
275        let last_idx = commands.len() - 1;
276
277        for (i, cmd) in commands.iter().enumerate() {
278            // Set up stdin from redirects (< file, <<heredoc)
279            setup_stdin_redirects(cmd, ctx);
280
281            // Set stdin from previous command's stdout (overrides redirect stdin)
282            // Also pass structured data if available from previous command
283            if let Some(input) = current_stdin.take() {
284                ctx.set_stdin_with_data(input, current_data.take());
285            }
286
287            // Set pipeline position for stdio inheritance decisions
288            ctx.pipeline_position = match i {
289                0 if last_idx == 0 => PipelinePosition::Only,
290                0 => PipelinePosition::First,
291                n if n == last_idx => PipelinePosition::Last,
292                _ => PipelinePosition::Middle,
293            };
294
295            // Execute via dispatcher (full resolution chain)
296            last_result = match dispatcher.dispatch(cmd, ctx).await {
297                Ok(result) => result,
298                Err(e) => ExecResult::failure(1, e.to_string()),
299            };
300
301            // Apply post-execution redirects
302            last_result = apply_redirects(last_result, &cmd.redirects, ctx).await;
303
304            // If command failed, stop the pipeline
305            if !last_result.ok() {
306                return last_result;
307            }
308
309            // Pass stdout and structured data to next command's stdin (unless last command)
310            if i < last_idx {
311                current_stdin = Some(last_result.out.clone());
312                current_data = last_result.data.clone();
313            }
314        }
315
316        last_result
317    }
318}
319
320/// Extract parameter types from a tool schema.
321///
322/// Returns a map from param name → param type (e.g., "verbose" → "bool", "output" → "string").
323/// Build a map from flag name → (canonical param name, param type).
324///
325/// Includes both primary names and aliases (with dashes stripped).
326/// For short flags like `-n` aliased to `lines`, maps `"n"` → `("lines", "int")`.
327pub fn schema_param_lookup(schema: &ToolSchema) -> HashMap<String, (&str, &str)> {
328    let mut map = HashMap::new();
329    for p in &schema.params {
330        map.insert(p.name.clone(), (p.name.as_str(), p.param_type.as_str()));
331        for alias in &p.aliases {
332            let stripped = alias.trim_start_matches('-');
333            map.insert(stripped.to_string(), (p.name.as_str(), p.param_type.as_str()));
334        }
335    }
336    map
337}
338
339/// Check if a type is considered boolean.
340pub fn is_bool_type(param_type: &str) -> bool {
341    matches!(param_type.to_lowercase().as_str(), "bool" | "boolean")
342}
343
344/// Build ToolArgs from AST Args, evaluating expressions.
345///
346/// If a schema is provided, uses it to determine argument types:
347/// - For `--flag` where schema says type is non-bool: consume next positional as value
348/// - For `--flag` where schema says type is bool (or unknown): treat as boolean flag
349///
350/// This enables natural shell syntax like `mcp_tool --query "test" --limit 10`.
351pub fn build_tool_args(args: &[Arg], ctx: &ExecContext, schema: Option<&ToolSchema>) -> ToolArgs {
352    let mut tool_args = ToolArgs::new();
353    let param_lookup = schema.map(schema_param_lookup).unwrap_or_default();
354
355    // Track which positional indices have been consumed as flag values
356    let mut consumed_positionals: std::collections::HashSet<usize> = std::collections::HashSet::new();
357    let mut past_double_dash = false;
358
359    // First pass: find positional args and their indices
360    let mut positional_indices: Vec<(usize, &Expr)> = Vec::new();
361    for (i, arg) in args.iter().enumerate() {
362        if let Arg::Positional(expr) = arg {
363            positional_indices.push((i, expr));
364        }
365    }
366
367    // Second pass: process all args
368    let mut i = 0;
369    while i < args.len() {
370        let arg = &args[i];
371
372        match arg {
373            Arg::DoubleDash => {
374                past_double_dash = true;
375            }
376            Arg::Positional(expr) => {
377                // Check if this positional was consumed by a preceding flag
378                if !consumed_positionals.contains(&i)
379                    && let Some(value) = eval_simple_expr(expr, ctx)
380                {
381                    tool_args.positional.push(value);
382                }
383            }
384            Arg::Named { key, value } => {
385                if let Some(val) = eval_simple_expr(value, ctx) {
386                    tool_args.named.insert(key.clone(), val);
387                }
388            }
389            Arg::ShortFlag(name) => {
390                if past_double_dash {
391                    tool_args.positional.push(Value::String(format!("-{name}")));
392                } else if name.len() == 1 {
393                    // Single-char short flag: look up schema to check if it takes a value.
394                    // e.g., `-n 5` where `-n` is an alias for `lines` (type: int)
395                    let flag_name = name.as_str();
396                    let lookup = param_lookup.get(flag_name);
397                    let is_bool = lookup
398                        .map(|(_, typ)| is_bool_type(typ))
399                        .unwrap_or(true);
400
401                    if is_bool {
402                        tool_args.flags.insert(flag_name.to_string());
403                    } else {
404                        // Non-bool: consume next positional as value, insert under canonical name
405                        let canonical = lookup.map(|(name, _)| *name).unwrap_or(flag_name);
406                        let next_positional = positional_indices
407                            .iter()
408                            .find(|(idx, _)| *idx > i && !consumed_positionals.contains(idx));
409
410                        if let Some((pos_idx, expr)) = next_positional {
411                            if let Some(value) = eval_simple_expr(expr, ctx) {
412                                tool_args.named.insert(canonical.to_string(), value);
413                                consumed_positionals.insert(*pos_idx);
414                            } else {
415                                tool_args.flags.insert(flag_name.to_string());
416                            }
417                        } else {
418                            tool_args.flags.insert(flag_name.to_string());
419                        }
420                    }
421                } else {
422                    // Multi-char combined flags like -la: always boolean
423                    for c in name.chars() {
424                        tool_args.flags.insert(c.to_string());
425                    }
426                }
427            }
428            Arg::LongFlag(name) => {
429                if past_double_dash {
430                    tool_args.positional.push(Value::String(format!("--{name}")));
431                } else {
432                    // Look up type in schema (checks name and aliases)
433                    let lookup = param_lookup.get(name.as_str());
434                    let is_bool = lookup
435                        .map(|(_, typ)| is_bool_type(typ))
436                        .unwrap_or(true); // Unknown params default to bool
437
438                    if is_bool {
439                        tool_args.flags.insert(name.clone());
440                    } else {
441                        // Non-bool: consume next positional as value, insert under canonical name
442                        let canonical = lookup.map(|(name, _)| *name).unwrap_or(name.as_str());
443                        let next_positional = positional_indices
444                            .iter()
445                            .find(|(idx, _)| *idx > i && !consumed_positionals.contains(idx));
446
447                        if let Some((pos_idx, expr)) = next_positional {
448                            if let Some(value) = eval_simple_expr(expr, ctx) {
449                                tool_args.named.insert(canonical.to_string(), value);
450                                consumed_positionals.insert(*pos_idx);
451                            } else {
452                                tool_args.flags.insert(name.clone());
453                            }
454                        } else {
455                            tool_args.flags.insert(name.clone());
456                        }
457                    }
458                }
459            }
460        }
461        i += 1;
462    }
463
464    tool_args
465}
466
467/// Simple expression evaluation for args (without full scope access).
468fn eval_simple_expr(expr: &Expr, ctx: &ExecContext) -> Option<Value> {
469    match expr {
470        Expr::Literal(value) => Some(eval_literal(value, ctx)),
471        Expr::VarRef(path) => ctx.scope.resolve_path(path),
472        Expr::Interpolated(parts) => {
473            let mut result = String::new();
474            for part in parts {
475                match part {
476                    crate::ast::StringPart::Literal(s) => result.push_str(s),
477                    crate::ast::StringPart::Var(path) => {
478                        if let Some(value) = ctx.scope.resolve_path(path) {
479                            result.push_str(&value_to_string(&value));
480                        }
481                    }
482                    crate::ast::StringPart::VarWithDefault { name, default } => {
483                        match ctx.scope.get(name) {
484                            Some(value) => {
485                                let s = value_to_string(value);
486                                if s.is_empty() {
487                                    result.push_str(&eval_string_parts_sync(default, ctx));
488                                } else {
489                                    result.push_str(&s);
490                                }
491                            }
492                            None => result.push_str(&eval_string_parts_sync(default, ctx)),
493                        }
494                    }
495                    crate::ast::StringPart::VarLength(name) => {
496                        let len = match ctx.scope.get(name) {
497                            Some(value) => value_to_string(value).len(),
498                            None => 0,
499                        };
500                        result.push_str(&len.to_string());
501                    }
502                    crate::ast::StringPart::Positional(n) => {
503                        if let Some(s) = ctx.scope.get_positional(*n) {
504                            result.push_str(s);
505                        }
506                    }
507                    crate::ast::StringPart::AllArgs => {
508                        result.push_str(&ctx.scope.all_args().join(" "));
509                    }
510                    crate::ast::StringPart::ArgCount => {
511                        result.push_str(&ctx.scope.arg_count().to_string());
512                    }
513                    crate::ast::StringPart::Arithmetic(expr) => {
514                        // Evaluate arithmetic in pipeline context
515                        if let Ok(value) = arithmetic::eval_arithmetic(expr, &ctx.scope) {
516                            result.push_str(&value.to_string());
517                        }
518                    }
519                    crate::ast::StringPart::CommandSubst(_) => {
520                        // Command substitution requires async - skip in sync context
521                    }
522                    crate::ast::StringPart::LastExitCode => {
523                        result.push_str(&ctx.scope.last_result().code.to_string());
524                    }
525                    crate::ast::StringPart::CurrentPid => {
526                        result.push_str(&ctx.scope.pid().to_string());
527                    }
528                }
529            }
530            Some(Value::String(result))
531        }
532        _ => None, // Binary ops and command subst need more context
533    }
534}
535
536/// Evaluate a literal value.
537fn eval_literal(value: &Value, _ctx: &ExecContext) -> Value {
538    value.clone()
539}
540
541/// Convert a value to a string for interpolation.
542fn value_to_string(value: &Value) -> String {
543    match value {
544        Value::Null => "".to_string(),
545        Value::Bool(b) => b.to_string(),
546        Value::Int(i) => i.to_string(),
547        Value::Float(f) => f.to_string(),
548        Value::String(s) => s.clone(),
549        Value::Json(json) => json.to_string(),
550        Value::Blob(blob) => format!("[blob: {} {}]", blob.formatted_size(), blob.content_type),
551    }
552}
553
554/// Evaluate string parts synchronously (for pipeline context).
555/// Command substitutions are skipped as they require async.
556fn eval_string_parts_sync(parts: &[crate::ast::StringPart], ctx: &ExecContext) -> String {
557    let mut result = String::new();
558    for part in parts {
559        match part {
560            crate::ast::StringPart::Literal(s) => result.push_str(s),
561            crate::ast::StringPart::Var(path) => {
562                if let Some(value) = ctx.scope.resolve_path(path) {
563                    result.push_str(&value_to_string(&value));
564                }
565            }
566            crate::ast::StringPart::VarWithDefault { name, default } => {
567                match ctx.scope.get(name) {
568                    Some(value) => {
569                        let s = value_to_string(value);
570                        if s.is_empty() {
571                            result.push_str(&eval_string_parts_sync(default, ctx));
572                        } else {
573                            result.push_str(&s);
574                        }
575                    }
576                    None => result.push_str(&eval_string_parts_sync(default, ctx)),
577                }
578            }
579            crate::ast::StringPart::VarLength(name) => {
580                let len = match ctx.scope.get(name) {
581                    Some(value) => value_to_string(value).len(),
582                    None => 0,
583                };
584                result.push_str(&len.to_string());
585            }
586            crate::ast::StringPart::Positional(n) => {
587                if let Some(s) = ctx.scope.get_positional(*n) {
588                    result.push_str(s);
589                }
590            }
591            crate::ast::StringPart::AllArgs => {
592                result.push_str(&ctx.scope.all_args().join(" "));
593            }
594            crate::ast::StringPart::ArgCount => {
595                result.push_str(&ctx.scope.arg_count().to_string());
596            }
597            crate::ast::StringPart::Arithmetic(expr) => {
598                if let Ok(value) = arithmetic::eval_arithmetic(expr, &ctx.scope) {
599                    result.push_str(&value.to_string());
600                }
601            }
602            crate::ast::StringPart::CommandSubst(_) => {
603                // Command substitution requires async - skip in sync context
604            }
605            crate::ast::StringPart::LastExitCode => {
606                result.push_str(&ctx.scope.last_result().code.to_string());
607            }
608            crate::ast::StringPart::CurrentPid => {
609                result.push_str(&ctx.scope.pid().to_string());
610            }
611        }
612    }
613    result
614}
615
616/// Find scatter and gather commands in a pipeline.
617///
618/// Returns Some((scatter_index, gather_index)) if both are found with scatter before gather.
619/// Returns None if the pipeline doesn't have a valid scatter/gather pattern.
620fn find_scatter_gather(commands: &[Command]) -> Option<(usize, usize)> {
621    let scatter_idx = commands.iter().position(|c| c.name == "scatter")?;
622    let gather_idx = commands.iter().position(|c| c.name == "gather")?;
623
624    // Gather must come after scatter
625    if gather_idx > scatter_idx {
626        Some((scatter_idx, gather_idx))
627    } else {
628        None
629    }
630}
631
632#[cfg(test)]
633mod tests {
634    use super::*;
635    use crate::dispatch::BackendDispatcher;
636    use crate::tools::register_builtins;
637    use crate::vfs::{Filesystem, MemoryFs, VfsRouter};
638    use std::path::Path;
639
640    async fn make_runner_and_ctx() -> (PipelineRunner, ExecContext, BackendDispatcher) {
641        let mut tools = ToolRegistry::new();
642        register_builtins(&mut tools);
643        let tools = Arc::new(tools);
644        let runner = PipelineRunner::new(tools.clone());
645        let dispatcher = BackendDispatcher::new(tools.clone());
646
647        let mut vfs = VfsRouter::new();
648        let mem = MemoryFs::new();
649        mem.write(Path::new("test.txt"), b"hello\nworld\nfoo").await.unwrap();
650        vfs.mount("/", mem);
651        let ctx = ExecContext::with_vfs_and_tools(Arc::new(vfs), tools);
652
653        (runner, ctx, dispatcher)
654    }
655
656    fn make_cmd(name: &str, args: Vec<&str>) -> Command {
657        Command {
658            name: name.to_string(),
659            args: args.iter().map(|s| Arg::Positional(Expr::Literal(Value::String(s.to_string())))).collect(),
660            redirects: vec![],
661        }
662    }
663
664    #[tokio::test]
665    async fn test_single_command() {
666        let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
667        let cmd = make_cmd("echo", vec!["hello"]);
668
669        let result = runner.run(&[cmd], &mut ctx, &dispatcher).await;
670        assert!(result.ok());
671        assert_eq!(result.out.trim(), "hello");
672    }
673
674    #[tokio::test]
675    async fn test_pipeline_echo_grep() {
676        let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
677
678        // echo "hello\nworld" | grep pattern="world"
679        let echo_cmd = Command {
680            name: "echo".to_string(),
681            args: vec![Arg::Positional(Expr::Literal(Value::String("hello\nworld".to_string())))],
682            redirects: vec![],
683        };
684        let grep_cmd = Command {
685            name: "grep".to_string(),
686            args: vec![Arg::Positional(Expr::Literal(Value::String("world".to_string())))],
687            redirects: vec![],
688        };
689
690        let result = runner.run(&[echo_cmd, grep_cmd], &mut ctx, &dispatcher).await;
691        assert!(result.ok());
692        assert_eq!(result.out.trim(), "world");
693    }
694
695    #[tokio::test]
696    async fn test_pipeline_cat_grep() {
697        let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
698
699        // cat /test.txt | grep pattern="hello"
700        let cat_cmd = make_cmd("cat", vec!["/test.txt"]);
701        let grep_cmd = Command {
702            name: "grep".to_string(),
703            args: vec![Arg::Positional(Expr::Literal(Value::String("hello".to_string())))],
704            redirects: vec![],
705        };
706
707        let result = runner.run(&[cat_cmd, grep_cmd], &mut ctx, &dispatcher).await;
708        assert!(result.ok());
709        assert!(result.out.contains("hello"));
710    }
711
712    #[tokio::test]
713    async fn test_command_not_found() {
714        let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
715        let cmd = make_cmd("nonexistent", vec![]);
716
717        let result = runner.run(&[cmd], &mut ctx, &dispatcher).await;
718        assert!(!result.ok());
719        assert_eq!(result.code, 127);
720        assert!(result.err.contains("not found"));
721    }
722
723    #[tokio::test]
724    async fn test_pipeline_stops_on_failure() {
725        let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
726
727        // cat /nonexistent | grep "hello"
728        let cat_cmd = make_cmd("cat", vec!["/nonexistent"]);
729        let grep_cmd = Command {
730            name: "grep".to_string(),
731            args: vec![Arg::Positional(Expr::Literal(Value::String("hello".to_string())))],
732            redirects: vec![],
733        };
734
735        let result = runner.run(&[cat_cmd, grep_cmd], &mut ctx, &dispatcher).await;
736        assert!(!result.ok());
737    }
738
739    #[tokio::test]
740    async fn test_empty_pipeline() {
741        let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
742        let result = runner.run(&[], &mut ctx, &dispatcher).await;
743        assert!(result.ok());
744    }
745
746    // === Scatter/Gather Tests ===
747
748    #[test]
749    fn test_find_scatter_gather_both_present() {
750        let commands = vec![
751            make_cmd("echo", vec!["a"]),
752            make_cmd("scatter", vec![]),
753            make_cmd("process", vec![]),
754            make_cmd("gather", vec![]),
755        ];
756        let result = find_scatter_gather(&commands);
757        assert_eq!(result, Some((1, 3)));
758    }
759
760    #[test]
761    fn test_find_scatter_gather_no_scatter() {
762        let commands = vec![
763            make_cmd("echo", vec!["a"]),
764            make_cmd("gather", vec![]),
765        ];
766        let result = find_scatter_gather(&commands);
767        assert!(result.is_none());
768    }
769
770    #[test]
771    fn test_find_scatter_gather_no_gather() {
772        let commands = vec![
773            make_cmd("echo", vec!["a"]),
774            make_cmd("scatter", vec![]),
775        ];
776        let result = find_scatter_gather(&commands);
777        assert!(result.is_none());
778    }
779
780    #[test]
781    fn test_find_scatter_gather_wrong_order() {
782        let commands = vec![
783            make_cmd("gather", vec![]),
784            make_cmd("scatter", vec![]),
785        ];
786        let result = find_scatter_gather(&commands);
787        assert!(result.is_none());
788    }
789
790    #[tokio::test]
791    async fn test_scatter_gather_simple() {
792        let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
793
794        // echo "a\nb\nc" | scatter | echo ${ITEM} | gather
795        let echo_cmd = Command {
796            name: "echo".to_string(),
797            args: vec![Arg::Positional(Expr::Literal(Value::String("a\nb\nc".to_string())))],
798            redirects: vec![],
799        };
800        let scatter_cmd = make_cmd("scatter", vec![]);
801        let process_cmd = Command {
802            name: "echo".to_string(),
803            args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("ITEM")))],
804            redirects: vec![],
805        };
806        let gather_cmd = make_cmd("gather", vec![]);
807
808        let result = runner.run(&[echo_cmd, scatter_cmd, process_cmd, gather_cmd], &mut ctx, &dispatcher).await;
809        assert!(result.ok());
810        // Each echo should output the item
811        assert!(result.out.contains("a"));
812        assert!(result.out.contains("b"));
813        assert!(result.out.contains("c"));
814    }
815
816    #[tokio::test]
817    async fn test_scatter_gather_empty_input() {
818        let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
819
820        // echo "" | scatter | echo ${ITEM} | gather
821        let echo_cmd = Command {
822            name: "echo".to_string(),
823            args: vec![Arg::Positional(Expr::Literal(Value::String("".to_string())))],
824            redirects: vec![],
825        };
826        let scatter_cmd = make_cmd("scatter", vec![]);
827        let process_cmd = Command {
828            name: "echo".to_string(),
829            args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("ITEM")))],
830            redirects: vec![],
831        };
832        let gather_cmd = make_cmd("gather", vec![]);
833
834        let result = runner.run(&[echo_cmd, scatter_cmd, process_cmd, gather_cmd], &mut ctx, &dispatcher).await;
835        assert!(result.ok());
836        assert!(result.out.trim().is_empty());
837    }
838
839    #[tokio::test]
840    async fn test_scatter_gather_with_stdin() {
841        let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
842
843        // Set stdin directly and use scatter | echo | gather
844        ctx.set_stdin("x\ny\nz".to_string());
845
846        let scatter_cmd = make_cmd("scatter", vec![]);
847        let process_cmd = Command {
848            name: "echo".to_string(),
849            args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("ITEM")))],
850            redirects: vec![],
851        };
852        let gather_cmd = make_cmd("gather", vec![]);
853
854        let result = runner.run(&[scatter_cmd, process_cmd, gather_cmd], &mut ctx, &dispatcher).await;
855        assert!(result.ok());
856        assert!(result.out.contains("x"));
857        assert!(result.out.contains("y"));
858        assert!(result.out.contains("z"));
859    }
860
861    #[tokio::test]
862    async fn test_scatter_gather_json_input() {
863        let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
864
865        // JSON array input
866        ctx.set_stdin(r#"["one", "two", "three"]"#.to_string());
867
868        let scatter_cmd = make_cmd("scatter", vec![]);
869        let process_cmd = Command {
870            name: "echo".to_string(),
871            args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("ITEM")))],
872            redirects: vec![],
873        };
874        let gather_cmd = make_cmd("gather", vec![]);
875
876        let result = runner.run(&[scatter_cmd, process_cmd, gather_cmd], &mut ctx, &dispatcher).await;
877        assert!(result.ok());
878        assert!(result.out.contains("one"));
879        assert!(result.out.contains("two"));
880        assert!(result.out.contains("three"));
881    }
882
883    #[tokio::test]
884    async fn test_scatter_gather_with_post_gather() {
885        let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
886
887        // echo "a\nb" | scatter | echo ${ITEM} | gather | grep "a"
888        let echo_cmd = Command {
889            name: "echo".to_string(),
890            args: vec![Arg::Positional(Expr::Literal(Value::String("a\nb".to_string())))],
891            redirects: vec![],
892        };
893        let scatter_cmd = make_cmd("scatter", vec![]);
894        let process_cmd = Command {
895            name: "echo".to_string(),
896            args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("ITEM")))],
897            redirects: vec![],
898        };
899        let gather_cmd = make_cmd("gather", vec![]);
900        let grep_cmd = Command {
901            name: "grep".to_string(),
902            args: vec![Arg::Positional(Expr::Literal(Value::String("a".to_string())))],
903            redirects: vec![],
904        };
905
906        let result = runner.run(&[echo_cmd, scatter_cmd, process_cmd, gather_cmd, grep_cmd], &mut ctx, &dispatcher).await;
907        assert!(result.ok());
908        assert!(result.out.contains("a"));
909        assert!(!result.out.contains("b"));
910    }
911
912    #[tokio::test]
913    async fn test_scatter_custom_var_name() {
914        let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
915
916        ctx.set_stdin("test1\ntest2".to_string());
917
918        // scatter as=URL | echo ${URL} | gather
919        let scatter_cmd = Command {
920            name: "scatter".to_string(),
921            args: vec![Arg::Named {
922                key: "as".to_string(),
923                value: Expr::Literal(Value::String("URL".to_string())),
924            }],
925            redirects: vec![],
926        };
927        let process_cmd = Command {
928            name: "echo".to_string(),
929            args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("URL")))],
930            redirects: vec![],
931        };
932        let gather_cmd = make_cmd("gather", vec![]);
933
934        let result = runner.run(&[scatter_cmd, process_cmd, gather_cmd], &mut ctx, &dispatcher).await;
935        assert!(result.ok());
936        assert!(result.out.contains("test1"));
937        assert!(result.out.contains("test2"));
938    }
939
940    // === Backend Routing Tests ===
941
942    #[tokio::test]
943    async fn test_pipeline_routes_through_backend() {
944        use crate::backend::testing::MockBackend;
945        use std::sync::atomic::Ordering;
946
947        // Create mock backend
948        let (backend, call_count) = MockBackend::new();
949        let backend: std::sync::Arc<dyn crate::backend::KernelBackend> = std::sync::Arc::new(backend);
950
951        // Create context with mock backend
952        let mut ctx = crate::tools::ExecContext::with_backend(backend);
953
954        // BackendDispatcher routes through backend.call_tool()
955        let tools = std::sync::Arc::new(ToolRegistry::new());
956        let runner = PipelineRunner::new(tools.clone());
957        let dispatcher = BackendDispatcher::new(tools);
958
959        // Single command should route through backend
960        let cmd = make_cmd("test-tool", vec!["arg1"]);
961        let result = runner.run(&[cmd], &mut ctx, &dispatcher).await;
962
963        assert!(result.ok(), "Mock backend should return success");
964        assert_eq!(call_count.load(Ordering::SeqCst), 1, "call_tool should be invoked once");
965        assert!(result.out.contains("mock executed"), "Output should be from mock backend");
966    }
967
968    #[tokio::test]
969    async fn test_multi_command_pipeline_routes_through_backend() {
970        use crate::backend::testing::MockBackend;
971        use std::sync::atomic::Ordering;
972
973        let (backend, call_count) = MockBackend::new();
974        let backend: std::sync::Arc<dyn crate::backend::KernelBackend> = std::sync::Arc::new(backend);
975        let mut ctx = crate::tools::ExecContext::with_backend(backend);
976
977        let tools = std::sync::Arc::new(ToolRegistry::new());
978        let runner = PipelineRunner::new(tools.clone());
979        let dispatcher = BackendDispatcher::new(tools);
980
981        // Pipeline with 3 commands
982        let cmd1 = make_cmd("tool1", vec![]);
983        let cmd2 = make_cmd("tool2", vec![]);
984        let cmd3 = make_cmd("tool3", vec![]);
985
986        let result = runner.run(&[cmd1, cmd2, cmd3], &mut ctx, &dispatcher).await;
987
988        assert!(result.ok());
989        assert_eq!(call_count.load(Ordering::SeqCst), 3, "call_tool should be invoked for each command");
990    }
991
992    // === Schema-Aware Argument Parsing Tests ===
993
994    use crate::tools::{ParamSchema, ToolSchema};
995
996    fn make_test_schema() -> ToolSchema {
997        ToolSchema::new("test-tool", "A test tool for schema-aware parsing")
998            .param(ParamSchema::required("query", "string", "Search query"))
999            .param(ParamSchema::optional("limit", "int", Value::Int(10), "Max results"))
1000            .param(ParamSchema::optional("verbose", "bool", Value::Bool(false), "Verbose output"))
1001            .param(ParamSchema::optional("output", "string", Value::String("stdout".into()), "Output destination"))
1002    }
1003
1004    fn make_minimal_ctx() -> ExecContext {
1005        let mut vfs = VfsRouter::new();
1006        vfs.mount("/", MemoryFs::new());
1007        ExecContext::new(Arc::new(vfs))
1008    }
1009
1010    #[test]
1011    fn test_schema_aware_string_arg() {
1012        // --query "test" should become named: {"query": "test"}
1013        let args = vec![
1014            Arg::LongFlag("query".to_string()),
1015            Arg::Positional(Expr::Literal(Value::String("test".to_string()))),
1016        ];
1017        let schema = make_test_schema();
1018        let ctx = make_minimal_ctx();
1019
1020        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1021
1022        assert!(tool_args.flags.is_empty(), "No flags should be set");
1023        assert!(tool_args.positional.is_empty(), "No positionals - consumed by --query");
1024        assert_eq!(
1025            tool_args.named.get("query"),
1026            Some(&Value::String("test".to_string())),
1027            "--query should consume 'test' as its value"
1028        );
1029    }
1030
1031    #[test]
1032    fn test_schema_aware_bool_flag() {
1033        // --verbose should remain a flag since schema says bool
1034        let args = vec![
1035            Arg::LongFlag("verbose".to_string()),
1036        ];
1037        let schema = make_test_schema();
1038        let ctx = make_minimal_ctx();
1039
1040        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1041
1042        assert!(tool_args.flags.contains("verbose"), "--verbose should be a flag");
1043        assert!(tool_args.named.is_empty(), "No named args");
1044        assert!(tool_args.positional.is_empty(), "No positionals");
1045    }
1046
1047    #[test]
1048    fn test_schema_aware_mixed() {
1049        // mcp_tool file.txt --output out.txt --verbose
1050        // Should produce: positional: ["file.txt"], named: {"output": "out.txt"}, flags: {"verbose"}
1051        let args = vec![
1052            Arg::Positional(Expr::Literal(Value::String("file.txt".to_string()))),
1053            Arg::LongFlag("output".to_string()),
1054            Arg::Positional(Expr::Literal(Value::String("out.txt".to_string()))),
1055            Arg::LongFlag("verbose".to_string()),
1056        ];
1057        let schema = make_test_schema();
1058        let ctx = make_minimal_ctx();
1059
1060        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1061
1062        assert_eq!(tool_args.positional, vec![Value::String("file.txt".to_string())]);
1063        assert_eq!(
1064            tool_args.named.get("output"),
1065            Some(&Value::String("out.txt".to_string()))
1066        );
1067        assert!(tool_args.flags.contains("verbose"));
1068    }
1069
1070    #[test]
1071    fn test_schema_aware_multiple_string_args() {
1072        // --query "test" --output "result.json" --verbose --limit 5
1073        let args = vec![
1074            Arg::LongFlag("query".to_string()),
1075            Arg::Positional(Expr::Literal(Value::String("test".to_string()))),
1076            Arg::LongFlag("output".to_string()),
1077            Arg::Positional(Expr::Literal(Value::String("result.json".to_string()))),
1078            Arg::LongFlag("verbose".to_string()),
1079            Arg::LongFlag("limit".to_string()),
1080            Arg::Positional(Expr::Literal(Value::Int(5))),
1081        ];
1082        let schema = make_test_schema();
1083        let ctx = make_minimal_ctx();
1084
1085        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1086
1087        assert!(tool_args.positional.is_empty(), "All positionals consumed");
1088        assert_eq!(
1089            tool_args.named.get("query"),
1090            Some(&Value::String("test".to_string()))
1091        );
1092        assert_eq!(
1093            tool_args.named.get("output"),
1094            Some(&Value::String("result.json".to_string()))
1095        );
1096        assert_eq!(
1097            tool_args.named.get("limit"),
1098            Some(&Value::Int(5))
1099        );
1100        assert!(tool_args.flags.contains("verbose"));
1101    }
1102
1103    #[test]
1104    fn test_schema_aware_double_dash() {
1105        // --output out.txt -- --this-is-data
1106        // After --, everything is positional
1107        let args = vec![
1108            Arg::LongFlag("output".to_string()),
1109            Arg::Positional(Expr::Literal(Value::String("out.txt".to_string()))),
1110            Arg::DoubleDash,
1111            Arg::Positional(Expr::Literal(Value::String("--this-is-data".to_string()))),
1112        ];
1113        let schema = make_test_schema();
1114        let ctx = make_minimal_ctx();
1115
1116        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1117
1118        assert_eq!(
1119            tool_args.named.get("output"),
1120            Some(&Value::String("out.txt".to_string()))
1121        );
1122        // After --, the --this-is-data is treated as a positional (it's a Positional in the args)
1123        assert_eq!(
1124            tool_args.positional,
1125            vec![Value::String("--this-is-data".to_string())]
1126        );
1127    }
1128
1129    #[test]
1130    fn test_no_schema_fallback() {
1131        // Without schema, all --flags are treated as bool flags
1132        let args = vec![
1133            Arg::LongFlag("query".to_string()),
1134            Arg::Positional(Expr::Literal(Value::String("test".to_string()))),
1135        ];
1136        let ctx = make_minimal_ctx();
1137
1138        let tool_args = build_tool_args(&args, &ctx, None);
1139
1140        // Without schema, --query is a flag and "test" is a positional
1141        assert!(tool_args.flags.contains("query"), "--query should be a flag");
1142        assert_eq!(
1143            tool_args.positional,
1144            vec![Value::String("test".to_string())],
1145            "'test' should be a positional"
1146        );
1147    }
1148
1149    #[test]
1150    fn test_unknown_flag_in_schema() {
1151        // --unknown-flag value should treat --unknown-flag as bool (not in schema)
1152        let args = vec![
1153            Arg::LongFlag("unknown".to_string()),
1154            Arg::Positional(Expr::Literal(Value::String("value".to_string()))),
1155        ];
1156        let schema = make_test_schema();
1157        let ctx = make_minimal_ctx();
1158
1159        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1160
1161        // Unknown flag defaults to bool behavior
1162        assert!(tool_args.flags.contains("unknown"));
1163        assert_eq!(
1164            tool_args.positional,
1165            vec![Value::String("value".to_string())]
1166        );
1167    }
1168
1169    #[test]
1170    fn test_named_args_unchanged() {
1171        // key=value syntax should work regardless of schema
1172        let args = vec![
1173            Arg::Named {
1174                key: "query".to_string(),
1175                value: Expr::Literal(Value::String("test".to_string())),
1176            },
1177            Arg::LongFlag("verbose".to_string()),
1178        ];
1179        let schema = make_test_schema();
1180        let ctx = make_minimal_ctx();
1181
1182        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1183
1184        assert_eq!(
1185            tool_args.named.get("query"),
1186            Some(&Value::String("test".to_string()))
1187        );
1188        assert!(tool_args.flags.contains("verbose"));
1189    }
1190
1191    #[test]
1192    fn test_short_flags_unchanged() {
1193        // Short flags -la should expand regardless of schema
1194        let args = vec![
1195            Arg::ShortFlag("la".to_string()),
1196            Arg::Positional(Expr::Literal(Value::String("file.txt".to_string()))),
1197        ];
1198        let schema = make_test_schema();
1199        let ctx = make_minimal_ctx();
1200
1201        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1202
1203        assert!(tool_args.flags.contains("l"));
1204        assert!(tool_args.flags.contains("a"));
1205        assert_eq!(
1206            tool_args.positional,
1207            vec![Value::String("file.txt".to_string())]
1208        );
1209    }
1210
1211    #[test]
1212    fn test_flag_at_end_no_value() {
1213        // --output at end with no value available - treat as flag (lenient)
1214        let args = vec![
1215            Arg::Positional(Expr::Literal(Value::String("file.txt".to_string()))),
1216            Arg::LongFlag("output".to_string()),
1217        ];
1218        let schema = make_test_schema();
1219        let ctx = make_minimal_ctx();
1220
1221        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1222
1223        // output expects a value but none available after it, so it becomes a flag
1224        assert!(tool_args.flags.contains("output"));
1225        assert_eq!(
1226            tool_args.positional,
1227            vec![Value::String("file.txt".to_string())]
1228        );
1229    }
1230
1231    #[test]
1232    fn test_short_flag_with_alias_consumes_value() {
1233        // `-n 5` where `-n` is aliased to `lines` (type: int)
1234        // Should produce named: {"lines": 5}, not flags: {"n"} + positional: [5]
1235        let schema = ToolSchema::new("head", "Output first part of files")
1236            .param(ParamSchema::optional("lines", "int", Value::Int(10), "Number of lines")
1237                .with_aliases(["-n"]));
1238        let args = vec![
1239            Arg::ShortFlag("n".to_string()),
1240            Arg::Positional(Expr::Literal(Value::Int(5))),
1241            Arg::Positional(Expr::Literal(Value::String("/tmp/file.txt".to_string()))),
1242        ];
1243        let ctx = make_minimal_ctx();
1244
1245        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1246
1247        assert!(tool_args.flags.is_empty(), "no boolean flags: {:?}", tool_args.flags);
1248        assert_eq!(tool_args.named.get("lines"), Some(&Value::Int(5)), "should resolve alias to canonical name");
1249        assert_eq!(tool_args.positional, vec![Value::String("/tmp/file.txt".to_string())]);
1250    }
1251
1252    // === Redirect Execution Tests ===
1253
1254    #[tokio::test]
1255    async fn test_merge_stderr_redirect() {
1256        // Test that 2>&1 merges stderr into stdout
1257        let result = ExecResult::from_output(0, "stdout content", "stderr content");
1258
1259        let redirects = vec![Redirect {
1260            kind: RedirectKind::MergeStderr,
1261            target: Expr::Literal(Value::Null),
1262        }];
1263
1264        let ctx = make_minimal_ctx();
1265        let result = apply_redirects(result, &redirects, &ctx).await;
1266
1267        assert_eq!(result.out, "stdout contentstderr content");
1268        assert!(result.err.is_empty());
1269    }
1270
1271    #[tokio::test]
1272    async fn test_merge_stderr_with_empty_stderr() {
1273        // Test that 2>&1 handles empty stderr gracefully
1274        let result = ExecResult::from_output(0, "stdout only", "");
1275
1276        let redirects = vec![Redirect {
1277            kind: RedirectKind::MergeStderr,
1278            target: Expr::Literal(Value::Null),
1279        }];
1280
1281        let ctx = make_minimal_ctx();
1282        let result = apply_redirects(result, &redirects, &ctx).await;
1283
1284        assert_eq!(result.out, "stdout only");
1285        assert!(result.err.is_empty());
1286    }
1287
1288    #[tokio::test]
1289    async fn test_merge_stderr_order_matters() {
1290        // Test redirect ordering: 2>&1 > file means:
1291        // 1. First merge stderr into stdout
1292        // 2. Then write stdout to file (leaving both empty for piping)
1293        // This verifies left-to-right processing
1294        let result = ExecResult::from_output(0, "stdout\n", "stderr\n");
1295
1296        // Just 2>&1 - should merge
1297        let redirects = vec![Redirect {
1298            kind: RedirectKind::MergeStderr,
1299            target: Expr::Literal(Value::Null),
1300        }];
1301
1302        let ctx = make_minimal_ctx();
1303        let result = apply_redirects(result, &redirects, &ctx).await;
1304
1305        assert_eq!(result.out, "stdout\nstderr\n");
1306        assert!(result.err.is_empty());
1307    }
1308
1309    #[tokio::test]
1310    async fn test_redirect_with_command_execution() {
1311        let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1312
1313        // echo "hello" with 2>&1 redirect
1314        let cmd = Command {
1315            name: "echo".to_string(),
1316            args: vec![Arg::Positional(Expr::Literal(Value::String("hello".to_string())))],
1317            redirects: vec![Redirect {
1318                kind: RedirectKind::MergeStderr,
1319                target: Expr::Literal(Value::Null),
1320            }],
1321        };
1322
1323        let result = runner.run(&[cmd], &mut ctx, &dispatcher).await;
1324        assert!(result.ok());
1325        // echo produces no stderr, so this just validates the redirect doesn't break anything
1326        assert!(result.out.contains("hello"));
1327    }
1328
1329    #[tokio::test]
1330    async fn test_merge_stderr_in_pipeline() {
1331        let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1332
1333        // echo "output" 2>&1 | grep "output"
1334        // The 2>&1 should be applied to echo's result, then piped to grep
1335        let echo_cmd = Command {
1336            name: "echo".to_string(),
1337            args: vec![Arg::Positional(Expr::Literal(Value::String("output".to_string())))],
1338            redirects: vec![Redirect {
1339                kind: RedirectKind::MergeStderr,
1340                target: Expr::Literal(Value::Null),
1341            }],
1342        };
1343        let grep_cmd = Command {
1344            name: "grep".to_string(),
1345            args: vec![Arg::Positional(Expr::Literal(Value::String("output".to_string())))],
1346            redirects: vec![],
1347        };
1348
1349        let result = runner.run(&[echo_cmd, grep_cmd], &mut ctx, &dispatcher).await;
1350        assert!(result.ok(), "result failed: code={}, err={}", result.code, result.err);
1351        assert!(result.out.contains("output"));
1352    }
1353}