Skip to main content

kaish_kernel/scheduler/
pipeline.rs

1//! Pipeline execution for kaish.
2//!
3//! Executes a sequence of commands connected by pipes, where the stdout
4//! of each command becomes the stdin of the next.
5//!
6//! Also handles scatter/gather pipelines for parallel execution.
7
8use std::sync::Arc;
9
10use std::collections::HashMap;
11
12use crate::arithmetic;
13use crate::ast::{Arg, Command, Expr, Redirect, RedirectKind, Value};
14use crate::dispatch::{CommandDispatcher, PipelinePosition};
15use crate::interpreter::ExecResult;
16use crate::tools::{ExecContext, ToolArgs, ToolRegistry, ToolSchema};
17use tokio::io::AsyncWriteExt;
18
19use super::pipe_stream::pipe_stream_default;
20use super::scatter::{
21    parse_gather_options, parse_scatter_options, ScatterGatherRunner,
22};
23
24/// Apply redirects to an execution result.
25///
26/// Pre-execution redirects (Stdin, HereDoc) should be handled before calling.
27/// Post-execution redirects (stdout/stderr to file, merge) applied here.
28/// Redirects are processed left-to-right per POSIX.
29async fn apply_redirects(
30    mut result: ExecResult,
31    redirects: &[Redirect],
32    ctx: &ExecContext,
33) -> ExecResult {
34    // Defer materialization of OutputData → result.out to individual redirect
35    // handlers. File redirects (Overwrite/Append) can stream OutputData directly
36    // to disk via write_canonical(), avoiding OOM on large structured output.
37    // Merge redirects and the fallthrough path materialize on demand.
38    for redir in redirects {
39        match redir.kind {
40            RedirectKind::MergeStderr => {
41                // 2>&1 - append stderr to stdout
42                // Ensure output is materialized for merge
43                result.materialize();
44                if !result.err.is_empty() {
45                    let err = std::mem::take(&mut result.err);
46                    result.push_out(&err);
47                }
48            }
49            RedirectKind::MergeStdout => {
50                // 1>&2 or >&2 - append stdout to stderr
51                result.materialize();
52                if !result.text_out().is_empty() {
53                    let out = result.text_out().into_owned();
54                    result.err.push_str(&out);
55                    result.clear_out();
56                }
57            }
58            RedirectKind::StdoutOverwrite => {
59                if let Some(path) = eval_redirect_target(&redir.target, ctx) {
60                    // Stream OutputData directly to file if available
61                    if let Some(output) = result.take_output_for_stream() {
62                        let mut buf = Vec::new();
63                        if let Err(e) = output.write_canonical(&mut buf, None) {
64                            return ExecResult::failure(1, format!("redirect: {e}"));
65                        }
66                        if let Err(e) = redirect_write(ctx, &path, &buf).await {
67                            return ExecResult::failure(1, format!("redirect: {e}"));
68                        }
69                    } else {
70                        if let Err(e) = redirect_write(ctx, &path, result.text_out().as_bytes()).await {
71                            return ExecResult::failure(1, format!("redirect: {e}"));
72                        }
73                    }
74                    result.clear_out();
75                    result.set_output(None);
76                }
77            }
78            RedirectKind::StdoutAppend => {
79                if let Some(path) = eval_redirect_target(&redir.target, ctx) {
80                    // Stream OutputData directly if available
81                    if let Some(output) = result.take_output_for_stream() {
82                        let mut buf = Vec::new();
83                        if let Err(e) = output.write_canonical(&mut buf, None) {
84                            return ExecResult::failure(1, format!("redirect: {e}"));
85                        }
86                        if let Err(e) = redirect_append(ctx, &path, &buf).await {
87                            return ExecResult::failure(1, format!("redirect: {e}"));
88                        }
89                    } else {
90                        if let Err(e) = redirect_append(ctx, &path, result.text_out().as_bytes()).await {
91                            return ExecResult::failure(1, format!("redirect: {e}"));
92                        }
93                    }
94                    result.clear_out();
95                    result.set_output(None);
96                }
97            }
98            RedirectKind::Stderr => {
99                if let Some(path) = eval_redirect_target(&redir.target, ctx) {
100                    if let Err(e) = redirect_write(ctx, &path, result.err.as_bytes()).await {
101                        return ExecResult::failure(1, format!("redirect: {e}"));
102                    }
103                    result.err.clear();
104                }
105            }
106            RedirectKind::Both => {
107                if let Some(path) = eval_redirect_target(&redir.target, ctx) {
108                    let combined = format!("{}{}", result.text_out(), result.err);
109                    if let Err(e) = redirect_write(ctx, &path, combined.as_bytes()).await {
110                        return ExecResult::failure(1, format!("redirect: {e}"));
111                    }
112                    result.clear_out();
113                    result.set_output(None);
114                    result.err.clear();
115                }
116            }
117            // Pre-execution redirects - already handled before command execution
118            RedirectKind::Stdin | RedirectKind::HereDoc | RedirectKind::HereString => {}
119        }
120    }
121    // Materialize any remaining OutputData into result.out.
122    // Callers (accumulate_result, pipeline piping) expect .out to be populated
123    // after apply_redirects returns. File redirects above consume .output directly
124    // via streaming; this only fires when no redirect consumed it.
125    result.materialize();
126    result
127}
128
129/// Evaluate a redirect target expression to get the file path.
130fn eval_redirect_target(expr: &Expr, ctx: &ExecContext) -> Option<String> {
131    eval_simple_expr(expr, ctx).map(|v| value_to_string(&v))
132}
133
134/// Write data to a file via the VFS backend.
135async fn redirect_write(ctx: &ExecContext, path: &str, data: &[u8]) -> Result<(), String> {
136    use crate::backend::WriteMode;
137    let p = std::path::Path::new(path);
138    ctx.backend.write(p, data, WriteMode::Overwrite).await.map_err(|e| e.to_string())
139}
140
141/// Append data to a file via the VFS backend.
142async fn redirect_append(ctx: &ExecContext, path: &str, data: &[u8]) -> Result<(), String> {
143    let p = std::path::Path::new(path);
144    ctx.backend.append(p, data).await.map_err(|e| e.to_string())
145}
146
147/// Set up stdin from redirects (< file, <<heredoc).
148/// Called before command execution.
149fn setup_stdin_redirects(cmd: &Command, ctx: &mut ExecContext) {
150    for redir in &cmd.redirects {
151        match &redir.kind {
152            RedirectKind::Stdin => {
153                if let Some(path) = eval_redirect_target(&redir.target, ctx)
154                    && let Ok(content) = std::fs::read_to_string(&path) {
155                        ctx.set_stdin(content);
156                    }
157            }
158            RedirectKind::HereDoc => {
159                match &redir.target {
160                    Expr::Literal(Value::String(content)) => {
161                        ctx.set_stdin(content.clone());
162                    }
163                    expr => {
164                        if let Some(value) = eval_simple_expr(expr, ctx) {
165                            ctx.set_stdin(value_to_string(&value));
166                        }
167                    }
168                }
169            }
170            RedirectKind::HereString => {
171                // Per bash, here-strings append a trailing newline to the
172                // expanded word so the command receives a terminated line.
173                if let Some(value) = eval_simple_expr(&redir.target, ctx) {
174                    let mut s = value_to_string(&value);
175                    s.push('\n');
176                    ctx.set_stdin(s);
177                }
178            }
179            _ => {}
180        }
181    }
182}
183
184/// Runs pipelines by spawning tasks and connecting them via channels.
185#[derive(Clone)]
186pub struct PipelineRunner {
187    tools: Arc<ToolRegistry>,
188}
189
190impl PipelineRunner {
191    /// Create a new pipeline runner with the given tool registry.
192    pub fn new(tools: Arc<ToolRegistry>) -> Self {
193        Self { tools }
194    }
195
196    /// Execute a pipeline of commands.
197    ///
198    /// Each command's stdout becomes the next command's stdin.
199    /// If the pipeline contains scatter/gather, delegates to ScatterGatherRunner.
200    /// Returns the result of the last command in the pipeline.
201    ///
202    /// The `dispatcher` handles the full command resolution chain (user tools,
203    /// builtins, scripts, external commands, backend tools). The runner handles
204    /// I/O routing: stdin redirects, piping between commands, and output redirects.
205    #[tracing::instrument(level = "debug", skip(self, commands, ctx, dispatcher), fields(command_count = commands.len()))]
206    pub async fn run(
207        &self,
208        commands: &[Command],
209        ctx: &mut ExecContext,
210        dispatcher: &dyn CommandDispatcher,
211    ) -> ExecResult {
212        if commands.is_empty() {
213            return ExecResult::success("");
214        }
215
216        // Check for scatter/gather pipeline
217        if let Some((scatter_idx, gather_idx)) = find_scatter_gather(commands) {
218            return self.run_scatter_gather(commands, scatter_idx, gather_idx, ctx, dispatcher).await;
219        }
220
221        self.run_sequential(commands, ctx, dispatcher).await
222    }
223
224    /// Execute commands sequentially without scatter/gather detection.
225    ///
226    /// Used by `ScatterGatherRunner` for pre_scatter, post_gather, and parallel
227    /// workers. Breaks the async recursion chain (`run` → scatter → `run`).
228    #[tracing::instrument(level = "debug", skip(self, commands, ctx, dispatcher), fields(command_count = commands.len()))]
229    pub async fn run_sequential(
230        &self,
231        commands: &[Command],
232        ctx: &mut ExecContext,
233        dispatcher: &dyn CommandDispatcher,
234    ) -> ExecResult {
235        if commands.is_empty() {
236            return ExecResult::success("");
237        }
238
239        if commands.len() == 1 {
240            // Single command, no piping needed
241            return self.run_single(&commands[0], ctx, None, dispatcher).await;
242        }
243
244        // Multi-command pipeline
245        self.run_pipeline(commands, ctx, dispatcher).await
246    }
247
248    /// Run a scatter/gather pipeline.
249    async fn run_scatter_gather(
250        &self,
251        commands: &[Command],
252        scatter_idx: usize,
253        gather_idx: usize,
254        ctx: &mut ExecContext,
255        dispatcher: &dyn CommandDispatcher,
256    ) -> ExecResult {
257        // Split pipeline into parts
258        let pre_scatter = &commands[..scatter_idx];
259        let scatter_cmd = &commands[scatter_idx];
260        let parallel = &commands[scatter_idx + 1..gather_idx];
261        let gather_cmd = &commands[gather_idx];
262        let post_gather = &commands[gather_idx + 1..];
263
264        // Parse options from scatter and gather commands
265        // These are builtins with simple key=value syntax, no schema-driven parsing needed
266        let scatter_schema = self.tools.get("scatter").map(|t| t.schema());
267        let gather_schema = self.tools.get("gather").map(|t| t.schema());
268        let scatter_opts = parse_scatter_options(&build_tool_args(&scatter_cmd.args, ctx, scatter_schema.as_ref()));
269        let gather_opts = parse_gather_options(&build_tool_args(&gather_cmd.args, ctx, gather_schema.as_ref()));
270
271        // We need an `Arc<dyn CommandDispatcher>` to hand to `ScatterGatherRunner`.
272        // `fork_attached` produces a subkernel whose cancellation token is a
273        // child of the parent's, so a parent timeout/cancel cascades into
274        // the scatter pipeline (and into worker children via further forks).
275        let sequential_dispatcher: Arc<dyn CommandDispatcher> = dispatcher.fork_attached().await;
276
277        let runner = ScatterGatherRunner::new(self.tools.clone(), sequential_dispatcher);
278        runner
279            .run(
280                pre_scatter,
281                scatter_opts,
282                parallel,
283                gather_opts,
284                post_gather,
285                ctx,
286            )
287            .await
288    }
289
290    /// Run a single command with optional stdin.
291    ///
292    /// The dispatcher handles arg parsing, schema lookup, output format, and execution.
293    /// The runner handles stdin setup (redirects + pipeline) and output redirects.
294    #[tracing::instrument(level = "debug", skip(self, cmd, ctx, stdin, dispatcher), fields(command = %cmd.name))]
295    async fn run_single(
296        &self,
297        cmd: &Command,
298        ctx: &mut ExecContext,
299        stdin: Option<String>,
300        dispatcher: &dyn CommandDispatcher,
301    ) -> ExecResult {
302        // Set up stdin from redirects (< file, <<heredoc)
303        setup_stdin_redirects(cmd, ctx);
304
305        // Set stdin from pipeline (overrides redirect stdin)
306        if let Some(input) = stdin {
307            ctx.set_stdin(input);
308        }
309
310        // Set pipeline position for stdio inheritance decisions
311        ctx.pipeline_position = PipelinePosition::Only;
312
313        // Execute via dispatcher (full resolution chain)
314        let result = match dispatcher.dispatch(cmd, ctx).await {
315            Ok(result) => result,
316            Err(e) => ExecResult::failure(1, e.to_string()),
317        };
318
319        // Apply post-execution redirects
320        apply_redirects(result, &cmd.redirects, ctx).await
321    }
322
323    /// Run a multi-command pipeline concurrently.
324    ///
325    /// Each stage runs in its own tokio task, connected by bounded pipe streams
326    /// (64KB ring buffers with backpressure). This provides:
327    /// - Bounded memory usage (no buffering entire outputs)
328    /// - Backpressure (fast producers wait for slow consumers)
329    /// - Early termination (e.g., `seq 1 1000000 | head -n 5`)
330    ///
331    /// Structured data (`stdin_data`) is passed via oneshot channels alongside pipes.
332    #[tracing::instrument(level = "debug", skip(self, commands, ctx, dispatcher), fields(stage_count = commands.len()))]
333    async fn run_pipeline(
334        &self,
335        commands: &[Command],
336        ctx: &mut ExecContext,
337        dispatcher: &dyn CommandDispatcher,
338    ) -> ExecResult {
339        let stage_count = commands.len();
340        let last_idx = stage_count - 1;
341
342        // Create N-1 pipe pairs connecting adjacent stages
343        let mut pipe_writers: Vec<Option<super::pipe_stream::PipeWriter>> = Vec::new();
344        let mut pipe_readers: Vec<Option<super::pipe_stream::PipeReader>> = Vec::new();
345
346        for _ in 0..last_idx {
347            let (writer, reader) = pipe_stream_default();
348            pipe_writers.push(Some(writer));
349            pipe_readers.push(Some(reader));
350        }
351
352        // Create N-1 oneshot channels for structured data sideband
353        let mut data_senders: Vec<Option<tokio::sync::oneshot::Sender<Option<Value>>>> = Vec::new();
354        let mut data_receivers: Vec<Option<tokio::sync::oneshot::Receiver<Option<Value>>>> = Vec::new();
355
356        for _ in 0..last_idx {
357            let (tx, rx) = tokio::sync::oneshot::channel();
358            data_senders.push(Some(tx));
359            data_receivers.push(Some(rx));
360        }
361
362        let mut handles: Vec<tokio::task::JoinHandle<(ExecResult, ExecContext)>> = Vec::with_capacity(stage_count);
363
364        for (i, cmd) in commands.iter().enumerate() {
365            let mut stage_ctx = ctx.child_for_pipeline();
366            let cmd = cmd.clone();
367
368            // Fork attached: each concurrent pipeline stage needs independent
369            // mutable state, but cancellation should still cascade from the
370            // parent (so a request timeout kills externals running in any
371            // stage, not just the foreground one).
372            let task_dispatcher: Arc<dyn CommandDispatcher> = dispatcher.fork_attached().await;
373
374            // Set up stdin from redirects on the child context
375            setup_stdin_redirects(&cmd, &mut stage_ctx);
376
377            // Wire pipe_stdin: stage 0 gets parent stdin (if no redirect), others get pipe reader
378            if i == 0 {
379                // First stage inherits the parent's stdin, but only if redirects didn't
380                // already set stdin (e.g., heredoc). Don't overwrite redirect-provided stdin.
381                if stage_ctx.stdin.is_none() {
382                    stage_ctx.stdin = ctx.stdin.take();
383                }
384                if stage_ctx.stdin_data.is_none() {
385                    stage_ctx.stdin_data = ctx.stdin_data.take();
386                }
387            } else {
388                // Intermediate/last stages read from pipe
389                stage_ctx.pipe_stdin = pipe_readers[i - 1].take();
390                // Structured data received via oneshot (resolved at start of execution)
391            }
392
393            // Wire pipe_stdout: last stage writes to ExecResult, others write to pipe
394            if i < last_idx {
395                stage_ctx.pipe_stdout = pipe_writers[i].take();
396            }
397
398            // Set pipeline position
399            stage_ctx.pipeline_position = match i {
400                0 => PipelinePosition::First,
401                n if n == last_idx => PipelinePosition::Last,
402                _ => PipelinePosition::Middle,
403            };
404
405            let data_sender = if i < last_idx { data_senders[i].take() } else { None };
406            let data_receiver = if i > 0 { data_receivers[i - 1].take() } else { None };
407
408            let handle: tokio::task::JoinHandle<(ExecResult, ExecContext)> = tokio::spawn(async move {
409                // Receive structured data from previous stage (non-blocking).
410                // Using try_recv avoids a deadlock: streaming builtins (e.g. grep)
411                // write to their pipe_stdout during dispatch. If we blocked here
412                // waiting for the upstream's oneshot (sent after dispatch), the
413                // downstream couldn't start draining the pipe → circular wait.
414                // Builtins that use stdin_data (e.g. jq) fall back to pipe text.
415                if let Some(mut rx) = data_receiver {
416                    if let Ok(data) = rx.try_recv() {
417                        stage_ctx.stdin_data = data;
418                    }
419                    // Err → not ready yet; builtin will read from pipe text
420                }
421
422                // Execute the command
423                let mut result = match task_dispatcher.dispatch(&cmd, &mut stage_ctx).await {
424                    Ok(result) => result,
425                    Err(e) => ExecResult::failure(1, e.to_string()),
426                };
427
428                // Apply post-execution redirects
429                result = apply_redirects(result, &cmd.redirects, &stage_ctx).await;
430
431                // Flush buffered stderr to the kernel's stderr stream.
432                // This delivers error output from intermediate pipeline stages
433                // in real-time (via the kernel drain) instead of silently discarding it.
434                // Redirects like 2>&1 have already cleared result.err, so merged
435                // stderr goes through the pipe as expected.
436                if !result.err.is_empty() {
437                    if let Some(ref stderr) = stage_ctx.stderr {
438                        stderr.write_str(&result.err);
439                        result.err.clear();
440                    }
441                }
442
443                // Send structured data to next stage via oneshot BEFORE pipe write.
444                // The pipe write may block on backpressure (>64KB output), and the
445                // consumer awaits this oneshot before starting execution. Sending
446                // first prevents a circular wait (producer blocked on pipe write,
447                // consumer blocked on oneshot).
448                if let Some(tx) = data_sender {
449                    let _ = tx.send(result.data.clone());
450                }
451
452                // Write output to pipe for next stage (if not last).
453                // Consumer is now unblocked and can drain concurrently.
454                if let Some(mut pipe_out) = stage_ctx.pipe_stdout.take() {
455                    let text = result.text_out();
456                    if !text.is_empty() {
457                        // Write result to pipe; ignore broken pipe (reader dropped early)
458                        let _ = pipe_out.write_all(text.as_bytes()).await;
459                        let _ = pipe_out.shutdown().await;
460                    }
461                    // Drop pipe_out signals EOF to next stage's reader
462                }
463
464                (result, stage_ctx)
465            });
466
467            handles.push(handle);
468        }
469
470        // Await all stages and return last stage's result.
471        // Sync the last stage's scope back to the parent context so that
472        // variable assignments in the last pipeline stage are visible
473        // (e.g., `echo "Alice" | read NAME`).
474        let mut last_result = ExecResult::success("");
475        let mut panics: Vec<String> = Vec::new();
476        for (i, handle) in handles.into_iter().enumerate() {
477            match handle.await {
478                Ok((result, stage_ctx)) => {
479                    if i == last_idx {
480                        last_result = result;
481                        // Sync last stage's scope and cwd changes back
482                        ctx.scope = stage_ctx.scope;
483                        ctx.cwd = stage_ctx.cwd;
484                        ctx.prev_cwd = stage_ctx.prev_cwd;
485                        ctx.aliases = stage_ctx.aliases;
486                    }
487                }
488                Err(e) => {
489                    panics.push(format!("stage {}: {}", i, e));
490                }
491            }
492        }
493
494        if !panics.is_empty() {
495            last_result = ExecResult::failure(
496                1,
497                format!("pipeline stage(s) panicked: {}", panics.join("; ")),
498            );
499        }
500
501        last_result
502    }
503}
504
505/// Extract parameter types from a tool schema.
506///
507/// Returns a map from param name → param type (e.g., "verbose" → "bool", "output" → "string").
508/// Build a map from flag name → (canonical param name, param type).
509///
510/// Includes both primary names and aliases (with dashes stripped).
511/// For short flags like `-n` aliased to `lines`, maps `"n"` → `("lines", "int", 1)`.
512/// The third tuple slot is `consumes`: how many positionals the flag pulls
513/// per occurrence (1 for standard `--flag value`, 2 for jq's `--arg NAME VAL`).
514pub fn schema_param_lookup(schema: &ToolSchema) -> HashMap<String, (&str, &str, usize)> {
515    let mut map = HashMap::new();
516    for p in &schema.params {
517        map.insert(p.name.clone(), (p.name.as_str(), p.param_type.as_str(), p.consumes));
518        for alias in &p.aliases {
519            let stripped = alias.trim_start_matches('-');
520            map.insert(stripped.to_string(), (p.name.as_str(), p.param_type.as_str(), p.consumes));
521        }
522    }
523    map
524}
525
526/// Check if a type is considered boolean.
527pub fn is_bool_type(param_type: &str) -> bool {
528    matches!(param_type.to_lowercase().as_str(), "bool" | "boolean")
529}
530
531/// Build ToolArgs from AST Args, evaluating expressions.
532///
533/// If a schema is provided, uses it to determine argument types:
534/// - For `--flag` where schema says type is non-bool: consume next positional as value
535/// - For `--flag` where schema says type is bool (or unknown): treat as boolean flag
536///
537/// This enables natural shell syntax like `mcp_tool --query "test" --limit 10`.
538pub fn build_tool_args(args: &[Arg], ctx: &ExecContext, schema: Option<&ToolSchema>) -> ToolArgs {
539    let mut tool_args = ToolArgs::new();
540    let param_lookup = schema.map(schema_param_lookup).unwrap_or_default();
541
542    // Track which positional indices have been consumed as flag values
543    let mut consumed_positionals: std::collections::HashSet<usize> = std::collections::HashSet::new();
544    let mut past_double_dash = false;
545
546    // First pass: find positional args and their indices
547    let mut positional_indices: Vec<(usize, &Expr)> = Vec::new();
548    for (i, arg) in args.iter().enumerate() {
549        if let Arg::Positional(expr) = arg {
550            positional_indices.push((i, expr));
551        }
552    }
553
554    // Second pass: process all args
555    let mut i = 0;
556    while i < args.len() {
557        let arg = &args[i];
558
559        match arg {
560            Arg::DoubleDash => {
561                past_double_dash = true;
562            }
563            Arg::Positional(expr) => {
564                // Check if this positional was consumed by a preceding flag
565                if !consumed_positionals.contains(&i)
566                    && let Some(value) = eval_simple_expr(expr, ctx)
567                {
568                    tool_args.positional.push(value);
569                }
570            }
571            Arg::Named { key, value } => {
572                if let Some(val) = eval_simple_expr(value, ctx) {
573                    tool_args.named.insert(key.clone(), val);
574                }
575            }
576            Arg::ShortFlag(name) => {
577                if past_double_dash {
578                    tool_args.positional.push(Value::String(format!("-{name}")));
579                } else if name.len() == 1 {
580                    // Single-char short flag: look up schema to check if it takes a value.
581                    // e.g., `-n 5` where `-n` is an alias for `lines` (type: int)
582                    let flag_name = name.as_str();
583                    let lookup = param_lookup.get(flag_name);
584                    let is_bool = lookup
585                        .map(|(_, typ, _)| is_bool_type(typ))
586                        .unwrap_or(true);
587
588                    if is_bool {
589                        tool_args.flags.insert(flag_name.to_string());
590                    } else {
591                        // Non-bool: consume next positional as value, insert under canonical name
592                        let canonical = lookup.map(|(n, _, _)| *n).unwrap_or(flag_name);
593                        let next_positional = positional_indices
594                            .iter()
595                            .find(|(idx, _)| *idx > i && !consumed_positionals.contains(idx));
596
597                        if let Some((pos_idx, expr)) = next_positional {
598                            if let Some(value) = eval_simple_expr(expr, ctx) {
599                                tool_args.named.insert(canonical.to_string(), value);
600                                consumed_positionals.insert(*pos_idx);
601                            } else {
602                                tool_args.flags.insert(flag_name.to_string());
603                            }
604                        } else {
605                            tool_args.flags.insert(flag_name.to_string());
606                        }
607                    }
608                } else if let Some(&(canonical, typ, _)) = param_lookup.get(name.as_str()) {
609                    // Multi-char short flag matches a schema param (POSIX style: -name value)
610                    if is_bool_type(typ) {
611                        tool_args.flags.insert(canonical.to_string());
612                    } else {
613                        let next_positional = positional_indices
614                            .iter()
615                            .find(|(idx, _)| *idx > i && !consumed_positionals.contains(idx));
616                        if let Some((pos_idx, expr)) = next_positional {
617                            if let Some(value) = eval_simple_expr(expr, ctx) {
618                                tool_args.named.insert(canonical.to_string(), value);
619                                consumed_positionals.insert(*pos_idx);
620                            } else {
621                                tool_args.flags.insert(name.clone());
622                            }
623                        } else {
624                            tool_args.flags.insert(name.clone());
625                        }
626                    }
627                } else {
628                    // Multi-char combined flags like -la: always boolean
629                    for c in name.chars() {
630                        tool_args.flags.insert(c.to_string());
631                    }
632                }
633            }
634            Arg::LongFlag(name) => {
635                if past_double_dash {
636                    tool_args.positional.push(Value::String(format!("--{name}")));
637                } else {
638                    // Look up type in schema (checks name and aliases)
639                    let lookup = param_lookup.get(name.as_str());
640                    let is_bool = lookup
641                        .map(|(_, typ, _)| is_bool_type(typ))
642                        .unwrap_or(true); // Unknown params default to bool
643
644                    if is_bool {
645                        tool_args.flags.insert(name.clone());
646                    } else {
647                        // Non-bool: consume next positional as value, insert under canonical name
648                        // Note: the sync build_tool_args does NOT honor `consumes > 1`. The async
649                        // build_args_async in kernel.rs is the only path that supports multi-consume
650                        // flags. Sync callers (scheduler pipelines for --json-marker plumbing) don't
651                        // yet need that; if they ever do, lift the logic via a shared helper.
652                        let canonical = lookup.map(|(n, _, _)| *n).unwrap_or(name.as_str());
653                        let next_positional = positional_indices
654                            .iter()
655                            .find(|(idx, _)| *idx > i && !consumed_positionals.contains(idx));
656
657                        if let Some((pos_idx, expr)) = next_positional {
658                            if let Some(value) = eval_simple_expr(expr, ctx) {
659                                tool_args.named.insert(canonical.to_string(), value);
660                                consumed_positionals.insert(*pos_idx);
661                            } else {
662                                tool_args.flags.insert(name.clone());
663                            }
664                        } else {
665                            tool_args.flags.insert(name.clone());
666                        }
667                    }
668                }
669            }
670        }
671        i += 1;
672    }
673
674    // Map remaining positionals to unfilled non-bool schema params (in order).
675    // This enables `drift_push "abc" "hello"` → named["target_ctx"] = "abc", named["content"] = "hello"
676    // Positionals that appeared after `--` are never mapped (they're raw data).
677    // Only for MCP/external tools (map_positionals=true). Builtins handle their own positionals.
678    if let Some(schema) = schema.filter(|s| s.map_positionals) {
679        // Count how many positionals were added before `--`
680        let pre_dash_count = if past_double_dash {
681            // Find where the double-dash was in the original args to count pre-dash positionals
682            let dash_pos = args.iter().position(|a| matches!(a, Arg::DoubleDash)).unwrap_or(args.len());
683            // Count unconsumed positionals before the double-dash
684            positional_indices.iter()
685                .filter(|(idx, _)| *idx < dash_pos && !consumed_positionals.contains(idx))
686                .count()
687        } else {
688            tool_args.positional.len()
689        };
690
691        let mut remaining = Vec::new();
692        let mut positional_iter = tool_args.positional.drain(..).enumerate();
693
694        for param in &schema.params {
695            if tool_args.named.contains_key(&param.name) || tool_args.flags.contains(&param.name) {
696                continue; // Already filled by a flag or named arg
697            }
698            if is_bool_type(&param.param_type) {
699                continue; // Bool params should only be set by flags
700            }
701            // Take from pre-dash positionals only
702            loop {
703                match positional_iter.next() {
704                    Some((idx, val)) if idx < pre_dash_count => {
705                        tool_args.named.insert(param.name.clone(), val);
706                        break;
707                    }
708                    Some((_, val)) => {
709                        remaining.push(val); // Post-dash or past limit, keep as positional
710                    }
711                    None => break,
712                }
713            }
714        }
715
716        // Any leftover positionals stay positional (e.g. `cat file1 file2`)
717        remaining.extend(positional_iter.map(|(_, v)| v));
718        tool_args.positional = remaining;
719    }
720
721    tool_args
722}
723
724/// Simple expression evaluation for args (without full scope access).
725fn eval_simple_expr(expr: &Expr, ctx: &ExecContext) -> Option<Value> {
726    match expr {
727        Expr::Literal(value) => Some(eval_literal(value, ctx)),
728        Expr::VarRef(path) => ctx.scope.resolve_path(path),
729        Expr::Interpolated(parts) => {
730            let mut result = String::new();
731            for part in parts {
732                match part {
733                    crate::ast::StringPart::Literal(s) => result.push_str(s),
734                    crate::ast::StringPart::Var(path) => {
735                        if let Some(value) = ctx.scope.resolve_path(path) {
736                            result.push_str(&value_to_string(&value));
737                        }
738                    }
739                    crate::ast::StringPart::VarWithDefault { name, default } => {
740                        match ctx.scope.get(name) {
741                            Some(value) => {
742                                let s = value_to_string(value);
743                                if s.is_empty() {
744                                    result.push_str(&eval_string_parts_sync(default, ctx));
745                                } else {
746                                    result.push_str(&s);
747                                }
748                            }
749                            None => result.push_str(&eval_string_parts_sync(default, ctx)),
750                        }
751                    }
752                    crate::ast::StringPart::VarLength(name) => {
753                        let len = match ctx.scope.get(name) {
754                            Some(value) => value_to_string(value).len(),
755                            None => 0,
756                        };
757                        result.push_str(&len.to_string());
758                    }
759                    crate::ast::StringPart::Positional(n) => {
760                        if let Some(s) = ctx.scope.get_positional(*n) {
761                            result.push_str(s);
762                        }
763                    }
764                    crate::ast::StringPart::AllArgs => {
765                        result.push_str(&ctx.scope.all_args().join(" "));
766                    }
767                    crate::ast::StringPart::ArgCount => {
768                        result.push_str(&ctx.scope.arg_count().to_string());
769                    }
770                    crate::ast::StringPart::Arithmetic(expr) => {
771                        // Evaluate arithmetic in pipeline context
772                        if let Ok(value) = arithmetic::eval_arithmetic(expr, &ctx.scope) {
773                            result.push_str(&value.to_string());
774                        }
775                    }
776                    crate::ast::StringPart::CommandSubst(_) => {
777                        // Command substitution requires async - skip in sync context
778                    }
779                    crate::ast::StringPart::LastExitCode => {
780                        result.push_str(&ctx.scope.last_result().code.to_string());
781                    }
782                    crate::ast::StringPart::CurrentPid => {
783                        result.push_str(&ctx.scope.pid().to_string());
784                    }
785                }
786            }
787            Some(Value::String(result))
788        }
789        Expr::GlobPattern(s) => Some(Value::String(s.clone())),
790        Expr::HereDocBody { parts, strip_tabs } => {
791            // Heredoc body materialization for redirect targets. Reuses the
792            // shared sync part-walker; tab stripping is applied after the
793            // body is assembled, matching the interpreter's eval path.
794            let unwrapped: Vec<crate::ast::StringPart> =
795                parts.iter().map(|sp| sp.part.clone()).collect();
796            let raw = eval_string_parts_sync(&unwrapped, ctx);
797            let body = if *strip_tabs {
798                crate::interpreter::strip_leading_tabs(&raw)
799            } else {
800                raw
801            };
802            Some(Value::String(body))
803        }
804        _ => None, // Binary ops and command subst need more context
805    }
806}
807
808/// Evaluate a literal value.
809fn eval_literal(value: &Value, _ctx: &ExecContext) -> Value {
810    value.clone()
811}
812
813/// Convert a value to a string for interpolation.
814fn value_to_string(value: &Value) -> String {
815    match value {
816        Value::Null => "".to_string(),
817        Value::Bool(b) => b.to_string(),
818        Value::Int(i) => i.to_string(),
819        Value::Float(f) => f.to_string(),
820        Value::String(s) => s.clone(),
821        Value::Json(json) => json.to_string(),
822        Value::Blob(blob) => format!("[blob: {} {}]", blob.formatted_size(), blob.content_type),
823    }
824}
825
826/// Evaluate string parts synchronously (for pipeline context).
827/// Command substitutions are skipped as they require async.
828fn eval_string_parts_sync(parts: &[crate::ast::StringPart], ctx: &ExecContext) -> String {
829    let mut result = String::new();
830    for part in parts {
831        match part {
832            crate::ast::StringPart::Literal(s) => result.push_str(s),
833            crate::ast::StringPart::Var(path) => {
834                if let Some(value) = ctx.scope.resolve_path(path) {
835                    result.push_str(&value_to_string(&value));
836                }
837            }
838            crate::ast::StringPart::VarWithDefault { name, default } => {
839                match ctx.scope.get(name) {
840                    Some(value) => {
841                        let s = value_to_string(value);
842                        if s.is_empty() {
843                            result.push_str(&eval_string_parts_sync(default, ctx));
844                        } else {
845                            result.push_str(&s);
846                        }
847                    }
848                    None => result.push_str(&eval_string_parts_sync(default, ctx)),
849                }
850            }
851            crate::ast::StringPart::VarLength(name) => {
852                let len = match ctx.scope.get(name) {
853                    Some(value) => value_to_string(value).len(),
854                    None => 0,
855                };
856                result.push_str(&len.to_string());
857            }
858            crate::ast::StringPart::Positional(n) => {
859                if let Some(s) = ctx.scope.get_positional(*n) {
860                    result.push_str(s);
861                }
862            }
863            crate::ast::StringPart::AllArgs => {
864                result.push_str(&ctx.scope.all_args().join(" "));
865            }
866            crate::ast::StringPart::ArgCount => {
867                result.push_str(&ctx.scope.arg_count().to_string());
868            }
869            crate::ast::StringPart::Arithmetic(expr) => {
870                if let Ok(value) = arithmetic::eval_arithmetic(expr, &ctx.scope) {
871                    result.push_str(&value.to_string());
872                }
873            }
874            crate::ast::StringPart::CommandSubst(_) => {
875                // Command substitution requires async - skip in sync context
876            }
877            crate::ast::StringPart::LastExitCode => {
878                result.push_str(&ctx.scope.last_result().code.to_string());
879            }
880            crate::ast::StringPart::CurrentPid => {
881                result.push_str(&ctx.scope.pid().to_string());
882            }
883        }
884    }
885    result
886}
887
888/// Find scatter and gather commands in a pipeline.
889///
890/// Returns Some((scatter_index, gather_index)) if both are found with scatter before gather.
891/// Returns None if the pipeline doesn't have a valid scatter/gather pattern.
892fn find_scatter_gather(commands: &[Command]) -> Option<(usize, usize)> {
893    let scatter_idx = commands.iter().position(|c| c.name == "scatter")?;
894    let gather_idx = commands.iter().position(|c| c.name == "gather")?;
895
896    // Gather must come after scatter
897    if gather_idx > scatter_idx {
898        Some((scatter_idx, gather_idx))
899    } else {
900        None
901    }
902}
903
904#[cfg(test)]
905mod tests {
906    use super::*;
907    use crate::dispatch::BackendDispatcher;
908    use crate::tools::register_builtins;
909    use crate::vfs::{Filesystem, MemoryFs, VfsRouter};
910    use std::path::Path;
911
912    async fn make_runner_and_ctx() -> (PipelineRunner, ExecContext, BackendDispatcher) {
913        let mut tools = ToolRegistry::new();
914        register_builtins(&mut tools);
915        let tools = Arc::new(tools);
916        let runner = PipelineRunner::new(tools.clone());
917        let dispatcher = BackendDispatcher::new(tools.clone());
918
919        let mut vfs = VfsRouter::new();
920        let mem = MemoryFs::new();
921        mem.write(Path::new("test.txt"), b"hello\nworld\nfoo").await.unwrap();
922        vfs.mount("/", mem);
923        let ctx = ExecContext::with_vfs_and_tools(Arc::new(vfs), tools);
924
925        (runner, ctx, dispatcher)
926    }
927
928    fn make_cmd(name: &str, args: Vec<&str>) -> Command {
929        Command {
930            name: name.to_string(),
931            args: args.iter().map(|s| Arg::Positional(Expr::Literal(Value::String(s.to_string())))).collect(),
932            redirects: vec![],
933        }
934    }
935
936    #[tokio::test]
937    async fn test_single_command() {
938        let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
939        let cmd = make_cmd("echo", vec!["hello"]);
940
941        let result = runner.run(&[cmd], &mut ctx, &dispatcher).await;
942        assert!(result.ok());
943        assert_eq!(result.text_out().trim(), "hello");
944    }
945
946    #[tokio::test]
947    async fn test_pipeline_echo_grep() {
948        let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
949
950        // echo "hello\nworld" | grep pattern="world"
951        let echo_cmd = Command {
952            name: "echo".to_string(),
953            args: vec![Arg::Positional(Expr::Literal(Value::String("hello\nworld".to_string())))],
954            redirects: vec![],
955        };
956        let grep_cmd = Command {
957            name: "grep".to_string(),
958            args: vec![Arg::Positional(Expr::Literal(Value::String("world".to_string())))],
959            redirects: vec![],
960        };
961
962        let result = runner.run(&[echo_cmd, grep_cmd], &mut ctx, &dispatcher).await;
963        assert!(result.ok());
964        assert_eq!(result.text_out().trim(), "world");
965    }
966
967    #[tokio::test]
968    async fn test_pipeline_cat_grep() {
969        let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
970
971        // cat /test.txt | grep pattern="hello"
972        let cat_cmd = make_cmd("cat", vec!["/test.txt"]);
973        let grep_cmd = Command {
974            name: "grep".to_string(),
975            args: vec![Arg::Positional(Expr::Literal(Value::String("hello".to_string())))],
976            redirects: vec![],
977        };
978
979        let result = runner.run(&[cat_cmd, grep_cmd], &mut ctx, &dispatcher).await;
980        assert!(result.ok());
981        assert!(result.text_out().contains("hello"));
982    }
983
984    #[tokio::test]
985    async fn test_command_not_found() {
986        let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
987        let cmd = make_cmd("nonexistent", vec![]);
988
989        let result = runner.run(&[cmd], &mut ctx, &dispatcher).await;
990        assert!(!result.ok());
991        assert_eq!(result.code, 127);
992        assert!(result.err.contains("not found"));
993    }
994
995    #[tokio::test]
996    async fn test_pipeline_continues_on_failure() {
997        // Standard shell semantics: pipeline runs all commands,
998        // exit code comes from the last command
999        let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1000
1001        // cat /nonexistent | grep "hello"
1002        // cat fails but grep still runs (on empty input), grep returns 1 (no match)
1003        let cat_cmd = make_cmd("cat", vec!["/nonexistent"]);
1004        let grep_cmd = Command {
1005            name: "grep".to_string(),
1006            args: vec![Arg::Positional(Expr::Literal(Value::String("hello".to_string())))],
1007            redirects: vec![],
1008        };
1009
1010        let result = runner.run(&[cat_cmd, grep_cmd], &mut ctx, &dispatcher).await;
1011        // Exit code comes from last command (grep), not from cat
1012        assert!(!result.ok());
1013    }
1014
1015    #[tokio::test]
1016    async fn test_pipeline_last_command_exit_code() {
1017        // echo hello | cat — both succeed, pipeline succeeds
1018        let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1019
1020        let echo_cmd = make_cmd("echo", vec!["hello"]);
1021        let cat_cmd = make_cmd("cat", vec![]);
1022
1023        let result = runner.run(&[echo_cmd, cat_cmd], &mut ctx, &dispatcher).await;
1024        assert!(result.ok());
1025        assert!(result.text_out().contains("hello"));
1026    }
1027
1028    #[tokio::test]
1029    async fn test_empty_pipeline() {
1030        let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1031        let result = runner.run(&[], &mut ctx, &dispatcher).await;
1032        assert!(result.ok());
1033    }
1034
1035    // === Scatter/Gather Tests ===
1036
1037    #[test]
1038    fn test_find_scatter_gather_both_present() {
1039        let commands = vec![
1040            make_cmd("echo", vec!["a"]),
1041            make_cmd("scatter", vec![]),
1042            make_cmd("process", vec![]),
1043            make_cmd("gather", vec![]),
1044        ];
1045        let result = find_scatter_gather(&commands);
1046        assert_eq!(result, Some((1, 3)));
1047    }
1048
1049    #[test]
1050    fn test_find_scatter_gather_no_scatter() {
1051        let commands = vec![
1052            make_cmd("echo", vec!["a"]),
1053            make_cmd("gather", vec![]),
1054        ];
1055        let result = find_scatter_gather(&commands);
1056        assert!(result.is_none());
1057    }
1058
1059    #[test]
1060    fn test_find_scatter_gather_no_gather() {
1061        let commands = vec![
1062            make_cmd("echo", vec!["a"]),
1063            make_cmd("scatter", vec![]),
1064        ];
1065        let result = find_scatter_gather(&commands);
1066        assert!(result.is_none());
1067    }
1068
1069    #[test]
1070    fn test_find_scatter_gather_wrong_order() {
1071        let commands = vec![
1072            make_cmd("gather", vec![]),
1073            make_cmd("scatter", vec![]),
1074        ];
1075        let result = find_scatter_gather(&commands);
1076        assert!(result.is_none());
1077    }
1078
1079    #[tokio::test]
1080    async fn test_scatter_gather_simple() {
1081        let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1082
1083        // split "a b c" | scatter | echo ${ITEM} | gather
1084        let split_cmd = Command {
1085            name: "split".to_string(),
1086            args: vec![Arg::Positional(Expr::Literal(Value::String("a b c".to_string())))],
1087            redirects: vec![],
1088        };
1089        let scatter_cmd = make_cmd("scatter", vec![]);
1090        let process_cmd = Command {
1091            name: "echo".to_string(),
1092            args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("ITEM")))],
1093            redirects: vec![],
1094        };
1095        let gather_cmd = make_cmd("gather", vec![]);
1096
1097        let result = runner.run(&[split_cmd, scatter_cmd, process_cmd, gather_cmd], &mut ctx, &dispatcher).await;
1098        assert!(result.ok(), "scatter with structured data should succeed: {}", result.err);
1099        // Each echo should output the item
1100        assert!(result.text_out().contains("a"));
1101        assert!(result.text_out().contains("b"));
1102        assert!(result.text_out().contains("c"));
1103    }
1104
1105    #[tokio::test]
1106    async fn test_scatter_gather_empty_input() {
1107        let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1108
1109        // echo "" | scatter | echo ${ITEM} | gather
1110        let echo_cmd = Command {
1111            name: "echo".to_string(),
1112            args: vec![Arg::Positional(Expr::Literal(Value::String("".to_string())))],
1113            redirects: vec![],
1114        };
1115        let scatter_cmd = make_cmd("scatter", vec![]);
1116        let process_cmd = Command {
1117            name: "echo".to_string(),
1118            args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("ITEM")))],
1119            redirects: vec![],
1120        };
1121        let gather_cmd = make_cmd("gather", vec![]);
1122
1123        let result = runner.run(&[echo_cmd, scatter_cmd, process_cmd, gather_cmd], &mut ctx, &dispatcher).await;
1124        assert!(result.ok());
1125        assert!(result.text_out().trim().is_empty());
1126    }
1127
1128    #[tokio::test]
1129    async fn test_scatter_gather_with_structured_stdin() {
1130        let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1131
1132        // Set structured stdin data (as if piped from split/seq)
1133        let data = Value::Json(serde_json::json!(["x", "y", "z"]));
1134        ctx.set_stdin_with_data("x\ny\nz".to_string(), Some(data));
1135
1136        let scatter_cmd = make_cmd("scatter", vec![]);
1137        let process_cmd = Command {
1138            name: "echo".to_string(),
1139            args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("ITEM")))],
1140            redirects: vec![],
1141        };
1142        let gather_cmd = make_cmd("gather", vec![]);
1143
1144        let result = runner.run(&[scatter_cmd, process_cmd, gather_cmd], &mut ctx, &dispatcher).await;
1145        assert!(result.ok(), "scatter with structured stdin should succeed: {}", result.err);
1146        assert!(result.text_out().contains("x"));
1147        assert!(result.text_out().contains("y"));
1148        assert!(result.text_out().contains("z"));
1149    }
1150
1151    #[tokio::test]
1152    async fn test_scatter_gather_json_input() {
1153        let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1154
1155        // Structured JSON array input (as if from split/seq)
1156        let data = Value::Json(serde_json::json!(["one", "two", "three"]));
1157        ctx.set_stdin_with_data(r#"["one", "two", "three"]"#.to_string(), Some(data));
1158
1159        let scatter_cmd = make_cmd("scatter", vec![]);
1160        let process_cmd = Command {
1161            name: "echo".to_string(),
1162            args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("ITEM")))],
1163            redirects: vec![],
1164        };
1165        let gather_cmd = make_cmd("gather", vec![]);
1166
1167        let result = runner.run(&[scatter_cmd, process_cmd, gather_cmd], &mut ctx, &dispatcher).await;
1168        assert!(result.ok(), "scatter with JSON data should succeed: {}", result.err);
1169        assert!(result.text_out().contains("one"));
1170        assert!(result.text_out().contains("two"));
1171        assert!(result.text_out().contains("three"));
1172    }
1173
1174    #[tokio::test]
1175    async fn test_scatter_gather_with_post_gather() {
1176        let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1177
1178        // split "a b" | scatter | echo ${ITEM} | gather | grep "a"
1179        let split_cmd = Command {
1180            name: "split".to_string(),
1181            args: vec![Arg::Positional(Expr::Literal(Value::String("a b".to_string())))],
1182            redirects: vec![],
1183        };
1184        let scatter_cmd = make_cmd("scatter", vec![]);
1185        let process_cmd = Command {
1186            name: "echo".to_string(),
1187            args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("ITEM")))],
1188            redirects: vec![],
1189        };
1190        let gather_cmd = make_cmd("gather", vec![]);
1191        let grep_cmd = Command {
1192            name: "grep".to_string(),
1193            args: vec![Arg::Positional(Expr::Literal(Value::String("a".to_string())))],
1194            redirects: vec![],
1195        };
1196
1197        let result = runner.run(&[split_cmd, scatter_cmd, process_cmd, gather_cmd, grep_cmd], &mut ctx, &dispatcher).await;
1198        assert!(result.ok(), "scatter with post_gather should succeed: {}", result.err);
1199        assert!(result.text_out().contains("a"));
1200        assert!(!result.text_out().contains("b"));
1201    }
1202
1203    #[tokio::test]
1204    async fn test_scatter_custom_var_name() {
1205        let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1206
1207        // Provide structured data (as if from split/seq)
1208        let data = Value::Json(serde_json::json!(["test1", "test2"]));
1209        ctx.set_stdin_with_data("test1\ntest2".to_string(), Some(data));
1210
1211        // scatter as=URL | echo ${URL} | gather
1212        let scatter_cmd = Command {
1213            name: "scatter".to_string(),
1214            args: vec![Arg::Named {
1215                key: "as".to_string(),
1216                value: Expr::Literal(Value::String("URL".to_string())),
1217            }],
1218            redirects: vec![],
1219        };
1220        let process_cmd = Command {
1221            name: "echo".to_string(),
1222            args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("URL")))],
1223            redirects: vec![],
1224        };
1225        let gather_cmd = make_cmd("gather", vec![]);
1226
1227        let result = runner.run(&[scatter_cmd, process_cmd, gather_cmd], &mut ctx, &dispatcher).await;
1228        assert!(result.ok(), "scatter with custom var should succeed: {}", result.err);
1229        assert!(result.text_out().contains("test1"));
1230        assert!(result.text_out().contains("test2"));
1231    }
1232
1233    // === Backend Routing Tests ===
1234
1235    #[tokio::test]
1236    async fn test_pipeline_routes_through_backend() {
1237        use crate::backend::testing::MockBackend;
1238        use std::sync::atomic::Ordering;
1239
1240        // Create mock backend
1241        let (backend, call_count) = MockBackend::new();
1242        let backend: std::sync::Arc<dyn crate::backend::KernelBackend> = std::sync::Arc::new(backend);
1243
1244        // Create context with mock backend
1245        let mut ctx = crate::tools::ExecContext::with_backend(backend);
1246
1247        // BackendDispatcher routes through backend.call_tool()
1248        let tools = std::sync::Arc::new(ToolRegistry::new());
1249        let runner = PipelineRunner::new(tools.clone());
1250        let dispatcher = BackendDispatcher::new(tools);
1251
1252        // Single command should route through backend
1253        let cmd = make_cmd("test-tool", vec!["arg1"]);
1254        let result = runner.run(&[cmd], &mut ctx, &dispatcher).await;
1255
1256        assert!(result.ok(), "Mock backend should return success");
1257        assert_eq!(call_count.load(Ordering::SeqCst), 1, "call_tool should be invoked once");
1258        assert!(result.text_out().contains("mock executed"), "Output should be from mock backend");
1259    }
1260
1261    #[tokio::test]
1262    async fn test_multi_command_pipeline_routes_through_backend() {
1263        use crate::backend::testing::MockBackend;
1264        use std::sync::atomic::Ordering;
1265
1266        let (backend, call_count) = MockBackend::new();
1267        let backend: std::sync::Arc<dyn crate::backend::KernelBackend> = std::sync::Arc::new(backend);
1268        let mut ctx = crate::tools::ExecContext::with_backend(backend);
1269
1270        let tools = std::sync::Arc::new(ToolRegistry::new());
1271        let runner = PipelineRunner::new(tools.clone());
1272        let dispatcher = BackendDispatcher::new(tools);
1273
1274        // Pipeline with 3 commands
1275        let cmd1 = make_cmd("tool1", vec![]);
1276        let cmd2 = make_cmd("tool2", vec![]);
1277        let cmd3 = make_cmd("tool3", vec![]);
1278
1279        let result = runner.run(&[cmd1, cmd2, cmd3], &mut ctx, &dispatcher).await;
1280
1281        assert!(result.ok());
1282        assert_eq!(call_count.load(Ordering::SeqCst), 3, "call_tool should be invoked for each command");
1283    }
1284
1285    // === Schema-Aware Argument Parsing Tests ===
1286
1287    use crate::tools::{ParamSchema, ToolSchema};
1288
1289    fn make_test_schema() -> ToolSchema {
1290        ToolSchema::new("test-tool", "A test tool for schema-aware parsing")
1291            .param(ParamSchema::required("query", "string", "Search query"))
1292            .param(ParamSchema::optional("limit", "int", Value::Int(10), "Max results"))
1293            .param(ParamSchema::optional("verbose", "bool", Value::Bool(false), "Verbose output"))
1294            .param(ParamSchema::optional("output", "string", Value::String("stdout".into()), "Output destination"))
1295            .with_positional_mapping()
1296    }
1297
1298    fn make_minimal_ctx() -> ExecContext {
1299        let mut vfs = VfsRouter::new();
1300        vfs.mount("/", MemoryFs::new());
1301        ExecContext::new(Arc::new(vfs))
1302    }
1303
1304    #[test]
1305    fn test_schema_aware_string_arg() {
1306        // --query "test" should become named: {"query": "test"}
1307        let args = vec![
1308            Arg::LongFlag("query".to_string()),
1309            Arg::Positional(Expr::Literal(Value::String("test".to_string()))),
1310        ];
1311        let schema = make_test_schema();
1312        let ctx = make_minimal_ctx();
1313
1314        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1315
1316        assert!(tool_args.flags.is_empty(), "No flags should be set");
1317        assert!(tool_args.positional.is_empty(), "No positionals - consumed by --query");
1318        assert_eq!(
1319            tool_args.named.get("query"),
1320            Some(&Value::String("test".to_string())),
1321            "--query should consume 'test' as its value"
1322        );
1323    }
1324
1325    #[test]
1326    fn test_schema_aware_bool_flag() {
1327        // --verbose should remain a flag since schema says bool
1328        let args = vec![
1329            Arg::LongFlag("verbose".to_string()),
1330        ];
1331        let schema = make_test_schema();
1332        let ctx = make_minimal_ctx();
1333
1334        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1335
1336        assert!(tool_args.flags.contains("verbose"), "--verbose should be a flag");
1337        assert!(tool_args.named.is_empty(), "No named args");
1338        assert!(tool_args.positional.is_empty(), "No positionals");
1339    }
1340
1341    #[test]
1342    fn test_schema_aware_mixed() {
1343        // mcp_tool file.txt --output out.txt --verbose
1344        // file.txt maps to "query" (first unfilled non-bool schema param)
1345        let args = vec![
1346            Arg::Positional(Expr::Literal(Value::String("file.txt".to_string()))),
1347            Arg::LongFlag("output".to_string()),
1348            Arg::Positional(Expr::Literal(Value::String("out.txt".to_string()))),
1349            Arg::LongFlag("verbose".to_string()),
1350        ];
1351        let schema = make_test_schema();
1352        let ctx = make_minimal_ctx();
1353
1354        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1355
1356        assert!(tool_args.positional.is_empty(), "file.txt consumed as query param");
1357        assert_eq!(
1358            tool_args.named.get("query"),
1359            Some(&Value::String("file.txt".to_string()))
1360        );
1361        assert_eq!(
1362            tool_args.named.get("output"),
1363            Some(&Value::String("out.txt".to_string()))
1364        );
1365        assert!(tool_args.flags.contains("verbose"));
1366    }
1367
1368    #[test]
1369    fn test_schema_aware_multiple_string_args() {
1370        // --query "test" --output "result.json" --verbose --limit 5
1371        let args = vec![
1372            Arg::LongFlag("query".to_string()),
1373            Arg::Positional(Expr::Literal(Value::String("test".to_string()))),
1374            Arg::LongFlag("output".to_string()),
1375            Arg::Positional(Expr::Literal(Value::String("result.json".to_string()))),
1376            Arg::LongFlag("verbose".to_string()),
1377            Arg::LongFlag("limit".to_string()),
1378            Arg::Positional(Expr::Literal(Value::Int(5))),
1379        ];
1380        let schema = make_test_schema();
1381        let ctx = make_minimal_ctx();
1382
1383        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1384
1385        assert!(tool_args.positional.is_empty(), "All positionals consumed");
1386        assert_eq!(
1387            tool_args.named.get("query"),
1388            Some(&Value::String("test".to_string()))
1389        );
1390        assert_eq!(
1391            tool_args.named.get("output"),
1392            Some(&Value::String("result.json".to_string()))
1393        );
1394        assert_eq!(
1395            tool_args.named.get("limit"),
1396            Some(&Value::Int(5))
1397        );
1398        assert!(tool_args.flags.contains("verbose"));
1399    }
1400
1401    #[test]
1402    fn test_schema_aware_double_dash() {
1403        // --output out.txt -- --this-is-data
1404        // After --, everything is positional
1405        let args = vec![
1406            Arg::LongFlag("output".to_string()),
1407            Arg::Positional(Expr::Literal(Value::String("out.txt".to_string()))),
1408            Arg::DoubleDash,
1409            Arg::Positional(Expr::Literal(Value::String("--this-is-data".to_string()))),
1410        ];
1411        let schema = make_test_schema();
1412        let ctx = make_minimal_ctx();
1413
1414        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1415
1416        assert_eq!(
1417            tool_args.named.get("output"),
1418            Some(&Value::String("out.txt".to_string()))
1419        );
1420        // After --, the --this-is-data is treated as a positional (it's a Positional in the args)
1421        assert_eq!(
1422            tool_args.positional,
1423            vec![Value::String("--this-is-data".to_string())]
1424        );
1425    }
1426
1427    #[test]
1428    fn test_no_schema_fallback() {
1429        // Without schema, all --flags are treated as bool flags
1430        let args = vec![
1431            Arg::LongFlag("query".to_string()),
1432            Arg::Positional(Expr::Literal(Value::String("test".to_string()))),
1433        ];
1434        let ctx = make_minimal_ctx();
1435
1436        let tool_args = build_tool_args(&args, &ctx, None);
1437
1438        // Without schema, --query is a flag and "test" is a positional
1439        assert!(tool_args.flags.contains("query"), "--query should be a flag");
1440        assert_eq!(
1441            tool_args.positional,
1442            vec![Value::String("test".to_string())],
1443            "'test' should be a positional"
1444        );
1445    }
1446
1447    #[test]
1448    fn test_unknown_flag_in_schema() {
1449        // --unknown-flag value: --unknown is bool (not in schema), "value" maps to query
1450        let args = vec![
1451            Arg::LongFlag("unknown".to_string()),
1452            Arg::Positional(Expr::Literal(Value::String("value".to_string()))),
1453        ];
1454        let schema = make_test_schema();
1455        let ctx = make_minimal_ctx();
1456
1457        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1458
1459        assert!(tool_args.flags.contains("unknown"));
1460        assert!(tool_args.positional.is_empty(), "value consumed as query param");
1461        assert_eq!(
1462            tool_args.named.get("query"),
1463            Some(&Value::String("value".to_string()))
1464        );
1465    }
1466
1467    #[test]
1468    fn test_named_args_unchanged() {
1469        // key=value syntax should work regardless of schema
1470        let args = vec![
1471            Arg::Named {
1472                key: "query".to_string(),
1473                value: Expr::Literal(Value::String("test".to_string())),
1474            },
1475            Arg::LongFlag("verbose".to_string()),
1476        ];
1477        let schema = make_test_schema();
1478        let ctx = make_minimal_ctx();
1479
1480        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1481
1482        assert_eq!(
1483            tool_args.named.get("query"),
1484            Some(&Value::String("test".to_string()))
1485        );
1486        assert!(tool_args.flags.contains("verbose"));
1487    }
1488
1489    #[test]
1490    fn test_short_flags_unchanged() {
1491        // Short flags -la should expand regardless of schema; file.txt maps to query
1492        let args = vec![
1493            Arg::ShortFlag("la".to_string()),
1494            Arg::Positional(Expr::Literal(Value::String("file.txt".to_string()))),
1495        ];
1496        let schema = make_test_schema();
1497        let ctx = make_minimal_ctx();
1498
1499        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1500
1501        assert!(tool_args.flags.contains("l"));
1502        assert!(tool_args.flags.contains("a"));
1503        assert!(tool_args.positional.is_empty(), "file.txt consumed as query param");
1504        assert_eq!(
1505            tool_args.named.get("query"),
1506            Some(&Value::String("file.txt".to_string()))
1507        );
1508    }
1509
1510    #[test]
1511    fn test_flag_at_end_no_value() {
1512        // --output at end with no value available - treat as flag (lenient)
1513        // file.txt maps to query (first unfilled non-bool param)
1514        let args = vec![
1515            Arg::Positional(Expr::Literal(Value::String("file.txt".to_string()))),
1516            Arg::LongFlag("output".to_string()),
1517        ];
1518        let schema = make_test_schema();
1519        let ctx = make_minimal_ctx();
1520
1521        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1522
1523        // output expects a value but none available after it, so it becomes a flag
1524        assert!(tool_args.flags.contains("output"));
1525        assert!(tool_args.positional.is_empty(), "file.txt consumed as query param");
1526        assert_eq!(
1527            tool_args.named.get("query"),
1528            Some(&Value::String("file.txt".to_string()))
1529        );
1530    }
1531
1532    #[test]
1533    fn test_positional_skips_bool_params() {
1534        // Schema: [query: string, verbose: bool, output: string]
1535        // Args: "val1" "val2"
1536        // Expected: query="val1", verbose unset, output="val2"
1537        let schema = ToolSchema::new("test", "")
1538            .param(ParamSchema::required("query", "string", ""))
1539            .param(ParamSchema::optional(
1540                "verbose",
1541                "bool",
1542                Value::Bool(false),
1543                "",
1544            ))
1545            .param(ParamSchema::optional(
1546                "output",
1547                "string",
1548                Value::Null,
1549                "",
1550            ))
1551            .with_positional_mapping();
1552        let args = vec![
1553            Arg::Positional(Expr::Literal(Value::String("val1".to_string()))),
1554            Arg::Positional(Expr::Literal(Value::String("val2".to_string()))),
1555        ];
1556        let ctx = make_minimal_ctx();
1557
1558        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1559
1560        assert_eq!(
1561            tool_args.named.get("query"),
1562            Some(&Value::String("val1".to_string()))
1563        );
1564        assert_eq!(
1565            tool_args.named.get("output"),
1566            Some(&Value::String("val2".to_string()))
1567        );
1568        assert!(!tool_args.flags.contains("verbose"));
1569        assert!(tool_args.positional.is_empty());
1570    }
1571
1572    #[test]
1573    fn test_positionals_fill_available_slots() {
1574        // Schema has query (string), limit (int), verbose (bool), output (string).
1575        // Three positionals fill the 3 non-bool slots.
1576        let args = vec![
1577            Arg::Positional(Expr::Literal(Value::String("val1".to_string()))),
1578            Arg::Positional(Expr::Literal(Value::String("val2".to_string()))),
1579            Arg::Positional(Expr::Literal(Value::String("val3".to_string()))),
1580        ];
1581        let schema = make_test_schema(); // query, limit(int), verbose(bool), output
1582        let ctx = make_minimal_ctx();
1583
1584        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1585
1586        // val1 → query, val2 → limit (int param but receives string — tool decides),
1587        // val3 → output
1588        assert_eq!(
1589            tool_args.named.get("query"),
1590            Some(&Value::String("val1".to_string()))
1591        );
1592        assert_eq!(
1593            tool_args.named.get("limit"),
1594            Some(&Value::String("val2".to_string()))
1595        );
1596        assert_eq!(
1597            tool_args.named.get("output"),
1598            Some(&Value::String("val3".to_string()))
1599        );
1600        assert!(tool_args.positional.is_empty());
1601    }
1602
1603    #[test]
1604    fn test_truly_excess_positionals() {
1605        // More positionals than non-bool schema params — leftovers stay positional
1606        let schema = ToolSchema::new("test", "")
1607            .param(ParamSchema::required("name", "string", ""))
1608            .with_positional_mapping();
1609        let args = vec![
1610            Arg::Positional(Expr::Literal(Value::String("first".to_string()))),
1611            Arg::Positional(Expr::Literal(Value::String("second".to_string()))),
1612            Arg::Positional(Expr::Literal(Value::String("third".to_string()))),
1613        ];
1614        let ctx = make_minimal_ctx();
1615
1616        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1617
1618        assert_eq!(
1619            tool_args.named.get("name"),
1620            Some(&Value::String("first".to_string()))
1621        );
1622        assert_eq!(
1623            tool_args.positional,
1624            vec![
1625                Value::String("second".to_string()),
1626                Value::String("third".to_string()),
1627            ]
1628        );
1629    }
1630
1631    #[test]
1632    fn test_double_dash_positional_not_mapped() {
1633        // `tool val1 -- val2` — val1 maps to query, val2 stays positional (post-dash)
1634        let args = vec![
1635            Arg::Positional(Expr::Literal(Value::String("val1".to_string()))),
1636            Arg::DoubleDash,
1637            Arg::Positional(Expr::Literal(Value::String("val2".to_string()))),
1638        ];
1639        let schema = make_test_schema();
1640        let ctx = make_minimal_ctx();
1641
1642        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1643
1644        assert_eq!(
1645            tool_args.named.get("query"),
1646            Some(&Value::String("val1".to_string()))
1647        );
1648        // val2 is after --, should NOT be mapped even though schema has unfilled params
1649        assert_eq!(
1650            tool_args.positional,
1651            vec![Value::String("val2".to_string())]
1652        );
1653    }
1654
1655    #[test]
1656    fn test_all_params_filled_by_flags() {
1657        // All schema params satisfied by explicit flags — no positional mapping needed
1658        let args = vec![
1659            Arg::LongFlag("query".to_string()),
1660            Arg::Positional(Expr::Literal(Value::String("search".to_string()))),
1661            Arg::LongFlag("output".to_string()),
1662            Arg::Positional(Expr::Literal(Value::String("out.txt".to_string()))),
1663            Arg::LongFlag("verbose".to_string()),
1664        ];
1665        let schema = make_test_schema();
1666        let ctx = make_minimal_ctx();
1667
1668        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1669
1670        assert_eq!(
1671            tool_args.named.get("query"),
1672            Some(&Value::String("search".to_string()))
1673        );
1674        assert_eq!(
1675            tool_args.named.get("output"),
1676            Some(&Value::String("out.txt".to_string()))
1677        );
1678        assert!(tool_args.flags.contains("verbose"));
1679        assert!(tool_args.positional.is_empty());
1680    }
1681
1682    #[test]
1683    fn test_mixed_flags_and_positional_fill() {
1684        // --output foo val1 — output is explicit, val1 maps to query
1685        let args = vec![
1686            Arg::LongFlag("output".to_string()),
1687            Arg::Positional(Expr::Literal(Value::String("foo".to_string()))),
1688            Arg::Positional(Expr::Literal(Value::String("val1".to_string()))),
1689        ];
1690        let schema = make_test_schema();
1691        let ctx = make_minimal_ctx();
1692
1693        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1694
1695        assert_eq!(
1696            tool_args.named.get("output"),
1697            Some(&Value::String("foo".to_string()))
1698        );
1699        assert_eq!(
1700            tool_args.named.get("query"),
1701            Some(&Value::String("val1".to_string()))
1702        );
1703        assert!(tool_args.positional.is_empty());
1704    }
1705
1706    #[test]
1707    fn test_alias_flag_prevents_mapping_overwrite() {
1708        // -q "search" "out.txt" — -q is alias for query, so out.txt should map to output
1709        let schema = ToolSchema::new("test", "")
1710            .param(ParamSchema::required("query", "string", "").with_aliases(["-q"]))
1711            .param(ParamSchema::required("output", "string", ""))
1712            .with_positional_mapping();
1713        let args = vec![
1714            Arg::ShortFlag("q".to_string()),
1715            Arg::Positional(Expr::Literal(Value::String("search".to_string()))),
1716            Arg::Positional(Expr::Literal(Value::String("out.txt".to_string()))),
1717        ];
1718        let ctx = make_minimal_ctx();
1719
1720        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1721
1722        assert_eq!(
1723            tool_args.named.get("query"),
1724            Some(&Value::String("search".to_string()))
1725        );
1726        assert_eq!(
1727            tool_args.named.get("output"),
1728            Some(&Value::String("out.txt".to_string()))
1729        );
1730        assert!(tool_args.positional.is_empty());
1731    }
1732
1733    #[test]
1734    fn test_builtin_schema_no_positional_mapping() {
1735        // Builtins have map_positionals=false — positionals stay positional
1736        let schema = ToolSchema::new("echo", "")
1737            .param(ParamSchema::optional("args", "any", Value::Null, ""))
1738            .param(ParamSchema::optional("no_newline", "bool", Value::Bool(false), ""));
1739        // Note: no .with_positional_mapping() — this is a builtin
1740        let args = vec![
1741            Arg::Positional(Expr::Literal(Value::String("hello".to_string()))),
1742            Arg::Positional(Expr::Literal(Value::String("world".to_string()))),
1743        ];
1744        let ctx = make_minimal_ctx();
1745
1746        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1747
1748        // Positionals should NOT be consumed as named params
1749        assert_eq!(
1750            tool_args.positional,
1751            vec![
1752                Value::String("hello".to_string()),
1753                Value::String("world".to_string()),
1754            ]
1755        );
1756        assert!(tool_args.named.get("args").is_none());
1757    }
1758
1759    #[test]
1760    fn test_short_flag_with_alias_consumes_value() {
1761        // `-n 5` where `-n` is aliased to `lines` (type: int)
1762        // Should produce named: {"lines": 5}, not flags: {"n"} + positional: [5]
1763        let schema = ToolSchema::new("head", "Output first part of files")
1764            .param(ParamSchema::optional("lines", "int", Value::Int(10), "Number of lines")
1765                .with_aliases(["-n"]));
1766        let args = vec![
1767            Arg::ShortFlag("n".to_string()),
1768            Arg::Positional(Expr::Literal(Value::Int(5))),
1769            Arg::Positional(Expr::Literal(Value::String("/tmp/file.txt".to_string()))),
1770        ];
1771        let ctx = make_minimal_ctx();
1772
1773        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1774
1775        assert!(tool_args.flags.is_empty(), "no boolean flags: {:?}", tool_args.flags);
1776        assert_eq!(tool_args.named.get("lines"), Some(&Value::Int(5)), "should resolve alias to canonical name");
1777        assert_eq!(tool_args.positional, vec![Value::String("/tmp/file.txt".to_string())]);
1778    }
1779
1780    // === Redirect Execution Tests ===
1781
1782    #[tokio::test]
1783    async fn test_merge_stderr_redirect() {
1784        // Test that 2>&1 merges stderr into stdout
1785        let result = ExecResult::from_output(0, "stdout content", "stderr content");
1786
1787        let redirects = vec![Redirect {
1788            kind: RedirectKind::MergeStderr,
1789            target: Expr::Literal(Value::Null),
1790        }];
1791
1792        let ctx = make_minimal_ctx();
1793        let result = apply_redirects(result, &redirects, &ctx).await;
1794
1795        assert_eq!(&*result.text_out(), "stdout contentstderr content");
1796        assert!(result.err.is_empty());
1797    }
1798
1799    #[tokio::test]
1800    async fn test_merge_stderr_with_empty_stderr() {
1801        // Test that 2>&1 handles empty stderr gracefully
1802        let result = ExecResult::from_output(0, "stdout only", "");
1803
1804        let redirects = vec![Redirect {
1805            kind: RedirectKind::MergeStderr,
1806            target: Expr::Literal(Value::Null),
1807        }];
1808
1809        let ctx = make_minimal_ctx();
1810        let result = apply_redirects(result, &redirects, &ctx).await;
1811
1812        assert_eq!(&*result.text_out(), "stdout only");
1813        assert!(result.err.is_empty());
1814    }
1815
1816    #[tokio::test]
1817    async fn test_merge_stderr_order_matters() {
1818        // Test redirect ordering: 2>&1 > file means:
1819        // 1. First merge stderr into stdout
1820        // 2. Then write stdout to file (leaving both empty for piping)
1821        // This verifies left-to-right processing
1822        let result = ExecResult::from_output(0, "stdout\n", "stderr\n");
1823
1824        // Just 2>&1 - should merge
1825        let redirects = vec![Redirect {
1826            kind: RedirectKind::MergeStderr,
1827            target: Expr::Literal(Value::Null),
1828        }];
1829
1830        let ctx = make_minimal_ctx();
1831        let result = apply_redirects(result, &redirects, &ctx).await;
1832
1833        assert_eq!(&*result.text_out(), "stdout\nstderr\n");
1834        assert!(result.err.is_empty());
1835    }
1836
1837    #[tokio::test]
1838    async fn test_redirect_with_command_execution() {
1839        let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1840
1841        // echo "hello" with 2>&1 redirect
1842        let cmd = Command {
1843            name: "echo".to_string(),
1844            args: vec![Arg::Positional(Expr::Literal(Value::String("hello".to_string())))],
1845            redirects: vec![Redirect {
1846                kind: RedirectKind::MergeStderr,
1847                target: Expr::Literal(Value::Null),
1848            }],
1849        };
1850
1851        let result = runner.run(&[cmd], &mut ctx, &dispatcher).await;
1852        assert!(result.ok());
1853        // echo produces no stderr, so this just validates the redirect doesn't break anything
1854        assert!(result.text_out().contains("hello"));
1855    }
1856
1857    #[tokio::test]
1858    async fn test_merge_stderr_in_pipeline() {
1859        let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1860
1861        // echo "output" 2>&1 | grep "output"
1862        // The 2>&1 should be applied to echo's result, then piped to grep
1863        let echo_cmd = Command {
1864            name: "echo".to_string(),
1865            args: vec![Arg::Positional(Expr::Literal(Value::String("output".to_string())))],
1866            redirects: vec![Redirect {
1867                kind: RedirectKind::MergeStderr,
1868                target: Expr::Literal(Value::Null),
1869            }],
1870        };
1871        let grep_cmd = Command {
1872            name: "grep".to_string(),
1873            args: vec![Arg::Positional(Expr::Literal(Value::String("output".to_string())))],
1874            redirects: vec![],
1875        };
1876
1877        let result = runner.run(&[echo_cmd, grep_cmd], &mut ctx, &dispatcher).await;
1878        assert!(result.ok(), "result failed: code={}, err={}", result.code, result.err);
1879        assert!(result.text_out().contains("output"));
1880    }
1881}