Skip to main content

kaish_kernel/scheduler/
pipeline.rs

1//! Pipeline execution for kaish.
2//!
3//! Executes a sequence of commands connected by pipes, where the stdout
4//! of each command becomes the stdin of the next.
5//!
6//! Also handles scatter/gather pipelines for parallel execution.
7
8use std::sync::Arc;
9
10use std::collections::HashMap;
11
12use crate::arithmetic;
13use crate::ast::{Arg, Command, Expr, Redirect, RedirectKind, Value};
14use crate::dispatch::{CommandDispatcher, PipelinePosition};
15use crate::interpreter::ExecResult;
16use crate::tools::{ExecContext, ToolArgs, ToolRegistry, ToolSchema};
17use tokio::io::AsyncWriteExt;
18
19use super::pipe_stream::pipe_stream_default;
20use super::scatter::{
21    parse_gather_options, parse_scatter_options, ScatterGatherRunner,
22};
23
24/// Apply redirects to an execution result.
25///
26/// Pre-execution redirects (Stdin, HereDoc) should be handled before calling.
27/// Post-execution redirects (stdout/stderr to file, merge) applied here.
28/// Redirects are processed left-to-right per POSIX.
29async fn apply_redirects(
30    mut result: ExecResult,
31    redirects: &[Redirect],
32    ctx: &ExecContext,
33) -> ExecResult {
34    // Defer materialization of OutputData → result.out to individual redirect
35    // handlers. File redirects (Overwrite/Append) can stream OutputData directly
36    // to disk via write_canonical(), avoiding OOM on large structured output.
37    // Merge redirects and the fallthrough path materialize on demand.
38    for redir in redirects {
39        match redir.kind {
40            RedirectKind::MergeStderr => {
41                // 2>&1 - append stderr to stdout
42                // Ensure output is materialized for merge
43                if result.out.is_empty() {
44                    if let Some(ref output) = result.output {
45                        result.out = output.to_canonical_string();
46                    }
47                }
48                if !result.err.is_empty() {
49                    result.out.push_str(&result.err);
50                    result.err.clear();
51                }
52            }
53            RedirectKind::MergeStdout => {
54                // 1>&2 or >&2 - append stdout to stderr
55                if result.out.is_empty() {
56                    if let Some(ref output) = result.output {
57                        result.out = output.to_canonical_string();
58                    }
59                }
60                if !result.out.is_empty() {
61                    result.err.push_str(&result.out);
62                    result.out.clear();
63                }
64            }
65            RedirectKind::StdoutOverwrite => {
66                if let Some(path) = eval_redirect_target(&redir.target, ctx) {
67                    // Stream OutputData directly to file if available
68                    if result.out.is_empty() && result.output.is_some() {
69                        match std::fs::File::create(&path) {
70                            Ok(mut file) => {
71                                if let Some(ref output) = result.output {
72                                    if let Err(e) = output.write_canonical(&mut file, None) {
73                                        return ExecResult::failure(1, format!("redirect: {e}"));
74                                    }
75                                }
76                            }
77                            Err(e) => return ExecResult::failure(1, format!("redirect: {e}")),
78                        }
79                    } else {
80                        if let Err(e) = tokio::fs::write(&path, &result.out).await {
81                            return ExecResult::failure(1, format!("redirect: {e}"));
82                        }
83                    }
84                    result.out.clear();
85                    result.output = None;
86                }
87            }
88            RedirectKind::StdoutAppend => {
89                if let Some(path) = eval_redirect_target(&redir.target, ctx) {
90                    // Stream OutputData directly if available
91                    if result.out.is_empty() && result.output.is_some() {
92                        match std::fs::OpenOptions::new().append(true).create(true).open(&path) {
93                            Ok(mut file) => {
94                                if let Some(ref output) = result.output {
95                                    if let Err(e) = output.write_canonical(&mut file, None) {
96                                        return ExecResult::failure(1, format!("redirect: {e}"));
97                                    }
98                                }
99                            }
100                            Err(e) => return ExecResult::failure(1, format!("redirect: {e}")),
101                        }
102                    } else {
103                        let file = tokio::fs::OpenOptions::new()
104                            .append(true)
105                            .create(true)
106                            .open(&path)
107                            .await;
108                        match file {
109                            Ok(mut f) => {
110                                if let Err(e) = f.write_all(result.out.as_bytes()).await {
111                                    return ExecResult::failure(1, format!("redirect: {e}"));
112                                }
113                            }
114                            Err(e) => return ExecResult::failure(1, format!("redirect: {e}")),
115                        }
116                    }
117                    result.out.clear();
118                    result.output = None;
119                }
120            }
121            RedirectKind::Stderr => {
122                if let Some(path) = eval_redirect_target(&redir.target, ctx) {
123                    if let Err(e) = tokio::fs::write(&path, &result.err).await {
124                        return ExecResult::failure(1, format!("redirect: {e}"));
125                    }
126                    result.err.clear();
127                }
128            }
129            RedirectKind::Both => {
130                if let Some(path) = eval_redirect_target(&redir.target, ctx) {
131                    let combined = format!("{}{}", result.out, result.err);
132                    if let Err(e) = tokio::fs::write(&path, combined).await {
133                        return ExecResult::failure(1, format!("redirect: {e}"));
134                    }
135                    result.out.clear();
136                    result.err.clear();
137                }
138            }
139            // Pre-execution redirects - already handled before command execution
140            RedirectKind::Stdin | RedirectKind::HereDoc => {}
141        }
142    }
143    // Materialize any remaining OutputData into result.out.
144    // Callers (accumulate_result, pipeline piping) expect .out to be populated
145    // after apply_redirects returns. File redirects above consume .output directly
146    // via streaming; this only fires when no redirect consumed it.
147    if result.out.is_empty() {
148        if let Some(ref output) = result.output {
149            result.out = output.to_canonical_string();
150        }
151    }
152    result
153}
154
155/// Evaluate a redirect target expression to get the file path.
156fn eval_redirect_target(expr: &Expr, ctx: &ExecContext) -> Option<String> {
157    eval_simple_expr(expr, ctx).map(|v| value_to_string(&v))
158}
159
160/// Set up stdin from redirects (< file, <<heredoc).
161/// Called before command execution.
162fn setup_stdin_redirects(cmd: &Command, ctx: &mut ExecContext) {
163    for redir in &cmd.redirects {
164        match &redir.kind {
165            RedirectKind::Stdin => {
166                if let Some(path) = eval_redirect_target(&redir.target, ctx)
167                    && let Ok(content) = std::fs::read_to_string(&path) {
168                        ctx.set_stdin(content);
169                    }
170            }
171            RedirectKind::HereDoc => {
172                match &redir.target {
173                    Expr::Literal(Value::String(content)) => {
174                        ctx.set_stdin(content.clone());
175                    }
176                    expr => {
177                        if let Some(value) = eval_simple_expr(expr, ctx) {
178                            ctx.set_stdin(value_to_string(&value));
179                        }
180                    }
181                }
182            }
183            _ => {}
184        }
185    }
186}
187
188/// Runs pipelines by spawning tasks and connecting them via channels.
189#[derive(Clone)]
190pub struct PipelineRunner {
191    tools: Arc<ToolRegistry>,
192}
193
194impl PipelineRunner {
195    /// Create a new pipeline runner with the given tool registry.
196    pub fn new(tools: Arc<ToolRegistry>) -> Self {
197        Self { tools }
198    }
199
200    /// Execute a pipeline of commands.
201    ///
202    /// Each command's stdout becomes the next command's stdin.
203    /// If the pipeline contains scatter/gather, delegates to ScatterGatherRunner.
204    /// Returns the result of the last command in the pipeline.
205    ///
206    /// The `dispatcher` handles the full command resolution chain (user tools,
207    /// builtins, scripts, external commands, backend tools). The runner handles
208    /// I/O routing: stdin redirects, piping between commands, and output redirects.
209    #[tracing::instrument(level = "debug", skip(self, commands, ctx, dispatcher), fields(command_count = commands.len()))]
210    pub async fn run(
211        &self,
212        commands: &[Command],
213        ctx: &mut ExecContext,
214        dispatcher: &dyn CommandDispatcher,
215    ) -> ExecResult {
216        if commands.is_empty() {
217            return ExecResult::success("");
218        }
219
220        // Check for scatter/gather pipeline
221        if let Some((scatter_idx, gather_idx)) = find_scatter_gather(commands) {
222            return self.run_scatter_gather(commands, scatter_idx, gather_idx, ctx, dispatcher).await;
223        }
224
225        self.run_sequential(commands, ctx, dispatcher).await
226    }
227
228    /// Execute commands sequentially without scatter/gather detection.
229    ///
230    /// Used by `ScatterGatherRunner` for pre_scatter, post_gather, and parallel
231    /// workers. Breaks the async recursion chain (`run` → scatter → `run`).
232    #[tracing::instrument(level = "debug", skip(self, commands, ctx, dispatcher), fields(command_count = commands.len()))]
233    pub async fn run_sequential(
234        &self,
235        commands: &[Command],
236        ctx: &mut ExecContext,
237        dispatcher: &dyn CommandDispatcher,
238    ) -> ExecResult {
239        if commands.is_empty() {
240            return ExecResult::success("");
241        }
242
243        if commands.len() == 1 {
244            // Single command, no piping needed
245            return self.run_single(&commands[0], ctx, None, dispatcher).await;
246        }
247
248        // Multi-command pipeline
249        self.run_pipeline(commands, ctx, dispatcher).await
250    }
251
252    /// Run a scatter/gather pipeline.
253    async fn run_scatter_gather(
254        &self,
255        commands: &[Command],
256        scatter_idx: usize,
257        gather_idx: usize,
258        ctx: &mut ExecContext,
259        _dispatcher: &dyn CommandDispatcher,
260    ) -> ExecResult {
261        // Split pipeline into parts
262        let pre_scatter = &commands[..scatter_idx];
263        let scatter_cmd = &commands[scatter_idx];
264        let parallel = &commands[scatter_idx + 1..gather_idx];
265        let gather_cmd = &commands[gather_idx];
266        let post_gather = &commands[gather_idx + 1..];
267
268        // Parse options from scatter and gather commands
269        // These are builtins with simple key=value syntax, no schema-driven parsing needed
270        let scatter_schema = self.tools.get("scatter").map(|t| t.schema());
271        let gather_schema = self.tools.get("gather").map(|t| t.schema());
272        let scatter_opts = parse_scatter_options(&build_tool_args(&scatter_cmd.args, ctx, scatter_schema.as_ref()));
273        let gather_opts = parse_gather_options(&build_tool_args(&gather_cmd.args, ctx, gather_schema.as_ref()));
274
275        // Create a BackendDispatcher for scatter workers.
276        // Parallel workers MUST use a stateless dispatcher to avoid data races
277        // on shared scope. BackendDispatcher is stateless (routes through backend.call_tool).
278        let scatter_dispatcher: Arc<dyn CommandDispatcher> =
279            Arc::new(crate::dispatch::BackendDispatcher::new(self.tools.clone()));
280
281        let runner = ScatterGatherRunner::new(self.tools.clone(), scatter_dispatcher);
282        runner
283            .run(
284                pre_scatter,
285                scatter_opts,
286                parallel,
287                gather_opts,
288                post_gather,
289                ctx,
290            )
291            .await
292    }
293
294    /// Run a single command with optional stdin.
295    ///
296    /// The dispatcher handles arg parsing, schema lookup, output format, and execution.
297    /// The runner handles stdin setup (redirects + pipeline) and output redirects.
298    #[tracing::instrument(level = "debug", skip(self, cmd, ctx, stdin, dispatcher), fields(command = %cmd.name))]
299    async fn run_single(
300        &self,
301        cmd: &Command,
302        ctx: &mut ExecContext,
303        stdin: Option<String>,
304        dispatcher: &dyn CommandDispatcher,
305    ) -> ExecResult {
306        // Set up stdin from redirects (< file, <<heredoc)
307        setup_stdin_redirects(cmd, ctx);
308
309        // Set stdin from pipeline (overrides redirect stdin)
310        if let Some(input) = stdin {
311            ctx.set_stdin(input);
312        }
313
314        // Set pipeline position for stdio inheritance decisions
315        ctx.pipeline_position = PipelinePosition::Only;
316
317        // Execute via dispatcher (full resolution chain)
318        let result = match dispatcher.dispatch(cmd, ctx).await {
319            Ok(result) => result,
320            Err(e) => ExecResult::failure(1, e.to_string()),
321        };
322
323        // Apply post-execution redirects
324        apply_redirects(result, &cmd.redirects, ctx).await
325    }
326
327    /// Run a multi-command pipeline concurrently.
328    ///
329    /// Each stage runs in its own tokio task, connected by bounded pipe streams
330    /// (64KB ring buffers with backpressure). This provides:
331    /// - Bounded memory usage (no buffering entire outputs)
332    /// - Backpressure (fast producers wait for slow consumers)
333    /// - Early termination (e.g., `seq 1 1000000 | head -n 5`)
334    ///
335    /// Structured data (`stdin_data`) is passed via oneshot channels alongside pipes.
336    #[tracing::instrument(level = "debug", skip(self, commands, ctx, _dispatcher), fields(stage_count = commands.len()))]
337    async fn run_pipeline(
338        &self,
339        commands: &[Command],
340        ctx: &mut ExecContext,
341        _dispatcher: &dyn CommandDispatcher,
342    ) -> ExecResult {
343        let stage_count = commands.len();
344        let last_idx = stage_count - 1;
345
346        // Create N-1 pipe pairs connecting adjacent stages
347        let mut pipe_writers: Vec<Option<super::pipe_stream::PipeWriter>> = Vec::new();
348        let mut pipe_readers: Vec<Option<super::pipe_stream::PipeReader>> = Vec::new();
349
350        for _ in 0..last_idx {
351            let (writer, reader) = pipe_stream_default();
352            pipe_writers.push(Some(writer));
353            pipe_readers.push(Some(reader));
354        }
355
356        // Create N-1 oneshot channels for structured data sideband
357        let mut data_senders: Vec<Option<tokio::sync::oneshot::Sender<Option<Value>>>> = Vec::new();
358        let mut data_receivers: Vec<Option<tokio::sync::oneshot::Receiver<Option<Value>>>> = Vec::new();
359
360        for _ in 0..last_idx {
361            let (tx, rx) = tokio::sync::oneshot::channel();
362            data_senders.push(Some(tx));
363            data_receivers.push(Some(rx));
364        }
365
366        // Concurrent pipeline stages use BackendDispatcher (builtins + backend tools).
367        // The original dispatcher (which may be the full Kernel dispatcher) can't be
368        // shared across tokio::spawn boundaries since it's not 'static.
369        // This is the same limitation scatter workers have.
370        let stage_dispatcher: Arc<dyn CommandDispatcher> =
371            Arc::new(crate::dispatch::BackendDispatcher::new(self.tools.clone()));
372
373        let mut handles: Vec<tokio::task::JoinHandle<(ExecResult, ExecContext)>> = Vec::with_capacity(stage_count);
374
375        for (i, cmd) in commands.iter().enumerate() {
376            let mut stage_ctx = ctx.child_for_pipeline();
377            let cmd = cmd.clone();
378
379            // Set up stdin from redirects on the child context
380            setup_stdin_redirects(&cmd, &mut stage_ctx);
381
382            // Wire pipe_stdin: stage 0 gets parent stdin (if no redirect), others get pipe reader
383            if i == 0 {
384                // First stage inherits the parent's stdin, but only if redirects didn't
385                // already set stdin (e.g., heredoc). Don't overwrite redirect-provided stdin.
386                if stage_ctx.stdin.is_none() {
387                    stage_ctx.stdin = ctx.stdin.take();
388                }
389                if stage_ctx.stdin_data.is_none() {
390                    stage_ctx.stdin_data = ctx.stdin_data.take();
391                }
392            } else {
393                // Intermediate/last stages read from pipe
394                stage_ctx.pipe_stdin = pipe_readers[i - 1].take();
395                // Structured data received via oneshot (resolved at start of execution)
396            }
397
398            // Wire pipe_stdout: last stage writes to ExecResult, others write to pipe
399            if i < last_idx {
400                stage_ctx.pipe_stdout = pipe_writers[i].take();
401            }
402
403            // Set pipeline position
404            stage_ctx.pipeline_position = match i {
405                0 => PipelinePosition::First,
406                n if n == last_idx => PipelinePosition::Last,
407                _ => PipelinePosition::Middle,
408            };
409
410            let data_sender = if i < last_idx { data_senders[i].take() } else { None };
411            let data_receiver = if i > 0 { data_receivers[i - 1].take() } else { None };
412            let task_dispatcher = stage_dispatcher.clone();
413
414            let handle: tokio::task::JoinHandle<(ExecResult, ExecContext)> = tokio::spawn(async move {
415                // Receive structured data from previous stage (non-blocking).
416                // Using try_recv avoids a deadlock: streaming builtins (e.g. grep)
417                // write to their pipe_stdout during dispatch. If we blocked here
418                // waiting for the upstream's oneshot (sent after dispatch), the
419                // downstream couldn't start draining the pipe → circular wait.
420                // Builtins that use stdin_data (e.g. jq) fall back to pipe text.
421                if let Some(mut rx) = data_receiver {
422                    if let Ok(data) = rx.try_recv() {
423                        stage_ctx.stdin_data = data;
424                    }
425                    // Err → not ready yet; builtin will read from pipe text
426                }
427
428                // Execute the command
429                let mut result = match task_dispatcher.dispatch(&cmd, &mut stage_ctx).await {
430                    Ok(result) => result,
431                    Err(e) => ExecResult::failure(1, e.to_string()),
432                };
433
434                // Apply post-execution redirects
435                result = apply_redirects(result, &cmd.redirects, &stage_ctx).await;
436
437                // Flush buffered stderr to the kernel's stderr stream.
438                // This delivers error output from intermediate pipeline stages
439                // in real-time (via the kernel drain) instead of silently discarding it.
440                // Redirects like 2>&1 have already cleared result.err, so merged
441                // stderr goes through the pipe as expected.
442                if !result.err.is_empty() {
443                    if let Some(ref stderr) = stage_ctx.stderr {
444                        stderr.write_str(&result.err);
445                        result.err.clear();
446                    }
447                }
448
449                // Send structured data to next stage via oneshot BEFORE pipe write.
450                // The pipe write may block on backpressure (>64KB output), and the
451                // consumer awaits this oneshot before starting execution. Sending
452                // first prevents a circular wait (producer blocked on pipe write,
453                // consumer blocked on oneshot).
454                if let Some(tx) = data_sender {
455                    let _ = tx.send(result.data.clone());
456                }
457
458                // Write output to pipe for next stage (if not last).
459                // Consumer is now unblocked and can drain concurrently.
460                if let Some(mut pipe_out) = stage_ctx.pipe_stdout.take() {
461                    let text = result.text_out();
462                    if !text.is_empty() {
463                        // Write result to pipe; ignore broken pipe (reader dropped early)
464                        let _ = pipe_out.write_all(text.as_bytes()).await;
465                        let _ = pipe_out.shutdown().await;
466                    }
467                    // Drop pipe_out signals EOF to next stage's reader
468                }
469
470                (result, stage_ctx)
471            });
472
473            handles.push(handle);
474        }
475
476        // Await all stages and return last stage's result.
477        // Sync the last stage's scope back to the parent context so that
478        // variable assignments in the last pipeline stage are visible
479        // (e.g., `echo "Alice" | read NAME`).
480        let mut last_result = ExecResult::success("");
481        let mut panics: Vec<String> = Vec::new();
482        for (i, handle) in handles.into_iter().enumerate() {
483            match handle.await {
484                Ok((result, stage_ctx)) => {
485                    if i == last_idx {
486                        last_result = result;
487                        // Sync last stage's scope and cwd changes back
488                        ctx.scope = stage_ctx.scope;
489                        ctx.cwd = stage_ctx.cwd;
490                        ctx.prev_cwd = stage_ctx.prev_cwd;
491                        ctx.aliases = stage_ctx.aliases;
492                    }
493                }
494                Err(e) => {
495                    panics.push(format!("stage {}: {}", i, e));
496                }
497            }
498        }
499
500        if !panics.is_empty() {
501            last_result = ExecResult::failure(
502                1,
503                format!("pipeline stage(s) panicked: {}", panics.join("; ")),
504            );
505        }
506
507        last_result
508    }
509}
510
511/// Extract parameter types from a tool schema.
512///
513/// Returns a map from param name → param type (e.g., "verbose" → "bool", "output" → "string").
514/// Build a map from flag name → (canonical param name, param type).
515///
516/// Includes both primary names and aliases (with dashes stripped).
517/// For short flags like `-n` aliased to `lines`, maps `"n"` → `("lines", "int")`.
518pub fn schema_param_lookup(schema: &ToolSchema) -> HashMap<String, (&str, &str)> {
519    let mut map = HashMap::new();
520    for p in &schema.params {
521        map.insert(p.name.clone(), (p.name.as_str(), p.param_type.as_str()));
522        for alias in &p.aliases {
523            let stripped = alias.trim_start_matches('-');
524            map.insert(stripped.to_string(), (p.name.as_str(), p.param_type.as_str()));
525        }
526    }
527    map
528}
529
530/// Check if a type is considered boolean.
531pub fn is_bool_type(param_type: &str) -> bool {
532    matches!(param_type.to_lowercase().as_str(), "bool" | "boolean")
533}
534
535/// Build ToolArgs from AST Args, evaluating expressions.
536///
537/// If a schema is provided, uses it to determine argument types:
538/// - For `--flag` where schema says type is non-bool: consume next positional as value
539/// - For `--flag` where schema says type is bool (or unknown): treat as boolean flag
540///
541/// This enables natural shell syntax like `mcp_tool --query "test" --limit 10`.
542pub fn build_tool_args(args: &[Arg], ctx: &ExecContext, schema: Option<&ToolSchema>) -> ToolArgs {
543    let mut tool_args = ToolArgs::new();
544    let param_lookup = schema.map(schema_param_lookup).unwrap_or_default();
545
546    // Track which positional indices have been consumed as flag values
547    let mut consumed_positionals: std::collections::HashSet<usize> = std::collections::HashSet::new();
548    let mut past_double_dash = false;
549
550    // First pass: find positional args and their indices
551    let mut positional_indices: Vec<(usize, &Expr)> = Vec::new();
552    for (i, arg) in args.iter().enumerate() {
553        if let Arg::Positional(expr) = arg {
554            positional_indices.push((i, expr));
555        }
556    }
557
558    // Second pass: process all args
559    let mut i = 0;
560    while i < args.len() {
561        let arg = &args[i];
562
563        match arg {
564            Arg::DoubleDash => {
565                past_double_dash = true;
566            }
567            Arg::Positional(expr) => {
568                // Check if this positional was consumed by a preceding flag
569                if !consumed_positionals.contains(&i)
570                    && let Some(value) = eval_simple_expr(expr, ctx)
571                {
572                    tool_args.positional.push(value);
573                }
574            }
575            Arg::Named { key, value } => {
576                if let Some(val) = eval_simple_expr(value, ctx) {
577                    tool_args.named.insert(key.clone(), val);
578                }
579            }
580            Arg::ShortFlag(name) => {
581                if past_double_dash {
582                    tool_args.positional.push(Value::String(format!("-{name}")));
583                } else if name.len() == 1 {
584                    // Single-char short flag: look up schema to check if it takes a value.
585                    // e.g., `-n 5` where `-n` is an alias for `lines` (type: int)
586                    let flag_name = name.as_str();
587                    let lookup = param_lookup.get(flag_name);
588                    let is_bool = lookup
589                        .map(|(_, typ)| is_bool_type(typ))
590                        .unwrap_or(true);
591
592                    if is_bool {
593                        tool_args.flags.insert(flag_name.to_string());
594                    } else {
595                        // Non-bool: consume next positional as value, insert under canonical name
596                        let canonical = lookup.map(|(name, _)| *name).unwrap_or(flag_name);
597                        let next_positional = positional_indices
598                            .iter()
599                            .find(|(idx, _)| *idx > i && !consumed_positionals.contains(idx));
600
601                        if let Some((pos_idx, expr)) = next_positional {
602                            if let Some(value) = eval_simple_expr(expr, ctx) {
603                                tool_args.named.insert(canonical.to_string(), value);
604                                consumed_positionals.insert(*pos_idx);
605                            } else {
606                                tool_args.flags.insert(flag_name.to_string());
607                            }
608                        } else {
609                            tool_args.flags.insert(flag_name.to_string());
610                        }
611                    }
612                } else if let Some(&(canonical, typ)) = param_lookup.get(name.as_str()) {
613                    // Multi-char short flag matches a schema param (POSIX style: -name value)
614                    if is_bool_type(typ) {
615                        tool_args.flags.insert(canonical.to_string());
616                    } else {
617                        let next_positional = positional_indices
618                            .iter()
619                            .find(|(idx, _)| *idx > i && !consumed_positionals.contains(idx));
620                        if let Some((pos_idx, expr)) = next_positional {
621                            if let Some(value) = eval_simple_expr(expr, ctx) {
622                                tool_args.named.insert(canonical.to_string(), value);
623                                consumed_positionals.insert(*pos_idx);
624                            } else {
625                                tool_args.flags.insert(name.clone());
626                            }
627                        } else {
628                            tool_args.flags.insert(name.clone());
629                        }
630                    }
631                } else {
632                    // Multi-char combined flags like -la: always boolean
633                    for c in name.chars() {
634                        tool_args.flags.insert(c.to_string());
635                    }
636                }
637            }
638            Arg::LongFlag(name) => {
639                if past_double_dash {
640                    tool_args.positional.push(Value::String(format!("--{name}")));
641                } else {
642                    // Look up type in schema (checks name and aliases)
643                    let lookup = param_lookup.get(name.as_str());
644                    let is_bool = lookup
645                        .map(|(_, typ)| is_bool_type(typ))
646                        .unwrap_or(true); // Unknown params default to bool
647
648                    if is_bool {
649                        tool_args.flags.insert(name.clone());
650                    } else {
651                        // Non-bool: consume next positional as value, insert under canonical name
652                        let canonical = lookup.map(|(name, _)| *name).unwrap_or(name.as_str());
653                        let next_positional = positional_indices
654                            .iter()
655                            .find(|(idx, _)| *idx > i && !consumed_positionals.contains(idx));
656
657                        if let Some((pos_idx, expr)) = next_positional {
658                            if let Some(value) = eval_simple_expr(expr, ctx) {
659                                tool_args.named.insert(canonical.to_string(), value);
660                                consumed_positionals.insert(*pos_idx);
661                            } else {
662                                tool_args.flags.insert(name.clone());
663                            }
664                        } else {
665                            tool_args.flags.insert(name.clone());
666                        }
667                    }
668                }
669            }
670        }
671        i += 1;
672    }
673
674    // Map remaining positionals to unfilled non-bool schema params (in order).
675    // This enables `drift_push "abc" "hello"` → named["target_ctx"] = "abc", named["content"] = "hello"
676    // Positionals that appeared after `--` are never mapped (they're raw data).
677    // Only for MCP/external tools (map_positionals=true). Builtins handle their own positionals.
678    if let Some(schema) = schema.filter(|s| s.map_positionals) {
679        // Count how many positionals were added before `--`
680        let pre_dash_count = if past_double_dash {
681            // Find where the double-dash was in the original args to count pre-dash positionals
682            let dash_pos = args.iter().position(|a| matches!(a, Arg::DoubleDash)).unwrap_or(args.len());
683            // Count unconsumed positionals before the double-dash
684            positional_indices.iter()
685                .filter(|(idx, _)| *idx < dash_pos && !consumed_positionals.contains(idx))
686                .count()
687        } else {
688            tool_args.positional.len()
689        };
690
691        let mut remaining = Vec::new();
692        let mut positional_iter = tool_args.positional.drain(..).enumerate();
693
694        for param in &schema.params {
695            if tool_args.named.contains_key(&param.name) || tool_args.flags.contains(&param.name) {
696                continue; // Already filled by a flag or named arg
697            }
698            if is_bool_type(&param.param_type) {
699                continue; // Bool params should only be set by flags
700            }
701            // Take from pre-dash positionals only
702            loop {
703                match positional_iter.next() {
704                    Some((idx, val)) if idx < pre_dash_count => {
705                        tool_args.named.insert(param.name.clone(), val);
706                        break;
707                    }
708                    Some((_, val)) => {
709                        remaining.push(val); // Post-dash or past limit, keep as positional
710                    }
711                    None => break,
712                }
713            }
714        }
715
716        // Any leftover positionals stay positional (e.g. `cat file1 file2`)
717        remaining.extend(positional_iter.map(|(_, v)| v));
718        tool_args.positional = remaining;
719    }
720
721    tool_args
722}
723
724/// Simple expression evaluation for args (without full scope access).
725fn eval_simple_expr(expr: &Expr, ctx: &ExecContext) -> Option<Value> {
726    match expr {
727        Expr::Literal(value) => Some(eval_literal(value, ctx)),
728        Expr::VarRef(path) => ctx.scope.resolve_path(path),
729        Expr::Interpolated(parts) => {
730            let mut result = String::new();
731            for part in parts {
732                match part {
733                    crate::ast::StringPart::Literal(s) => result.push_str(s),
734                    crate::ast::StringPart::Var(path) => {
735                        if let Some(value) = ctx.scope.resolve_path(path) {
736                            result.push_str(&value_to_string(&value));
737                        }
738                    }
739                    crate::ast::StringPart::VarWithDefault { name, default } => {
740                        match ctx.scope.get(name) {
741                            Some(value) => {
742                                let s = value_to_string(value);
743                                if s.is_empty() {
744                                    result.push_str(&eval_string_parts_sync(default, ctx));
745                                } else {
746                                    result.push_str(&s);
747                                }
748                            }
749                            None => result.push_str(&eval_string_parts_sync(default, ctx)),
750                        }
751                    }
752                    crate::ast::StringPart::VarLength(name) => {
753                        let len = match ctx.scope.get(name) {
754                            Some(value) => value_to_string(value).len(),
755                            None => 0,
756                        };
757                        result.push_str(&len.to_string());
758                    }
759                    crate::ast::StringPart::Positional(n) => {
760                        if let Some(s) = ctx.scope.get_positional(*n) {
761                            result.push_str(s);
762                        }
763                    }
764                    crate::ast::StringPart::AllArgs => {
765                        result.push_str(&ctx.scope.all_args().join(" "));
766                    }
767                    crate::ast::StringPart::ArgCount => {
768                        result.push_str(&ctx.scope.arg_count().to_string());
769                    }
770                    crate::ast::StringPart::Arithmetic(expr) => {
771                        // Evaluate arithmetic in pipeline context
772                        if let Ok(value) = arithmetic::eval_arithmetic(expr, &ctx.scope) {
773                            result.push_str(&value.to_string());
774                        }
775                    }
776                    crate::ast::StringPart::CommandSubst(_) => {
777                        // Command substitution requires async - skip in sync context
778                    }
779                    crate::ast::StringPart::LastExitCode => {
780                        result.push_str(&ctx.scope.last_result().code.to_string());
781                    }
782                    crate::ast::StringPart::CurrentPid => {
783                        result.push_str(&ctx.scope.pid().to_string());
784                    }
785                }
786            }
787            Some(Value::String(result))
788        }
789        _ => None, // Binary ops and command subst need more context
790    }
791}
792
793/// Evaluate a literal value.
794fn eval_literal(value: &Value, _ctx: &ExecContext) -> Value {
795    value.clone()
796}
797
798/// Convert a value to a string for interpolation.
799fn value_to_string(value: &Value) -> String {
800    match value {
801        Value::Null => "".to_string(),
802        Value::Bool(b) => b.to_string(),
803        Value::Int(i) => i.to_string(),
804        Value::Float(f) => f.to_string(),
805        Value::String(s) => s.clone(),
806        Value::Json(json) => json.to_string(),
807        Value::Blob(blob) => format!("[blob: {} {}]", blob.formatted_size(), blob.content_type),
808    }
809}
810
811/// Evaluate string parts synchronously (for pipeline context).
812/// Command substitutions are skipped as they require async.
813fn eval_string_parts_sync(parts: &[crate::ast::StringPart], ctx: &ExecContext) -> String {
814    let mut result = String::new();
815    for part in parts {
816        match part {
817            crate::ast::StringPart::Literal(s) => result.push_str(s),
818            crate::ast::StringPart::Var(path) => {
819                if let Some(value) = ctx.scope.resolve_path(path) {
820                    result.push_str(&value_to_string(&value));
821                }
822            }
823            crate::ast::StringPart::VarWithDefault { name, default } => {
824                match ctx.scope.get(name) {
825                    Some(value) => {
826                        let s = value_to_string(value);
827                        if s.is_empty() {
828                            result.push_str(&eval_string_parts_sync(default, ctx));
829                        } else {
830                            result.push_str(&s);
831                        }
832                    }
833                    None => result.push_str(&eval_string_parts_sync(default, ctx)),
834                }
835            }
836            crate::ast::StringPart::VarLength(name) => {
837                let len = match ctx.scope.get(name) {
838                    Some(value) => value_to_string(value).len(),
839                    None => 0,
840                };
841                result.push_str(&len.to_string());
842            }
843            crate::ast::StringPart::Positional(n) => {
844                if let Some(s) = ctx.scope.get_positional(*n) {
845                    result.push_str(s);
846                }
847            }
848            crate::ast::StringPart::AllArgs => {
849                result.push_str(&ctx.scope.all_args().join(" "));
850            }
851            crate::ast::StringPart::ArgCount => {
852                result.push_str(&ctx.scope.arg_count().to_string());
853            }
854            crate::ast::StringPart::Arithmetic(expr) => {
855                if let Ok(value) = arithmetic::eval_arithmetic(expr, &ctx.scope) {
856                    result.push_str(&value.to_string());
857                }
858            }
859            crate::ast::StringPart::CommandSubst(_) => {
860                // Command substitution requires async - skip in sync context
861            }
862            crate::ast::StringPart::LastExitCode => {
863                result.push_str(&ctx.scope.last_result().code.to_string());
864            }
865            crate::ast::StringPart::CurrentPid => {
866                result.push_str(&ctx.scope.pid().to_string());
867            }
868        }
869    }
870    result
871}
872
873/// Find scatter and gather commands in a pipeline.
874///
875/// Returns Some((scatter_index, gather_index)) if both are found with scatter before gather.
876/// Returns None if the pipeline doesn't have a valid scatter/gather pattern.
877fn find_scatter_gather(commands: &[Command]) -> Option<(usize, usize)> {
878    let scatter_idx = commands.iter().position(|c| c.name == "scatter")?;
879    let gather_idx = commands.iter().position(|c| c.name == "gather")?;
880
881    // Gather must come after scatter
882    if gather_idx > scatter_idx {
883        Some((scatter_idx, gather_idx))
884    } else {
885        None
886    }
887}
888
889#[cfg(test)]
890mod tests {
891    use super::*;
892    use crate::dispatch::BackendDispatcher;
893    use crate::tools::register_builtins;
894    use crate::vfs::{Filesystem, MemoryFs, VfsRouter};
895    use std::path::Path;
896
897    async fn make_runner_and_ctx() -> (PipelineRunner, ExecContext, BackendDispatcher) {
898        let mut tools = ToolRegistry::new();
899        register_builtins(&mut tools);
900        let tools = Arc::new(tools);
901        let runner = PipelineRunner::new(tools.clone());
902        let dispatcher = BackendDispatcher::new(tools.clone());
903
904        let mut vfs = VfsRouter::new();
905        let mem = MemoryFs::new();
906        mem.write(Path::new("test.txt"), b"hello\nworld\nfoo").await.unwrap();
907        vfs.mount("/", mem);
908        let ctx = ExecContext::with_vfs_and_tools(Arc::new(vfs), tools);
909
910        (runner, ctx, dispatcher)
911    }
912
913    fn make_cmd(name: &str, args: Vec<&str>) -> Command {
914        Command {
915            name: name.to_string(),
916            args: args.iter().map(|s| Arg::Positional(Expr::Literal(Value::String(s.to_string())))).collect(),
917            redirects: vec![],
918        }
919    }
920
921    #[tokio::test]
922    async fn test_single_command() {
923        let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
924        let cmd = make_cmd("echo", vec!["hello"]);
925
926        let result = runner.run(&[cmd], &mut ctx, &dispatcher).await;
927        assert!(result.ok());
928        assert_eq!(result.out.trim(), "hello");
929    }
930
931    #[tokio::test]
932    async fn test_pipeline_echo_grep() {
933        let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
934
935        // echo "hello\nworld" | grep pattern="world"
936        let echo_cmd = Command {
937            name: "echo".to_string(),
938            args: vec![Arg::Positional(Expr::Literal(Value::String("hello\nworld".to_string())))],
939            redirects: vec![],
940        };
941        let grep_cmd = Command {
942            name: "grep".to_string(),
943            args: vec![Arg::Positional(Expr::Literal(Value::String("world".to_string())))],
944            redirects: vec![],
945        };
946
947        let result = runner.run(&[echo_cmd, grep_cmd], &mut ctx, &dispatcher).await;
948        assert!(result.ok());
949        assert_eq!(result.out.trim(), "world");
950    }
951
952    #[tokio::test]
953    async fn test_pipeline_cat_grep() {
954        let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
955
956        // cat /test.txt | grep pattern="hello"
957        let cat_cmd = make_cmd("cat", vec!["/test.txt"]);
958        let grep_cmd = Command {
959            name: "grep".to_string(),
960            args: vec![Arg::Positional(Expr::Literal(Value::String("hello".to_string())))],
961            redirects: vec![],
962        };
963
964        let result = runner.run(&[cat_cmd, grep_cmd], &mut ctx, &dispatcher).await;
965        assert!(result.ok());
966        assert!(result.out.contains("hello"));
967    }
968
969    #[tokio::test]
970    async fn test_command_not_found() {
971        let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
972        let cmd = make_cmd("nonexistent", vec![]);
973
974        let result = runner.run(&[cmd], &mut ctx, &dispatcher).await;
975        assert!(!result.ok());
976        assert_eq!(result.code, 127);
977        assert!(result.err.contains("not found"));
978    }
979
980    #[tokio::test]
981    async fn test_pipeline_continues_on_failure() {
982        // Standard shell semantics: pipeline runs all commands,
983        // exit code comes from the last command
984        let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
985
986        // cat /nonexistent | grep "hello"
987        // cat fails but grep still runs (on empty input), grep returns 1 (no match)
988        let cat_cmd = make_cmd("cat", vec!["/nonexistent"]);
989        let grep_cmd = Command {
990            name: "grep".to_string(),
991            args: vec![Arg::Positional(Expr::Literal(Value::String("hello".to_string())))],
992            redirects: vec![],
993        };
994
995        let result = runner.run(&[cat_cmd, grep_cmd], &mut ctx, &dispatcher).await;
996        // Exit code comes from last command (grep), not from cat
997        assert!(!result.ok());
998    }
999
1000    #[tokio::test]
1001    async fn test_pipeline_last_command_exit_code() {
1002        // echo hello | cat — both succeed, pipeline succeeds
1003        let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1004
1005        let echo_cmd = make_cmd("echo", vec!["hello"]);
1006        let cat_cmd = make_cmd("cat", vec![]);
1007
1008        let result = runner.run(&[echo_cmd, cat_cmd], &mut ctx, &dispatcher).await;
1009        assert!(result.ok());
1010        assert!(result.out.contains("hello"));
1011    }
1012
1013    #[tokio::test]
1014    async fn test_empty_pipeline() {
1015        let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1016        let result = runner.run(&[], &mut ctx, &dispatcher).await;
1017        assert!(result.ok());
1018    }
1019
1020    // === Scatter/Gather Tests ===
1021
1022    #[test]
1023    fn test_find_scatter_gather_both_present() {
1024        let commands = vec![
1025            make_cmd("echo", vec!["a"]),
1026            make_cmd("scatter", vec![]),
1027            make_cmd("process", vec![]),
1028            make_cmd("gather", vec![]),
1029        ];
1030        let result = find_scatter_gather(&commands);
1031        assert_eq!(result, Some((1, 3)));
1032    }
1033
1034    #[test]
1035    fn test_find_scatter_gather_no_scatter() {
1036        let commands = vec![
1037            make_cmd("echo", vec!["a"]),
1038            make_cmd("gather", vec![]),
1039        ];
1040        let result = find_scatter_gather(&commands);
1041        assert!(result.is_none());
1042    }
1043
1044    #[test]
1045    fn test_find_scatter_gather_no_gather() {
1046        let commands = vec![
1047            make_cmd("echo", vec!["a"]),
1048            make_cmd("scatter", vec![]),
1049        ];
1050        let result = find_scatter_gather(&commands);
1051        assert!(result.is_none());
1052    }
1053
1054    #[test]
1055    fn test_find_scatter_gather_wrong_order() {
1056        let commands = vec![
1057            make_cmd("gather", vec![]),
1058            make_cmd("scatter", vec![]),
1059        ];
1060        let result = find_scatter_gather(&commands);
1061        assert!(result.is_none());
1062    }
1063
1064    #[tokio::test]
1065    async fn test_scatter_gather_simple() {
1066        let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1067
1068        // split "a b c" | scatter | echo ${ITEM} | gather
1069        let split_cmd = Command {
1070            name: "split".to_string(),
1071            args: vec![Arg::Positional(Expr::Literal(Value::String("a b c".to_string())))],
1072            redirects: vec![],
1073        };
1074        let scatter_cmd = make_cmd("scatter", vec![]);
1075        let process_cmd = Command {
1076            name: "echo".to_string(),
1077            args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("ITEM")))],
1078            redirects: vec![],
1079        };
1080        let gather_cmd = make_cmd("gather", vec![]);
1081
1082        let result = runner.run(&[split_cmd, scatter_cmd, process_cmd, gather_cmd], &mut ctx, &dispatcher).await;
1083        assert!(result.ok(), "scatter with structured data should succeed: {}", result.err);
1084        // Each echo should output the item
1085        assert!(result.out.contains("a"));
1086        assert!(result.out.contains("b"));
1087        assert!(result.out.contains("c"));
1088    }
1089
1090    #[tokio::test]
1091    async fn test_scatter_gather_empty_input() {
1092        let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1093
1094        // echo "" | scatter | echo ${ITEM} | gather
1095        let echo_cmd = Command {
1096            name: "echo".to_string(),
1097            args: vec![Arg::Positional(Expr::Literal(Value::String("".to_string())))],
1098            redirects: vec![],
1099        };
1100        let scatter_cmd = make_cmd("scatter", vec![]);
1101        let process_cmd = Command {
1102            name: "echo".to_string(),
1103            args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("ITEM")))],
1104            redirects: vec![],
1105        };
1106        let gather_cmd = make_cmd("gather", vec![]);
1107
1108        let result = runner.run(&[echo_cmd, scatter_cmd, process_cmd, gather_cmd], &mut ctx, &dispatcher).await;
1109        assert!(result.ok());
1110        assert!(result.out.trim().is_empty());
1111    }
1112
1113    #[tokio::test]
1114    async fn test_scatter_gather_with_structured_stdin() {
1115        let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1116
1117        // Set structured stdin data (as if piped from split/seq)
1118        let data = Value::Json(serde_json::json!(["x", "y", "z"]));
1119        ctx.set_stdin_with_data("x\ny\nz".to_string(), Some(data));
1120
1121        let scatter_cmd = make_cmd("scatter", vec![]);
1122        let process_cmd = Command {
1123            name: "echo".to_string(),
1124            args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("ITEM")))],
1125            redirects: vec![],
1126        };
1127        let gather_cmd = make_cmd("gather", vec![]);
1128
1129        let result = runner.run(&[scatter_cmd, process_cmd, gather_cmd], &mut ctx, &dispatcher).await;
1130        assert!(result.ok(), "scatter with structured stdin should succeed: {}", result.err);
1131        assert!(result.out.contains("x"));
1132        assert!(result.out.contains("y"));
1133        assert!(result.out.contains("z"));
1134    }
1135
1136    #[tokio::test]
1137    async fn test_scatter_gather_json_input() {
1138        let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1139
1140        // Structured JSON array input (as if from split/seq)
1141        let data = Value::Json(serde_json::json!(["one", "two", "three"]));
1142        ctx.set_stdin_with_data(r#"["one", "two", "three"]"#.to_string(), Some(data));
1143
1144        let scatter_cmd = make_cmd("scatter", vec![]);
1145        let process_cmd = Command {
1146            name: "echo".to_string(),
1147            args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("ITEM")))],
1148            redirects: vec![],
1149        };
1150        let gather_cmd = make_cmd("gather", vec![]);
1151
1152        let result = runner.run(&[scatter_cmd, process_cmd, gather_cmd], &mut ctx, &dispatcher).await;
1153        assert!(result.ok(), "scatter with JSON data should succeed: {}", result.err);
1154        assert!(result.out.contains("one"));
1155        assert!(result.out.contains("two"));
1156        assert!(result.out.contains("three"));
1157    }
1158
1159    #[tokio::test]
1160    async fn test_scatter_gather_with_post_gather() {
1161        let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1162
1163        // split "a b" | scatter | echo ${ITEM} | gather | grep "a"
1164        let split_cmd = Command {
1165            name: "split".to_string(),
1166            args: vec![Arg::Positional(Expr::Literal(Value::String("a b".to_string())))],
1167            redirects: vec![],
1168        };
1169        let scatter_cmd = make_cmd("scatter", vec![]);
1170        let process_cmd = Command {
1171            name: "echo".to_string(),
1172            args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("ITEM")))],
1173            redirects: vec![],
1174        };
1175        let gather_cmd = make_cmd("gather", vec![]);
1176        let grep_cmd = Command {
1177            name: "grep".to_string(),
1178            args: vec![Arg::Positional(Expr::Literal(Value::String("a".to_string())))],
1179            redirects: vec![],
1180        };
1181
1182        let result = runner.run(&[split_cmd, scatter_cmd, process_cmd, gather_cmd, grep_cmd], &mut ctx, &dispatcher).await;
1183        assert!(result.ok(), "scatter with post_gather should succeed: {}", result.err);
1184        assert!(result.out.contains("a"));
1185        assert!(!result.out.contains("b"));
1186    }
1187
1188    #[tokio::test]
1189    async fn test_scatter_custom_var_name() {
1190        let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1191
1192        // Provide structured data (as if from split/seq)
1193        let data = Value::Json(serde_json::json!(["test1", "test2"]));
1194        ctx.set_stdin_with_data("test1\ntest2".to_string(), Some(data));
1195
1196        // scatter as=URL | echo ${URL} | gather
1197        let scatter_cmd = Command {
1198            name: "scatter".to_string(),
1199            args: vec![Arg::Named {
1200                key: "as".to_string(),
1201                value: Expr::Literal(Value::String("URL".to_string())),
1202            }],
1203            redirects: vec![],
1204        };
1205        let process_cmd = Command {
1206            name: "echo".to_string(),
1207            args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("URL")))],
1208            redirects: vec![],
1209        };
1210        let gather_cmd = make_cmd("gather", vec![]);
1211
1212        let result = runner.run(&[scatter_cmd, process_cmd, gather_cmd], &mut ctx, &dispatcher).await;
1213        assert!(result.ok(), "scatter with custom var should succeed: {}", result.err);
1214        assert!(result.out.contains("test1"));
1215        assert!(result.out.contains("test2"));
1216    }
1217
1218    // === Backend Routing Tests ===
1219
1220    #[tokio::test]
1221    async fn test_pipeline_routes_through_backend() {
1222        use crate::backend::testing::MockBackend;
1223        use std::sync::atomic::Ordering;
1224
1225        // Create mock backend
1226        let (backend, call_count) = MockBackend::new();
1227        let backend: std::sync::Arc<dyn crate::backend::KernelBackend> = std::sync::Arc::new(backend);
1228
1229        // Create context with mock backend
1230        let mut ctx = crate::tools::ExecContext::with_backend(backend);
1231
1232        // BackendDispatcher routes through backend.call_tool()
1233        let tools = std::sync::Arc::new(ToolRegistry::new());
1234        let runner = PipelineRunner::new(tools.clone());
1235        let dispatcher = BackendDispatcher::new(tools);
1236
1237        // Single command should route through backend
1238        let cmd = make_cmd("test-tool", vec!["arg1"]);
1239        let result = runner.run(&[cmd], &mut ctx, &dispatcher).await;
1240
1241        assert!(result.ok(), "Mock backend should return success");
1242        assert_eq!(call_count.load(Ordering::SeqCst), 1, "call_tool should be invoked once");
1243        assert!(result.out.contains("mock executed"), "Output should be from mock backend");
1244    }
1245
1246    #[tokio::test]
1247    async fn test_multi_command_pipeline_routes_through_backend() {
1248        use crate::backend::testing::MockBackend;
1249        use std::sync::atomic::Ordering;
1250
1251        let (backend, call_count) = MockBackend::new();
1252        let backend: std::sync::Arc<dyn crate::backend::KernelBackend> = std::sync::Arc::new(backend);
1253        let mut ctx = crate::tools::ExecContext::with_backend(backend);
1254
1255        let tools = std::sync::Arc::new(ToolRegistry::new());
1256        let runner = PipelineRunner::new(tools.clone());
1257        let dispatcher = BackendDispatcher::new(tools);
1258
1259        // Pipeline with 3 commands
1260        let cmd1 = make_cmd("tool1", vec![]);
1261        let cmd2 = make_cmd("tool2", vec![]);
1262        let cmd3 = make_cmd("tool3", vec![]);
1263
1264        let result = runner.run(&[cmd1, cmd2, cmd3], &mut ctx, &dispatcher).await;
1265
1266        assert!(result.ok());
1267        assert_eq!(call_count.load(Ordering::SeqCst), 3, "call_tool should be invoked for each command");
1268    }
1269
1270    // === Schema-Aware Argument Parsing Tests ===
1271
1272    use crate::tools::{ParamSchema, ToolSchema};
1273
1274    fn make_test_schema() -> ToolSchema {
1275        ToolSchema::new("test-tool", "A test tool for schema-aware parsing")
1276            .param(ParamSchema::required("query", "string", "Search query"))
1277            .param(ParamSchema::optional("limit", "int", Value::Int(10), "Max results"))
1278            .param(ParamSchema::optional("verbose", "bool", Value::Bool(false), "Verbose output"))
1279            .param(ParamSchema::optional("output", "string", Value::String("stdout".into()), "Output destination"))
1280            .with_positional_mapping()
1281    }
1282
1283    fn make_minimal_ctx() -> ExecContext {
1284        let mut vfs = VfsRouter::new();
1285        vfs.mount("/", MemoryFs::new());
1286        ExecContext::new(Arc::new(vfs))
1287    }
1288
1289    #[test]
1290    fn test_schema_aware_string_arg() {
1291        // --query "test" should become named: {"query": "test"}
1292        let args = vec![
1293            Arg::LongFlag("query".to_string()),
1294            Arg::Positional(Expr::Literal(Value::String("test".to_string()))),
1295        ];
1296        let schema = make_test_schema();
1297        let ctx = make_minimal_ctx();
1298
1299        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1300
1301        assert!(tool_args.flags.is_empty(), "No flags should be set");
1302        assert!(tool_args.positional.is_empty(), "No positionals - consumed by --query");
1303        assert_eq!(
1304            tool_args.named.get("query"),
1305            Some(&Value::String("test".to_string())),
1306            "--query should consume 'test' as its value"
1307        );
1308    }
1309
1310    #[test]
1311    fn test_schema_aware_bool_flag() {
1312        // --verbose should remain a flag since schema says bool
1313        let args = vec![
1314            Arg::LongFlag("verbose".to_string()),
1315        ];
1316        let schema = make_test_schema();
1317        let ctx = make_minimal_ctx();
1318
1319        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1320
1321        assert!(tool_args.flags.contains("verbose"), "--verbose should be a flag");
1322        assert!(tool_args.named.is_empty(), "No named args");
1323        assert!(tool_args.positional.is_empty(), "No positionals");
1324    }
1325
1326    #[test]
1327    fn test_schema_aware_mixed() {
1328        // mcp_tool file.txt --output out.txt --verbose
1329        // file.txt maps to "query" (first unfilled non-bool schema param)
1330        let args = vec![
1331            Arg::Positional(Expr::Literal(Value::String("file.txt".to_string()))),
1332            Arg::LongFlag("output".to_string()),
1333            Arg::Positional(Expr::Literal(Value::String("out.txt".to_string()))),
1334            Arg::LongFlag("verbose".to_string()),
1335        ];
1336        let schema = make_test_schema();
1337        let ctx = make_minimal_ctx();
1338
1339        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1340
1341        assert!(tool_args.positional.is_empty(), "file.txt consumed as query param");
1342        assert_eq!(
1343            tool_args.named.get("query"),
1344            Some(&Value::String("file.txt".to_string()))
1345        );
1346        assert_eq!(
1347            tool_args.named.get("output"),
1348            Some(&Value::String("out.txt".to_string()))
1349        );
1350        assert!(tool_args.flags.contains("verbose"));
1351    }
1352
1353    #[test]
1354    fn test_schema_aware_multiple_string_args() {
1355        // --query "test" --output "result.json" --verbose --limit 5
1356        let args = vec![
1357            Arg::LongFlag("query".to_string()),
1358            Arg::Positional(Expr::Literal(Value::String("test".to_string()))),
1359            Arg::LongFlag("output".to_string()),
1360            Arg::Positional(Expr::Literal(Value::String("result.json".to_string()))),
1361            Arg::LongFlag("verbose".to_string()),
1362            Arg::LongFlag("limit".to_string()),
1363            Arg::Positional(Expr::Literal(Value::Int(5))),
1364        ];
1365        let schema = make_test_schema();
1366        let ctx = make_minimal_ctx();
1367
1368        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1369
1370        assert!(tool_args.positional.is_empty(), "All positionals consumed");
1371        assert_eq!(
1372            tool_args.named.get("query"),
1373            Some(&Value::String("test".to_string()))
1374        );
1375        assert_eq!(
1376            tool_args.named.get("output"),
1377            Some(&Value::String("result.json".to_string()))
1378        );
1379        assert_eq!(
1380            tool_args.named.get("limit"),
1381            Some(&Value::Int(5))
1382        );
1383        assert!(tool_args.flags.contains("verbose"));
1384    }
1385
1386    #[test]
1387    fn test_schema_aware_double_dash() {
1388        // --output out.txt -- --this-is-data
1389        // After --, everything is positional
1390        let args = vec![
1391            Arg::LongFlag("output".to_string()),
1392            Arg::Positional(Expr::Literal(Value::String("out.txt".to_string()))),
1393            Arg::DoubleDash,
1394            Arg::Positional(Expr::Literal(Value::String("--this-is-data".to_string()))),
1395        ];
1396        let schema = make_test_schema();
1397        let ctx = make_minimal_ctx();
1398
1399        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1400
1401        assert_eq!(
1402            tool_args.named.get("output"),
1403            Some(&Value::String("out.txt".to_string()))
1404        );
1405        // After --, the --this-is-data is treated as a positional (it's a Positional in the args)
1406        assert_eq!(
1407            tool_args.positional,
1408            vec![Value::String("--this-is-data".to_string())]
1409        );
1410    }
1411
1412    #[test]
1413    fn test_no_schema_fallback() {
1414        // Without schema, all --flags are treated as bool flags
1415        let args = vec![
1416            Arg::LongFlag("query".to_string()),
1417            Arg::Positional(Expr::Literal(Value::String("test".to_string()))),
1418        ];
1419        let ctx = make_minimal_ctx();
1420
1421        let tool_args = build_tool_args(&args, &ctx, None);
1422
1423        // Without schema, --query is a flag and "test" is a positional
1424        assert!(tool_args.flags.contains("query"), "--query should be a flag");
1425        assert_eq!(
1426            tool_args.positional,
1427            vec![Value::String("test".to_string())],
1428            "'test' should be a positional"
1429        );
1430    }
1431
1432    #[test]
1433    fn test_unknown_flag_in_schema() {
1434        // --unknown-flag value: --unknown is bool (not in schema), "value" maps to query
1435        let args = vec![
1436            Arg::LongFlag("unknown".to_string()),
1437            Arg::Positional(Expr::Literal(Value::String("value".to_string()))),
1438        ];
1439        let schema = make_test_schema();
1440        let ctx = make_minimal_ctx();
1441
1442        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1443
1444        assert!(tool_args.flags.contains("unknown"));
1445        assert!(tool_args.positional.is_empty(), "value consumed as query param");
1446        assert_eq!(
1447            tool_args.named.get("query"),
1448            Some(&Value::String("value".to_string()))
1449        );
1450    }
1451
1452    #[test]
1453    fn test_named_args_unchanged() {
1454        // key=value syntax should work regardless of schema
1455        let args = vec![
1456            Arg::Named {
1457                key: "query".to_string(),
1458                value: Expr::Literal(Value::String("test".to_string())),
1459            },
1460            Arg::LongFlag("verbose".to_string()),
1461        ];
1462        let schema = make_test_schema();
1463        let ctx = make_minimal_ctx();
1464
1465        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1466
1467        assert_eq!(
1468            tool_args.named.get("query"),
1469            Some(&Value::String("test".to_string()))
1470        );
1471        assert!(tool_args.flags.contains("verbose"));
1472    }
1473
1474    #[test]
1475    fn test_short_flags_unchanged() {
1476        // Short flags -la should expand regardless of schema; file.txt maps to query
1477        let args = vec![
1478            Arg::ShortFlag("la".to_string()),
1479            Arg::Positional(Expr::Literal(Value::String("file.txt".to_string()))),
1480        ];
1481        let schema = make_test_schema();
1482        let ctx = make_minimal_ctx();
1483
1484        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1485
1486        assert!(tool_args.flags.contains("l"));
1487        assert!(tool_args.flags.contains("a"));
1488        assert!(tool_args.positional.is_empty(), "file.txt consumed as query param");
1489        assert_eq!(
1490            tool_args.named.get("query"),
1491            Some(&Value::String("file.txt".to_string()))
1492        );
1493    }
1494
1495    #[test]
1496    fn test_flag_at_end_no_value() {
1497        // --output at end with no value available - treat as flag (lenient)
1498        // file.txt maps to query (first unfilled non-bool param)
1499        let args = vec![
1500            Arg::Positional(Expr::Literal(Value::String("file.txt".to_string()))),
1501            Arg::LongFlag("output".to_string()),
1502        ];
1503        let schema = make_test_schema();
1504        let ctx = make_minimal_ctx();
1505
1506        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1507
1508        // output expects a value but none available after it, so it becomes a flag
1509        assert!(tool_args.flags.contains("output"));
1510        assert!(tool_args.positional.is_empty(), "file.txt consumed as query param");
1511        assert_eq!(
1512            tool_args.named.get("query"),
1513            Some(&Value::String("file.txt".to_string()))
1514        );
1515    }
1516
1517    #[test]
1518    fn test_positional_skips_bool_params() {
1519        // Schema: [query: string, verbose: bool, output: string]
1520        // Args: "val1" "val2"
1521        // Expected: query="val1", verbose unset, output="val2"
1522        let schema = ToolSchema::new("test", "")
1523            .param(ParamSchema::required("query", "string", ""))
1524            .param(ParamSchema::optional(
1525                "verbose",
1526                "bool",
1527                Value::Bool(false),
1528                "",
1529            ))
1530            .param(ParamSchema::optional(
1531                "output",
1532                "string",
1533                Value::Null,
1534                "",
1535            ))
1536            .with_positional_mapping();
1537        let args = vec![
1538            Arg::Positional(Expr::Literal(Value::String("val1".to_string()))),
1539            Arg::Positional(Expr::Literal(Value::String("val2".to_string()))),
1540        ];
1541        let ctx = make_minimal_ctx();
1542
1543        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1544
1545        assert_eq!(
1546            tool_args.named.get("query"),
1547            Some(&Value::String("val1".to_string()))
1548        );
1549        assert_eq!(
1550            tool_args.named.get("output"),
1551            Some(&Value::String("val2".to_string()))
1552        );
1553        assert!(!tool_args.flags.contains("verbose"));
1554        assert!(tool_args.positional.is_empty());
1555    }
1556
1557    #[test]
1558    fn test_positionals_fill_available_slots() {
1559        // Schema has query (string), limit (int), verbose (bool), output (string).
1560        // Three positionals fill the 3 non-bool slots.
1561        let args = vec![
1562            Arg::Positional(Expr::Literal(Value::String("val1".to_string()))),
1563            Arg::Positional(Expr::Literal(Value::String("val2".to_string()))),
1564            Arg::Positional(Expr::Literal(Value::String("val3".to_string()))),
1565        ];
1566        let schema = make_test_schema(); // query, limit(int), verbose(bool), output
1567        let ctx = make_minimal_ctx();
1568
1569        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1570
1571        // val1 → query, val2 → limit (int param but receives string — tool decides),
1572        // val3 → output
1573        assert_eq!(
1574            tool_args.named.get("query"),
1575            Some(&Value::String("val1".to_string()))
1576        );
1577        assert_eq!(
1578            tool_args.named.get("limit"),
1579            Some(&Value::String("val2".to_string()))
1580        );
1581        assert_eq!(
1582            tool_args.named.get("output"),
1583            Some(&Value::String("val3".to_string()))
1584        );
1585        assert!(tool_args.positional.is_empty());
1586    }
1587
1588    #[test]
1589    fn test_truly_excess_positionals() {
1590        // More positionals than non-bool schema params — leftovers stay positional
1591        let schema = ToolSchema::new("test", "")
1592            .param(ParamSchema::required("name", "string", ""))
1593            .with_positional_mapping();
1594        let args = vec![
1595            Arg::Positional(Expr::Literal(Value::String("first".to_string()))),
1596            Arg::Positional(Expr::Literal(Value::String("second".to_string()))),
1597            Arg::Positional(Expr::Literal(Value::String("third".to_string()))),
1598        ];
1599        let ctx = make_minimal_ctx();
1600
1601        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1602
1603        assert_eq!(
1604            tool_args.named.get("name"),
1605            Some(&Value::String("first".to_string()))
1606        );
1607        assert_eq!(
1608            tool_args.positional,
1609            vec![
1610                Value::String("second".to_string()),
1611                Value::String("third".to_string()),
1612            ]
1613        );
1614    }
1615
1616    #[test]
1617    fn test_double_dash_positional_not_mapped() {
1618        // `tool val1 -- val2` — val1 maps to query, val2 stays positional (post-dash)
1619        let args = vec![
1620            Arg::Positional(Expr::Literal(Value::String("val1".to_string()))),
1621            Arg::DoubleDash,
1622            Arg::Positional(Expr::Literal(Value::String("val2".to_string()))),
1623        ];
1624        let schema = make_test_schema();
1625        let ctx = make_minimal_ctx();
1626
1627        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1628
1629        assert_eq!(
1630            tool_args.named.get("query"),
1631            Some(&Value::String("val1".to_string()))
1632        );
1633        // val2 is after --, should NOT be mapped even though schema has unfilled params
1634        assert_eq!(
1635            tool_args.positional,
1636            vec![Value::String("val2".to_string())]
1637        );
1638    }
1639
1640    #[test]
1641    fn test_all_params_filled_by_flags() {
1642        // All schema params satisfied by explicit flags — no positional mapping needed
1643        let args = vec![
1644            Arg::LongFlag("query".to_string()),
1645            Arg::Positional(Expr::Literal(Value::String("search".to_string()))),
1646            Arg::LongFlag("output".to_string()),
1647            Arg::Positional(Expr::Literal(Value::String("out.txt".to_string()))),
1648            Arg::LongFlag("verbose".to_string()),
1649        ];
1650        let schema = make_test_schema();
1651        let ctx = make_minimal_ctx();
1652
1653        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1654
1655        assert_eq!(
1656            tool_args.named.get("query"),
1657            Some(&Value::String("search".to_string()))
1658        );
1659        assert_eq!(
1660            tool_args.named.get("output"),
1661            Some(&Value::String("out.txt".to_string()))
1662        );
1663        assert!(tool_args.flags.contains("verbose"));
1664        assert!(tool_args.positional.is_empty());
1665    }
1666
1667    #[test]
1668    fn test_mixed_flags_and_positional_fill() {
1669        // --output foo val1 — output is explicit, val1 maps to query
1670        let args = vec![
1671            Arg::LongFlag("output".to_string()),
1672            Arg::Positional(Expr::Literal(Value::String("foo".to_string()))),
1673            Arg::Positional(Expr::Literal(Value::String("val1".to_string()))),
1674        ];
1675        let schema = make_test_schema();
1676        let ctx = make_minimal_ctx();
1677
1678        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1679
1680        assert_eq!(
1681            tool_args.named.get("output"),
1682            Some(&Value::String("foo".to_string()))
1683        );
1684        assert_eq!(
1685            tool_args.named.get("query"),
1686            Some(&Value::String("val1".to_string()))
1687        );
1688        assert!(tool_args.positional.is_empty());
1689    }
1690
1691    #[test]
1692    fn test_alias_flag_prevents_mapping_overwrite() {
1693        // -q "search" "out.txt" — -q is alias for query, so out.txt should map to output
1694        let schema = ToolSchema::new("test", "")
1695            .param(ParamSchema::required("query", "string", "").with_aliases(["-q"]))
1696            .param(ParamSchema::required("output", "string", ""))
1697            .with_positional_mapping();
1698        let args = vec![
1699            Arg::ShortFlag("q".to_string()),
1700            Arg::Positional(Expr::Literal(Value::String("search".to_string()))),
1701            Arg::Positional(Expr::Literal(Value::String("out.txt".to_string()))),
1702        ];
1703        let ctx = make_minimal_ctx();
1704
1705        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1706
1707        assert_eq!(
1708            tool_args.named.get("query"),
1709            Some(&Value::String("search".to_string()))
1710        );
1711        assert_eq!(
1712            tool_args.named.get("output"),
1713            Some(&Value::String("out.txt".to_string()))
1714        );
1715        assert!(tool_args.positional.is_empty());
1716    }
1717
1718    #[test]
1719    fn test_builtin_schema_no_positional_mapping() {
1720        // Builtins have map_positionals=false — positionals stay positional
1721        let schema = ToolSchema::new("echo", "")
1722            .param(ParamSchema::optional("args", "any", Value::Null, ""))
1723            .param(ParamSchema::optional("no_newline", "bool", Value::Bool(false), ""));
1724        // Note: no .with_positional_mapping() — this is a builtin
1725        let args = vec![
1726            Arg::Positional(Expr::Literal(Value::String("hello".to_string()))),
1727            Arg::Positional(Expr::Literal(Value::String("world".to_string()))),
1728        ];
1729        let ctx = make_minimal_ctx();
1730
1731        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1732
1733        // Positionals should NOT be consumed as named params
1734        assert_eq!(
1735            tool_args.positional,
1736            vec![
1737                Value::String("hello".to_string()),
1738                Value::String("world".to_string()),
1739            ]
1740        );
1741        assert!(tool_args.named.get("args").is_none());
1742    }
1743
1744    #[test]
1745    fn test_short_flag_with_alias_consumes_value() {
1746        // `-n 5` where `-n` is aliased to `lines` (type: int)
1747        // Should produce named: {"lines": 5}, not flags: {"n"} + positional: [5]
1748        let schema = ToolSchema::new("head", "Output first part of files")
1749            .param(ParamSchema::optional("lines", "int", Value::Int(10), "Number of lines")
1750                .with_aliases(["-n"]));
1751        let args = vec![
1752            Arg::ShortFlag("n".to_string()),
1753            Arg::Positional(Expr::Literal(Value::Int(5))),
1754            Arg::Positional(Expr::Literal(Value::String("/tmp/file.txt".to_string()))),
1755        ];
1756        let ctx = make_minimal_ctx();
1757
1758        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1759
1760        assert!(tool_args.flags.is_empty(), "no boolean flags: {:?}", tool_args.flags);
1761        assert_eq!(tool_args.named.get("lines"), Some(&Value::Int(5)), "should resolve alias to canonical name");
1762        assert_eq!(tool_args.positional, vec![Value::String("/tmp/file.txt".to_string())]);
1763    }
1764
1765    // === Redirect Execution Tests ===
1766
1767    #[tokio::test]
1768    async fn test_merge_stderr_redirect() {
1769        // Test that 2>&1 merges stderr into stdout
1770        let result = ExecResult::from_output(0, "stdout content", "stderr content");
1771
1772        let redirects = vec![Redirect {
1773            kind: RedirectKind::MergeStderr,
1774            target: Expr::Literal(Value::Null),
1775        }];
1776
1777        let ctx = make_minimal_ctx();
1778        let result = apply_redirects(result, &redirects, &ctx).await;
1779
1780        assert_eq!(result.out, "stdout contentstderr content");
1781        assert!(result.err.is_empty());
1782    }
1783
1784    #[tokio::test]
1785    async fn test_merge_stderr_with_empty_stderr() {
1786        // Test that 2>&1 handles empty stderr gracefully
1787        let result = ExecResult::from_output(0, "stdout only", "");
1788
1789        let redirects = vec![Redirect {
1790            kind: RedirectKind::MergeStderr,
1791            target: Expr::Literal(Value::Null),
1792        }];
1793
1794        let ctx = make_minimal_ctx();
1795        let result = apply_redirects(result, &redirects, &ctx).await;
1796
1797        assert_eq!(result.out, "stdout only");
1798        assert!(result.err.is_empty());
1799    }
1800
1801    #[tokio::test]
1802    async fn test_merge_stderr_order_matters() {
1803        // Test redirect ordering: 2>&1 > file means:
1804        // 1. First merge stderr into stdout
1805        // 2. Then write stdout to file (leaving both empty for piping)
1806        // This verifies left-to-right processing
1807        let result = ExecResult::from_output(0, "stdout\n", "stderr\n");
1808
1809        // Just 2>&1 - should merge
1810        let redirects = vec![Redirect {
1811            kind: RedirectKind::MergeStderr,
1812            target: Expr::Literal(Value::Null),
1813        }];
1814
1815        let ctx = make_minimal_ctx();
1816        let result = apply_redirects(result, &redirects, &ctx).await;
1817
1818        assert_eq!(result.out, "stdout\nstderr\n");
1819        assert!(result.err.is_empty());
1820    }
1821
1822    #[tokio::test]
1823    async fn test_redirect_with_command_execution() {
1824        let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1825
1826        // echo "hello" with 2>&1 redirect
1827        let cmd = Command {
1828            name: "echo".to_string(),
1829            args: vec![Arg::Positional(Expr::Literal(Value::String("hello".to_string())))],
1830            redirects: vec![Redirect {
1831                kind: RedirectKind::MergeStderr,
1832                target: Expr::Literal(Value::Null),
1833            }],
1834        };
1835
1836        let result = runner.run(&[cmd], &mut ctx, &dispatcher).await;
1837        assert!(result.ok());
1838        // echo produces no stderr, so this just validates the redirect doesn't break anything
1839        assert!(result.out.contains("hello"));
1840    }
1841
1842    #[tokio::test]
1843    async fn test_merge_stderr_in_pipeline() {
1844        let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1845
1846        // echo "output" 2>&1 | grep "output"
1847        // The 2>&1 should be applied to echo's result, then piped to grep
1848        let echo_cmd = Command {
1849            name: "echo".to_string(),
1850            args: vec![Arg::Positional(Expr::Literal(Value::String("output".to_string())))],
1851            redirects: vec![Redirect {
1852                kind: RedirectKind::MergeStderr,
1853                target: Expr::Literal(Value::Null),
1854            }],
1855        };
1856        let grep_cmd = Command {
1857            name: "grep".to_string(),
1858            args: vec![Arg::Positional(Expr::Literal(Value::String("output".to_string())))],
1859            redirects: vec![],
1860        };
1861
1862        let result = runner.run(&[echo_cmd, grep_cmd], &mut ctx, &dispatcher).await;
1863        assert!(result.ok(), "result failed: code={}, err={}", result.code, result.err);
1864        assert!(result.out.contains("output"));
1865    }
1866}