Skip to main content

kaish_kernel/scheduler/
pipeline.rs

1//! Pipeline execution for kaish.
2//!
3//! Executes a sequence of commands connected by pipes, where the stdout
4//! of each command becomes the stdin of the next.
5//!
6//! Also handles scatter/gather pipelines for parallel execution.
7
8use std::sync::Arc;
9
10use std::collections::HashMap;
11
12use crate::arithmetic;
13use crate::ast::{Arg, Command, Expr, Redirect, RedirectKind, Value};
14use crate::dispatch::{CommandDispatcher, PipelinePosition};
15use crate::interpreter::ExecResult;
16use crate::tools::{ExecContext, ToolArgs, ToolRegistry, ToolSchema};
17use tokio::io::AsyncWriteExt;
18
19use super::pipe_stream::pipe_stream_default;
20use super::scatter::{
21    parse_gather_options, parse_scatter_options, ScatterGatherRunner,
22};
23
24/// Apply redirects to an execution result.
25///
26/// Pre-execution redirects (Stdin, HereDoc) should be handled before calling.
27/// Post-execution redirects (stdout/stderr to file, merge) applied here.
28/// Redirects are processed left-to-right per POSIX.
29async fn apply_redirects(
30    mut result: ExecResult,
31    redirects: &[Redirect],
32    ctx: &ExecContext,
33) -> ExecResult {
34    // Defer materialization of OutputData → result.out to individual redirect
35    // handlers. File redirects (Overwrite/Append) can stream OutputData directly
36    // to disk via write_canonical(), avoiding OOM on large structured output.
37    // Merge redirects and the fallthrough path materialize on demand.
38    for redir in redirects {
39        match redir.kind {
40            RedirectKind::MergeStderr => {
41                // 2>&1 - append stderr to stdout
42                // Ensure output is materialized for merge
43                result.materialize();
44                if !result.err.is_empty() {
45                    let err = std::mem::take(&mut result.err);
46                    result.push_out(&err);
47                }
48            }
49            RedirectKind::MergeStdout => {
50                // 1>&2 or >&2 - append stdout to stderr
51                result.materialize();
52                if !result.text_out().is_empty() {
53                    let out = result.text_out().into_owned();
54                    result.err.push_str(&out);
55                    result.clear_out();
56                }
57            }
58            RedirectKind::StdoutOverwrite => {
59                let path = match eval_redirect_target(&redir.target, ctx).await {
60                    Ok(p) => p,
61                    Err(e) => return ExecResult::failure(1, format!("redirect: {e}")),
62                };
63                // Stream OutputData directly to file if available
64                if let Some(output) = result.take_output_for_stream() {
65                    let mut buf = Vec::new();
66                    if let Err(e) = output.write_canonical(&mut buf, None) {
67                        return ExecResult::failure(1, format!("redirect: {e}"));
68                    }
69                    if let Err(e) = redirect_write(ctx, &path, &buf).await {
70                        return ExecResult::failure(1, format!("redirect: {e}"));
71                    }
72                } else {
73                    if let Err(e) = redirect_write(ctx, &path, result.text_out().as_bytes()).await {
74                        return ExecResult::failure(1, format!("redirect: {e}"));
75                    }
76                }
77                result.clear_out();
78                result.set_output(None);
79            }
80            RedirectKind::StdoutAppend => {
81                let path = match eval_redirect_target(&redir.target, ctx).await {
82                    Ok(p) => p,
83                    Err(e) => return ExecResult::failure(1, format!("redirect: {e}")),
84                };
85                // Stream OutputData directly if available
86                if let Some(output) = result.take_output_for_stream() {
87                    let mut buf = Vec::new();
88                    if let Err(e) = output.write_canonical(&mut buf, None) {
89                        return ExecResult::failure(1, format!("redirect: {e}"));
90                    }
91                    if let Err(e) = redirect_append(ctx, &path, &buf).await {
92                        return ExecResult::failure(1, format!("redirect: {e}"));
93                    }
94                } else {
95                    if let Err(e) = redirect_append(ctx, &path, result.text_out().as_bytes()).await {
96                        return ExecResult::failure(1, format!("redirect: {e}"));
97                    }
98                }
99                result.clear_out();
100                result.set_output(None);
101            }
102            RedirectKind::Stderr => {
103                let path = match eval_redirect_target(&redir.target, ctx).await {
104                    Ok(p) => p,
105                    Err(e) => return ExecResult::failure(1, format!("redirect: {e}")),
106                };
107                if let Err(e) = redirect_write(ctx, &path, result.err.as_bytes()).await {
108                    return ExecResult::failure(1, format!("redirect: {e}"));
109                }
110                result.err.clear();
111            }
112            RedirectKind::Both => {
113                let path = match eval_redirect_target(&redir.target, ctx).await {
114                    Ok(p) => p,
115                    Err(e) => return ExecResult::failure(1, format!("redirect: {e}")),
116                };
117                let combined = format!("{}{}", result.text_out(), result.err);
118                if let Err(e) = redirect_write(ctx, &path, combined.as_bytes()).await {
119                    return ExecResult::failure(1, format!("redirect: {e}"));
120                }
121                result.clear_out();
122                result.set_output(None);
123                result.err.clear();
124            }
125            // Pre-execution redirects - already handled before command execution
126            RedirectKind::Stdin | RedirectKind::HereDoc | RedirectKind::HereString => {}
127        }
128    }
129    // Materialize any remaining OutputData into result.out.
130    // Callers (accumulate_result, pipeline piping) expect .out to be populated
131    // after apply_redirects returns. File redirects above consume .output directly
132    // via streaming; this only fires when no redirect consumed it.
133    result.materialize();
134    result
135}
136
137/// Evaluate a redirect target expression to get the file path (or heredoc body).
138///
139/// Routes through `ctx.dispatcher` so command substitution (`$(...)`) in the
140/// target runs — e.g. `cat < $(echo f)`, `echo x > $(echo f)`, and `$(...)`
141/// inside a heredoc body. Falls back to the sync evaluator (which skips
142/// command substitution) only when no dispatcher is attached.
143async fn eval_redirect_target(expr: &Expr, ctx: &ExecContext) -> Result<String, String> {
144    if let Some(dispatcher) = &ctx.dispatcher {
145        dispatcher
146            .eval_expr(expr, ctx)
147            .await
148            .map(|v| value_to_string(&v))
149            .map_err(|e| e.to_string())
150    } else {
151        eval_simple_expr(expr, ctx)
152            .map(|v| value_to_string(&v))
153            .ok_or_else(|| "could not evaluate redirect target".to_string())
154    }
155}
156
157/// Write data to a file via the VFS backend.
158///
159/// The redirect target is resolved against `ctx.cwd` (like every other path
160/// operand — see `cat`/`cp`/etc.), so a relative `> f` write and a later
161/// relative read agree on the same `$PWD/f`. Without this the router would
162/// normalize a bare relative path to `/f`, diverging from cwd-resolved reads.
163async fn redirect_write(ctx: &ExecContext, path: &str, data: &[u8]) -> Result<(), String> {
164    use crate::backend::WriteMode;
165    let resolved = ctx.resolve_path(path);
166    ctx.backend.write(&resolved, data, WriteMode::Overwrite).await.map_err(|e| e.to_string())
167}
168
169/// Append data to a file via the VFS backend.
170///
171/// Resolves the target against `ctx.cwd` for the same reason as `redirect_write`.
172async fn redirect_append(ctx: &ExecContext, path: &str, data: &[u8]) -> Result<(), String> {
173    let resolved = ctx.resolve_path(path);
174    ctx.backend.append(&resolved, data).await.map_err(|e| e.to_string())
175}
176
177/// Set up stdin from redirects (< file, <<heredoc).
178/// Called before command execution.
179///
180/// `< file` reads through the VFS backend (not the host filesystem) with the
181/// target resolved against `ctx.cwd`, mirroring how `cat` and the output
182/// redirects resolve their operands. A missing/unreadable file or non-UTF-8
183/// content is a hard error — we never silently feed the command empty stdin.
184async fn setup_stdin_redirects(cmd: &Command, ctx: &mut ExecContext) -> Result<(), String> {
185    use std::path::Path;
186    for redir in &cmd.redirects {
187        match &redir.kind {
188            RedirectKind::Stdin => {
189                let path = eval_redirect_target(&redir.target, ctx).await?;
190                let resolved = ctx.resolve_path(&path);
191                let data = ctx
192                    .backend
193                    .read(Path::new(&resolved), None)
194                    .await
195                    .map_err(|e| format!("redirect: {path}: {e}"))?;
196                let content = String::from_utf8(data)
197                    .map_err(|_| format!("redirect: {path}: invalid UTF-8"))?;
198                ctx.set_stdin(content);
199            }
200            RedirectKind::HereDoc => {
201                match &redir.target {
202                    Expr::Literal(Value::String(content)) => {
203                        ctx.set_stdin(content.clone());
204                    }
205                    // Heredoc bodies may contain `$(...)`; route through the
206                    // dispatcher so command substitution runs.
207                    expr => {
208                        let body = eval_redirect_target(expr, ctx).await?;
209                        ctx.set_stdin(body);
210                    }
211                }
212            }
213            RedirectKind::HereString => {
214                // Per bash, here-strings append a trailing newline to the
215                // expanded word so the command receives a terminated line.
216                let mut s = eval_redirect_target(&redir.target, ctx).await?;
217                s.push('\n');
218                ctx.set_stdin(s);
219            }
220            _ => {}
221        }
222    }
223    Ok(())
224}
225
226/// Runs pipelines by spawning tasks and connecting them via channels.
227#[derive(Clone)]
228pub struct PipelineRunner {
229    tools: Arc<ToolRegistry>,
230}
231
232impl PipelineRunner {
233    /// Create a new pipeline runner with the given tool registry.
234    pub fn new(tools: Arc<ToolRegistry>) -> Self {
235        Self { tools }
236    }
237
238    /// Execute a pipeline of commands.
239    ///
240    /// Each command's stdout becomes the next command's stdin.
241    /// If the pipeline contains scatter/gather, delegates to ScatterGatherRunner.
242    /// Returns the result of the last command in the pipeline.
243    ///
244    /// The `dispatcher` handles the full command resolution chain (user tools,
245    /// builtins, scripts, external commands, backend tools). The runner handles
246    /// I/O routing: stdin redirects, piping between commands, and output redirects.
247    #[tracing::instrument(level = "debug", skip(self, commands, ctx, dispatcher), fields(command_count = commands.len()))]
248    pub async fn run(
249        &self,
250        commands: &[Command],
251        ctx: &mut ExecContext,
252        dispatcher: &dyn CommandDispatcher,
253    ) -> ExecResult {
254        if commands.is_empty() {
255            return ExecResult::success("");
256        }
257
258        // Check for scatter/gather pipeline
259        if let Some((scatter_idx, gather_idx)) = find_scatter_gather(commands) {
260            return self.run_scatter_gather(commands, scatter_idx, gather_idx, ctx, dispatcher).await;
261        }
262
263        self.run_sequential(commands, ctx, dispatcher).await
264    }
265
266    /// Execute commands sequentially without scatter/gather detection.
267    ///
268    /// Used by `ScatterGatherRunner` for pre_scatter, post_gather, and parallel
269    /// workers. Breaks the async recursion chain (`run` → scatter → `run`).
270    #[tracing::instrument(level = "debug", skip(self, commands, ctx, dispatcher), fields(command_count = commands.len()))]
271    pub async fn run_sequential(
272        &self,
273        commands: &[Command],
274        ctx: &mut ExecContext,
275        dispatcher: &dyn CommandDispatcher,
276    ) -> ExecResult {
277        if commands.is_empty() {
278            return ExecResult::success("");
279        }
280
281        if commands.len() == 1 {
282            // Single command, no piping needed
283            return self.run_single(&commands[0], ctx, None, dispatcher).await;
284        }
285
286        // Multi-command pipeline
287        self.run_pipeline(commands, ctx, dispatcher).await
288    }
289
290    /// Run a scatter/gather pipeline.
291    async fn run_scatter_gather(
292        &self,
293        commands: &[Command],
294        scatter_idx: usize,
295        gather_idx: usize,
296        ctx: &mut ExecContext,
297        dispatcher: &dyn CommandDispatcher,
298    ) -> ExecResult {
299        // Split pipeline into parts
300        let pre_scatter = &commands[..scatter_idx];
301        let scatter_cmd = &commands[scatter_idx];
302        let parallel = &commands[scatter_idx + 1..gather_idx];
303        let gather_cmd = &commands[gather_idx];
304        let post_gather = &commands[gather_idx + 1..];
305
306        // Parse options from scatter and gather commands
307        // These are builtins with simple key=value syntax, no schema-driven parsing needed
308        let scatter_schema = self.tools.get("scatter").map(|t| t.schema());
309        let gather_schema = self.tools.get("gather").map(|t| t.schema());
310        let scatter_opts = parse_scatter_options(&build_tool_args(&scatter_cmd.args, ctx, scatter_schema.as_ref()));
311        let gather_opts = parse_gather_options(&build_tool_args(&gather_cmd.args, ctx, gather_schema.as_ref()));
312
313        // We need an `Arc<dyn CommandDispatcher>` to hand to `ScatterGatherRunner`.
314        // `fork_attached` produces a subkernel whose cancellation token is a
315        // child of the parent's, so a parent timeout/cancel cascades into
316        // the scatter pipeline (and into worker children via further forks).
317        let sequential_dispatcher: Arc<dyn CommandDispatcher> = dispatcher.fork_attached().await;
318
319        let runner = ScatterGatherRunner::new(self.tools.clone(), sequential_dispatcher);
320        runner
321            .run(
322                pre_scatter,
323                scatter_opts,
324                parallel,
325                gather_opts,
326                post_gather,
327                ctx,
328            )
329            .await
330    }
331
332    /// Run a single command with optional stdin.
333    ///
334    /// The dispatcher handles arg parsing, schema lookup, output format, and execution.
335    /// The runner handles stdin setup (redirects + pipeline) and output redirects.
336    #[tracing::instrument(level = "debug", skip(self, cmd, ctx, stdin, dispatcher), fields(command = %cmd.name))]
337    async fn run_single(
338        &self,
339        cmd: &Command,
340        ctx: &mut ExecContext,
341        stdin: Option<String>,
342        dispatcher: &dyn CommandDispatcher,
343    ) -> ExecResult {
344        // Set up stdin from redirects (< file, <<heredoc)
345        if let Err(e) = setup_stdin_redirects(cmd, ctx).await {
346            return ExecResult::failure(1, e);
347        }
348
349        // Set stdin from pipeline (overrides redirect stdin)
350        if let Some(input) = stdin {
351            ctx.set_stdin(input);
352        }
353
354        // Set pipeline position for stdio inheritance decisions
355        ctx.pipeline_position = PipelinePosition::Only;
356
357        // Execute via dispatcher (full resolution chain)
358        let result = match dispatcher.dispatch(cmd, ctx).await {
359            Ok(result) => result,
360            Err(e) => ExecResult::failure(1, e.to_string()),
361        };
362
363        // Apply post-execution redirects
364        apply_redirects(result, &cmd.redirects, ctx).await
365    }
366
367    /// Run a multi-command pipeline concurrently.
368    ///
369    /// Each stage runs in its own tokio task, connected by bounded pipe streams
370    /// (64KB ring buffers with backpressure). This provides:
371    /// - Bounded memory usage (no buffering entire outputs)
372    /// - Backpressure (fast producers wait for slow consumers)
373    /// - Early termination (e.g., `seq 1 1000000 | head -n 5`)
374    ///
375    /// Structured data (`stdin_data`) is passed via oneshot channels alongside pipes.
376    #[tracing::instrument(level = "debug", skip(self, commands, ctx, dispatcher), fields(stage_count = commands.len()))]
377    async fn run_pipeline(
378        &self,
379        commands: &[Command],
380        ctx: &mut ExecContext,
381        dispatcher: &dyn CommandDispatcher,
382    ) -> ExecResult {
383        let stage_count = commands.len();
384        let last_idx = stage_count - 1;
385
386        // Create N-1 pipe pairs connecting adjacent stages
387        let mut pipe_writers: Vec<Option<super::pipe_stream::PipeWriter>> = Vec::new();
388        let mut pipe_readers: Vec<Option<super::pipe_stream::PipeReader>> = Vec::new();
389
390        for _ in 0..last_idx {
391            let (writer, reader) = pipe_stream_default();
392            pipe_writers.push(Some(writer));
393            pipe_readers.push(Some(reader));
394        }
395
396        // Create N-1 oneshot channels for structured data sideband
397        let mut data_senders: Vec<Option<tokio::sync::oneshot::Sender<Option<Value>>>> = Vec::new();
398        let mut data_receivers: Vec<Option<tokio::sync::oneshot::Receiver<Option<Value>>>> = Vec::new();
399
400        for _ in 0..last_idx {
401            let (tx, rx) = tokio::sync::oneshot::channel();
402            data_senders.push(Some(tx));
403            data_receivers.push(Some(rx));
404        }
405
406        let mut handles: Vec<tokio::task::JoinHandle<(ExecResult, ExecContext)>> = Vec::with_capacity(stage_count);
407
408        for (i, cmd) in commands.iter().enumerate() {
409            let mut stage_ctx = ctx.child_for_pipeline();
410            let cmd = cmd.clone();
411
412            // Fork attached: each concurrent pipeline stage needs independent
413            // mutable state, but cancellation should still cascade from the
414            // parent (so a request timeout kills externals running in any
415            // stage, not just the foreground one).
416            let task_dispatcher: Arc<dyn CommandDispatcher> = dispatcher.fork_attached().await;
417
418            // Set up stdin from redirects on the child context. A failure here
419            // (e.g. `cmd < missing`) fails this stage; surface it from inside
420            // the spawned task so the normal join/collection path reports it.
421            let stdin_setup = setup_stdin_redirects(&cmd, &mut stage_ctx).await;
422
423            // Wire pipe_stdin: stage 0 gets parent stdin (if no redirect), others get pipe reader
424            if i == 0 {
425                // First stage inherits the parent's stdin, but only if redirects didn't
426                // already set stdin (e.g., heredoc). Don't overwrite redirect-provided stdin.
427                if stage_ctx.stdin.is_none() {
428                    stage_ctx.stdin = ctx.stdin.take();
429                }
430                if stage_ctx.stdin_data.is_none() {
431                    stage_ctx.stdin_data = ctx.stdin_data.take();
432                }
433            } else {
434                // Intermediate/last stages read from pipe
435                stage_ctx.pipe_stdin = pipe_readers[i - 1].take();
436                // Structured data received via oneshot (resolved at start of execution)
437            }
438
439            // Wire pipe_stdout: last stage writes to ExecResult, others write to pipe
440            if i < last_idx {
441                stage_ctx.pipe_stdout = pipe_writers[i].take();
442            }
443
444            // Set pipeline position
445            stage_ctx.pipeline_position = match i {
446                0 => PipelinePosition::First,
447                n if n == last_idx => PipelinePosition::Last,
448                _ => PipelinePosition::Middle,
449            };
450
451            let data_sender = if i < last_idx { data_senders[i].take() } else { None };
452            let data_receiver = if i > 0 { data_receivers[i - 1].take() } else { None };
453
454            // Propagate the embedder's trace context across the spawn boundary
455            // so each concurrent stage's spans stay in the same trace.
456            let handle: tokio::task::JoinHandle<(ExecResult, ExecContext)> =
457                tokio::spawn(crate::telemetry::bind_current_context(async move {
458                // A stdin-redirect setup failure short-circuits this stage.
459                if let Err(e) = stdin_setup {
460                    return (ExecResult::failure(1, e), stage_ctx);
461                }
462
463                // Receive structured data from previous stage (non-blocking).
464                // Using try_recv avoids a deadlock: streaming builtins (e.g. grep)
465                // write to their pipe_stdout during dispatch. If we blocked here
466                // waiting for the upstream's oneshot (sent after dispatch), the
467                // downstream couldn't start draining the pipe → circular wait.
468                // Builtins that use stdin_data (e.g. jq) fall back to pipe text.
469                if let Some(mut rx) = data_receiver {
470                    if let Ok(data) = rx.try_recv() {
471                        stage_ctx.stdin_data = data;
472                    }
473                    // Err → not ready yet; builtin will read from pipe text
474                }
475
476                // Execute the command
477                let mut result = match task_dispatcher.dispatch(&cmd, &mut stage_ctx).await {
478                    Ok(result) => result,
479                    Err(e) => ExecResult::failure(1, e.to_string()),
480                };
481
482                // Apply post-execution redirects
483                result = apply_redirects(result, &cmd.redirects, &stage_ctx).await;
484
485                // Flush buffered stderr to the kernel's stderr stream.
486                // This delivers error output from intermediate pipeline stages
487                // in real-time (via the kernel drain) instead of silently discarding it.
488                // Redirects like 2>&1 have already cleared result.err, so merged
489                // stderr goes through the pipe as expected.
490                if !result.err.is_empty() {
491                    if let Some(ref stderr) = stage_ctx.stderr {
492                        stderr.write_str(&result.err);
493                        result.err.clear();
494                    }
495                }
496
497                // Send structured data to next stage via oneshot BEFORE pipe write.
498                // The pipe write may block on backpressure (>64KB output), and the
499                // consumer awaits this oneshot before starting execution. Sending
500                // first prevents a circular wait (producer blocked on pipe write,
501                // consumer blocked on oneshot).
502                if let Some(tx) = data_sender {
503                    let _ = tx.send(result.data.clone());
504                }
505
506                // Write output to pipe for next stage (if not last).
507                // Consumer is now unblocked and can drain concurrently.
508                if let Some(mut pipe_out) = stage_ctx.pipe_stdout.take() {
509                    let text = result.text_out();
510                    if !text.is_empty() {
511                        // Write result to pipe; ignore broken pipe (reader dropped early)
512                        let _ = pipe_out.write_all(text.as_bytes()).await;
513                        let _ = pipe_out.shutdown().await;
514                    }
515                    // Drop pipe_out signals EOF to next stage's reader
516                }
517
518                (result, stage_ctx)
519            }));
520
521            handles.push(handle);
522        }
523
524        // Await all stages and return last stage's result.
525        // Sync the last stage's scope back to the parent context so that
526        // variable assignments in the last pipeline stage are visible
527        // (e.g., `echo "Alice" | read NAME`).
528        let mut last_result = ExecResult::success("");
529        let mut panics: Vec<String> = Vec::new();
530        for (i, handle) in handles.into_iter().enumerate() {
531            match handle.await {
532                Ok((result, stage_ctx)) => {
533                    if i == last_idx {
534                        last_result = result;
535                        // Sync last stage's scope and cwd changes back
536                        ctx.scope = stage_ctx.scope;
537                        ctx.cwd = stage_ctx.cwd;
538                        ctx.prev_cwd = stage_ctx.prev_cwd;
539                        ctx.aliases = stage_ctx.aliases;
540                    }
541                }
542                Err(e) => {
543                    panics.push(format!("stage {}: {}", i, e));
544                }
545            }
546        }
547
548        if !panics.is_empty() {
549            last_result = ExecResult::failure(
550                1,
551                format!("pipeline stage(s) panicked: {}", panics.join("; ")),
552            );
553        }
554
555        last_result
556    }
557}
558
559/// Extract parameter types from a tool schema.
560///
561/// Returns a map from param name → param type (e.g., "verbose" → "bool", "output" → "string").
562/// Build a map from flag name → (canonical param name, param type).
563///
564/// Includes both primary names and aliases (with dashes stripped).
565/// For short flags like `-n` aliased to `lines`, maps `"n"` → `("lines", "int", 1)`.
566/// The third tuple slot is `consumes`: how many positionals the flag pulls
567/// per occurrence (1 for standard `--flag value`, 2 for jq's `--arg NAME VAL`).
568///
569/// Positional params (`positional: true`) are excluded — they're not flags,
570/// and including them would mis-route `cat --paths foo.txt` from positional
571/// to named, regressing builtins that read from `args.positional`.
572/// Walk leading positionals to select the active subcommand leaf of a schema.
573///
574/// A flat tool (`schema.subcommands` empty) returns the root immediately —
575/// today's single-leaf behavior. For a subcommand-aware tool each leading
576/// positional, in order, must name a child (by `name` or a command-level
577/// alias) to descend; the first positional that names no child is the leaf's
578/// own argument, and selection stops there. Multi-level trees fall out by
579/// construction (`block edit insert` → two descents).
580///
581/// Routing is **literal-only**: a subcommand selector must be a bareword or
582/// quoted string (both parse to `Expr::Literal(Value::String)`). A *computed*
583/// positional (`$(…)`, `$VAR`, a glob) sitting where a subcommand is required
584/// is an **error**, not a silent guess — kaish can't see its value at parse
585/// time, so picking a leaf from it would misroute the flags that bind against
586/// the leaf's params. The fix is to spell the subcommand out, or use the
587/// `--flag=value` form (which binds without any schema lookup).
588///
589/// Returned leaf borrows from `schema`, so its `params`/`subcommands` outlive
590/// any `schema_param_lookup` taken from it.
591///
592/// **Global value flags.** A space-form value flag declared on the *root*
593/// (e.g. kj's global `--confirm <nonce>`) can legitimately precede the
594/// subcommand path. Its value is a positional in the AST, so routing must not
595/// mistake it for a subcommand selector — `select_leaf` skips the value of any
596/// root-declared non-bool flag it sees. Leaf-specific value flags can't precede
597/// their own subcommand by construction, so only the root's flags need this.
598pub fn select_leaf<'a>(schema: &'a ToolSchema, args: &[Arg]) -> anyhow::Result<&'a ToolSchema> {
599    // Names + aliases of root-declared value (non-bool, non-positional) flags,
600    // whose space-form value is a positional we must skip while routing.
601    let root_lookup = schema_param_lookup(schema);
602    let is_root_value_flag = |name: &str| -> bool {
603        root_lookup.get(name).is_some_and(|(_, typ, _)| !is_bool_type(typ))
604    };
605
606    let mut node = schema;
607    let mut skip_next_positional = false;
608    for arg in args {
609        match arg {
610            // Tokens past `--` are raw data, never subcommand selectors.
611            Arg::DoubleDash => break,
612            // A root value flag in space form consumes the next positional as
613            // its value — don't route on that positional.
614            Arg::LongFlag(name) if is_root_value_flag(name) => skip_next_positional = true,
615            Arg::ShortFlag(name) if is_root_value_flag(name) => skip_next_positional = true,
616            Arg::Positional(expr) => {
617                if skip_next_positional {
618                    skip_next_positional = false;
619                    continue; // this positional is the preceding flag's value
620                }
621                if node.subcommands.is_empty() {
622                    break; // leaf reached — remaining positionals are its args
623                }
624                match classify_subcommand_positional(expr) {
625                    SubcommandWord::Word(word) => {
626                        match node.subcommands.iter().find(|c| c.matches_command(word)) {
627                            Some(child) => node = child, // descend
628                            None => break,               // not a subcommand → leaf's own arg
629                        }
630                    }
631                    // A non-string literal (number/bool) can't be a subcommand
632                    // name but its value *is* known; treat it as the leaf's own
633                    // positional and stop — no misroute risk.
634                    SubcommandWord::OtherLiteral => break,
635                    SubcommandWord::Computed(kind) => anyhow::bail!(
636                        "{}: a subcommand name is required here, but got {kind}. \
637                         Subcommands must be literal words — spell it out \
638                         (e.g. `{} <subcommand> …`) or use the `--flag=value` form.",
639                        node.name,
640                        schema.name
641                    ),
642                }
643            }
644            // Flags are skipped during routing; they bind against the leaf.
645            _ => {}
646        }
647    }
648    Ok(node)
649}
650
651/// How a positional reads when a subcommand selector is expected.
652enum SubcommandWord<'a> {
653    /// A literal word that may name a child.
654    Word(&'a str),
655    /// A literal but non-string value — a known value, never a subcommand.
656    OtherLiteral,
657    /// A value computed at runtime; `kind` describes it for the error.
658    Computed(&'static str),
659}
660
661fn classify_subcommand_positional(expr: &Expr) -> SubcommandWord<'_> {
662    match expr {
663        Expr::Literal(Value::String(s)) => SubcommandWord::Word(s),
664        Expr::Literal(_) => SubcommandWord::OtherLiteral,
665        Expr::CommandSubst(_) | Expr::Command(_) => SubcommandWord::Computed("a command substitution `$(…)`"),
666        Expr::VarRef(_)
667        | Expr::VarWithDefault { .. }
668        | Expr::VarLength(_)
669        | Expr::Positional(_)
670        | Expr::AllArgs
671        | Expr::ArgCount
672        | Expr::CurrentPid
673        | Expr::LastExitCode => SubcommandWord::Computed("a variable reference"),
674        Expr::Interpolated(_) | Expr::HereDocBody { .. } => SubcommandWord::Computed("an interpolated string"),
675        Expr::GlobPattern(_) => SubcommandWord::Computed("a glob pattern"),
676        Expr::Arithmetic(_) => SubcommandWord::Computed("an arithmetic expansion"),
677        _ => SubcommandWord::Computed("a value computed at runtime"),
678    }
679}
680
681pub fn schema_param_lookup(schema: &ToolSchema) -> HashMap<String, (&str, &str, usize)> {
682    let mut map = HashMap::new();
683    for p in schema.params.iter().filter(|p| !p.positional) {
684        map.insert(p.name.clone(), (p.name.as_str(), p.param_type.as_str(), p.consumes));
685        for alias in &p.aliases {
686            let stripped = alias.trim_start_matches('-');
687            map.insert(stripped.to_string(), (p.name.as_str(), p.param_type.as_str(), p.consumes));
688        }
689    }
690    map
691}
692
693/// Check if a type is considered boolean.
694pub fn is_bool_type(param_type: &str) -> bool {
695    matches!(param_type.to_lowercase().as_str(), "bool" | "boolean")
696}
697
698/// Build ToolArgs from AST Args, evaluating expressions.
699///
700/// If a schema is provided, uses it to determine argument types:
701/// - For `--flag` where schema says type is non-bool: consume next positional as value
702/// - For `--flag` where schema says type is bool (or unknown): treat as boolean flag
703///
704/// This enables natural shell syntax like `mcp_tool --query "test" --limit 10`.
705pub fn build_tool_args(args: &[Arg], ctx: &ExecContext, schema: Option<&ToolSchema>) -> ToolArgs {
706    let mut tool_args = ToolArgs::new();
707    let param_lookup = schema.map(schema_param_lookup).unwrap_or_default();
708    let accepts_word_assign = schema
709        .map(|s| crate::tools::accepts_word_assign(s.name.as_str()))
710        .unwrap_or(false);
711
712    // Track which positional indices have been consumed as flag values
713    let mut consumed_positionals: std::collections::HashSet<usize> = std::collections::HashSet::new();
714    let mut past_double_dash = false;
715
716    // First pass: find positional args and their indices
717    let mut positional_indices: Vec<(usize, &Expr)> = Vec::new();
718    for (i, arg) in args.iter().enumerate() {
719        if let Arg::Positional(expr) = arg {
720            positional_indices.push((i, expr));
721        }
722    }
723
724    // Second pass: process all args
725    let mut i = 0;
726    while i < args.len() {
727        let arg = &args[i];
728
729        match arg {
730            Arg::DoubleDash => {
731                past_double_dash = true;
732            }
733            Arg::Positional(expr) => {
734                // Check if this positional was consumed by a preceding flag
735                if !consumed_positionals.contains(&i)
736                    && let Some(value) = eval_simple_expr(expr, ctx)
737                {
738                    tool_args.positional.push(value);
739                }
740            }
741            Arg::Named { key, value } => {
742                if let Some(val) = eval_simple_expr(value, ctx) {
743                    tool_args.named.insert(key.clone(), val);
744                }
745            }
746            Arg::WordAssign { key, value } => {
747                if let Some(val) = eval_simple_expr(value, ctx) {
748                    if accepts_word_assign {
749                        tool_args.named.insert(key.clone(), val);
750                    } else {
751                        let val_str = crate::interpreter::value_to_string(&val);
752                        tool_args.positional.push(Value::String(format!("{key}={val_str}")));
753                    }
754                }
755            }
756            Arg::ShortFlag(name) => {
757                if past_double_dash {
758                    tool_args.positional.push(Value::String(format!("-{name}")));
759                } else if name.len() == 1 {
760                    // Single-char short flag: look up schema to check if it takes a value.
761                    // e.g., `-n 5` where `-n` is an alias for `lines` (type: int)
762                    let flag_name = name.as_str();
763                    let lookup = param_lookup.get(flag_name);
764                    let is_bool = lookup
765                        .map(|(_, typ, _)| is_bool_type(typ))
766                        .unwrap_or(true);
767
768                    if is_bool {
769                        tool_args.flags.insert(flag_name.to_string());
770                    } else {
771                        // Non-bool: consume next positional as value, insert under canonical name
772                        let canonical = lookup.map(|(n, _, _)| *n).unwrap_or(flag_name);
773                        let next_positional = positional_indices
774                            .iter()
775                            .find(|(idx, _)| *idx > i && !consumed_positionals.contains(idx));
776
777                        if let Some((pos_idx, expr)) = next_positional {
778                            if let Some(value) = eval_simple_expr(expr, ctx) {
779                                tool_args.named.insert(canonical.to_string(), value);
780                                consumed_positionals.insert(*pos_idx);
781                            } else {
782                                tool_args.flags.insert(flag_name.to_string());
783                            }
784                        } else {
785                            tool_args.flags.insert(flag_name.to_string());
786                        }
787                    }
788                } else if let Some(&(canonical, typ, _)) = param_lookup.get(name.as_str()) {
789                    // Multi-char short flag matches a schema param (POSIX style: -name value)
790                    if is_bool_type(typ) {
791                        tool_args.flags.insert(canonical.to_string());
792                    } else {
793                        let next_positional = positional_indices
794                            .iter()
795                            .find(|(idx, _)| *idx > i && !consumed_positionals.contains(idx));
796                        if let Some((pos_idx, expr)) = next_positional {
797                            if let Some(value) = eval_simple_expr(expr, ctx) {
798                                tool_args.named.insert(canonical.to_string(), value);
799                                consumed_positionals.insert(*pos_idx);
800                            } else {
801                                tool_args.flags.insert(name.clone());
802                            }
803                        } else {
804                            tool_args.flags.insert(name.clone());
805                        }
806                    }
807                } else {
808                    // Multi-char combined flags like -la: always boolean
809                    for c in name.chars() {
810                        tool_args.flags.insert(c.to_string());
811                    }
812                }
813            }
814            Arg::LongFlag(name) => {
815                if past_double_dash {
816                    tool_args.positional.push(Value::String(format!("--{name}")));
817                } else {
818                    // Look up type in schema (checks name and aliases)
819                    let lookup = param_lookup.get(name.as_str());
820                    let is_bool = lookup
821                        .map(|(_, typ, _)| is_bool_type(typ))
822                        .unwrap_or(true); // Unknown params default to bool
823
824                    if is_bool {
825                        tool_args.flags.insert(name.clone());
826                    } else {
827                        // Non-bool: consume next positional as value, insert under canonical name
828                        // Note: the sync build_tool_args does NOT honor `consumes > 1`. The async
829                        // build_args_async in kernel.rs is the only path that supports multi-consume
830                        // flags. Sync callers (scheduler pipelines for --json-marker plumbing) don't
831                        // yet need that; if they ever do, lift the logic via a shared helper.
832                        let canonical = lookup.map(|(n, _, _)| *n).unwrap_or(name.as_str());
833                        let next_positional = positional_indices
834                            .iter()
835                            .find(|(idx, _)| *idx > i && !consumed_positionals.contains(idx));
836
837                        if let Some((pos_idx, expr)) = next_positional {
838                            if let Some(value) = eval_simple_expr(expr, ctx) {
839                                tool_args.named.insert(canonical.to_string(), value);
840                                consumed_positionals.insert(*pos_idx);
841                            } else {
842                                tool_args.flags.insert(name.clone());
843                            }
844                        } else {
845                            tool_args.flags.insert(name.clone());
846                        }
847                    }
848                }
849            }
850        }
851        i += 1;
852    }
853
854    // Map remaining positionals to unfilled non-bool schema params (in order).
855    // This enables `drift_push "abc" "hello"` → named["target_ctx"] = "abc", named["content"] = "hello"
856    // Positionals that appeared after `--` are never mapped (they're raw data).
857    // Only for MCP/external tools (map_positionals=true). Builtins handle their own positionals.
858    if let Some(schema) = schema.filter(|s| s.map_positionals) {
859        // Count how many positionals were added before `--`
860        let pre_dash_count = if past_double_dash {
861            // Find where the double-dash was in the original args to count pre-dash positionals
862            let dash_pos = args.iter().position(|a| matches!(a, Arg::DoubleDash)).unwrap_or(args.len());
863            // Count unconsumed positionals before the double-dash
864            positional_indices.iter()
865                .filter(|(idx, _)| *idx < dash_pos && !consumed_positionals.contains(idx))
866                .count()
867        } else {
868            tool_args.positional.len()
869        };
870
871        let mut remaining = Vec::new();
872        let mut positional_iter = tool_args.positional.drain(..).enumerate();
873
874        for param in &schema.params {
875            if tool_args.named.contains_key(&param.name) || tool_args.flags.contains(&param.name) {
876                continue; // Already filled by a flag or named arg
877            }
878            if is_bool_type(&param.param_type) {
879                continue; // Bool params should only be set by flags
880            }
881            // Take from pre-dash positionals only
882            loop {
883                match positional_iter.next() {
884                    Some((idx, val)) if idx < pre_dash_count => {
885                        tool_args.named.insert(param.name.clone(), val);
886                        break;
887                    }
888                    Some((_, val)) => {
889                        remaining.push(val); // Post-dash or past limit, keep as positional
890                    }
891                    None => break,
892                }
893            }
894        }
895
896        // Any leftover positionals stay positional (e.g. `cat file1 file2`)
897        remaining.extend(positional_iter.map(|(_, v)| v));
898        tool_args.positional = remaining;
899    }
900
901    tool_args
902}
903
904/// Simple expression evaluation for args (without full scope access).
905pub(crate) fn eval_simple_expr(expr: &Expr, ctx: &ExecContext) -> Option<Value> {
906    match expr {
907        Expr::Literal(value) => Some(eval_literal(value, ctx)),
908        Expr::VarRef(path) => ctx.scope.resolve_path(path),
909        Expr::Interpolated(parts) => {
910            let mut result = String::new();
911            for part in parts {
912                match part {
913                    crate::ast::StringPart::Literal(s) => result.push_str(s),
914                    crate::ast::StringPart::Var(path) => {
915                        if let Some(value) = ctx.scope.resolve_path(path) {
916                            result.push_str(&value_to_string(&value));
917                        }
918                    }
919                    crate::ast::StringPart::VarWithDefault { name, default } => {
920                        match ctx.scope.get(name) {
921                            Some(value) => {
922                                let s = value_to_string(value);
923                                if s.is_empty() {
924                                    result.push_str(&eval_string_parts_sync(default, ctx));
925                                } else {
926                                    result.push_str(&s);
927                                }
928                            }
929                            None => result.push_str(&eval_string_parts_sync(default, ctx)),
930                        }
931                    }
932                    crate::ast::StringPart::VarLength(name) => {
933                        let len = match ctx.scope.get(name) {
934                            Some(value) => value_to_string(value).len(),
935                            None => 0,
936                        };
937                        result.push_str(&len.to_string());
938                    }
939                    crate::ast::StringPart::Positional(n) => {
940                        if let Some(s) = ctx.scope.get_positional(*n) {
941                            result.push_str(s);
942                        }
943                    }
944                    crate::ast::StringPart::AllArgs => {
945                        result.push_str(&ctx.scope.all_args().join(" "));
946                    }
947                    crate::ast::StringPart::ArgCount => {
948                        result.push_str(&ctx.scope.arg_count().to_string());
949                    }
950                    crate::ast::StringPart::Arithmetic(expr) => {
951                        // Evaluate arithmetic in pipeline context
952                        if let Ok(value) = arithmetic::eval_arithmetic(expr, &ctx.scope) {
953                            result.push_str(&value.to_string());
954                        }
955                    }
956                    crate::ast::StringPart::CommandSubst(_) => {
957                        // Command substitution requires async - skip in sync context
958                    }
959                    crate::ast::StringPart::LastExitCode => {
960                        result.push_str(&ctx.scope.last_result().code.to_string());
961                    }
962                    crate::ast::StringPart::CurrentPid => {
963                        result.push_str(&ctx.scope.pid().to_string());
964                    }
965                }
966            }
967            Some(Value::String(result))
968        }
969        Expr::GlobPattern(s) => Some(Value::String(s.clone())),
970        Expr::HereDocBody { parts, strip_tabs } => {
971            // Heredoc body materialization for redirect targets. Reuses the
972            // shared sync part-walker; tab stripping is applied after the
973            // body is assembled, matching the interpreter's eval path.
974            let unwrapped: Vec<crate::ast::StringPart> =
975                parts.iter().map(|sp| sp.part.clone()).collect();
976            let raw = eval_string_parts_sync(&unwrapped, ctx);
977            let body = if *strip_tabs {
978                crate::interpreter::strip_leading_tabs(&raw)
979            } else {
980                raw
981            };
982            Some(Value::String(body))
983        }
984        _ => None, // Binary ops and command subst need more context
985    }
986}
987
988/// Evaluate a literal value.
989fn eval_literal(value: &Value, _ctx: &ExecContext) -> Value {
990    value.clone()
991}
992
993/// Convert a value to a string for interpolation.
994fn value_to_string(value: &Value) -> String {
995    match value {
996        Value::Null => "".to_string(),
997        Value::Bool(b) => b.to_string(),
998        Value::Int(i) => i.to_string(),
999        Value::Float(f) => f.to_string(),
1000        Value::String(s) => s.clone(),
1001        Value::Json(json) => json.to_string(),
1002        Value::Blob(blob) => format!("[blob: {} {}]", blob.formatted_size(), blob.content_type),
1003    }
1004}
1005
1006/// Evaluate string parts synchronously (for pipeline context).
1007/// Command substitutions are skipped as they require async.
1008fn eval_string_parts_sync(parts: &[crate::ast::StringPart], ctx: &ExecContext) -> String {
1009    let mut result = String::new();
1010    for part in parts {
1011        match part {
1012            crate::ast::StringPart::Literal(s) => result.push_str(s),
1013            crate::ast::StringPart::Var(path) => {
1014                if let Some(value) = ctx.scope.resolve_path(path) {
1015                    result.push_str(&value_to_string(&value));
1016                }
1017            }
1018            crate::ast::StringPart::VarWithDefault { name, default } => {
1019                match ctx.scope.get(name) {
1020                    Some(value) => {
1021                        let s = value_to_string(value);
1022                        if s.is_empty() {
1023                            result.push_str(&eval_string_parts_sync(default, ctx));
1024                        } else {
1025                            result.push_str(&s);
1026                        }
1027                    }
1028                    None => result.push_str(&eval_string_parts_sync(default, ctx)),
1029                }
1030            }
1031            crate::ast::StringPart::VarLength(name) => {
1032                let len = match ctx.scope.get(name) {
1033                    Some(value) => value_to_string(value).len(),
1034                    None => 0,
1035                };
1036                result.push_str(&len.to_string());
1037            }
1038            crate::ast::StringPart::Positional(n) => {
1039                if let Some(s) = ctx.scope.get_positional(*n) {
1040                    result.push_str(s);
1041                }
1042            }
1043            crate::ast::StringPart::AllArgs => {
1044                result.push_str(&ctx.scope.all_args().join(" "));
1045            }
1046            crate::ast::StringPart::ArgCount => {
1047                result.push_str(&ctx.scope.arg_count().to_string());
1048            }
1049            crate::ast::StringPart::Arithmetic(expr) => {
1050                if let Ok(value) = arithmetic::eval_arithmetic(expr, &ctx.scope) {
1051                    result.push_str(&value.to_string());
1052                }
1053            }
1054            crate::ast::StringPart::CommandSubst(_) => {
1055                // Command substitution requires async - skip in sync context
1056            }
1057            crate::ast::StringPart::LastExitCode => {
1058                result.push_str(&ctx.scope.last_result().code.to_string());
1059            }
1060            crate::ast::StringPart::CurrentPid => {
1061                result.push_str(&ctx.scope.pid().to_string());
1062            }
1063        }
1064    }
1065    result
1066}
1067
1068/// Find scatter and gather commands in a pipeline.
1069///
1070/// Returns Some((scatter_index, gather_index)) if both are found with scatter before gather.
1071/// Returns None if the pipeline doesn't have a valid scatter/gather pattern.
1072fn find_scatter_gather(commands: &[Command]) -> Option<(usize, usize)> {
1073    let scatter_idx = commands.iter().position(|c| c.name == "scatter")?;
1074    let gather_idx = commands.iter().position(|c| c.name == "gather")?;
1075
1076    // Gather must come after scatter
1077    if gather_idx > scatter_idx {
1078        Some((scatter_idx, gather_idx))
1079    } else {
1080        None
1081    }
1082}
1083
1084#[cfg(test)]
1085mod select_leaf_tests {
1086    use super::*;
1087    use crate::tools::ParamSchema;
1088
1089    /// `kj`-shaped tree: kj → context (alias ctx) → {list (alias ls), create}.
1090    /// Root carries a global `--confirm <nonce>` value flag and a `--verbose`
1091    /// bool; `create` carries a leaf `--type` value flag — enough to exercise
1092    /// global-flag skipping and leaf binding.
1093    fn kj_schema() -> ToolSchema {
1094        ToolSchema::new("kj", "kaijutsu")
1095            .param(ParamSchema::new("confirm", "string"))
1096            .param(ParamSchema::new("verbose", "bool"))
1097            .subcommand(
1098                ToolSchema::new("context", "context ops")
1099                    .with_command_aliases(["ctx"])
1100                    .subcommand(ToolSchema::new("list", "list").with_command_aliases(["ls"]))
1101                    .subcommand(
1102                        ToolSchema::new("create", "create").param(
1103                            ParamSchema::new("type", "string").with_aliases(["t"]),
1104                        ),
1105                    ),
1106            )
1107    }
1108
1109    fn word(s: &str) -> Arg {
1110        Arg::Positional(Expr::Literal(Value::String(s.to_string())))
1111    }
1112
1113    #[test]
1114    fn flat_tool_returns_root() {
1115        let schema = ToolSchema::new("cat", "concat")
1116            .param(ParamSchema::required("path", "string", "f").positional());
1117        let leaf = select_leaf(&schema, &[word("foo.txt")]).expect("flat ok");
1118        assert_eq!(leaf.name, "cat");
1119    }
1120
1121    #[test]
1122    fn single_hop() {
1123        let schema = kj_schema();
1124        let leaf = select_leaf(&schema, &[word("context")]).expect("ok");
1125        assert_eq!(leaf.name, "context");
1126    }
1127
1128    #[test]
1129    fn two_hops() {
1130        let schema = kj_schema();
1131        let leaf = select_leaf(&schema, &[word("context"), word("create")]).expect("ok");
1132        assert_eq!(leaf.name, "create");
1133        assert!(leaf.params.iter().any(|p| p.name == "type"), "leaf has --type");
1134    }
1135
1136    #[test]
1137    fn alias_hops_route() {
1138        let schema = kj_schema();
1139        // `kj ctx ls` → context.list via command aliases.
1140        let leaf = select_leaf(&schema, &[word("ctx"), word("ls")]).expect("ok");
1141        assert_eq!(leaf.name, "list");
1142    }
1143
1144    #[test]
1145    fn unknown_subcommand_stops_at_current_node() {
1146        let schema = kj_schema();
1147        // `context nonesuch` — `nonesuch` names no child, so context is the leaf
1148        // and `nonesuch` is context's own positional. No error.
1149        let leaf = select_leaf(&schema, &[word("context"), word("nonesuch")]).expect("ok");
1150        assert_eq!(leaf.name, "context");
1151    }
1152
1153    #[test]
1154    fn root_bool_flag_before_path_does_not_disrupt_routing() {
1155        let schema = kj_schema();
1156        // `kj --verbose context create` — a root bool flag is skipped, both
1157        // positionals route to create.
1158        let args = vec![Arg::LongFlag("verbose".into()), word("context"), word("create")];
1159        let leaf = select_leaf(&schema, &args).expect("ok");
1160        assert_eq!(leaf.name, "create");
1161    }
1162
1163    #[test]
1164    fn root_value_flag_space_form_before_path_skips_its_value() {
1165        let schema = kj_schema();
1166        // `kj --confirm nonce context create` — `nonce` is --confirm's value,
1167        // NOT a subcommand selector; routing skips it and reaches create.
1168        let args = vec![
1169            Arg::LongFlag("confirm".into()),
1170            word("nonce"),
1171            word("context"),
1172            word("create"),
1173        ];
1174        let leaf = select_leaf(&schema, &args).expect("ok");
1175        assert_eq!(leaf.name, "create");
1176    }
1177
1178    #[test]
1179    fn leaf_value_flag_after_path_routes_to_leaf() {
1180        let schema = kj_schema();
1181        // `kj context create --type x` — the natural form: path first, leaf flag
1182        // after. Routing reaches create; --type then binds against create.
1183        let args = vec![
1184            word("context"),
1185            word("create"),
1186            Arg::LongFlag("type".into()),
1187            word("x"),
1188        ];
1189        let leaf = select_leaf(&schema, &args).expect("ok");
1190        assert_eq!(leaf.name, "create");
1191        assert!(leaf.params.iter().any(|p| p.name == "type"));
1192    }
1193
1194    #[test]
1195    fn double_dash_stops_routing() {
1196        let schema = kj_schema();
1197        // `kj -- context` — after `--`, `context` is raw data, not a subcommand.
1198        let leaf = select_leaf(&schema, &[Arg::DoubleDash, word("context")]).expect("ok");
1199        assert_eq!(leaf.name, "kj");
1200    }
1201
1202    #[test]
1203    fn computed_subcommand_selector_errors() {
1204        let schema = kj_schema();
1205        // `kj $(echo context)` — a command substitution where a subcommand name
1206        // is required must fail loud, not silently pick a leaf.
1207        let args = vec![Arg::Positional(Expr::CommandSubst(vec![
1208            crate::ast::Stmt::Command(crate::ast::Command {
1209                name: "echo".into(),
1210                args: vec![],
1211                redirects: vec![],
1212            }),
1213        ]))];
1214        let err = select_leaf(&schema, &args).expect_err("must error");
1215        let msg = err.to_string();
1216        assert!(msg.contains("subcommand name is required"), "got: {msg}");
1217        assert!(msg.contains("command substitution"), "names the cause: {msg}");
1218    }
1219
1220    #[test]
1221    fn variable_subcommand_selector_errors() {
1222        let schema = kj_schema();
1223        let args = vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("sub")))];
1224        let err = select_leaf(&schema, &args).expect_err("must error");
1225        assert!(err.to_string().contains("variable reference"), "got: {err}");
1226    }
1227
1228    #[test]
1229    fn computed_positional_after_leaf_is_fine() {
1230        let schema = kj_schema();
1231        // `kj context list $(echo x)` — once at a leaf (list has no children),
1232        // a computed positional is just an argument; routing already stopped.
1233        let args = vec![
1234            word("context"),
1235            word("list"),
1236            Arg::Positional(Expr::CommandSubst(vec![crate::ast::Stmt::Command(
1237                crate::ast::Command { name: "echo".into(), args: vec![], redirects: vec![] },
1238            )])),
1239        ];
1240        let leaf = select_leaf(&schema, &args).expect("ok");
1241        assert_eq!(leaf.name, "list");
1242    }
1243}
1244
1245#[cfg(test)]
1246mod tests {
1247    use super::*;
1248    use crate::dispatch::BackendDispatcher;
1249    use crate::tools::register_builtins;
1250    use crate::vfs::{Filesystem, MemoryFs, VfsRouter};
1251    use std::path::Path;
1252
1253    async fn make_runner_and_ctx() -> (PipelineRunner, ExecContext, BackendDispatcher) {
1254        let mut tools = ToolRegistry::new();
1255        register_builtins(&mut tools);
1256        let tools = Arc::new(tools);
1257        let runner = PipelineRunner::new(tools.clone());
1258        let dispatcher = BackendDispatcher::new(tools.clone());
1259
1260        let mut vfs = VfsRouter::new();
1261        let mem = MemoryFs::new();
1262        mem.write(Path::new("test.txt"), b"hello\nworld\nfoo").await.unwrap();
1263        vfs.mount("/", mem);
1264        let ctx = ExecContext::with_vfs_and_tools(Arc::new(vfs), tools);
1265
1266        (runner, ctx, dispatcher)
1267    }
1268
1269    fn make_cmd(name: &str, args: Vec<&str>) -> Command {
1270        Command {
1271            name: name.to_string(),
1272            args: args.iter().map(|s| Arg::Positional(Expr::Literal(Value::String(s.to_string())))).collect(),
1273            redirects: vec![],
1274        }
1275    }
1276
1277    #[tokio::test]
1278    async fn test_single_command() {
1279        let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1280        let cmd = make_cmd("echo", vec!["hello"]);
1281
1282        let result = runner.run(&[cmd], &mut ctx, &dispatcher).await;
1283        assert!(result.ok());
1284        assert_eq!(result.text_out().trim(), "hello");
1285    }
1286
1287    #[tokio::test]
1288    async fn test_pipeline_echo_grep() {
1289        let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1290
1291        // echo "hello\nworld" | grep pattern="world"
1292        let echo_cmd = Command {
1293            name: "echo".to_string(),
1294            args: vec![Arg::Positional(Expr::Literal(Value::String("hello\nworld".to_string())))],
1295            redirects: vec![],
1296        };
1297        let grep_cmd = Command {
1298            name: "grep".to_string(),
1299            args: vec![Arg::Positional(Expr::Literal(Value::String("world".to_string())))],
1300            redirects: vec![],
1301        };
1302
1303        let result = runner.run(&[echo_cmd, grep_cmd], &mut ctx, &dispatcher).await;
1304        assert!(result.ok());
1305        assert_eq!(result.text_out().trim(), "world");
1306    }
1307
1308    #[tokio::test]
1309    async fn test_pipeline_cat_grep() {
1310        let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1311
1312        // cat /test.txt | grep pattern="hello"
1313        let cat_cmd = make_cmd("cat", vec!["/test.txt"]);
1314        let grep_cmd = Command {
1315            name: "grep".to_string(),
1316            args: vec![Arg::Positional(Expr::Literal(Value::String("hello".to_string())))],
1317            redirects: vec![],
1318        };
1319
1320        let result = runner.run(&[cat_cmd, grep_cmd], &mut ctx, &dispatcher).await;
1321        assert!(result.ok());
1322        assert!(result.text_out().contains("hello"));
1323    }
1324
1325    #[tokio::test]
1326    async fn test_command_not_found() {
1327        let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1328        let cmd = make_cmd("nonexistent", vec![]);
1329
1330        let result = runner.run(&[cmd], &mut ctx, &dispatcher).await;
1331        assert!(!result.ok());
1332        assert_eq!(result.code, 127);
1333        assert!(result.err.contains("not found"));
1334    }
1335
1336    #[tokio::test]
1337    async fn test_pipeline_continues_on_failure() {
1338        // Standard shell semantics: pipeline runs all commands,
1339        // exit code comes from the last command
1340        let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1341
1342        // cat /nonexistent | grep "hello"
1343        // cat fails but grep still runs (on empty input), grep returns 1 (no match)
1344        let cat_cmd = make_cmd("cat", vec!["/nonexistent"]);
1345        let grep_cmd = Command {
1346            name: "grep".to_string(),
1347            args: vec![Arg::Positional(Expr::Literal(Value::String("hello".to_string())))],
1348            redirects: vec![],
1349        };
1350
1351        let result = runner.run(&[cat_cmd, grep_cmd], &mut ctx, &dispatcher).await;
1352        // Exit code comes from last command (grep), not from cat
1353        assert!(!result.ok());
1354    }
1355
1356    #[tokio::test]
1357    async fn test_pipeline_last_command_exit_code() {
1358        // echo hello | cat — both succeed, pipeline succeeds
1359        let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1360
1361        let echo_cmd = make_cmd("echo", vec!["hello"]);
1362        let cat_cmd = make_cmd("cat", vec![]);
1363
1364        let result = runner.run(&[echo_cmd, cat_cmd], &mut ctx, &dispatcher).await;
1365        assert!(result.ok());
1366        assert!(result.text_out().contains("hello"));
1367    }
1368
1369    #[tokio::test]
1370    async fn test_empty_pipeline() {
1371        let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1372        let result = runner.run(&[], &mut ctx, &dispatcher).await;
1373        assert!(result.ok());
1374    }
1375
1376    // === Scatter/Gather Tests ===
1377
1378    #[test]
1379    fn test_find_scatter_gather_both_present() {
1380        let commands = vec![
1381            make_cmd("echo", vec!["a"]),
1382            make_cmd("scatter", vec![]),
1383            make_cmd("process", vec![]),
1384            make_cmd("gather", vec![]),
1385        ];
1386        let result = find_scatter_gather(&commands);
1387        assert_eq!(result, Some((1, 3)));
1388    }
1389
1390    #[test]
1391    fn test_find_scatter_gather_no_scatter() {
1392        let commands = vec![
1393            make_cmd("echo", vec!["a"]),
1394            make_cmd("gather", vec![]),
1395        ];
1396        let result = find_scatter_gather(&commands);
1397        assert!(result.is_none());
1398    }
1399
1400    #[test]
1401    fn test_find_scatter_gather_no_gather() {
1402        let commands = vec![
1403            make_cmd("echo", vec!["a"]),
1404            make_cmd("scatter", vec![]),
1405        ];
1406        let result = find_scatter_gather(&commands);
1407        assert!(result.is_none());
1408    }
1409
1410    #[test]
1411    fn test_find_scatter_gather_wrong_order() {
1412        let commands = vec![
1413            make_cmd("gather", vec![]),
1414            make_cmd("scatter", vec![]),
1415        ];
1416        let result = find_scatter_gather(&commands);
1417        assert!(result.is_none());
1418    }
1419
1420    #[tokio::test]
1421    async fn test_scatter_gather_simple() {
1422        let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1423
1424        // split "a b c" | scatter | echo ${ITEM} | gather
1425        let split_cmd = Command {
1426            name: "split".to_string(),
1427            args: vec![Arg::Positional(Expr::Literal(Value::String("a b c".to_string())))],
1428            redirects: vec![],
1429        };
1430        let scatter_cmd = make_cmd("scatter", vec![]);
1431        let process_cmd = Command {
1432            name: "echo".to_string(),
1433            args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("ITEM")))],
1434            redirects: vec![],
1435        };
1436        let gather_cmd = make_cmd("gather", vec![]);
1437
1438        let result = runner.run(&[split_cmd, scatter_cmd, process_cmd, gather_cmd], &mut ctx, &dispatcher).await;
1439        assert!(result.ok(), "scatter with structured data should succeed: {}", result.err);
1440        // Each echo should output the item
1441        assert!(result.text_out().contains("a"));
1442        assert!(result.text_out().contains("b"));
1443        assert!(result.text_out().contains("c"));
1444    }
1445
1446    #[tokio::test]
1447    async fn test_scatter_gather_empty_input() {
1448        let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1449
1450        // echo "" | scatter | echo ${ITEM} | gather
1451        let echo_cmd = Command {
1452            name: "echo".to_string(),
1453            args: vec![Arg::Positional(Expr::Literal(Value::String("".to_string())))],
1454            redirects: vec![],
1455        };
1456        let scatter_cmd = make_cmd("scatter", vec![]);
1457        let process_cmd = Command {
1458            name: "echo".to_string(),
1459            args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("ITEM")))],
1460            redirects: vec![],
1461        };
1462        let gather_cmd = make_cmd("gather", vec![]);
1463
1464        let result = runner.run(&[echo_cmd, scatter_cmd, process_cmd, gather_cmd], &mut ctx, &dispatcher).await;
1465        assert!(result.ok());
1466        assert!(result.text_out().trim().is_empty());
1467    }
1468
1469    #[tokio::test]
1470    async fn test_scatter_gather_with_structured_stdin() {
1471        let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1472
1473        // Set structured stdin data (as if piped from split/seq)
1474        let data = Value::Json(serde_json::json!(["x", "y", "z"]));
1475        ctx.set_stdin_with_data("x\ny\nz".to_string(), Some(data));
1476
1477        let scatter_cmd = make_cmd("scatter", vec![]);
1478        let process_cmd = Command {
1479            name: "echo".to_string(),
1480            args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("ITEM")))],
1481            redirects: vec![],
1482        };
1483        let gather_cmd = make_cmd("gather", vec![]);
1484
1485        let result = runner.run(&[scatter_cmd, process_cmd, gather_cmd], &mut ctx, &dispatcher).await;
1486        assert!(result.ok(), "scatter with structured stdin should succeed: {}", result.err);
1487        assert!(result.text_out().contains("x"));
1488        assert!(result.text_out().contains("y"));
1489        assert!(result.text_out().contains("z"));
1490    }
1491
1492    #[tokio::test]
1493    async fn test_scatter_gather_json_input() {
1494        let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1495
1496        // Structured JSON array input (as if from split/seq)
1497        let data = Value::Json(serde_json::json!(["one", "two", "three"]));
1498        ctx.set_stdin_with_data(r#"["one", "two", "three"]"#.to_string(), Some(data));
1499
1500        let scatter_cmd = make_cmd("scatter", vec![]);
1501        let process_cmd = Command {
1502            name: "echo".to_string(),
1503            args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("ITEM")))],
1504            redirects: vec![],
1505        };
1506        let gather_cmd = make_cmd("gather", vec![]);
1507
1508        let result = runner.run(&[scatter_cmd, process_cmd, gather_cmd], &mut ctx, &dispatcher).await;
1509        assert!(result.ok(), "scatter with JSON data should succeed: {}", result.err);
1510        assert!(result.text_out().contains("one"));
1511        assert!(result.text_out().contains("two"));
1512        assert!(result.text_out().contains("three"));
1513    }
1514
1515    #[tokio::test]
1516    async fn test_scatter_gather_with_post_gather() {
1517        let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1518
1519        // split "a b" | scatter | echo ${ITEM} | gather | grep "a"
1520        let split_cmd = Command {
1521            name: "split".to_string(),
1522            args: vec![Arg::Positional(Expr::Literal(Value::String("a b".to_string())))],
1523            redirects: vec![],
1524        };
1525        let scatter_cmd = make_cmd("scatter", vec![]);
1526        let process_cmd = Command {
1527            name: "echo".to_string(),
1528            args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("ITEM")))],
1529            redirects: vec![],
1530        };
1531        let gather_cmd = make_cmd("gather", vec![]);
1532        let grep_cmd = Command {
1533            name: "grep".to_string(),
1534            args: vec![Arg::Positional(Expr::Literal(Value::String("a".to_string())))],
1535            redirects: vec![],
1536        };
1537
1538        let result = runner.run(&[split_cmd, scatter_cmd, process_cmd, gather_cmd, grep_cmd], &mut ctx, &dispatcher).await;
1539        assert!(result.ok(), "scatter with post_gather should succeed: {}", result.err);
1540        assert!(result.text_out().contains("a"));
1541        assert!(!result.text_out().contains("b"));
1542    }
1543
1544    #[tokio::test]
1545    async fn test_scatter_custom_var_name() {
1546        let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
1547
1548        // Provide structured data (as if from split/seq)
1549        let data = Value::Json(serde_json::json!(["test1", "test2"]));
1550        ctx.set_stdin_with_data("test1\ntest2".to_string(), Some(data));
1551
1552        // scatter --as URL | echo ${URL} | gather
1553        let scatter_cmd = Command {
1554            name: "scatter".to_string(),
1555            args: vec![Arg::Named {
1556                key: "as".to_string(),
1557                value: Expr::Literal(Value::String("URL".to_string())),
1558            }],
1559            redirects: vec![],
1560        };
1561        let process_cmd = Command {
1562            name: "echo".to_string(),
1563            args: vec![Arg::Positional(Expr::VarRef(crate::ast::VarPath::simple("URL")))],
1564            redirects: vec![],
1565        };
1566        let gather_cmd = make_cmd("gather", vec![]);
1567
1568        let result = runner.run(&[scatter_cmd, process_cmd, gather_cmd], &mut ctx, &dispatcher).await;
1569        assert!(result.ok(), "scatter with custom var should succeed: {}", result.err);
1570        assert!(result.text_out().contains("test1"));
1571        assert!(result.text_out().contains("test2"));
1572    }
1573
1574    // === Backend Routing Tests ===
1575
1576    #[tokio::test]
1577    async fn test_pipeline_routes_through_backend() {
1578        use crate::backend::testing::MockBackend;
1579        use std::sync::atomic::Ordering;
1580
1581        // Create mock backend
1582        let (backend, call_count) = MockBackend::new();
1583        let backend: std::sync::Arc<dyn crate::backend::KernelBackend> = std::sync::Arc::new(backend);
1584
1585        // Create context with mock backend
1586        let mut ctx = crate::tools::ExecContext::with_backend(backend);
1587
1588        // BackendDispatcher routes through backend.call_tool()
1589        let tools = std::sync::Arc::new(ToolRegistry::new());
1590        let runner = PipelineRunner::new(tools.clone());
1591        let dispatcher = BackendDispatcher::new(tools);
1592
1593        // Single command should route through backend
1594        let cmd = make_cmd("test-tool", vec!["arg1"]);
1595        let result = runner.run(&[cmd], &mut ctx, &dispatcher).await;
1596
1597        assert!(result.ok(), "Mock backend should return success");
1598        assert_eq!(call_count.load(Ordering::SeqCst), 1, "call_tool should be invoked once");
1599        assert!(result.text_out().contains("mock executed"), "Output should be from mock backend");
1600    }
1601
1602    #[tokio::test]
1603    async fn test_multi_command_pipeline_routes_through_backend() {
1604        use crate::backend::testing::MockBackend;
1605        use std::sync::atomic::Ordering;
1606
1607        let (backend, call_count) = MockBackend::new();
1608        let backend: std::sync::Arc<dyn crate::backend::KernelBackend> = std::sync::Arc::new(backend);
1609        let mut ctx = crate::tools::ExecContext::with_backend(backend);
1610
1611        let tools = std::sync::Arc::new(ToolRegistry::new());
1612        let runner = PipelineRunner::new(tools.clone());
1613        let dispatcher = BackendDispatcher::new(tools);
1614
1615        // Pipeline with 3 commands
1616        let cmd1 = make_cmd("tool1", vec![]);
1617        let cmd2 = make_cmd("tool2", vec![]);
1618        let cmd3 = make_cmd("tool3", vec![]);
1619
1620        let result = runner.run(&[cmd1, cmd2, cmd3], &mut ctx, &dispatcher).await;
1621
1622        assert!(result.ok());
1623        assert_eq!(call_count.load(Ordering::SeqCst), 3, "call_tool should be invoked for each command");
1624    }
1625
1626    // === Schema-Aware Argument Parsing Tests ===
1627
1628    use crate::tools::{ParamSchema, ToolSchema};
1629
1630    fn make_test_schema() -> ToolSchema {
1631        ToolSchema::new("test-tool", "A test tool for schema-aware parsing")
1632            .param(ParamSchema::required("query", "string", "Search query"))
1633            .param(ParamSchema::optional("limit", "int", Value::Int(10), "Max results"))
1634            .param(ParamSchema::optional("verbose", "bool", Value::Bool(false), "Verbose output"))
1635            .param(ParamSchema::optional("output", "string", Value::String("stdout".into()), "Output destination"))
1636            .with_positional_mapping()
1637    }
1638
1639    fn make_minimal_ctx() -> ExecContext {
1640        let mut vfs = VfsRouter::new();
1641        vfs.mount("/", MemoryFs::new());
1642        ExecContext::new(Arc::new(vfs))
1643    }
1644
1645    #[test]
1646    fn test_schema_aware_string_arg() {
1647        // --query "test" should become named: {"query": "test"}
1648        let args = vec![
1649            Arg::LongFlag("query".to_string()),
1650            Arg::Positional(Expr::Literal(Value::String("test".to_string()))),
1651        ];
1652        let schema = make_test_schema();
1653        let ctx = make_minimal_ctx();
1654
1655        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1656
1657        assert!(tool_args.flags.is_empty(), "No flags should be set");
1658        assert!(tool_args.positional.is_empty(), "No positionals - consumed by --query");
1659        assert_eq!(
1660            tool_args.named.get("query"),
1661            Some(&Value::String("test".to_string())),
1662            "--query should consume 'test' as its value"
1663        );
1664    }
1665
1666    #[test]
1667    fn test_schema_aware_bool_flag() {
1668        // --verbose should remain a flag since schema says bool
1669        let args = vec![
1670            Arg::LongFlag("verbose".to_string()),
1671        ];
1672        let schema = make_test_schema();
1673        let ctx = make_minimal_ctx();
1674
1675        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1676
1677        assert!(tool_args.flags.contains("verbose"), "--verbose should be a flag");
1678        assert!(tool_args.named.is_empty(), "No named args");
1679        assert!(tool_args.positional.is_empty(), "No positionals");
1680    }
1681
1682    #[test]
1683    fn test_schema_aware_mixed() {
1684        // mcp_tool file.txt --output out.txt --verbose
1685        // file.txt maps to "query" (first unfilled non-bool schema param)
1686        let args = vec![
1687            Arg::Positional(Expr::Literal(Value::String("file.txt".to_string()))),
1688            Arg::LongFlag("output".to_string()),
1689            Arg::Positional(Expr::Literal(Value::String("out.txt".to_string()))),
1690            Arg::LongFlag("verbose".to_string()),
1691        ];
1692        let schema = make_test_schema();
1693        let ctx = make_minimal_ctx();
1694
1695        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1696
1697        assert!(tool_args.positional.is_empty(), "file.txt consumed as query param");
1698        assert_eq!(
1699            tool_args.named.get("query"),
1700            Some(&Value::String("file.txt".to_string()))
1701        );
1702        assert_eq!(
1703            tool_args.named.get("output"),
1704            Some(&Value::String("out.txt".to_string()))
1705        );
1706        assert!(tool_args.flags.contains("verbose"));
1707    }
1708
1709    #[test]
1710    fn test_schema_aware_multiple_string_args() {
1711        // --query "test" --output "result.json" --verbose --limit 5
1712        let args = vec![
1713            Arg::LongFlag("query".to_string()),
1714            Arg::Positional(Expr::Literal(Value::String("test".to_string()))),
1715            Arg::LongFlag("output".to_string()),
1716            Arg::Positional(Expr::Literal(Value::String("result.json".to_string()))),
1717            Arg::LongFlag("verbose".to_string()),
1718            Arg::LongFlag("limit".to_string()),
1719            Arg::Positional(Expr::Literal(Value::Int(5))),
1720        ];
1721        let schema = make_test_schema();
1722        let ctx = make_minimal_ctx();
1723
1724        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1725
1726        assert!(tool_args.positional.is_empty(), "All positionals consumed");
1727        assert_eq!(
1728            tool_args.named.get("query"),
1729            Some(&Value::String("test".to_string()))
1730        );
1731        assert_eq!(
1732            tool_args.named.get("output"),
1733            Some(&Value::String("result.json".to_string()))
1734        );
1735        assert_eq!(
1736            tool_args.named.get("limit"),
1737            Some(&Value::Int(5))
1738        );
1739        assert!(tool_args.flags.contains("verbose"));
1740    }
1741
1742    #[test]
1743    fn test_schema_aware_double_dash() {
1744        // --output out.txt -- --this-is-data
1745        // After --, everything is positional
1746        let args = vec![
1747            Arg::LongFlag("output".to_string()),
1748            Arg::Positional(Expr::Literal(Value::String("out.txt".to_string()))),
1749            Arg::DoubleDash,
1750            Arg::Positional(Expr::Literal(Value::String("--this-is-data".to_string()))),
1751        ];
1752        let schema = make_test_schema();
1753        let ctx = make_minimal_ctx();
1754
1755        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1756
1757        assert_eq!(
1758            tool_args.named.get("output"),
1759            Some(&Value::String("out.txt".to_string()))
1760        );
1761        // After --, the --this-is-data is treated as a positional (it's a Positional in the args)
1762        assert_eq!(
1763            tool_args.positional,
1764            vec![Value::String("--this-is-data".to_string())]
1765        );
1766    }
1767
1768    #[test]
1769    fn test_no_schema_fallback() {
1770        // Without schema, all --flags are treated as bool flags
1771        let args = vec![
1772            Arg::LongFlag("query".to_string()),
1773            Arg::Positional(Expr::Literal(Value::String("test".to_string()))),
1774        ];
1775        let ctx = make_minimal_ctx();
1776
1777        let tool_args = build_tool_args(&args, &ctx, None);
1778
1779        // Without schema, --query is a flag and "test" is a positional
1780        assert!(tool_args.flags.contains("query"), "--query should be a flag");
1781        assert_eq!(
1782            tool_args.positional,
1783            vec![Value::String("test".to_string())],
1784            "'test' should be a positional"
1785        );
1786    }
1787
1788    #[test]
1789    fn test_unknown_flag_in_schema() {
1790        // --unknown-flag value: --unknown is bool (not in schema), "value" maps to query
1791        let args = vec![
1792            Arg::LongFlag("unknown".to_string()),
1793            Arg::Positional(Expr::Literal(Value::String("value".to_string()))),
1794        ];
1795        let schema = make_test_schema();
1796        let ctx = make_minimal_ctx();
1797
1798        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1799
1800        assert!(tool_args.flags.contains("unknown"));
1801        assert!(tool_args.positional.is_empty(), "value consumed as query param");
1802        assert_eq!(
1803            tool_args.named.get("query"),
1804            Some(&Value::String("value".to_string()))
1805        );
1806    }
1807
1808    #[test]
1809    fn test_named_args_unchanged() {
1810        // key=value syntax should work regardless of schema
1811        let args = vec![
1812            Arg::Named {
1813                key: "query".to_string(),
1814                value: Expr::Literal(Value::String("test".to_string())),
1815            },
1816            Arg::LongFlag("verbose".to_string()),
1817        ];
1818        let schema = make_test_schema();
1819        let ctx = make_minimal_ctx();
1820
1821        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1822
1823        assert_eq!(
1824            tool_args.named.get("query"),
1825            Some(&Value::String("test".to_string()))
1826        );
1827        assert!(tool_args.flags.contains("verbose"));
1828    }
1829
1830    #[test]
1831    fn test_short_flags_unchanged() {
1832        // Short flags -la should expand regardless of schema; file.txt maps to query
1833        let args = vec![
1834            Arg::ShortFlag("la".to_string()),
1835            Arg::Positional(Expr::Literal(Value::String("file.txt".to_string()))),
1836        ];
1837        let schema = make_test_schema();
1838        let ctx = make_minimal_ctx();
1839
1840        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1841
1842        assert!(tool_args.flags.contains("l"));
1843        assert!(tool_args.flags.contains("a"));
1844        assert!(tool_args.positional.is_empty(), "file.txt consumed as query param");
1845        assert_eq!(
1846            tool_args.named.get("query"),
1847            Some(&Value::String("file.txt".to_string()))
1848        );
1849    }
1850
1851    #[test]
1852    fn test_flag_at_end_no_value() {
1853        // --output at end with no value available - treat as flag (lenient)
1854        // file.txt maps to query (first unfilled non-bool param)
1855        let args = vec![
1856            Arg::Positional(Expr::Literal(Value::String("file.txt".to_string()))),
1857            Arg::LongFlag("output".to_string()),
1858        ];
1859        let schema = make_test_schema();
1860        let ctx = make_minimal_ctx();
1861
1862        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1863
1864        // output expects a value but none available after it, so it becomes a flag
1865        assert!(tool_args.flags.contains("output"));
1866        assert!(tool_args.positional.is_empty(), "file.txt consumed as query param");
1867        assert_eq!(
1868            tool_args.named.get("query"),
1869            Some(&Value::String("file.txt".to_string()))
1870        );
1871    }
1872
1873    #[test]
1874    fn test_positional_skips_bool_params() {
1875        // Schema: [query: string, verbose: bool, output: string]
1876        // Args: "val1" "val2"
1877        // Expected: query="val1", verbose unset, output="val2"
1878        let schema = ToolSchema::new("test", "")
1879            .param(ParamSchema::required("query", "string", ""))
1880            .param(ParamSchema::optional(
1881                "verbose",
1882                "bool",
1883                Value::Bool(false),
1884                "",
1885            ))
1886            .param(ParamSchema::optional(
1887                "output",
1888                "string",
1889                Value::Null,
1890                "",
1891            ))
1892            .with_positional_mapping();
1893        let args = vec![
1894            Arg::Positional(Expr::Literal(Value::String("val1".to_string()))),
1895            Arg::Positional(Expr::Literal(Value::String("val2".to_string()))),
1896        ];
1897        let ctx = make_minimal_ctx();
1898
1899        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1900
1901        assert_eq!(
1902            tool_args.named.get("query"),
1903            Some(&Value::String("val1".to_string()))
1904        );
1905        assert_eq!(
1906            tool_args.named.get("output"),
1907            Some(&Value::String("val2".to_string()))
1908        );
1909        assert!(!tool_args.flags.contains("verbose"));
1910        assert!(tool_args.positional.is_empty());
1911    }
1912
1913    #[test]
1914    fn test_positionals_fill_available_slots() {
1915        // Schema has query (string), limit (int), verbose (bool), output (string).
1916        // Three positionals fill the 3 non-bool slots.
1917        let args = vec![
1918            Arg::Positional(Expr::Literal(Value::String("val1".to_string()))),
1919            Arg::Positional(Expr::Literal(Value::String("val2".to_string()))),
1920            Arg::Positional(Expr::Literal(Value::String("val3".to_string()))),
1921        ];
1922        let schema = make_test_schema(); // query, limit(int), verbose(bool), output
1923        let ctx = make_minimal_ctx();
1924
1925        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1926
1927        // val1 → query, val2 → limit (int param but receives string — tool decides),
1928        // val3 → output
1929        assert_eq!(
1930            tool_args.named.get("query"),
1931            Some(&Value::String("val1".to_string()))
1932        );
1933        assert_eq!(
1934            tool_args.named.get("limit"),
1935            Some(&Value::String("val2".to_string()))
1936        );
1937        assert_eq!(
1938            tool_args.named.get("output"),
1939            Some(&Value::String("val3".to_string()))
1940        );
1941        assert!(tool_args.positional.is_empty());
1942    }
1943
1944    #[test]
1945    fn test_truly_excess_positionals() {
1946        // More positionals than non-bool schema params — leftovers stay positional
1947        let schema = ToolSchema::new("test", "")
1948            .param(ParamSchema::required("name", "string", ""))
1949            .with_positional_mapping();
1950        let args = vec![
1951            Arg::Positional(Expr::Literal(Value::String("first".to_string()))),
1952            Arg::Positional(Expr::Literal(Value::String("second".to_string()))),
1953            Arg::Positional(Expr::Literal(Value::String("third".to_string()))),
1954        ];
1955        let ctx = make_minimal_ctx();
1956
1957        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1958
1959        assert_eq!(
1960            tool_args.named.get("name"),
1961            Some(&Value::String("first".to_string()))
1962        );
1963        assert_eq!(
1964            tool_args.positional,
1965            vec![
1966                Value::String("second".to_string()),
1967                Value::String("third".to_string()),
1968            ]
1969        );
1970    }
1971
1972    #[test]
1973    fn test_double_dash_positional_not_mapped() {
1974        // `tool val1 -- val2` — val1 maps to query, val2 stays positional (post-dash)
1975        let args = vec![
1976            Arg::Positional(Expr::Literal(Value::String("val1".to_string()))),
1977            Arg::DoubleDash,
1978            Arg::Positional(Expr::Literal(Value::String("val2".to_string()))),
1979        ];
1980        let schema = make_test_schema();
1981        let ctx = make_minimal_ctx();
1982
1983        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
1984
1985        assert_eq!(
1986            tool_args.named.get("query"),
1987            Some(&Value::String("val1".to_string()))
1988        );
1989        // val2 is after --, should NOT be mapped even though schema has unfilled params
1990        assert_eq!(
1991            tool_args.positional,
1992            vec![Value::String("val2".to_string())]
1993        );
1994    }
1995
1996    #[test]
1997    fn test_all_params_filled_by_flags() {
1998        // All schema params satisfied by explicit flags — no positional mapping needed
1999        let args = vec![
2000            Arg::LongFlag("query".to_string()),
2001            Arg::Positional(Expr::Literal(Value::String("search".to_string()))),
2002            Arg::LongFlag("output".to_string()),
2003            Arg::Positional(Expr::Literal(Value::String("out.txt".to_string()))),
2004            Arg::LongFlag("verbose".to_string()),
2005        ];
2006        let schema = make_test_schema();
2007        let ctx = make_minimal_ctx();
2008
2009        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
2010
2011        assert_eq!(
2012            tool_args.named.get("query"),
2013            Some(&Value::String("search".to_string()))
2014        );
2015        assert_eq!(
2016            tool_args.named.get("output"),
2017            Some(&Value::String("out.txt".to_string()))
2018        );
2019        assert!(tool_args.flags.contains("verbose"));
2020        assert!(tool_args.positional.is_empty());
2021    }
2022
2023    #[test]
2024    fn test_mixed_flags_and_positional_fill() {
2025        // --output foo val1 — output is explicit, val1 maps to query
2026        let args = vec![
2027            Arg::LongFlag("output".to_string()),
2028            Arg::Positional(Expr::Literal(Value::String("foo".to_string()))),
2029            Arg::Positional(Expr::Literal(Value::String("val1".to_string()))),
2030        ];
2031        let schema = make_test_schema();
2032        let ctx = make_minimal_ctx();
2033
2034        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
2035
2036        assert_eq!(
2037            tool_args.named.get("output"),
2038            Some(&Value::String("foo".to_string()))
2039        );
2040        assert_eq!(
2041            tool_args.named.get("query"),
2042            Some(&Value::String("val1".to_string()))
2043        );
2044        assert!(tool_args.positional.is_empty());
2045    }
2046
2047    #[test]
2048    fn test_alias_flag_prevents_mapping_overwrite() {
2049        // -q "search" "out.txt" — -q is alias for query, so out.txt should map to output
2050        let schema = ToolSchema::new("test", "")
2051            .param(ParamSchema::required("query", "string", "").with_aliases(["-q"]))
2052            .param(ParamSchema::required("output", "string", ""))
2053            .with_positional_mapping();
2054        let args = vec![
2055            Arg::ShortFlag("q".to_string()),
2056            Arg::Positional(Expr::Literal(Value::String("search".to_string()))),
2057            Arg::Positional(Expr::Literal(Value::String("out.txt".to_string()))),
2058        ];
2059        let ctx = make_minimal_ctx();
2060
2061        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
2062
2063        assert_eq!(
2064            tool_args.named.get("query"),
2065            Some(&Value::String("search".to_string()))
2066        );
2067        assert_eq!(
2068            tool_args.named.get("output"),
2069            Some(&Value::String("out.txt".to_string()))
2070        );
2071        assert!(tool_args.positional.is_empty());
2072    }
2073
2074    #[test]
2075    fn test_builtin_schema_no_positional_mapping() {
2076        // Builtins have map_positionals=false — positionals stay positional
2077        let schema = ToolSchema::new("echo", "")
2078            .param(ParamSchema::optional("args", "any", Value::Null, ""))
2079            .param(ParamSchema::optional("no_newline", "bool", Value::Bool(false), ""));
2080        // Note: no .with_positional_mapping() — this is a builtin
2081        let args = vec![
2082            Arg::Positional(Expr::Literal(Value::String("hello".to_string()))),
2083            Arg::Positional(Expr::Literal(Value::String("world".to_string()))),
2084        ];
2085        let ctx = make_minimal_ctx();
2086
2087        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
2088
2089        // Positionals should NOT be consumed as named params
2090        assert_eq!(
2091            tool_args.positional,
2092            vec![
2093                Value::String("hello".to_string()),
2094                Value::String("world".to_string()),
2095            ]
2096        );
2097        assert!(tool_args.named.get("args").is_none());
2098    }
2099
2100    #[test]
2101    fn test_short_flag_with_alias_consumes_value() {
2102        // `-n 5` where `-n` is aliased to `lines` (type: int)
2103        // Should produce named: {"lines": 5}, not flags: {"n"} + positional: [5]
2104        let schema = ToolSchema::new("head", "Output first part of files")
2105            .param(ParamSchema::optional("lines", "int", Value::Int(10), "Number of lines")
2106                .with_aliases(["-n"]));
2107        let args = vec![
2108            Arg::ShortFlag("n".to_string()),
2109            Arg::Positional(Expr::Literal(Value::Int(5))),
2110            Arg::Positional(Expr::Literal(Value::String("/tmp/file.txt".to_string()))),
2111        ];
2112        let ctx = make_minimal_ctx();
2113
2114        let tool_args = build_tool_args(&args, &ctx, Some(&schema));
2115
2116        assert!(tool_args.flags.is_empty(), "no boolean flags: {:?}", tool_args.flags);
2117        assert_eq!(tool_args.named.get("lines"), Some(&Value::Int(5)), "should resolve alias to canonical name");
2118        assert_eq!(tool_args.positional, vec![Value::String("/tmp/file.txt".to_string())]);
2119    }
2120
2121    // === Redirect Execution Tests ===
2122
2123    #[tokio::test]
2124    async fn test_merge_stderr_redirect() {
2125        // Test that 2>&1 merges stderr into stdout
2126        let result = ExecResult::from_output(0, "stdout content", "stderr content");
2127
2128        let redirects = vec![Redirect {
2129            kind: RedirectKind::MergeStderr,
2130            target: Expr::Literal(Value::Null),
2131        }];
2132
2133        let ctx = make_minimal_ctx();
2134        let result = apply_redirects(result, &redirects, &ctx).await;
2135
2136        assert_eq!(&*result.text_out(), "stdout contentstderr content");
2137        assert!(result.err.is_empty());
2138    }
2139
2140    #[tokio::test]
2141    async fn test_merge_stderr_with_empty_stderr() {
2142        // Test that 2>&1 handles empty stderr gracefully
2143        let result = ExecResult::from_output(0, "stdout only", "");
2144
2145        let redirects = vec![Redirect {
2146            kind: RedirectKind::MergeStderr,
2147            target: Expr::Literal(Value::Null),
2148        }];
2149
2150        let ctx = make_minimal_ctx();
2151        let result = apply_redirects(result, &redirects, &ctx).await;
2152
2153        assert_eq!(&*result.text_out(), "stdout only");
2154        assert!(result.err.is_empty());
2155    }
2156
2157    #[tokio::test]
2158    async fn test_merge_stderr_order_matters() {
2159        // Test redirect ordering: 2>&1 > file means:
2160        // 1. First merge stderr into stdout
2161        // 2. Then write stdout to file (leaving both empty for piping)
2162        // This verifies left-to-right processing
2163        let result = ExecResult::from_output(0, "stdout\n", "stderr\n");
2164
2165        // Just 2>&1 - should merge
2166        let redirects = vec![Redirect {
2167            kind: RedirectKind::MergeStderr,
2168            target: Expr::Literal(Value::Null),
2169        }];
2170
2171        let ctx = make_minimal_ctx();
2172        let result = apply_redirects(result, &redirects, &ctx).await;
2173
2174        assert_eq!(&*result.text_out(), "stdout\nstderr\n");
2175        assert!(result.err.is_empty());
2176    }
2177
2178    #[tokio::test]
2179    async fn test_redirect_with_command_execution() {
2180        let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
2181
2182        // echo "hello" with 2>&1 redirect
2183        let cmd = Command {
2184            name: "echo".to_string(),
2185            args: vec![Arg::Positional(Expr::Literal(Value::String("hello".to_string())))],
2186            redirects: vec![Redirect {
2187                kind: RedirectKind::MergeStderr,
2188                target: Expr::Literal(Value::Null),
2189            }],
2190        };
2191
2192        let result = runner.run(&[cmd], &mut ctx, &dispatcher).await;
2193        assert!(result.ok());
2194        // echo produces no stderr, so this just validates the redirect doesn't break anything
2195        assert!(result.text_out().contains("hello"));
2196    }
2197
2198    #[tokio::test]
2199    async fn test_merge_stderr_in_pipeline() {
2200        let (runner, mut ctx, dispatcher) = make_runner_and_ctx().await;
2201
2202        // echo "output" 2>&1 | grep "output"
2203        // The 2>&1 should be applied to echo's result, then piped to grep
2204        let echo_cmd = Command {
2205            name: "echo".to_string(),
2206            args: vec![Arg::Positional(Expr::Literal(Value::String("output".to_string())))],
2207            redirects: vec![Redirect {
2208                kind: RedirectKind::MergeStderr,
2209                target: Expr::Literal(Value::Null),
2210            }],
2211        };
2212        let grep_cmd = Command {
2213            name: "grep".to_string(),
2214            args: vec![Arg::Positional(Expr::Literal(Value::String("output".to_string())))],
2215            redirects: vec![],
2216        };
2217
2218        let result = runner.run(&[echo_cmd, grep_cmd], &mut ctx, &dispatcher).await;
2219        assert!(result.ok(), "result failed: code={}, err={}", result.code, result.err);
2220        assert!(result.text_out().contains("output"));
2221    }
2222}