Skip to main content

kaish_kernel/scheduler/
pipeline.rs

1//! Pipeline execution for kaish.
2//!
3//! Executes a sequence of commands connected by pipes, where the stdout
4//! of each command becomes the stdin of the next.
5//!
6//! Also handles scatter/gather pipelines for parallel execution.
7
8use std::sync::Arc;
9
10use std::collections::HashMap;
11
12use crate::arithmetic;
13use crate::ast::{Arg, Command, Expr, Redirect, RedirectKind, Value};
14use crate::dispatch::{CommandDispatcher, PipelinePosition};
15use crate::interpreter::ExecResult;
16use crate::tools::{ExecContext, ToolArgs, ToolRegistry, ToolSchema};
17use tokio::io::AsyncWriteExt;
18
19use super::pipe_stream::pipe_stream_default;
20use super::scatter::{
21    parse_gather_options, parse_scatter_options, ScatterGatherRunner,
22};
23
24/// Apply redirects to an execution result.
25///
26/// Pre-execution redirects (Stdin, HereDoc) should be handled before calling.
27/// Post-execution redirects (stdout/stderr to file, merge) applied here.
28/// Redirects are processed left-to-right per POSIX.
29async fn apply_redirects(
30    mut result: ExecResult,
31    redirects: &[Redirect],
32    ctx: &ExecContext,
33) -> ExecResult {
34    // Defer materialization of OutputData → result.out to individual redirect
35    // handlers. File redirects (Overwrite/Append) can stream OutputData directly
36    // to disk via write_canonical(), avoiding OOM on large structured output.
37    // Merge redirects and the fallthrough path materialize on demand.
38    for redir in redirects {
39        match redir.kind {
40            RedirectKind::MergeStderr => {
41                // 2>&1 - append stderr to stdout
42                // Ensure output is materialized for merge
43                result.materialize();
44                if !result.err.is_empty() {
45                    let err = std::mem::take(&mut result.err);
46                    result.push_out(&err);
47                }
48            }
49            RedirectKind::MergeStdout => {
50                // 1>&2 or >&2 - append stdout to stderr (a text stream).
51                // Binary stdout can't be folded into text stderr without
52                // corruption — fail loud instead.
53                if result.is_bytes() {
54                    return ExecResult::failure(
55                        1,
56                        "redirect: cannot merge binary stdout into stderr (1>&2) — \
57                         redirect it to a file or pipe through base64/xxd",
58                    );
59                }
60                result.materialize();
61                if !result.text_out().is_empty() {
62                    let out = result.text_out().into_owned();
63                    result.err.push_str(&out);
64                    result.clear_out();
65                }
66            }
67            RedirectKind::StdoutOverwrite => {
68                let path = match eval_redirect_target(&redir.target, ctx).await {
69                    Ok(p) => p,
70                    Err(e) => return ExecResult::failure(1, format!("redirect: {e}")),
71                };
72                // A binary result writes its raw bytes (no lossy decode).
73                if let Some(bytes) = result.out_bytes() {
74                    if let Err(e) = redirect_write(ctx, &path, bytes).await {
75                        return ExecResult::failure(1, format!("redirect: {e}"));
76                    }
77                } else if let Some(output) = result.take_output_for_stream() {
78                    // Stream OutputData directly to file if available
79                    let mut buf = Vec::new();
80                    if let Err(e) = output.write_canonical(&mut buf, None) {
81                        return ExecResult::failure(1, format!("redirect: {e}"));
82                    }
83                    if let Err(e) = redirect_write(ctx, &path, &buf).await {
84                        return ExecResult::failure(1, format!("redirect: {e}"));
85                    }
86                } else if let Err(e) = redirect_write(ctx, &path, result.text_out().as_bytes()).await {
87                    return ExecResult::failure(1, format!("redirect: {e}"));
88                }
89                result.clear_out();
90                result.set_output(None);
91            }
92            RedirectKind::StdoutAppend => {
93                let path = match eval_redirect_target(&redir.target, ctx).await {
94                    Ok(p) => p,
95                    Err(e) => return ExecResult::failure(1, format!("redirect: {e}")),
96                };
97                // A binary result appends its raw bytes (no lossy decode).
98                if let Some(bytes) = result.out_bytes() {
99                    if let Err(e) = redirect_append(ctx, &path, bytes).await {
100                        return ExecResult::failure(1, format!("redirect: {e}"));
101                    }
102                } else if let Some(output) = result.take_output_for_stream() {
103                    // Stream OutputData directly if available
104                    let mut buf = Vec::new();
105                    if let Err(e) = output.write_canonical(&mut buf, None) {
106                        return ExecResult::failure(1, format!("redirect: {e}"));
107                    }
108                    if let Err(e) = redirect_append(ctx, &path, &buf).await {
109                        return ExecResult::failure(1, format!("redirect: {e}"));
110                    }
111                } else if let Err(e) = redirect_append(ctx, &path, result.text_out().as_bytes()).await {
112                    return ExecResult::failure(1, format!("redirect: {e}"));
113                }
114                result.clear_out();
115                result.set_output(None);
116            }
117            RedirectKind::Stderr => {
118                let path = match eval_redirect_target(&redir.target, ctx).await {
119                    Ok(p) => p,
120                    Err(e) => return ExecResult::failure(1, format!("redirect: {e}")),
121                };
122                if let Err(e) = redirect_write(ctx, &path, result.err.as_bytes()).await {
123                    return ExecResult::failure(1, format!("redirect: {e}"));
124                }
125                result.err.clear();
126            }
127            RedirectKind::Both => {
128                let path = match eval_redirect_target(&redir.target, ctx).await {
129                    Ok(p) => p,
130                    Err(e) => return ExecResult::failure(1, format!("redirect: {e}")),
131                };
132                // Build the combined bytes: raw binary stdout (no lossy decode)
133                // or text stdout, followed by stderr.
134                let mut combined: Vec<u8> = match result.out_bytes() {
135                    Some(b) => b.to_vec(),
136                    None => result.text_out().into_owned().into_bytes(),
137                };
138                combined.extend_from_slice(result.err.as_bytes());
139                if let Err(e) = redirect_write(ctx, &path, &combined).await {
140                    return ExecResult::failure(1, format!("redirect: {e}"));
141                }
142                result.clear_out();
143                result.set_output(None);
144                result.err.clear();
145            }
146            // Pre-execution redirects - already handled before command execution
147            RedirectKind::Stdin | RedirectKind::HereDoc | RedirectKind::HereString => {}
148        }
149    }
150    // Materialize any remaining OutputData into result.out.
151    // Callers (accumulate_result, pipeline piping) expect .out to be populated
152    // after apply_redirects returns. File redirects above consume .output directly
153    // via streaming; this only fires when no redirect consumed it.
154    result.materialize();
155    result
156}
157
158/// Evaluate a redirect target expression to get the file path (or heredoc body).
159///
160/// Routes through `ctx.dispatcher` so command substitution (`$(...)`) in the
161/// target runs — e.g. `cat < $(echo f)`, `echo x > $(echo f)`, and `$(...)`
162/// inside a heredoc body. Falls back to the sync evaluator (which skips
163/// command substitution) only when no dispatcher is attached.
164async fn eval_redirect_target(expr: &Expr, ctx: &ExecContext) -> Result<String, String> {
165    if let Some(dispatcher) = &ctx.dispatcher {
166        dispatcher
167            .eval_expr(expr, ctx)
168            .await
169            .map(|v| value_to_string(&v))
170            .map_err(|e| e.to_string())
171    } else {
172        eval_simple_expr(expr, ctx)
173            .map(|v| value_to_string(&v))
174            .ok_or_else(|| "could not evaluate redirect target".to_string())
175    }
176}
177
178/// Write data to a file via the VFS backend.
179///
180/// The redirect target is resolved against `ctx.cwd` (like every other path
181/// operand — see `cat`/`cp`/etc.), so a relative `> f` write and a later
182/// relative read agree on the same `$PWD/f`. Without this the router would
183/// normalize a bare relative path to `/f`, diverging from cwd-resolved reads.
184async fn redirect_write(ctx: &ExecContext, path: &str, data: &[u8]) -> Result<(), String> {
185    use crate::backend::WriteMode;
186    let resolved = ctx.resolve_path(path);
187    ctx.backend.write(&resolved, data, WriteMode::Overwrite).await.map_err(|e| e.to_string())
188}
189
190/// Append data to a file via the VFS backend.
191///
192/// Resolves the target against `ctx.cwd` for the same reason as `redirect_write`.
193async fn redirect_append(ctx: &ExecContext, path: &str, data: &[u8]) -> Result<(), String> {
194    let resolved = ctx.resolve_path(path);
195    ctx.backend.append(&resolved, data).await.map_err(|e| e.to_string())
196}
197
198/// Set up stdin from redirects (< file, <<heredoc).
199/// Called before command execution.
200///
201/// `< file` reads through the VFS backend (not the host filesystem) with the
202/// target resolved against `ctx.cwd`, mirroring how `cat` and the output
203/// redirects resolve their operands. A missing/unreadable file or non-UTF-8
204/// content is a hard error — we never silently feed the command empty stdin.
205async fn setup_stdin_redirects(cmd: &Command, ctx: &mut ExecContext) -> Result<(), String> {
206    use std::path::Path;
207    for redir in &cmd.redirects {
208        match &redir.kind {
209            RedirectKind::Stdin => {
210                let path = eval_redirect_target(&redir.target, ctx).await?;
211                let resolved = ctx.resolve_path(&path);
212                let data = ctx
213                    .backend
214                    .read(Path::new(&resolved), None)
215                    .await
216                    .map_err(|e| format!("redirect: {path}: {e}"))?;
217                let content = String::from_utf8(data)
218                    .map_err(|_| format!("redirect: {path}: invalid UTF-8"))?;
219                ctx.set_stdin(content);
220            }
221            RedirectKind::HereDoc => {
222                match &redir.target {
223                    Expr::Literal(Value::String(content)) => {
224                        ctx.set_stdin(content.clone());
225                    }
226                    // Heredoc bodies may contain `$(...)`; route through the
227                    // dispatcher so command substitution runs.
228                    expr => {
229                        let body = eval_redirect_target(expr, ctx).await?;
230                        ctx.set_stdin(body);
231                    }
232                }
233            }
234            RedirectKind::HereString => {
235                // Per bash, here-strings append a trailing newline to the
236                // expanded word so the command receives a terminated line.
237                let mut s = eval_redirect_target(&redir.target, ctx).await?;
238                s.push('\n');
239                ctx.set_stdin(s);
240            }
241            _ => {}
242        }
243    }
244    Ok(())
245}
246
247/// Runs pipelines by spawning tasks and connecting them via channels.
248#[derive(Clone)]
249pub struct PipelineRunner {
250    tools: Arc<ToolRegistry>,
251}
252
253impl PipelineRunner {
254    /// Create a new pipeline runner with the given tool registry.
255    pub fn new(tools: Arc<ToolRegistry>) -> Self {
256        Self { tools }
257    }
258
259    /// Execute a pipeline of commands.
260    ///
261    /// Each command's stdout becomes the next command's stdin.
262    /// If the pipeline contains scatter/gather, delegates to ScatterGatherRunner.
263    /// Returns the result of the last command in the pipeline.
264    ///
265    /// The `dispatcher` handles the full command resolution chain (user tools,
266    /// builtins, scripts, external commands, backend tools). The runner handles
267    /// I/O routing: stdin redirects, piping between commands, and output redirects.
268    #[tracing::instrument(level = "debug", skip(self, commands, ctx, dispatcher), fields(command_count = commands.len()))]
269    pub async fn run(
270        &self,
271        commands: &[Command],
272        ctx: &mut ExecContext,
273        dispatcher: &dyn CommandDispatcher,
274    ) -> ExecResult {
275        if commands.is_empty() {
276            return ExecResult::success("");
277        }
278
279        // Check for scatter/gather pipeline
280        if let Some((scatter_idx, gather_idx)) = find_scatter_gather(commands) {
281            return self.run_scatter_gather(commands, scatter_idx, gather_idx, ctx, dispatcher).await;
282        }
283
284        self.run_sequential(commands, ctx, dispatcher).await
285    }
286
287    /// Execute commands sequentially without scatter/gather detection.
288    ///
289    /// Used by `ScatterGatherRunner` for pre_scatter, post_gather, and parallel
290    /// workers. Breaks the async recursion chain (`run` → scatter → `run`).
291    #[tracing::instrument(level = "debug", skip(self, commands, ctx, dispatcher), fields(command_count = commands.len()))]
292    pub async fn run_sequential(
293        &self,
294        commands: &[Command],
295        ctx: &mut ExecContext,
296        dispatcher: &dyn CommandDispatcher,
297    ) -> ExecResult {
298        if commands.is_empty() {
299            return ExecResult::success("");
300        }
301
302        if commands.len() == 1 {
303            // Single command, no piping needed
304            return self.run_single(&commands[0], ctx, None, dispatcher).await;
305        }
306
307        // Multi-command pipeline
308        self.run_pipeline(commands, ctx, dispatcher).await
309    }
310
311    /// Run a scatter/gather pipeline.
312    async fn run_scatter_gather(
313        &self,
314        commands: &[Command],
315        scatter_idx: usize,
316        gather_idx: usize,
317        ctx: &mut ExecContext,
318        dispatcher: &dyn CommandDispatcher,
319    ) -> ExecResult {
320        // Split pipeline into parts
321        let pre_scatter = &commands[..scatter_idx];
322        let scatter_cmd = &commands[scatter_idx];
323        let parallel = &commands[scatter_idx + 1..gather_idx];
324        let gather_cmd = &commands[gather_idx];
325        let post_gather = &commands[gather_idx + 1..];
326
327        // Parse options from scatter and gather commands
328        // These are builtins with simple key=value syntax, no schema-driven parsing needed
329        let scatter_schema = self.tools.get("scatter").map(|t| t.schema());
330        let gather_schema = self.tools.get("gather").map(|t| t.schema());
331        let scatter_opts = parse_scatter_options(&build_tool_args(&scatter_cmd.args, ctx, scatter_schema.as_ref()));
332        let gather_opts = parse_gather_options(&build_tool_args(&gather_cmd.args, ctx, gather_schema.as_ref()));
333
334        // We need an `Arc<dyn CommandDispatcher>` to hand to `ScatterGatherRunner`.
335        // `fork_attached` produces a subkernel whose cancellation token is a
336        // child of the parent's, so a parent timeout/cancel cascades into
337        // the scatter pipeline (and into worker children via further forks).
338        let sequential_dispatcher: Arc<dyn CommandDispatcher> = dispatcher.fork_attached().await;
339
340        let runner = ScatterGatherRunner::new(self.tools.clone(), sequential_dispatcher);
341        runner
342            .run(
343                pre_scatter,
344                scatter_opts,
345                parallel,
346                gather_opts,
347                post_gather,
348                ctx,
349            )
350            .await
351    }
352
353    /// Run a single command with optional stdin.
354    ///
355    /// The dispatcher handles arg parsing, schema lookup, output format, and execution.
356    /// The runner handles stdin setup (redirects + pipeline) and output redirects.
357    #[tracing::instrument(level = "debug", skip(self, cmd, ctx, stdin, dispatcher), fields(command = %cmd.name))]
358    async fn run_single(
359        &self,
360        cmd: &Command,
361        ctx: &mut ExecContext,
362        stdin: Option<String>,
363        dispatcher: &dyn CommandDispatcher,
364    ) -> ExecResult {
365        // Set up stdin from redirects (< file, <<heredoc)
366        if let Err(e) = setup_stdin_redirects(cmd, ctx).await {
367            return ExecResult::failure(1, e);
368        }
369
370        // Set stdin from pipeline (overrides redirect stdin)
371        if let Some(input) = stdin {
372            ctx.set_stdin(input);
373        }
374
375        // Set pipeline position for stdio inheritance decisions
376        ctx.pipeline_position = PipelinePosition::Only;
377
378        // Execute via dispatcher (full resolution chain)
379        let result = match dispatcher.dispatch(cmd, ctx).await {
380            Ok(result) => result,
381            Err(e) => ExecResult::failure(1, e.to_string()),
382        };
383
384        // Apply post-execution redirects
385        apply_redirects(result, &cmd.redirects, ctx).await
386    }
387
388    /// Run a multi-command pipeline concurrently.
389    ///
390    /// Each stage runs in its own tokio task, connected by bounded pipe streams
391    /// (64KB ring buffers with backpressure). This provides:
392    /// - Bounded memory usage (no buffering entire outputs)
393    /// - Backpressure (fast producers wait for slow consumers)
394    /// - Early termination (e.g., `seq 1 1000000 | head -n 5`)
395    ///
396    /// Structured data (`stdin_data`) is passed via oneshot channels alongside pipes.
397    #[tracing::instrument(level = "debug", skip(self, commands, ctx, dispatcher), fields(stage_count = commands.len()))]
398    async fn run_pipeline(
399        &self,
400        commands: &[Command],
401        ctx: &mut ExecContext,
402        dispatcher: &dyn CommandDispatcher,
403    ) -> ExecResult {
404        let stage_count = commands.len();
405        let last_idx = stage_count - 1;
406
407        // Create N-1 pipe pairs connecting adjacent stages
408        let mut pipe_writers: Vec<Option<super::pipe_stream::PipeWriter>> = Vec::new();
409        let mut pipe_readers: Vec<Option<super::pipe_stream::PipeReader>> = Vec::new();
410
411        for _ in 0..last_idx {
412            let (writer, reader) = pipe_stream_default();
413            pipe_writers.push(Some(writer));
414            pipe_readers.push(Some(reader));
415        }
416
417        // Create N-1 oneshot channels for structured data sideband
418        let mut data_senders: Vec<Option<tokio::sync::oneshot::Sender<Option<Value>>>> = Vec::new();
419        let mut data_receivers: Vec<Option<tokio::sync::oneshot::Receiver<Option<Value>>>> = Vec::new();
420
421        for _ in 0..last_idx {
422            let (tx, rx) = tokio::sync::oneshot::channel();
423            data_senders.push(Some(tx));
424            data_receivers.push(Some(rx));
425        }
426
427        let mut handles: Vec<tokio::task::JoinHandle<(ExecResult, ExecContext)>> = Vec::with_capacity(stage_count);
428
429        for (i, cmd) in commands.iter().enumerate() {
430            let mut stage_ctx = ctx.child_for_pipeline();
431            let cmd = cmd.clone();
432
433            // Fork attached: each concurrent pipeline stage needs independent
434            // mutable state, but cancellation should still cascade from the
435            // parent (so a request timeout kills externals running in any
436            // stage, not just the foreground one).
437            let task_dispatcher: Arc<dyn CommandDispatcher> = dispatcher.fork_attached().await;
438
439            // Set up stdin from redirects on the child context. A failure here
440            // (e.g. `cmd < missing`) fails this stage; surface it from inside
441            // the spawned task so the normal join/collection path reports it.
442            let stdin_setup = setup_stdin_redirects(&cmd, &mut stage_ctx).await;
443
444            // Wire pipe_stdin: stage 0 gets parent stdin (if no redirect), others get pipe reader
445            if i == 0 {
446                // First stage inherits the parent's stdin, but only if redirects didn't
447                // already set stdin (e.g., heredoc). Don't overwrite redirect-provided stdin.
448                if stage_ctx.stdin.is_none() {
449                    stage_ctx.stdin = ctx.stdin.take();
450                }
451                if stage_ctx.stdin_data.is_none() {
452                    stage_ctx.stdin_data = ctx.stdin_data.take();
453                }
454            } else {
455                // Intermediate/last stages read from pipe
456                stage_ctx.pipe_stdin = pipe_readers[i - 1].take();
457                // Structured data received via oneshot (resolved at start of execution)
458            }
459
460            // Wire pipe_stdout: last stage writes to ExecResult, others write to pipe
461            if i < last_idx {
462                stage_ctx.pipe_stdout = pipe_writers[i].take();
463            }
464
465            // Set pipeline position
466            stage_ctx.pipeline_position = match i {
467                0 => PipelinePosition::First,
468                n if n == last_idx => PipelinePosition::Last,
469                _ => PipelinePosition::Middle,
470            };
471
472            let data_sender = if i < last_idx { data_senders[i].take() } else { None };
473            let data_receiver = if i > 0 { data_receivers[i - 1].take() } else { None };
474
475            // Propagate the embedder's trace context across the spawn boundary
476            // so each concurrent stage's spans stay in the same trace.
477            let handle: tokio::task::JoinHandle<(ExecResult, ExecContext)> =
478                tokio::spawn(crate::telemetry::bind_current_context(async move {
479                // A stdin-redirect setup failure short-circuits this stage.
480                if let Err(e) = stdin_setup {
481                    return (ExecResult::failure(1, e), stage_ctx);
482                }
483
484                // Receive structured data from previous stage (non-blocking).
485                // Using try_recv avoids a deadlock: streaming builtins (e.g. grep)
486                // write to their pipe_stdout during dispatch. If we blocked here
487                // waiting for the upstream's oneshot (sent after dispatch), the
488                // downstream couldn't start draining the pipe → circular wait.
489                // Builtins that use stdin_data (e.g. jq) fall back to pipe text.
490                if let Some(mut rx) = data_receiver {
491                    if let Ok(data) = rx.try_recv() {
492                        stage_ctx.stdin_data = data;
493                    }
494                    // Err → not ready yet; builtin will read from pipe text
495                }
496
497                // Execute the command
498                let mut result = match task_dispatcher.dispatch(&cmd, &mut stage_ctx).await {
499                    Ok(result) => result,
500                    Err(e) => ExecResult::failure(1, e.to_string()),
501                };
502
503                // Apply post-execution redirects
504                result = apply_redirects(result, &cmd.redirects, &stage_ctx).await;
505
506                // Flush buffered stderr to the kernel's stderr stream.
507                // This delivers error output from intermediate pipeline stages
508                // in real-time (via the kernel drain) instead of silently discarding it.
509                // Redirects like 2>&1 have already cleared result.err, so merged
510                // stderr goes through the pipe as expected.
511                if !result.err.is_empty() {
512                    if let Some(ref stderr) = stage_ctx.stderr {
513                        stderr.write_str(&result.err);
514                        result.err.clear();
515                    }
516                }
517
518                // Send structured data to next stage via oneshot BEFORE pipe write.
519                // The pipe write may block on backpressure (>64KB output), and the
520                // consumer awaits this oneshot before starting execution. Sending
521                // first prevents a circular wait (producer blocked on pipe write,
522                // consumer blocked on oneshot).
523                if let Some(tx) = data_sender {
524                    let _ = tx.send(result.data.clone());
525                }
526
527                // Write output to pipe for next stage (if not last).
528                // Consumer is now unblocked and can drain concurrently.
529                if let Some(mut pipe_out) = stage_ctx.pipe_stdout.take() {
530                    // A binary result flows through the pipe as raw bytes; text
531                    // results as their UTF-8 bytes. Either way the next stage
532                    // gets exactly what was produced — no lossy round-trip.
533                    let bytes: Vec<u8> = match result.out_bytes() {
534                        Some(b) => b.to_vec(),
535                        None => result.text_out().into_owned().into_bytes(),
536                    };
537                    if !bytes.is_empty() {
538                        // Write result to pipe; ignore broken pipe (reader dropped early)
539                        let _ = pipe_out.write_all(&bytes).await;
540                        let _ = pipe_out.shutdown().await;
541                    }
542                    // Drop pipe_out signals EOF to next stage's reader
543                }
544
545                (result, stage_ctx)
546            }));
547
548            handles.push(handle);
549        }
550
551        // Await all stages and return last stage's result.
552        // Sync the last stage's scope back to the parent context so that
553        // variable assignments in the last pipeline stage are visible
554        // (e.g., `echo "Alice" | read NAME`).
555        let mut last_result = ExecResult::success("");
556        let mut panics: Vec<String> = Vec::new();
557        for (i, handle) in handles.into_iter().enumerate() {
558            match handle.await {
559                Ok((result, stage_ctx)) => {
560                    if i == last_idx {
561                        last_result = result;
562                        // Sync last stage's scope and cwd changes back
563                        ctx.scope = stage_ctx.scope;
564                        ctx.cwd = stage_ctx.cwd;
565                        ctx.prev_cwd = stage_ctx.prev_cwd;
566                        ctx.aliases = stage_ctx.aliases;
567                    }
568                }
569                Err(e) => {
570                    panics.push(format!("stage {}: {}", i, e));
571                }
572            }
573        }
574
575        if !panics.is_empty() {
576            last_result = ExecResult::failure(
577                1,
578                format!("pipeline stage(s) panicked: {}", panics.join("; ")),
579            );
580        }
581
582        last_result
583    }
584}
585
586/// Extract parameter types from a tool schema.
587///
588/// Returns a map from param name → param type (e.g., "verbose" → "bool", "output" → "string").
589/// Build a map from flag name → (canonical param name, param type).
590///
591/// Includes both primary names and aliases (with dashes stripped).
592/// For short flags like `-n` aliased to `lines`, maps `"n"` → `("lines", "int", 1)`.
593/// The third tuple slot is `consumes`: how many positionals the flag pulls
594/// per occurrence (1 for standard `--flag value`, 2 for jq's `--arg NAME VAL`).
595///
596/// Positional params (`positional: true`) are excluded — they're not flags,
597/// and including them would mis-route `cat --paths foo.txt` from positional
598/// to named, regressing builtins that read from `args.positional`.
599/// Walk leading positionals to select the active subcommand leaf of a schema.
600///
601/// A flat tool (`schema.subcommands` empty) returns the root immediately —
602/// today's single-leaf behavior. For a subcommand-aware tool each leading
603/// positional, in order, must name a child (by `name` or a command-level
604/// alias) to descend; the first positional that names no child is the leaf's
605/// own argument, and selection stops there. Multi-level trees fall out by
606/// construction (`block edit insert` → two descents).
607///
608/// Routing is **literal-only**: a subcommand selector must be a bareword or
609/// quoted string (both parse to `Expr::Literal(Value::String)`). A *computed*
610/// positional (`$(…)`, `$VAR`, a glob) sitting where a subcommand is required
611/// is an **error**, not a silent guess — kaish can't see its value at parse
612/// time, so picking a leaf from it would misroute the flags that bind against
613/// the leaf's params. The fix is to spell the subcommand out, or use the
614/// `--flag=value` form (which binds without any schema lookup).
615///
616/// Returned leaf borrows from `schema`, so its `params`/`subcommands` outlive
617/// any `schema_param_lookup` taken from it.
618///
619/// **Global value flags.** A space-form value flag declared on the *root*
620/// (e.g. kj's global `--confirm <nonce>`) can legitimately precede the
621/// subcommand path. Its value is a positional in the AST, so routing must not
622/// mistake it for a subcommand selector — `select_leaf` skips the value of any
623/// root-declared non-bool flag it sees. Leaf-specific value flags can't precede
624/// their own subcommand by construction, so only the root's flags need this.
625pub fn select_leaf<'a>(schema: &'a ToolSchema, args: &[Arg]) -> anyhow::Result<&'a ToolSchema> {
626    // Names + aliases of root-declared value (non-bool, non-positional) flags,
627    // whose space-form value is a positional we must skip while routing.
628    let root_lookup = schema_param_lookup(schema);
629    let is_root_value_flag = |name: &str| -> bool {
630        root_lookup.get(name).is_some_and(|(_, typ, _)| !is_bool_type(typ))
631    };
632
633    let mut node = schema;
634    let mut skip_next_positional = false;
635    for arg in args {
636        match arg {
637            // Tokens past `--` are raw data, never subcommand selectors.
638            Arg::DoubleDash => break,
639            // A root value flag in space form consumes the next positional as
640            // its value — don't route on that positional.
641            Arg::LongFlag(name) if is_root_value_flag(name) => skip_next_positional = true,
642            Arg::ShortFlag(name) if is_root_value_flag(name) => skip_next_positional = true,
643            Arg::Positional(expr) => {
644                if skip_next_positional {
645                    skip_next_positional = false;
646                    continue; // this positional is the preceding flag's value
647                }
648                if node.subcommands.is_empty() {
649                    break; // leaf reached — remaining positionals are its args
650                }
651                match classify_subcommand_positional(expr) {
652                    SubcommandWord::Word(word) => {
653                        match node.subcommands.iter().find(|c| c.matches_command(word)) {
654                            Some(child) => node = child, // descend
655                            None => break,               // not a subcommand → leaf's own arg
656                        }
657                    }
658                    // A non-string literal (number/bool) can't be a subcommand
659                    // name but its value *is* known; treat it as the leaf's own
660                    // positional and stop — no misroute risk.
661                    SubcommandWord::OtherLiteral => break,
662                    SubcommandWord::Computed(kind) => anyhow::bail!(
663                        "{}: a subcommand name is required here, but got {kind}. \
664                         Subcommands must be literal words — spell it out \
665                         (e.g. `{} <subcommand> …`) or use the `--flag=value` form.",
666                        node.name,
667                        schema.name
668                    ),
669                }
670            }
671            // Flags are skipped during routing; they bind against the leaf.
672            _ => {}
673        }
674    }
675    Ok(node)
676}
677
678/// How a positional reads when a subcommand selector is expected.
679enum SubcommandWord<'a> {
680    /// A literal word that may name a child.
681    Word(&'a str),
682    /// A literal but non-string value — a known value, never a subcommand.
683    OtherLiteral,
684    /// A value computed at runtime; `kind` describes it for the error.
685    Computed(&'static str),
686}
687
688fn classify_subcommand_positional(expr: &Expr) -> SubcommandWord<'_> {
689    match expr {
690        Expr::Literal(Value::String(s)) => SubcommandWord::Word(s),
691        Expr::Literal(_) => SubcommandWord::OtherLiteral,
692        Expr::CommandSubst(_) | Expr::Command(_) => SubcommandWord::Computed("a command substitution `$(…)`"),
693        Expr::VarRef(_)
694        | Expr::VarWithDefault { .. }
695        | Expr::VarLength(_)
696        | Expr::Positional(_)
697        | Expr::AllArgs
698        | Expr::ArgCount
699        | Expr::CurrentPid
700        | Expr::LastExitCode => SubcommandWord::Computed("a variable reference"),
701        Expr::Interpolated(_) | Expr::HereDocBody { .. } => SubcommandWord::Computed("an interpolated string"),
702        Expr::GlobPattern(_) => SubcommandWord::Computed("a glob pattern"),
703        Expr::Arithmetic(_) => SubcommandWord::Computed("an arithmetic expansion"),
704        _ => SubcommandWord::Computed("a value computed at runtime"),
705    }
706}
707
708pub fn schema_param_lookup(schema: &ToolSchema) -> HashMap<String, (&str, &str, usize)> {
709    let mut map = HashMap::new();
710    for p in schema.params.iter().filter(|p| !p.positional) {
711        map.insert(p.name.clone(), (p.name.as_str(), p.param_type.as_str(), p.consumes));
712        for alias in &p.aliases {
713            let stripped = alias.trim_start_matches('-');
714            map.insert(stripped.to_string(), (p.name.as_str(), p.param_type.as_str(), p.consumes));
715        }
716    }
717    map
718}
719
720/// Check if a type is considered boolean.
721pub fn is_bool_type(param_type: &str) -> bool {
722    matches!(param_type.to_lowercase().as_str(), "bool" | "boolean")
723}
724
725/// Build ToolArgs from AST Args, evaluating expressions.
726///
727/// If a schema is provided, uses it to determine argument types:
728/// - For `--flag` where schema says type is non-bool: consume next positional as value
729/// - For `--flag` where schema says type is bool (or unknown): treat as boolean flag
730///
731/// This enables natural shell syntax like `mcp_tool --query "test" --limit 10`.
732pub fn build_tool_args(args: &[Arg], ctx: &ExecContext, schema: Option<&ToolSchema>) -> ToolArgs {
733    let mut tool_args = ToolArgs::new();
734    let param_lookup = schema.map(schema_param_lookup).unwrap_or_default();
735    let accepts_word_assign = schema
736        .map(|s| crate::tools::accepts_word_assign(s.name.as_str()))
737        .unwrap_or(false);
738
739    // Track which positional indices have been consumed as flag values
740    let mut consumed_positionals: std::collections::HashSet<usize> = std::collections::HashSet::new();
741    let mut past_double_dash = false;
742
743    // First pass: find positional args and their indices
744    let mut positional_indices: Vec<(usize, &Expr)> = Vec::new();
745    for (i, arg) in args.iter().enumerate() {
746        if let Arg::Positional(expr) = arg {
747            positional_indices.push((i, expr));
748        }
749    }
750
751    // Second pass: process all args
752    let mut i = 0;
753    while i < args.len() {
754        let arg = &args[i];
755
756        match arg {
757            Arg::DoubleDash => {
758                past_double_dash = true;
759            }
760            Arg::Positional(expr) => {
761                // Check if this positional was consumed by a preceding flag
762                if !consumed_positionals.contains(&i)
763                    && let Some(value) = eval_simple_expr(expr, ctx)
764                {
765                    tool_args.positional.push(value);
766                }
767            }
768            Arg::Named { key, value } => {
769                if let Some(val) = eval_simple_expr(value, ctx) {
770                    tool_args.named.insert(key.clone(), val);
771                }
772            }
773            Arg::WordAssign { key, value } => {
774                if let Some(val) = eval_simple_expr(value, ctx) {
775                    if accepts_word_assign {
776                        tool_args.named.insert(key.clone(), val);
777                    } else {
778                        let val_str = crate::interpreter::value_to_string(&val);
779                        tool_args.positional.push(Value::String(format!("{key}={val_str}")));
780                    }
781                }
782            }
783            Arg::ShortFlag(name) => {
784                if past_double_dash {
785                    tool_args.positional.push(Value::String(format!("-{name}")));
786                } else if name.len() == 1 {
787                    // Single-char short flag: look up schema to check if it takes a value.
788                    // e.g., `-n 5` where `-n` is an alias for `lines` (type: int)
789                    let flag_name = name.as_str();
790                    let lookup = param_lookup.get(flag_name);
791                    let is_bool = lookup
792                        .map(|(_, typ, _)| is_bool_type(typ))
793                        .unwrap_or(true);
794
795                    if is_bool {
796                        tool_args.flags.insert(flag_name.to_string());
797                    } else {
798                        // Non-bool: consume next positional as value, insert under canonical name
799                        let canonical = lookup.map(|(n, _, _)| *n).unwrap_or(flag_name);
800                        let next_positional = positional_indices
801                            .iter()
802                            .find(|(idx, _)| *idx > i && !consumed_positionals.contains(idx));
803
804                        if let Some((pos_idx, expr)) = next_positional {
805                            if let Some(value) = eval_simple_expr(expr, ctx) {
806                                tool_args.named.insert(canonical.to_string(), value);
807                                consumed_positionals.insert(*pos_idx);
808                            } else {
809                                tool_args.flags.insert(flag_name.to_string());
810                            }
811                        } else {
812                            tool_args.flags.insert(flag_name.to_string());
813                        }
814                    }
815                } else if let Some(&(canonical, typ, _)) = param_lookup.get(name.as_str()) {
816                    // Multi-char short flag matches a schema param (POSIX style: -name value)
817                    if is_bool_type(typ) {
818                        tool_args.flags.insert(canonical.to_string());
819                    } else {
820                        let next_positional = positional_indices
821                            .iter()
822                            .find(|(idx, _)| *idx > i && !consumed_positionals.contains(idx));
823                        if let Some((pos_idx, expr)) = next_positional {
824                            if let Some(value) = eval_simple_expr(expr, ctx) {
825                                tool_args.named.insert(canonical.to_string(), value);
826                                consumed_positionals.insert(*pos_idx);
827                            } else {
828                                tool_args.flags.insert(name.clone());
829                            }
830                        } else {
831                            tool_args.flags.insert(name.clone());
832                        }
833                    }
834                } else {
835                    // Multi-char combined flags like -la: always boolean
836                    for c in name.chars() {
837                        tool_args.flags.insert(c.to_string());
838                    }
839                }
840            }
841            Arg::LongFlag(name) => {
842                if past_double_dash {
843                    tool_args.positional.push(Value::String(format!("--{name}")));
844                } else {
845                    // Look up type in schema (checks name and aliases)
846                    let lookup = param_lookup.get(name.as_str());
847                    let is_bool = lookup
848                        .map(|(_, typ, _)| is_bool_type(typ))
849                        .unwrap_or(true); // Unknown params default to bool
850
851                    if is_bool {
852                        tool_args.flags.insert(name.clone());
853                    } else {
854                        // Non-bool: consume next positional as value, insert under canonical name
855                        // Note: the sync build_tool_args does NOT honor `consumes > 1`. The async
856                        // build_args_async in kernel.rs is the only path that supports multi-consume
857                        // flags. Sync callers (scheduler pipelines for --json-marker plumbing) don't
858                        // yet need that; if they ever do, lift the logic via a shared helper.
859                        let canonical = lookup.map(|(n, _, _)| *n).unwrap_or(name.as_str());
860                        let next_positional = positional_indices
861                            .iter()
862                            .find(|(idx, _)| *idx > i && !consumed_positionals.contains(idx));
863
864                        if let Some((pos_idx, expr)) = next_positional {
865                            if let Some(value) = eval_simple_expr(expr, ctx) {
866                                tool_args.named.insert(canonical.to_string(), value);
867                                consumed_positionals.insert(*pos_idx);
868                            } else {
869                                tool_args.flags.insert(name.clone());
870                            }
871                        } else {
872                            tool_args.flags.insert(name.clone());
873                        }
874                    }
875                }
876            }
877        }
878        i += 1;
879    }
880
881    // Map remaining positionals to unfilled non-bool schema params (in order).
882    // This enables `drift_push "abc" "hello"` → named["target_ctx"] = "abc", named["content"] = "hello"
883    // Positionals that appeared after `--` are never mapped (they're raw data).
884    // Only for MCP/external tools (map_positionals=true). Builtins handle their own positionals.
885    if let Some(schema) = schema.filter(|s| s.map_positionals) {
886        // Count how many positionals were added before `--`
887        let pre_dash_count = if past_double_dash {
888            // Find where the double-dash was in the original args to count pre-dash positionals
889            let dash_pos = args.iter().position(|a| matches!(a, Arg::DoubleDash)).unwrap_or(args.len());
890            // Count unconsumed positionals before the double-dash
891            positional_indices.iter()
892                .filter(|(idx, _)| *idx < dash_pos && !consumed_positionals.contains(idx))
893                .count()
894        } else {
895            tool_args.positional.len()
896        };
897
898        let mut remaining = Vec::new();
899        let mut positional_iter = tool_args.positional.drain(..).enumerate();
900
901        for param in &schema.params {
902            if tool_args.named.contains_key(&param.name) || tool_args.flags.contains(&param.name) {
903                continue; // Already filled by a flag or named arg
904            }
905            if is_bool_type(&param.param_type) {
906                continue; // Bool params should only be set by flags
907            }
908            // Take from pre-dash positionals only
909            loop {
910                match positional_iter.next() {
911                    Some((idx, val)) if idx < pre_dash_count => {
912                        tool_args.named.insert(param.name.clone(), val);
913                        break;
914                    }
915                    Some((_, val)) => {
916                        remaining.push(val); // Post-dash or past limit, keep as positional
917                    }
918                    None => break,
919                }
920            }
921        }
922
923        // Any leftover positionals stay positional (e.g. `cat file1 file2`)
924        remaining.extend(positional_iter.map(|(_, v)| v));
925        tool_args.positional = remaining;
926    }
927
928    tool_args
929}
930
931/// Simple expression evaluation for args (without full scope access).
932pub(crate) fn eval_simple_expr(expr: &Expr, ctx: &ExecContext) -> Option<Value> {
933    match expr {
934        Expr::Literal(value) => Some(eval_literal(value, ctx)),
935        Expr::VarRef(path) => ctx.scope.resolve_path(path),
936        Expr::Interpolated(parts) => {
937            let mut result = String::new();
938            for part in parts {
939                match part {
940                    crate::ast::StringPart::Literal(s) => result.push_str(s),
941                    crate::ast::StringPart::Var(path) => {
942                        if let Some(value) = ctx.scope.resolve_path(path) {
943                            result.push_str(&value_to_string(&value));
944                        }
945                    }
946                    crate::ast::StringPart::VarWithDefault { name, default } => {
947                        match ctx.scope.get(name) {
948                            Some(value) => {
949                                let s = value_to_string(value);
950                                if s.is_empty() {
951                                    result.push_str(&eval_string_parts_sync(default, ctx));
952                                } else {
953                                    result.push_str(&s);
954                                }
955                            }
956                            None => result.push_str(&eval_string_parts_sync(default, ctx)),
957                        }
958                    }
959                    crate::ast::StringPart::VarLength(name) => {
960                        let len = match ctx.scope.get(name) {
961                            Some(value) => value_to_string(value).len(),
962                            None => 0,
963                        };
964                        result.push_str(&len.to_string());
965                    }
966                    crate::ast::StringPart::Positional(n) => {
967                        if let Some(s) = ctx.scope.get_positional(*n) {
968                            result.push_str(s);
969                        }
970                    }
971                    crate::ast::StringPart::AllArgs => {
972                        result.push_str(&ctx.scope.all_args().join(" "));
973                    }
974                    crate::ast::StringPart::ArgCount => {
975                        result.push_str(&ctx.scope.arg_count().to_string());
976                    }
977                    crate::ast::StringPart::Arithmetic(expr) => {
978                        // Evaluate arithmetic in pipeline context
979                        if let Ok(value) = arithmetic::eval_arithmetic(expr, &ctx.scope) {
980                            result.push_str(&value.to_string());
981                        }
982                    }
983                    crate::ast::StringPart::CommandSubst(_) => {
984                        // Command substitution requires async - skip in sync context
985                    }
986                    crate::ast::StringPart::LastExitCode => {
987                        result.push_str(&ctx.scope.last_result().code.to_string());
988                    }
989                    crate::ast::StringPart::CurrentPid => {
990                        result.push_str(&ctx.scope.pid().to_string());
991                    }
992                }
993            }
994            Some(Value::String(result))
995        }
996        Expr::GlobPattern(s) => Some(Value::String(s.clone())),
997        Expr::HereDocBody { parts, strip_tabs } => {
998            // Heredoc body materialization for redirect targets. Reuses the
999            // shared sync part-walker; tab stripping is applied after the
1000            // body is assembled, matching the interpreter's eval path.
1001            let unwrapped: Vec<crate::ast::StringPart> =
1002                parts.iter().map(|sp| sp.part.clone()).collect();
1003            let raw = eval_string_parts_sync(&unwrapped, ctx);
1004            let body = if *strip_tabs {
1005                crate::interpreter::strip_leading_tabs(&raw)
1006            } else {
1007                raw
1008            };
1009            Some(Value::String(body))
1010        }
1011        _ => None, // Binary ops and command subst need more context
1012    }
1013}
1014
1015/// Evaluate a literal value.
1016fn eval_literal(value: &Value, _ctx: &ExecContext) -> Value {
1017    value.clone()
1018}
1019
1020/// Convert a value to a string for interpolation.
1021fn value_to_string(value: &Value) -> String {
1022    match value {
1023        Value::Null => "".to_string(),
1024        Value::Bool(b) => b.to_string(),
1025        Value::Int(i) => i.to_string(),
1026        Value::Float(f) => f.to_string(),
1027        Value::String(s) => s.clone(),
1028        Value::Json(json) => json.to_string(),
1029        Value::Bytes(b) => format!("[binary: {} bytes]", b.len()),
1030    }
1031}
1032
1033/// Evaluate string parts synchronously (for pipeline context).
1034/// Command substitutions are skipped as they require async.
1035fn eval_string_parts_sync(parts: &[crate::ast::StringPart], ctx: &ExecContext) -> String {
1036    let mut result = String::new();
1037    for part in parts {
1038        match part {
1039            crate::ast::StringPart::Literal(s) => result.push_str(s),
1040            crate::ast::StringPart::Var(path) => {
1041                if let Some(value) = ctx.scope.resolve_path(path) {
1042                    result.push_str(&value_to_string(&value));
1043                }
1044            }
1045            crate::ast::StringPart::VarWithDefault { name, default } => {
1046                match ctx.scope.get(name) {
1047                    Some(value) => {
1048                        let s = value_to_string(value);
1049                        if s.is_empty() {
1050                            result.push_str(&eval_string_parts_sync(default, ctx));
1051                        } else {
1052                            result.push_str(&s);
1053                        }
1054                    }
1055                    None => result.push_str(&eval_string_parts_sync(default, ctx)),
1056                }
1057            }
1058            crate::ast::StringPart::VarLength(name) => {
1059                let len = match ctx.scope.get(name) {
1060                    Some(value) => value_to_string(value).len(),
1061                    None => 0,
1062                };
1063                result.push_str(&len.to_string());
1064            }
1065            crate::ast::StringPart::Positional(n) => {
1066                if let Some(s) = ctx.scope.get_positional(*n) {
1067                    result.push_str(s);
1068                }
1069            }
1070            crate::ast::StringPart::AllArgs => {
1071                result.push_str(&ctx.scope.all_args().join(" "));
1072            }
1073            crate::ast::StringPart::ArgCount => {
1074                result.push_str(&ctx.scope.arg_count().to_string());
1075            }
1076            crate::ast::StringPart::Arithmetic(expr) => {
1077                if let Ok(value) = arithmetic::eval_arithmetic(expr, &ctx.scope) {
1078                    result.push_str(&value.to_string());
1079                }
1080            }
1081            crate::ast::StringPart::CommandSubst(_) => {
1082                // Command substitution requires async - skip in sync context
1083            }
1084            crate::ast::StringPart::LastExitCode => {
1085                result.push_str(&ctx.scope.last_result().code.to_string());
1086            }
1087            crate::ast::StringPart::CurrentPid => {
1088                result.push_str(&ctx.scope.pid().to_string());
1089            }
1090        }
1091    }
1092    result
1093}
1094
1095/// Find scatter and gather commands in a pipeline.
1096///
1097/// Returns Some((scatter_index, gather_index)) if both are found with scatter before gather.
1098/// Returns None if the pipeline doesn't have a valid scatter/gather pattern.
1099fn find_scatter_gather(commands: &[Command]) -> Option<(usize, usize)> {
1100    let scatter_idx = commands.iter().position(|c| c.name == "scatter")?;
1101    let gather_idx = commands.iter().position(|c| c.name == "gather")?;
1102
1103    // Gather must come after scatter
1104    if gather_idx > scatter_idx {
1105        Some((scatter_idx, gather_idx))
1106    } else {
1107        None
1108    }
1109}
1110
1111#[cfg(test)]
1112mod select_leaf_tests {
1113    use super::*;
1114    use crate::tools::ParamSchema;
1115
1116    /// `kj`-shaped tree: kj → context (alias ctx) → {list (alias ls), create}.
1117    /// Root carries a global `--confirm <nonce>` value flag and a `--verbose`
1118    /// bool; `create` carries a leaf `--type` value flag — enough to exercise
1119    /// global-flag skipping and leaf binding.
1120    fn kj_schema() -> ToolSchema {
1121        ToolSchema::new("kj", "kaijutsu")
1122            .param(ParamSchema::new("confirm", "string"))
1123            .param(ParamSchema::new("verbose", "bool"))
1124            .subcommand(
1125                ToolSchema::new("context", "context ops")
1126                    .with_command_aliases(["ctx"])
1127                    .subcommand(ToolSchema::new("list", "list").with_command_aliases(["ls"]))
1128                    .subcommand(
1129                        ToolSchema::new("create", "create").param(
1130                            ParamSchema::new("type", "string").with_aliases(["t"]),
1131                        ),
1132                    ),
1133            )
1134    }
1135
1136    fn word(s: &str) -> Arg {
1137        Arg::Positional(Expr::Literal(Value::String(s.to_string())))
1138    }
1139
1140    #[test]
1141    fn flat_tool_returns_root() {
1142        let schema = ToolSchema::new("cat", "concat")
1143            .param(ParamSchema::required("path", "string", "f").positional());
1144        let leaf = select_leaf(&schema, &[word("foo.txt")]).expect("flat ok");
1145        assert_eq!(leaf.name, "cat");
1146    }
1147
1148    #[test]
1149    fn single_hop() {
1150        let schema = kj_schema();
1151        let leaf = select_leaf(&schema, &[word("context")]).expect("ok");
1152        assert_eq!(leaf.name, "context");
1153    }
1154
1155    #[test]
1156    fn two_hops() {
1157        let schema = kj_schema();
1158        let leaf = select_leaf(&schema, &[word("context"), word("create")]).expect("ok");
1159        assert_eq!(leaf.name, "create");
1160        assert!(leaf.params.iter().any(|p| p.name == "type"), "leaf has --type");
1161    }
1162
1163    #[test]
1164    fn alias_hops_route() {
1165        let schema = kj_schema();
1166        // `kj ctx ls` → context.list via command aliases.
1167        let leaf = select_leaf(&schema, &[word("ctx"), word("ls")]).expect("ok");
1168        assert_eq!(leaf.name, "list");
1169    }
1170
1171    #[test]
1172    fn unknown_subcommand_stops_at_current_node() {
1173        let schema = kj_schema();
1174        // `context nonesuch` — `nonesuch` names no child, so context is the leaf
1175        // and `nonesuch` is context's own positional. No error.
1176        let leaf = select_leaf(&schema, &[word("context"), word("nonesuch")]).expect("ok");
1177        assert_eq!(leaf.name, "context");
1178    }
1179
1180    #[test]
1181    fn root_bool_flag_before_path_does_not_disrupt_routing() {
1182        let schema = kj_schema();
1183        // `kj --verbose context create` — a root bool flag is skipped, both
1184        // positionals route to create.
1185        let args = vec![Arg::LongFlag("verbose".into()), word("context"), word("create")];
1186        let leaf = select_leaf(&schema, &args).expect("ok");
1187        assert_eq!(leaf.name, "create");
1188    }
1189
1190    #[test]
1191    fn root_value_flag_space_form_before_path_skips_its_value() {
1192        let schema = kj_schema();
1193        // `kj --confirm nonce context create` — `nonce` is --confirm's value,
1194        // NOT a subcommand selector; routing skips it and reaches create.
1195        let args = vec![
1196            Arg::LongFlag("confirm".into()),
1197            word("nonce"),
1198            word("context"),
1199            word("create"),
1200        ];
1201        let leaf = select_leaf(&schema, &args).expect("ok");
1202        assert_eq!(leaf.name, "create");
1203    }
1204
1205    #[test]
1206    fn leaf_value_flag_after_path_routes_to_leaf() {
1207        let schema = kj_schema();
1208        // `kj context create --type x` — the natural form: path first, leaf flag
1209        // after. Routing reaches create; --type then binds against create.
1210        let args = vec![
1211            word("context"),
1212            word("create"),
1213            Arg::LongFlag("type".into()),
1214            word("x"),
1215        ];
1216        let leaf = select_leaf(&schema, &args).expect("ok");
1217        assert_eq!(leaf.name, "create");
1218        assert!(leaf.params.iter().any(|p| p.name == "type"));
1219    }
1220
1221    #[test]
1222    fn double_dash_stops_routing() {
1223        let schema = kj_schema();
1224        // `kj -- context` — after `--`, `context` is raw data, not a subcommand.
1225        let leaf = select_leaf(&schema, &[Arg::DoubleDash, word("context")]).expect("ok");
1226        assert_eq!(leaf.name, "kj");
1227    }
1228
1229    #[test]
1230    fn computed_subcommand_selector_errors() {
1231        let schema = kj_schema();
1232        // `kj $(echo context)` — a command substitution where a subcommand name
1233        // is required must fail loud, not silently pick a leaf.
1234        let args = vec![Arg::Positional(Expr::CommandSubst(vec![
1235            crate::ast::Stmt::Command(crate::ast::Command {
1236                name: "echo".into(),
1237                args: vec![],
1238                redirects: vec![],
1239            }),
1240        ]))];
1241        let err = select_leaf(&schema, &args).expect_err("must error");
1242        let msg = err.to_string();
1243        assert!(msg.contains("subcommand name is required"), "got: {msg}");
1244        assert!(msg.contains("command substitution"), "names the cause: {msg}");
1245    }
1246
1247    #[test]
1248    fn variable_subcommand_selector_errors() {
1249        let schema = kj_schema();
1250        let args = vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("sub")))];
1251        let err = select_leaf(&schema, &args).expect_err("must error");
1252        assert!(err.to_string().contains("variable reference"), "got: {err}");
1253    }
1254
1255    #[test]
1256    fn computed_positional_after_leaf_is_fine() {
1257        let schema = kj_schema();
1258        // `kj context list $(echo x)` — once at a leaf (list has no children),
1259        // a computed positional is just an argument; routing already stopped.
1260        let args = vec![
1261            word("context"),
1262            word("list"),
1263            Arg::Positional(Expr::CommandSubst(vec![crate::ast::Stmt::Command(
1264                crate::ast::Command { name: "echo".into(), args: vec![], redirects: vec![] },
1265            )])),
1266        ];
1267        let leaf = select_leaf(&schema, &args).expect("ok");
1268        assert_eq!(leaf.name, "list");
1269    }
1270}
1271
1272#[cfg(test)]
1273mod tests {
1274    use super::*;
1275    use crate::dispatch::BackendDispatcher;
1276    use crate::tools::register_builtins;
1277    use crate::vfs::{Filesystem, MemoryFs, VfsRouter};
1278    use std::path::Path;
1279
1280    async fn make_runner_and_ctx() -> (PipelineRunner, ExecContext, BackendDispatcher) {
1281        let mut tools = ToolRegistry::new();
1282        register_builtins(&mut tools);
1283        let tools = Arc::new(tools);
1284        let runner = PipelineRunner::new(tools.clone());
1285        let dispatcher = BackendDispatcher::new(tools.clone());
1286
1287        let mut vfs = VfsRouter::new();
1288        let mem = MemoryFs::new();
1289        mem.write(Path::new("test.txt"), b"hello\nworld\nfoo").await.unwrap();
1290        vfs.mount("/", mem);
1291        let ctx = ExecContext::with_vfs_and_tools(Arc::new(vfs), tools);
1292
1293        (runner, ctx, dispatcher)
1294    }
1295
1296    fn make_cmd(name: &str, args: Vec<&str>) -> Command {
1297        Command {
1298            name: name.to_string(),
1299            args: args.iter().map(|s| Arg::Positional(Expr::Literal(Value::String(s.to_string())))).collect(),
1300            redirects: vec![],
1301        }
1302    }
1303
1304    #[tokio::test]
1305    async fn test_single_command() {
1306        let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1307        let cmd = make_cmd("echo", vec!["hello"]);
1308
1309        let result = runner.run(&[cmd], &mut ctx, &dispatcher).await;
1310        assert!(result.ok());
1311        assert_eq!(result.text_out().trim(), "hello");
1312    }
1313
1314    #[tokio::test]
1315    async fn test_pipeline_echo_grep() {
1316        let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1317
1318        // echo "hello\nworld" | grep pattern="world"
1319        let echo_cmd = Command {
1320            name: "echo".to_string(),
1321            args: vec![Arg::Positional(Expr::Literal(Value::String("hello\nworld".to_string())))],
1322            redirects: vec![],
1323        };
1324        let grep_cmd = Command {
1325            name: "grep".to_string(),
1326            args: vec![Arg::Positional(Expr::Literal(Value::String("world".to_string())))],
1327            redirects: vec![],
1328        };
1329
1330        let result = runner.run(&[echo_cmd, grep_cmd], &mut ctx, &dispatcher).await;
1331        assert!(result.ok());
1332        assert_eq!(result.text_out().trim(), "world");
1333    }
1334
1335    #[tokio::test]
1336    async fn test_pipeline_cat_grep() {
1337        let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1338
1339        // cat /test.txt | grep pattern="hello"
1340        let cat_cmd = make_cmd("cat", vec!["/test.txt"]);
1341        let grep_cmd = Command {
1342            name: "grep".to_string(),
1343            args: vec![Arg::Positional(Expr::Literal(Value::String("hello".to_string())))],
1344            redirects: vec![],
1345        };
1346
1347        let result = runner.run(&[cat_cmd, grep_cmd], &mut ctx, &dispatcher).await;
1348        assert!(result.ok());
1349        assert!(result.text_out().contains("hello"));
1350    }
1351
1352    #[tokio::test]
1353    async fn test_command_not_found() {
1354        let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1355        let cmd = make_cmd("nonexistent", vec![]);
1356
1357        let result = runner.run(&[cmd], &mut ctx, &dispatcher).await;
1358        assert!(!result.ok());
1359        assert_eq!(result.code, 127);
1360        assert!(result.err.contains("not found"));
1361    }
1362
1363    #[tokio::test]
1364    async fn test_pipeline_continues_on_failure() {
1365        // Standard shell semantics: pipeline runs all commands,
1366        // exit code comes from the last command
1367        let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1368
1369        // cat /nonexistent | grep "hello"
1370        // cat fails but grep still runs (on empty input), grep returns 1 (no match)
1371        let cat_cmd = make_cmd("cat", vec!["/nonexistent"]);
1372        let grep_cmd = Command {
1373            name: "grep".to_string(),
1374            args: vec![Arg::Positional(Expr::Literal(Value::String("hello".to_string())))],
1375            redirects: vec![],
1376        };
1377
1378        let result = runner.run(&[cat_cmd, grep_cmd], &mut ctx, &dispatcher).await;
1379        // Exit code comes from last command (grep), not from cat
1380        assert!(!result.ok());
1381    }
1382
1383    #[tokio::test]
1384    async fn test_pipeline_last_command_exit_code() {
1385        // echo hello | cat — both succeed, pipeline succeeds
1386        let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1387
1388        let echo_cmd = make_cmd("echo", vec!["hello"]);
1389        let cat_cmd = make_cmd("cat", vec![]);
1390
1391        let result = runner.run(&[echo_cmd, cat_cmd], &mut ctx, &dispatcher).await;
1392        assert!(result.ok());
1393        assert!(result.text_out().contains("hello"));
1394    }
1395
1396    #[tokio::test]
1397    async fn test_empty_pipeline() {
1398        let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1399        let result = runner.run(&[], &mut ctx, &dispatcher).await;
1400        assert!(result.ok());
1401    }
1402
1403    // === Scatter/Gather Tests ===
1404
1405    #[test]
1406    fn test_find_scatter_gather_both_present() {
1407        let commands = vec![
1408            make_cmd("echo", vec!["a"]),
1409            make_cmd("scatter", vec![]),
1410            make_cmd("process", vec![]),
1411            make_cmd("gather", vec![]),
1412        ];
1413        let result = find_scatter_gather(&commands);
1414        assert_eq!(result, Some((1, 3)));
1415    }
1416
1417    #[test]
1418    fn test_find_scatter_gather_no_scatter() {
1419        let commands = vec![
1420            make_cmd("echo", vec!["a"]),
1421            make_cmd("gather", vec![]),
1422        ];
1423        let result = find_scatter_gather(&commands);
1424        assert!(result.is_none());
1425    }
1426
1427    #[test]
1428    fn test_find_scatter_gather_no_gather() {
1429        let commands = vec![
1430            make_cmd("echo", vec!["a"]),
1431            make_cmd("scatter", vec![]),
1432        ];
1433        let result = find_scatter_gather(&commands);
1434        assert!(result.is_none());
1435    }
1436
1437    #[test]
1438    fn test_find_scatter_gather_wrong_order() {
1439        let commands = vec![
1440            make_cmd("gather", vec![]),
1441            make_cmd("scatter", vec![]),
1442        ];
1443        let result = find_scatter_gather(&commands);
1444        assert!(result.is_none());
1445    }
1446
1447    #[tokio::test]
1448    async fn test_scatter_gather_simple() {
1449        let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1450
1451        // split "a b c" | scatter | echo ${ITEM} | gather
1452        let split_cmd = Command {
1453            name: "split".to_string(),
1454            args: vec![Arg::Positional(Expr::Literal(Value::String("a b c".to_string())))],
1455            redirects: vec![],
1456        };
1457        let scatter_cmd = make_cmd("scatter", vec![]);
1458        let process_cmd = Command {
1459            name: "echo".to_string(),
1460            args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("ITEM")))],
1461            redirects: vec![],
1462        };
1463        let gather_cmd = make_cmd("gather", vec![]);
1464
1465        let result = runner.run(&[split_cmd, scatter_cmd, process_cmd, gather_cmd], &mut ctx, &dispatcher).await;
1466        assert!(result.ok(), "scatter with structured data should succeed: {}", result.err);
1467        // Each echo should output the item
1468        assert!(result.text_out().contains("a"));
1469        assert!(result.text_out().contains("b"));
1470        assert!(result.text_out().contains("c"));
1471    }
1472
1473    #[tokio::test]
1474    async fn test_scatter_gather_empty_input() {
1475        let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1476
1477        // echo "" | scatter | echo ${ITEM} | gather
1478        let echo_cmd = Command {
1479            name: "echo".to_string(),
1480            args: vec![Arg::Positional(Expr::Literal(Value::String("".to_string())))],
1481            redirects: vec![],
1482        };
1483        let scatter_cmd = make_cmd("scatter", vec![]);
1484        let process_cmd = Command {
1485            name: "echo".to_string(),
1486            args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("ITEM")))],
1487            redirects: vec![],
1488        };
1489        let gather_cmd = make_cmd("gather", vec![]);
1490
1491        let result = runner.run(&[echo_cmd, scatter_cmd, process_cmd, gather_cmd], &mut ctx, &dispatcher).await;
1492        assert!(result.ok());
1493        assert!(result.text_out().trim().is_empty());
1494    }
1495
1496    #[tokio::test]
1497    async fn test_scatter_gather_with_structured_stdin() {
1498        let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1499
1500        // Set structured stdin data (as if piped from split/seq)
1501        let data = Value::Json(serde_json::json!(["x", "y", "z"]));
1502        ctx.set_stdin_with_data("x\ny\nz".to_string(), Some(data));
1503
1504        let scatter_cmd = make_cmd("scatter", vec![]);
1505        let process_cmd = Command {
1506            name: "echo".to_string(),
1507            args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("ITEM")))],
1508            redirects: vec![],
1509        };
1510        let gather_cmd = make_cmd("gather", vec![]);
1511
1512        let result = runner.run(&[scatter_cmd, process_cmd, gather_cmd], &mut ctx, &dispatcher).await;
1513        assert!(result.ok(), "scatter with structured stdin should succeed: {}", result.err);
1514        assert!(result.text_out().contains("x"));
1515        assert!(result.text_out().contains("y"));
1516        assert!(result.text_out().contains("z"));
1517    }
1518
1519    #[tokio::test]
1520    async fn test_scatter_gather_json_input() {
1521        let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1522
1523        // Structured JSON array input (as if from split/seq)
1524        let data = Value::Json(serde_json::json!(["one", "two", "three"]));
1525        ctx.set_stdin_with_data(r#"["one", "two", "three"]"#.to_string(), Some(data));
1526
1527        let scatter_cmd = make_cmd("scatter", vec![]);
1528        let process_cmd = Command {
1529            name: "echo".to_string(),
1530            args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("ITEM")))],
1531            redirects: vec![],
1532        };
1533        let gather_cmd = make_cmd("gather", vec![]);
1534
1535        let result = runner.run(&[scatter_cmd, process_cmd, gather_cmd], &mut ctx, &dispatcher).await;
1536        assert!(result.ok(), "scatter with JSON data should succeed: {}", result.err);
1537        assert!(result.text_out().contains("one"));
1538        assert!(result.text_out().contains("two"));
1539        assert!(result.text_out().contains("three"));
1540    }
1541
1542    #[tokio::test]
1543    async fn test_scatter_gather_with_post_gather() {
1544        let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1545
1546        // split "a b" | scatter | echo ${ITEM} | gather | grep "a"
1547        let split_cmd = Command {
1548            name: "split".to_string(),
1549            args: vec![Arg::Positional(Expr::Literal(Value::String("a b".to_string())))],
1550            redirects: vec![],
1551        };
1552        let scatter_cmd = make_cmd("scatter", vec![]);
1553        let process_cmd = Command {
1554            name: "echo".to_string(),
1555            args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("ITEM")))],
1556            redirects: vec![],
1557        };
1558        let gather_cmd = make_cmd("gather", vec![]);
1559        let grep_cmd = Command {
1560            name: "grep".to_string(),
1561            args: vec![Arg::Positional(Expr::Literal(Value::String("a".to_string())))],
1562            redirects: vec![],
1563        };
1564
1565        let result = runner.run(&[split_cmd, scatter_cmd, process_cmd, gather_cmd, grep_cmd], &mut ctx, &dispatcher).await;
1566        assert!(result.ok(), "scatter with post_gather should succeed: {}", result.err);
1567        assert!(result.text_out().contains("a"));
1568        assert!(!result.text_out().contains("b"));
1569    }
1570
1571    #[tokio::test]
1572    async fn test_scatter_custom_var_name() {
1573        let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1574
1575        // Provide structured data (as if from split/seq)
1576        let data = Value::Json(serde_json::json!(["test1", "test2"]));
1577        ctx.set_stdin_with_data("test1\ntest2".to_string(), Some(data));
1578
1579        // scatter --as URL | echo ${URL} | gather
1580        let scatter_cmd = Command {
1581            name: "scatter".to_string(),
1582            args: vec![Arg::Named {
1583                key: "as".to_string(),
1584                value: Expr::Literal(Value::String("URL".to_string())),
1585            }],
1586            redirects: vec![],
1587        };
1588        let process_cmd = Command {
1589            name: "echo".to_string(),
1590            args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("URL")))],
1591            redirects: vec![],
1592        };
1593        let gather_cmd = make_cmd("gather", vec![]);
1594
1595        let result = runner.run(&[scatter_cmd, process_cmd, gather_cmd], &mut ctx, &dispatcher).await;
1596        assert!(result.ok(), "scatter with custom var should succeed: {}", result.err);
1597        assert!(result.text_out().contains("test1"));
1598        assert!(result.text_out().contains("test2"));
1599    }
1600
1601    // === Backend Routing Tests ===
1602
1603    #[tokio::test]
1604    async fn test_pipeline_routes_through_backend() {
1605        use crate::backend::testing::MockBackend;
1606        use std::sync::atomic::Ordering;
1607
1608        // Create mock backend
1609        let (backend, call_count) = MockBackend::new();
1610        let backend: std::sync::Arc<dyn crate::backend::KernelBackend> = std::sync::Arc::new(backend);
1611
1612        // Create context with mock backend
1613        let mut ctx = crate::tools::ExecContext::with_backend(backend);
1614
1615        // BackendDispatcher routes through backend.call_tool()
1616        let tools = std::sync::Arc::new(ToolRegistry::new());
1617        let runner = PipelineRunner::new(tools.clone());
1618        let dispatcher = BackendDispatcher::new(tools);
1619
1620        // Single command should route through backend
1621        let cmd = make_cmd("test-tool", vec!["arg1"]);
1622        let result = runner.run(&[cmd], &mut ctx, &dispatcher).await;
1623
1624        assert!(result.ok(), "Mock backend should return success");
1625        assert_eq!(call_count.load(Ordering::SeqCst), 1, "call_tool should be invoked once");
1626        assert!(result.text_out().contains("mock executed"), "Output should be from mock backend");
1627    }
1628
1629    #[tokio::test]
1630    async fn test_multi_command_pipeline_routes_through_backend() {
1631        use crate::backend::testing::MockBackend;
1632        use std::sync::atomic::Ordering;
1633
1634        let (backend, call_count) = MockBackend::new();
1635        let backend: std::sync::Arc<dyn crate::backend::KernelBackend> = std::sync::Arc::new(backend);
1636        let mut ctx = crate::tools::ExecContext::with_backend(backend);
1637
1638        let tools = std::sync::Arc::new(ToolRegistry::new());
1639        let runner = PipelineRunner::new(tools.clone());
1640        let dispatcher = BackendDispatcher::new(tools);
1641
1642        // Pipeline with 3 commands
1643        let cmd1 = make_cmd("tool1", vec![]);
1644        let cmd2 = make_cmd("tool2", vec![]);
1645        let cmd3 = make_cmd("tool3", vec![]);
1646
1647        let result = runner.run(&[cmd1, cmd2, cmd3], &mut ctx, &dispatcher).await;
1648
1649        assert!(result.ok());
1650        assert_eq!(call_count.load(Ordering::SeqCst), 3, "call_tool should be invoked for each command");
1651    }
1652
1653    // === Schema-Aware Argument Parsing Tests ===
1654
1655    use crate::tools::{ParamSchema, ToolSchema};
1656
1657    fn make_test_schema() -> ToolSchema {
1658        ToolSchema::new("test-tool", "A test tool for schema-aware parsing")
1659            .param(ParamSchema::required("query", "string", "Search query"))
1660            .param(ParamSchema::optional("limit", "int", Value::Int(10), "Max results"))
1661            .param(ParamSchema::optional("verbose", "bool", Value::Bool(false), "Verbose output"))
1662            .param(ParamSchema::optional("output", "string", Value::String("stdout".into()), "Output destination"))
1663            .with_positional_mapping()
1664    }
1665
1666    fn make_minimal_ctx() -> ExecContext {
1667        let mut vfs = VfsRouter::new();
1668        vfs.mount("/", MemoryFs::new());
1669        ExecContext::new(Arc::new(vfs))
1670    }
1671
1672    #[test]
1673    fn test_schema_aware_string_arg() {
1674        // --query "test" should become named: {"query": "test"}
1675        let args = vec![
1676            Arg::LongFlag("query".to_string()),
1677            Arg::Positional(Expr::Literal(Value::String("test".to_string()))),
1678        ];
1679        let schema = make_test_schema();
1680        let ctx = make_minimal_ctx();
1681
1682        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1683
1684        assert!(tool_args.flags.is_empty(), "No flags should be set");
1685        assert!(tool_args.positional.is_empty(), "No positionals - consumed by --query");
1686        assert_eq!(
1687            tool_args.named.get("query"),
1688            Some(&Value::String("test".to_string())),
1689            "--query should consume 'test' as its value"
1690        );
1691    }
1692
1693    #[test]
1694    fn test_schema_aware_bool_flag() {
1695        // --verbose should remain a flag since schema says bool
1696        let args = vec![
1697            Arg::LongFlag("verbose".to_string()),
1698        ];
1699        let schema = make_test_schema();
1700        let ctx = make_minimal_ctx();
1701
1702        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1703
1704        assert!(tool_args.flags.contains("verbose"), "--verbose should be a flag");
1705        assert!(tool_args.named.is_empty(), "No named args");
1706        assert!(tool_args.positional.is_empty(), "No positionals");
1707    }
1708
1709    #[test]
1710    fn test_schema_aware_mixed() {
1711        // mcp_tool file.txt --output out.txt --verbose
1712        // file.txt maps to "query" (first unfilled non-bool schema param)
1713        let args = vec![
1714            Arg::Positional(Expr::Literal(Value::String("file.txt".to_string()))),
1715            Arg::LongFlag("output".to_string()),
1716            Arg::Positional(Expr::Literal(Value::String("out.txt".to_string()))),
1717            Arg::LongFlag("verbose".to_string()),
1718        ];
1719        let schema = make_test_schema();
1720        let ctx = make_minimal_ctx();
1721
1722        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1723
1724        assert!(tool_args.positional.is_empty(), "file.txt consumed as query param");
1725        assert_eq!(
1726            tool_args.named.get("query"),
1727            Some(&Value::String("file.txt".to_string()))
1728        );
1729        assert_eq!(
1730            tool_args.named.get("output"),
1731            Some(&Value::String("out.txt".to_string()))
1732        );
1733        assert!(tool_args.flags.contains("verbose"));
1734    }
1735
1736    #[test]
1737    fn test_schema_aware_multiple_string_args() {
1738        // --query "test" --output "result.json" --verbose --limit 5
1739        let args = vec![
1740            Arg::LongFlag("query".to_string()),
1741            Arg::Positional(Expr::Literal(Value::String("test".to_string()))),
1742            Arg::LongFlag("output".to_string()),
1743            Arg::Positional(Expr::Literal(Value::String("result.json".to_string()))),
1744            Arg::LongFlag("verbose".to_string()),
1745            Arg::LongFlag("limit".to_string()),
1746            Arg::Positional(Expr::Literal(Value::Int(5))),
1747        ];
1748        let schema = make_test_schema();
1749        let ctx = make_minimal_ctx();
1750
1751        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1752
1753        assert!(tool_args.positional.is_empty(), "All positionals consumed");
1754        assert_eq!(
1755            tool_args.named.get("query"),
1756            Some(&Value::String("test".to_string()))
1757        );
1758        assert_eq!(
1759            tool_args.named.get("output"),
1760            Some(&Value::String("result.json".to_string()))
1761        );
1762        assert_eq!(
1763            tool_args.named.get("limit"),
1764            Some(&Value::Int(5))
1765        );
1766        assert!(tool_args.flags.contains("verbose"));
1767    }
1768
1769    #[test]
1770    fn test_schema_aware_double_dash() {
1771        // --output out.txt -- --this-is-data
1772        // After --, everything is positional
1773        let args = vec![
1774            Arg::LongFlag("output".to_string()),
1775            Arg::Positional(Expr::Literal(Value::String("out.txt".to_string()))),
1776            Arg::DoubleDash,
1777            Arg::Positional(Expr::Literal(Value::String("--this-is-data".to_string()))),
1778        ];
1779        let schema = make_test_schema();
1780        let ctx = make_minimal_ctx();
1781
1782        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1783
1784        assert_eq!(
1785            tool_args.named.get("output"),
1786            Some(&Value::String("out.txt".to_string()))
1787        );
1788        // After --, the --this-is-data is treated as a positional (it's a Positional in the args)
1789        assert_eq!(
1790            tool_args.positional,
1791            vec![Value::String("--this-is-data".to_string())]
1792        );
1793    }
1794
1795    #[test]
1796    fn test_no_schema_fallback() {
1797        // Without schema, all --flags are treated as bool flags
1798        let args = vec![
1799            Arg::LongFlag("query".to_string()),
1800            Arg::Positional(Expr::Literal(Value::String("test".to_string()))),
1801        ];
1802        let ctx = make_minimal_ctx();
1803
1804        let tool_args = build_tool_args(&args, &ctx, None);
1805
1806        // Without schema, --query is a flag and "test" is a positional
1807        assert!(tool_args.flags.contains("query"), "--query should be a flag");
1808        assert_eq!(
1809            tool_args.positional,
1810            vec![Value::String("test".to_string())],
1811            "'test' should be a positional"
1812        );
1813    }
1814
1815    #[test]
1816    fn test_unknown_flag_in_schema() {
1817        // --unknown-flag value: --unknown is bool (not in schema), "value" maps to query
1818        let args = vec![
1819            Arg::LongFlag("unknown".to_string()),
1820            Arg::Positional(Expr::Literal(Value::String("value".to_string()))),
1821        ];
1822        let schema = make_test_schema();
1823        let ctx = make_minimal_ctx();
1824
1825        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1826
1827        assert!(tool_args.flags.contains("unknown"));
1828        assert!(tool_args.positional.is_empty(), "value consumed as query param");
1829        assert_eq!(
1830            tool_args.named.get("query"),
1831            Some(&Value::String("value".to_string()))
1832        );
1833    }
1834
1835    #[test]
1836    fn test_named_args_unchanged() {
1837        // key=value syntax should work regardless of schema
1838        let args = vec![
1839            Arg::Named {
1840                key: "query".to_string(),
1841                value: Expr::Literal(Value::String("test".to_string())),
1842            },
1843            Arg::LongFlag("verbose".to_string()),
1844        ];
1845        let schema = make_test_schema();
1846        let ctx = make_minimal_ctx();
1847
1848        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1849
1850        assert_eq!(
1851            tool_args.named.get("query"),
1852            Some(&Value::String("test".to_string()))
1853        );
1854        assert!(tool_args.flags.contains("verbose"));
1855    }
1856
1857    #[test]
1858    fn test_short_flags_unchanged() {
1859        // Short flags -la should expand regardless of schema; file.txt maps to query
1860        let args = vec![
1861            Arg::ShortFlag("la".to_string()),
1862            Arg::Positional(Expr::Literal(Value::String("file.txt".to_string()))),
1863        ];
1864        let schema = make_test_schema();
1865        let ctx = make_minimal_ctx();
1866
1867        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1868
1869        assert!(tool_args.flags.contains("l"));
1870        assert!(tool_args.flags.contains("a"));
1871        assert!(tool_args.positional.is_empty(), "file.txt consumed as query param");
1872        assert_eq!(
1873            tool_args.named.get("query"),
1874            Some(&Value::String("file.txt".to_string()))
1875        );
1876    }
1877
1878    #[test]
1879    fn test_flag_at_end_no_value() {
1880        // --output at end with no value available - treat as flag (lenient)
1881        // file.txt maps to query (first unfilled non-bool param)
1882        let args = vec![
1883            Arg::Positional(Expr::Literal(Value::String("file.txt".to_string()))),
1884            Arg::LongFlag("output".to_string()),
1885        ];
1886        let schema = make_test_schema();
1887        let ctx = make_minimal_ctx();
1888
1889        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1890
1891        // output expects a value but none available after it, so it becomes a flag
1892        assert!(tool_args.flags.contains("output"));
1893        assert!(tool_args.positional.is_empty(), "file.txt consumed as query param");
1894        assert_eq!(
1895            tool_args.named.get("query"),
1896            Some(&Value::String("file.txt".to_string()))
1897        );
1898    }
1899
1900    #[test]
1901    fn test_positional_skips_bool_params() {
1902        // Schema: [query: string, verbose: bool, output: string]
1903        // Args: "val1" "val2"
1904        // Expected: query="val1", verbose unset, output="val2"
1905        let schema = ToolSchema::new("test", "")
1906            .param(ParamSchema::required("query", "string", ""))
1907            .param(ParamSchema::optional(
1908                "verbose",
1909                "bool",
1910                Value::Bool(false),
1911                "",
1912            ))
1913            .param(ParamSchema::optional(
1914                "output",
1915                "string",
1916                Value::Null,
1917                "",
1918            ))
1919            .with_positional_mapping();
1920        let args = vec![
1921            Arg::Positional(Expr::Literal(Value::String("val1".to_string()))),
1922            Arg::Positional(Expr::Literal(Value::String("val2".to_string()))),
1923        ];
1924        let ctx = make_minimal_ctx();
1925
1926        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1927
1928        assert_eq!(
1929            tool_args.named.get("query"),
1930            Some(&Value::String("val1".to_string()))
1931        );
1932        assert_eq!(
1933            tool_args.named.get("output"),
1934            Some(&Value::String("val2".to_string()))
1935        );
1936        assert!(!tool_args.flags.contains("verbose"));
1937        assert!(tool_args.positional.is_empty());
1938    }
1939
1940    #[test]
1941    fn test_positionals_fill_available_slots() {
1942        // Schema has query (string), limit (int), verbose (bool), output (string).
1943        // Three positionals fill the 3 non-bool slots.
1944        let args = vec![
1945            Arg::Positional(Expr::Literal(Value::String("val1".to_string()))),
1946            Arg::Positional(Expr::Literal(Value::String("val2".to_string()))),
1947            Arg::Positional(Expr::Literal(Value::String("val3".to_string()))),
1948        ];
1949        let schema = make_test_schema(); // query, limit(int), verbose(bool), output
1950        let ctx = make_minimal_ctx();
1951
1952        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1953
1954        // val1 → query, val2 → limit (int param but receives string — tool decides),
1955        // val3 → output
1956        assert_eq!(
1957            tool_args.named.get("query"),
1958            Some(&Value::String("val1".to_string()))
1959        );
1960        assert_eq!(
1961            tool_args.named.get("limit"),
1962            Some(&Value::String("val2".to_string()))
1963        );
1964        assert_eq!(
1965            tool_args.named.get("output"),
1966            Some(&Value::String("val3".to_string()))
1967        );
1968        assert!(tool_args.positional.is_empty());
1969    }
1970
1971    #[test]
1972    fn test_truly_excess_positionals() {
1973        // More positionals than non-bool schema params — leftovers stay positional
1974        let schema = ToolSchema::new("test", "")
1975            .param(ParamSchema::required("name", "string", ""))
1976            .with_positional_mapping();
1977        let args = vec![
1978            Arg::Positional(Expr::Literal(Value::String("first".to_string()))),
1979            Arg::Positional(Expr::Literal(Value::String("second".to_string()))),
1980            Arg::Positional(Expr::Literal(Value::String("third".to_string()))),
1981        ];
1982        let ctx = make_minimal_ctx();
1983
1984        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1985
1986        assert_eq!(
1987            tool_args.named.get("name"),
1988            Some(&Value::String("first".to_string()))
1989        );
1990        assert_eq!(
1991            tool_args.positional,
1992            vec![
1993                Value::String("second".to_string()),
1994                Value::String("third".to_string()),
1995            ]
1996        );
1997    }
1998
1999    #[test]
2000    fn test_double_dash_positional_not_mapped() {
2001        // `tool val1 -- val2` — val1 maps to query, val2 stays positional (post-dash)
2002        let args = vec![
2003            Arg::Positional(Expr::Literal(Value::String("val1".to_string()))),
2004            Arg::DoubleDash,
2005            Arg::Positional(Expr::Literal(Value::String("val2".to_string()))),
2006        ];
2007        let schema = make_test_schema();
2008        let ctx = make_minimal_ctx();
2009
2010        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
2011
2012        assert_eq!(
2013            tool_args.named.get("query"),
2014            Some(&Value::String("val1".to_string()))
2015        );
2016        // val2 is after --, should NOT be mapped even though schema has unfilled params
2017        assert_eq!(
2018            tool_args.positional,
2019            vec![Value::String("val2".to_string())]
2020        );
2021    }
2022
2023    #[test]
2024    fn test_all_params_filled_by_flags() {
2025        // All schema params satisfied by explicit flags — no positional mapping needed
2026        let args = vec![
2027            Arg::LongFlag("query".to_string()),
2028            Arg::Positional(Expr::Literal(Value::String("search".to_string()))),
2029            Arg::LongFlag("output".to_string()),
2030            Arg::Positional(Expr::Literal(Value::String("out.txt".to_string()))),
2031            Arg::LongFlag("verbose".to_string()),
2032        ];
2033        let schema = make_test_schema();
2034        let ctx = make_minimal_ctx();
2035
2036        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
2037
2038        assert_eq!(
2039            tool_args.named.get("query"),
2040            Some(&Value::String("search".to_string()))
2041        );
2042        assert_eq!(
2043            tool_args.named.get("output"),
2044            Some(&Value::String("out.txt".to_string()))
2045        );
2046        assert!(tool_args.flags.contains("verbose"));
2047        assert!(tool_args.positional.is_empty());
2048    }
2049
2050    #[test]
2051    fn test_mixed_flags_and_positional_fill() {
2052        // --output foo val1 — output is explicit, val1 maps to query
2053        let args = vec![
2054            Arg::LongFlag("output".to_string()),
2055            Arg::Positional(Expr::Literal(Value::String("foo".to_string()))),
2056            Arg::Positional(Expr::Literal(Value::String("val1".to_string()))),
2057        ];
2058        let schema = make_test_schema();
2059        let ctx = make_minimal_ctx();
2060
2061        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
2062
2063        assert_eq!(
2064            tool_args.named.get("output"),
2065            Some(&Value::String("foo".to_string()))
2066        );
2067        assert_eq!(
2068            tool_args.named.get("query"),
2069            Some(&Value::String("val1".to_string()))
2070        );
2071        assert!(tool_args.positional.is_empty());
2072    }
2073
2074    #[test]
2075    fn test_alias_flag_prevents_mapping_overwrite() {
2076        // -q "search" "out.txt" — -q is alias for query, so out.txt should map to output
2077        let schema = ToolSchema::new("test", "")
2078            .param(ParamSchema::required("query", "string", "").with_aliases(["-q"]))
2079            .param(ParamSchema::required("output", "string", ""))
2080            .with_positional_mapping();
2081        let args = vec![
2082            Arg::ShortFlag("q".to_string()),
2083            Arg::Positional(Expr::Literal(Value::String("search".to_string()))),
2084            Arg::Positional(Expr::Literal(Value::String("out.txt".to_string()))),
2085        ];
2086        let ctx = make_minimal_ctx();
2087
2088        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
2089
2090        assert_eq!(
2091            tool_args.named.get("query"),
2092            Some(&Value::String("search".to_string()))
2093        );
2094        assert_eq!(
2095            tool_args.named.get("output"),
2096            Some(&Value::String("out.txt".to_string()))
2097        );
2098        assert!(tool_args.positional.is_empty());
2099    }
2100
2101    #[test]
2102    fn test_builtin_schema_no_positional_mapping() {
2103        // Builtins have map_positionals=false — positionals stay positional
2104        let schema = ToolSchema::new("echo", "")
2105            .param(ParamSchema::optional("args", "any", Value::Null, ""))
2106            .param(ParamSchema::optional("no_newline", "bool", Value::Bool(false), ""));
2107        // Note: no .with_positional_mapping() — this is a builtin
2108        let args = vec![
2109            Arg::Positional(Expr::Literal(Value::String("hello".to_string()))),
2110            Arg::Positional(Expr::Literal(Value::String("world".to_string()))),
2111        ];
2112        let ctx = make_minimal_ctx();
2113
2114        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
2115
2116        // Positionals should NOT be consumed as named params
2117        assert_eq!(
2118            tool_args.positional,
2119            vec![
2120                Value::String("hello".to_string()),
2121                Value::String("world".to_string()),
2122            ]
2123        );
2124        assert!(!tool_args.named.contains_key("args"));
2125    }
2126
2127    #[test]
2128    fn test_short_flag_with_alias_consumes_value() {
2129        // `-n 5` where `-n` is aliased to `lines` (type: int)
2130        // Should produce named: {"lines": 5}, not flags: {"n"} + positional: [5]
2131        let schema = ToolSchema::new("head", "Output first part of files")
2132            .param(ParamSchema::optional("lines", "int", Value::Int(10), "Number of lines")
2133                .with_aliases(["-n"]));
2134        let args = vec![
2135            Arg::ShortFlag("n".to_string()),
2136            Arg::Positional(Expr::Literal(Value::Int(5))),
2137            Arg::Positional(Expr::Literal(Value::String("/tmp/file.txt".to_string()))),
2138        ];
2139        let ctx = make_minimal_ctx();
2140
2141        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
2142
2143        assert!(tool_args.flags.is_empty(), "no boolean flags: {:?}", tool_args.flags);
2144        assert_eq!(tool_args.named.get("lines"), Some(&Value::Int(5)), "should resolve alias to canonical name");
2145        assert_eq!(tool_args.positional, vec![Value::String("/tmp/file.txt".to_string())]);
2146    }
2147
2148    // === Redirect Execution Tests ===
2149
2150    #[tokio::test]
2151    async fn test_merge_stderr_redirect() {
2152        // Test that 2>&1 merges stderr into stdout
2153        let result = ExecResult::from_output(0, "stdout content", "stderr content");
2154
2155        let redirects = vec![Redirect {
2156            kind: RedirectKind::MergeStderr,
2157            target: Expr::Literal(Value::Null),
2158        }];
2159
2160        let ctx = make_minimal_ctx();
2161        let result = apply_redirects(result, &redirects, &ctx).await;
2162
2163        assert_eq!(&*result.text_out(), "stdout contentstderr content");
2164        assert!(result.err.is_empty());
2165    }
2166
2167    #[tokio::test]
2168    async fn test_merge_stderr_with_empty_stderr() {
2169        // Test that 2>&1 handles empty stderr gracefully
2170        let result = ExecResult::from_output(0, "stdout only", "");
2171
2172        let redirects = vec![Redirect {
2173            kind: RedirectKind::MergeStderr,
2174            target: Expr::Literal(Value::Null),
2175        }];
2176
2177        let ctx = make_minimal_ctx();
2178        let result = apply_redirects(result, &redirects, &ctx).await;
2179
2180        assert_eq!(&*result.text_out(), "stdout only");
2181        assert!(result.err.is_empty());
2182    }
2183
2184    #[tokio::test]
2185    async fn test_merge_stderr_order_matters() {
2186        // Test redirect ordering: 2>&1 > file means:
2187        // 1. First merge stderr into stdout
2188        // 2. Then write stdout to file (leaving both empty for piping)
2189        // This verifies left-to-right processing
2190        let result = ExecResult::from_output(0, "stdout\n", "stderr\n");
2191
2192        // Just 2>&1 - should merge
2193        let redirects = vec![Redirect {
2194            kind: RedirectKind::MergeStderr,
2195            target: Expr::Literal(Value::Null),
2196        }];
2197
2198        let ctx = make_minimal_ctx();
2199        let result = apply_redirects(result, &redirects, &ctx).await;
2200
2201        assert_eq!(&*result.text_out(), "stdout\nstderr\n");
2202        assert!(result.err.is_empty());
2203    }
2204
2205    #[tokio::test]
2206    async fn test_redirect_with_command_execution() {
2207        let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
2208
2209        // echo "hello" with 2>&1 redirect
2210        let cmd = Command {
2211            name: "echo".to_string(),
2212            args: vec![Arg::Positional(Expr::Literal(Value::String("hello".to_string())))],
2213            redirects: vec![Redirect {
2214                kind: RedirectKind::MergeStderr,
2215                target: Expr::Literal(Value::Null),
2216            }],
2217        };
2218
2219        let result = runner.run(&[cmd], &mut ctx, &dispatcher).await;
2220        assert!(result.ok());
2221        // echo produces no stderr, so this just validates the redirect doesn't break anything
2222        assert!(result.text_out().contains("hello"));
2223    }
2224
2225    #[tokio::test]
2226    async fn test_merge_stderr_in_pipeline() {
2227        let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
2228
2229        // echo "output" 2>&1 | grep "output"
2230        // The 2>&1 should be applied to echo's result, then piped to grep
2231        let echo_cmd = Command {
2232            name: "echo".to_string(),
2233            args: vec![Arg::Positional(Expr::Literal(Value::String("output".to_string())))],
2234            redirects: vec![Redirect {
2235                kind: RedirectKind::MergeStderr,
2236                target: Expr::Literal(Value::Null),
2237            }],
2238        };
2239        let grep_cmd = Command {
2240            name: "grep".to_string(),
2241            args: vec![Arg::Positional(Expr::Literal(Value::String("output".to_string())))],
2242            redirects: vec![],
2243        };
2244
2245        let result = runner.run(&[echo_cmd, grep_cmd], &mut ctx, &dispatcher).await;
2246        assert!(result.ok(), "result failed: code={}, err={}", result.code, result.err);
2247        assert!(result.text_out().contains("output"));
2248    }
2249}