Skip to main content

kaish_kernel/
kernel.rs

1//! The Kernel (核) — the heart of kaish.
2//!
3//! The Kernel owns and coordinates all core components:
4//! - Interpreter state (scope, $?)
5//! - Tool registry (builtins, user tools)
6//! - VFS router (mount points)
7//! - Job manager (background jobs)
8//!
9//! # Architecture
10//!
11//! ```text
12//! ┌────────────────────────────────────────────────────────────┐
13//! │                         Kernel (核)                         │
14//! │  ┌──────────────┐  ┌──────────────┐  ┌──────────────────┐  │
15//! │  │   Scope      │  │ ToolRegistry │  │  VfsRouter       │  │
16//! │  │  (variables) │  │  (builtins,  │  │  (mount points)  │  │
17//! │  │              │  │   user tools)│  │                  │  │
18//! │  └──────────────┘  └──────────────┘  └──────────────────┘  │
19//! │  ┌──────────────────────────────┐  ┌──────────────────┐    │
20//! │  │  JobManager (background)     │  │  ExecResult ($?) │    │
21//! │  └──────────────────────────────┘  └──────────────────┘    │
22//! └────────────────────────────────────────────────────────────┘
23//! ```
24
25use std::collections::HashMap;
26use std::path::PathBuf;
27use std::sync::Arc;
28use std::sync::atomic::{AtomicU64, Ordering};
29use std::time::Duration;
30
31use anyhow::{Context, Result};
32use tokio::sync::RwLock;
33
34/// Monotonic counter assigned to each Kernel at construction time, exposed
35/// via `$$` / `${$}`. Starts at 1; each new Kernel gets the next value.
36/// `Kernel::fork()` inherits the parent's value (matching bash's "subshell
37/// keeps parent's $$" semantics) because forks clone the parent's Scope
38/// rather than calling `set_pid` again.
39///
40/// Deliberately *not* the OS PID — kaish runs as a long-lived MCP server
41/// or embedded inside other binaries (kaijutsu), where the host PID is
42/// meaningless to the script. See
43/// `~/.claude/projects/-home-atobey-src-kaish/memory/lang_dollar_dollar_identifier.md`
44/// for the design rationale.
45static KERNEL_COUNTER: AtomicU64 = AtomicU64::new(1);
46
47use async_trait::async_trait;
48
49use crate::ast::{Arg, Command, Expr, FileTestOp, Stmt, StringPart, TestExpr, ToolDef, Value, BinaryOp};
50pub use kaish_types::ExecuteOptions;
51use crate::backend::{BackendError, KernelBackend};
52use kaish_glob::glob_match;
53use crate::dispatch::{CommandDispatcher, PipelinePosition};
54use crate::interpreter::{apply_output_format, eval_expr, expand_tilde, json_to_value, value_to_bool, value_to_string, ControlFlow, ExecResult, Scope};
55use crate::parser::parse;
56use crate::scheduler::{is_bool_type, schema_param_lookup, select_leaf, stderr_stream, BoundedStream, JobManager, PipelineRunner, StderrReceiver};
57#[cfg(feature = "subprocess")]
58use crate::scheduler::{drain_to_stream, DEFAULT_STREAM_MAX_SIZE};
59use crate::tools::{register_builtins, ExecContext, GlobalFlags, ToolArgs, ToolRegistry};
60#[cfg(feature = "subprocess")]
61use crate::tools::resolve_in_path;
62use crate::validator::{Severity, Validator};
63#[cfg(feature = "localfs")]
64use crate::vfs::LocalFs;
65use crate::vfs::{BuiltinFs, JobFs, MemoryFs, VfsRouter};
66use kaish_vfs::ByteBudget;
67#[cfg(all(feature = "localfs", feature = "overlay"))]
68use kaish_vfs::OverlayFs;
69
70/// VFS mount mode determines how the local filesystem is exposed.
71///
72/// Different modes trade off convenience vs. security:
73/// - `Passthrough` gives native path access (best for human REPL use)
74/// - `Sandboxed` restricts access to a subtree (safer for agents)
75/// - `NoLocal` provides complete isolation (tests, pure memory mode)
76#[derive(Debug, Clone)]
77pub enum VfsMountMode {
78    /// LocalFs at "/" — native paths work directly.
79    ///
80    /// Full filesystem access. Use for human-operated REPL sessions where
81    /// native paths like `/home/user/project` should just work.
82    ///
83    /// Mounts:
84    /// - `/` → LocalFs("/")
85    /// - `/v` → MemoryFs (blob storage)
86    #[cfg(feature = "localfs")]
87    Passthrough,
88
89    /// Transparent sandbox — paths look native but access is restricted.
90    ///
91    /// The local filesystem is mounted at its real path (e.g., `/home/user`),
92    /// so `/home/user/src/project` just works. But paths outside the sandbox
93    /// root are not accessible.
94    ///
95    /// **Note:** This only restricts VFS (builtin) operations. External commands
96    /// bypass the sandbox entirely — see [`KernelConfig::allow_external_commands`].
97    ///
98    /// Mounts:
99    /// - `/` → MemoryFs (catches paths outside sandbox)
100    /// - `{root}` → LocalFs(root)  (e.g., `/home/user` → LocalFs)
101    /// - `/tmp` → LocalFs("/tmp")
102    /// - `/v` → MemoryFs (blob storage)
103    #[cfg(feature = "localfs")]
104    Sandboxed {
105        /// Root path for local filesystem. Defaults to `$HOME`.
106        /// Can be restricted further, e.g., `~/src`.
107        root: Option<PathBuf>,
108    },
109
110    /// No local filesystem. Memory only.
111    ///
112    /// Complete isolation — no access to the host filesystem.
113    /// Useful for tests or pure sandboxed execution.
114    ///
115    /// Output spill is forced to [`SpillMode::Memory`](crate::output_limit::SpillMode::Memory)
116    /// for this mode at kernel construction: with no host filesystem mounted,
117    /// large output must not write a host spill file (`paths::spill_dir()`
118    /// bypasses the VFS). This overrides any explicit `SpillMode::Disk`.
119    ///
120    /// Mounts:
121    /// - `/` → MemoryFs
122    /// - `/tmp` → MemoryFs
123    /// - `/v` → MemoryFs
124    NoLocal,
125}
126
127#[allow(clippy::derivable_impls)] // native has multiple variants; not derivable cross-feature
128impl Default for VfsMountMode {
129    fn default() -> Self {
130        #[cfg(feature = "localfs")]
131        { VfsMountMode::Sandboxed { root: None } }
132        #[cfg(not(feature = "localfs"))]
133        { VfsMountMode::NoLocal }
134    }
135}
136
137/// Configuration for kernel initialization.
138#[derive(Debug, Clone)]
139pub struct KernelConfig {
140    /// Name of this kernel (for identification).
141    pub name: String,
142
143    /// VFS mount mode — controls how local filesystem is exposed.
144    pub vfs_mode: VfsMountMode,
145
146    /// Initial working directory (VFS path).
147    pub cwd: PathBuf,
148
149    /// Whether to skip pre-execution validation.
150    ///
151    /// When false (default), scripts are validated before execution to catch
152    /// errors early. Set to true to skip validation for performance or to
153    /// allow dynamic/external commands.
154    pub skip_validation: bool,
155
156    /// When true, standalone external commands inherit stdio for real-time output.
157    ///
158    /// Set by script runner and REPL for human-visible output.
159    /// Not set by MCP server (output must be captured for structured responses).
160    pub interactive: bool,
161
162    /// Ignore file configuration for file-walking tools.
163    pub ignore_config: crate::ignore_config::IgnoreConfig,
164
165    /// Output size limit configuration for agent safety.
166    pub output_limit: crate::output_limit::OutputLimitConfig,
167
168    /// Whether external command execution (PATH lookup, `exec`, `spawn`) is allowed.
169    ///
170    /// When `true` (default), commands not found as builtins are resolved via PATH
171    /// and executed as child processes. When `false`, only kaish builtins and
172    /// backend-registered tools are available.
173    ///
174    /// **Security:** External commands bypass the VFS sandbox entirely — they see
175    /// the real filesystem, network, and environment. Set to `false` when running
176    /// untrusted input.
177    pub allow_external_commands: bool,
178
179    /// Enable confirmation latch for dangerous operations (set -o latch).
180    ///
181    /// When enabled, destructive operations like `rm` require nonce confirmation.
182    /// Can also be enabled at runtime with `set -o latch` or via `KAISH_LATCH=1`.
183    pub latch_enabled: bool,
184
185    /// Enable trash-on-delete for rm (set -o trash).
186    ///
187    /// When enabled, small files are moved to freedesktop.org Trash instead of
188    /// being permanently deleted. Can also be enabled at runtime with `set -o trash`
189    /// or via `KAISH_TRASH=1`.
190    pub trash_enabled: bool,
191
192    /// Shared nonce store for cross-request confirmation latch.
193    ///
194    /// When `Some`, the kernel uses this store instead of creating a fresh one.
195    /// This allows nonces issued in one MCP `execute()` call to be validated
196    /// in a subsequent call. When `None` (default), a fresh store is created.
197    pub nonce_store: Option<crate::nonce::NonceStore>,
198
199    /// Variables to populate the root scope with at construction, all marked
200    /// for export to child processes.
201    ///
202    /// The kernel itself is hermetic — it never reads `std::env::vars()` —
203    /// so frontends that want OS-env passthrough (REPL, MCP) populate this
204    /// from `std::env::vars()`. Embedders that want isolation pass nothing
205    /// (or only the keys they curate).
206    pub initial_vars: HashMap<String, Value>,
207
208    /// Default per-request timeout. When `Some`, every `execute_with_options`
209    /// call without an explicit `ExecuteOptions::timeout` uses this duration.
210    /// When elapsed, the kernel cancels the request, kills any external
211    /// children with the configured grace, and returns exit code 124.
212    ///
213    /// `None` means no default timeout — only explicit per-call timeouts apply.
214    pub request_timeout: Option<Duration>,
215
216    /// Grace period between SIGTERM and SIGKILL when killing an external
217    /// child on cancellation or timeout.
218    ///
219    /// Defaults to 2 seconds. Set to `Duration::ZERO` to escalate immediately
220    /// to SIGKILL. Long-shutdown processes (databases, etc.) may need more.
221    pub kill_grace: Duration,
222
223    /// Cap on memory-resident bytes across all kernel-owned `MemoryFs` mounts.
224    ///
225    /// One shared `ByteBudget` (labeled `"vfs-memory"`) is created at kernel
226    /// construction and handed to every `MemoryFs` the kernel builds in
227    /// `setup_vfs` (Passthrough `/v`; Sandboxed `/` and `/v`; NoLocal `/`,
228    /// `/tmp`, `/v`). Writes that would exceed the cap fail loudly with
229    /// `StorageFull` — an in-band error a model reads and adapts to; fail
230    /// loud over quietly eating RAM.
231    ///
232    /// **Why MCP is bounded by default:** each `execute()` call creates a fresh
233    /// kernel (see `server/execute.rs`), so the 64 MiB cap is per-call, not
234    /// per-session. Embedders that know their workload needs more opt out with
235    /// `without_vfs_budget()` or raise the cap with `with_vfs_budget(bytes)` —
236    /// protection on by default, opt out knowingly. All other profiles default
237    /// to `None` (unbounded).
238    ///
239    /// Follows the same pattern as `OutputLimitConfig`: MCP bounded, rest unbounded.
240    pub vfs_budget_bytes: Option<u64>,
241
242    /// Enable copy-on-write overlay mode (opt-in).
243    ///
244    /// When `true`, the primary local filesystem mount is wrapped in an
245    /// `OverlayFs` so writes are virtual — the lower layer is never touched.
246    /// Use `kaish-vfs status/diff/commit/reset` to inspect and manage the
247    /// overlay transaction.
248    ///
249    /// **Passthrough:** `/` becomes `OverlayFs over LocalFs::read_only("/")`.
250    /// **Sandboxed{root}:** the `{root}` mount becomes
251    /// `OverlayFs over LocalFs::read_only(root)`; the `/tmp` and XDG runtime
252    /// mounts stay as real `LocalFs` (real writes escape the transaction —
253    /// see `docs/kaish-overlayfs.md` for the escape-hatch inventory).
254    /// **NoLocal:** incompatible — construction fails loudly (everything is
255    /// already virtual; an overlay adds no value and no lower layer to wrap).
256    /// **with_backend:** incompatible — the embedder controls the VFS; the
257    /// kernel cannot wrap it without bypassing the embedder's semantics.
258    ///
259    /// **Not default-on for MCP:** each `execute()` call gets a fresh kernel,
260    /// making the overlay a per-call transaction — `kaish-vfs commit` must run
261    /// in the same call as the writes, or the transaction is discarded on drop.
262    /// Frontends (REPL, MCP) expose `--overlay` as an explicit opt-in flag.
263    pub overlay: bool,
264}
265
266/// Get the default sandbox root ($HOME).
267#[cfg(feature = "localfs")]
268fn default_sandbox_root() -> PathBuf {
269    std::env::var("HOME")
270        .map(PathBuf::from)
271        .unwrap_or_else(|_| PathBuf::from("/"))
272}
273
274impl Default for KernelConfig {
275    fn default() -> Self {
276        #[cfg(feature = "localfs")]
277        {
278            let home = default_sandbox_root();
279            Self {
280                name: "default".to_string(),
281                vfs_mode: VfsMountMode::Sandboxed { root: None },
282                cwd: home,
283                skip_validation: false,
284                interactive: false,
285                ignore_config: crate::ignore_config::IgnoreConfig::none(),
286                output_limit: crate::output_limit::OutputLimitConfig::none(),
287                allow_external_commands: cfg!(feature = "subprocess"),
288                latch_enabled: std::env::var("KAISH_LATCH").is_ok_and(|v| v == "1"),
289                trash_enabled: std::env::var("KAISH_TRASH").is_ok_and(|v| v == "1"),
290                nonce_store: None,
291                initial_vars: HashMap::new(),
292                request_timeout: None,
293                kill_grace: Duration::from_secs(2),
294                vfs_budget_bytes: None,
295                overlay: false,
296            }
297        }
298        #[cfg(not(feature = "localfs"))]
299        {
300            Self {
301                name: "default".to_string(),
302                vfs_mode: VfsMountMode::NoLocal,
303                cwd: PathBuf::from("/"),
304                skip_validation: false,
305                interactive: false,
306                ignore_config: crate::ignore_config::IgnoreConfig::none(),
307                output_limit: crate::output_limit::OutputLimitConfig::none(),
308                allow_external_commands: false,
309                latch_enabled: false,
310                trash_enabled: false,
311                nonce_store: None,
312                initial_vars: HashMap::new(),
313                request_timeout: None,
314                kill_grace: Duration::from_secs(2),
315                vfs_budget_bytes: None,
316                overlay: false,
317            }
318        }
319    }
320}
321
322impl KernelConfig {
323    /// Create a transient kernel config (sandboxed, for temporary use).
324    #[cfg(feature = "localfs")]
325    pub fn transient() -> Self {
326        let home = default_sandbox_root();
327        Self {
328            name: "transient".to_string(),
329            vfs_mode: VfsMountMode::Sandboxed { root: None },
330            cwd: home,
331            skip_validation: false,
332            interactive: false,
333            ignore_config: crate::ignore_config::IgnoreConfig::none(),
334            output_limit: crate::output_limit::OutputLimitConfig::none(),
335            allow_external_commands: cfg!(feature = "subprocess"),
336            latch_enabled: false,
337            trash_enabled: false,
338            nonce_store: None,
339            initial_vars: HashMap::new(),
340            request_timeout: None,
341            kill_grace: Duration::from_secs(2),
342            vfs_budget_bytes: None,
343            overlay: false,
344        }
345    }
346
347    /// Create a transient kernel config (isolated, no-default-features).
348    #[cfg(not(feature = "localfs"))]
349    pub fn transient() -> Self {
350        Self::isolated()
351    }
352
353    /// Create a kernel config with the given name (sandboxed by default).
354    #[cfg(feature = "localfs")]
355    pub fn named(name: &str) -> Self {
356        let home = default_sandbox_root();
357        Self {
358            name: name.to_string(),
359            vfs_mode: VfsMountMode::Sandboxed { root: None },
360            cwd: home,
361            skip_validation: false,
362            interactive: false,
363            ignore_config: crate::ignore_config::IgnoreConfig::none(),
364            output_limit: crate::output_limit::OutputLimitConfig::none(),
365            allow_external_commands: cfg!(feature = "subprocess"),
366            latch_enabled: false,
367            trash_enabled: false,
368            nonce_store: None,
369            initial_vars: HashMap::new(),
370            request_timeout: None,
371            kill_grace: Duration::from_secs(2),
372            vfs_budget_bytes: None,
373            overlay: false,
374        }
375    }
376
377    /// Create a kernel config with the given name (isolated, no-default-features).
378    #[cfg(not(feature = "localfs"))]
379    pub fn named(name: &str) -> Self {
380        Self {
381            name: name.to_string(),
382            ..Self::isolated()
383        }
384    }
385
386    /// Create a REPL config with passthrough filesystem access.
387    ///
388    /// Native paths like `/home/user/project` work directly.
389    /// The cwd is set to the actual current working directory.
390    #[cfg(feature = "localfs")]
391    pub fn repl() -> Self {
392        let cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("/"));
393        Self {
394            name: "repl".to_string(),
395            vfs_mode: VfsMountMode::Passthrough,
396            cwd,
397            skip_validation: false,
398            interactive: false,
399            ignore_config: crate::ignore_config::IgnoreConfig::none(),
400            output_limit: crate::output_limit::OutputLimitConfig::none(),
401            allow_external_commands: cfg!(feature = "subprocess"),
402            latch_enabled: std::env::var("KAISH_LATCH").is_ok_and(|v| v == "1"),
403            trash_enabled: std::env::var("KAISH_TRASH").is_ok_and(|v| v == "1"),
404            nonce_store: None,
405            initial_vars: HashMap::new(),
406            request_timeout: None,
407            kill_grace: Duration::from_secs(2),
408            vfs_budget_bytes: None,
409            overlay: false,
410        }
411    }
412
413    /// Create an MCP server config with sandboxed filesystem access.
414    ///
415    /// Local filesystem is accessible at its real path (e.g., `/home/user`),
416    /// but sandboxed to `$HOME`. Paths outside the sandbox are not accessible
417    /// through builtins. External commands still access the real filesystem —
418    /// use `.with_allow_external_commands(false)` to block them.
419    ///
420    /// VFS memory is bounded at 64 MiB per `execute()` call by default
421    /// (MCP creates a fresh kernel per call). Raise or remove with
422    /// `with_vfs_budget` / `without_vfs_budget`.
423    #[cfg(feature = "localfs")]
424    pub fn mcp() -> Self {
425        let home = default_sandbox_root();
426        Self {
427            name: "mcp".to_string(),
428            vfs_mode: VfsMountMode::Sandboxed { root: None },
429            cwd: home,
430            skip_validation: false,
431            interactive: false,
432            ignore_config: crate::ignore_config::IgnoreConfig::mcp(),
433            output_limit: crate::output_limit::OutputLimitConfig::mcp(),
434            allow_external_commands: cfg!(feature = "subprocess"),
435            latch_enabled: std::env::var("KAISH_LATCH").is_ok_and(|v| v == "1"),
436            trash_enabled: std::env::var("KAISH_TRASH").is_ok_and(|v| v == "1"),
437            nonce_store: None,
438            initial_vars: HashMap::new(),
439            request_timeout: None,
440            kill_grace: Duration::from_secs(2),
441            vfs_budget_bytes: Some(64 * 1024 * 1024),
442            overlay: false,
443        }
444    }
445
446    /// Create an MCP server config with a custom sandbox root.
447    ///
448    /// Use this to restrict access to a subdirectory like `~/src`.
449    ///
450    /// VFS memory is bounded at 64 MiB per `execute()` call by default.
451    /// Raise or remove with `with_vfs_budget` / `without_vfs_budget`.
452    #[cfg(feature = "localfs")]
453    pub fn mcp_with_root(root: PathBuf) -> Self {
454        Self {
455            name: "mcp".to_string(),
456            vfs_mode: VfsMountMode::Sandboxed { root: Some(root.clone()) },
457            cwd: root,
458            skip_validation: false,
459            interactive: false,
460            ignore_config: crate::ignore_config::IgnoreConfig::mcp(),
461            output_limit: crate::output_limit::OutputLimitConfig::mcp(),
462            allow_external_commands: cfg!(feature = "subprocess"),
463            latch_enabled: std::env::var("KAISH_LATCH").is_ok_and(|v| v == "1"),
464            trash_enabled: std::env::var("KAISH_TRASH").is_ok_and(|v| v == "1"),
465            nonce_store: None,
466            initial_vars: HashMap::new(),
467            request_timeout: None,
468            kill_grace: Duration::from_secs(2),
469            vfs_budget_bytes: Some(64 * 1024 * 1024),
470            overlay: false,
471        }
472    }
473
474    /// Create a config with no local filesystem (memory only).
475    ///
476    /// Complete isolation: no local filesystem and external commands are disabled.
477    /// Useful for tests or pure sandboxed execution.
478    pub fn isolated() -> Self {
479        Self {
480            name: "isolated".to_string(),
481            vfs_mode: VfsMountMode::NoLocal,
482            cwd: PathBuf::from("/"),
483            skip_validation: false,
484            interactive: false,
485            ignore_config: crate::ignore_config::IgnoreConfig::none(),
486            output_limit: crate::output_limit::OutputLimitConfig::none(),
487            allow_external_commands: false,
488            latch_enabled: false,
489            trash_enabled: false,
490            nonce_store: None,
491            initial_vars: HashMap::new(),
492            request_timeout: None,
493            kill_grace: Duration::from_secs(2),
494            vfs_budget_bytes: None,
495            overlay: false,
496        }
497    }
498
499    /// Set the VFS mount mode.
500    pub fn with_vfs_mode(mut self, mode: VfsMountMode) -> Self {
501        self.vfs_mode = mode;
502        self
503    }
504
505    /// Set the initial working directory.
506    pub fn with_cwd(mut self, cwd: PathBuf) -> Self {
507        self.cwd = cwd;
508        self
509    }
510
511    /// Skip pre-execution validation.
512    pub fn with_skip_validation(mut self, skip: bool) -> Self {
513        self.skip_validation = skip;
514        self
515    }
516
517    /// Enable interactive mode (external commands inherit stdio).
518    pub fn with_interactive(mut self, interactive: bool) -> Self {
519        self.interactive = interactive;
520        self
521    }
522
523    /// Set the ignore file configuration.
524    pub fn with_ignore_config(mut self, config: crate::ignore_config::IgnoreConfig) -> Self {
525        self.ignore_config = config;
526        self
527    }
528
529    /// Set the output limit configuration.
530    pub fn with_output_limit(mut self, config: crate::output_limit::OutputLimitConfig) -> Self {
531        self.output_limit = config;
532        self
533    }
534
535    /// Set whether external command execution is allowed.
536    ///
537    /// When `false`, commands not found as builtins produce "command not found"
538    /// instead of searching PATH. The `exec` and `spawn` builtins also return
539    /// errors. Use this to prevent VFS sandbox bypass via external binaries.
540    pub fn with_allow_external_commands(mut self, allow: bool) -> Self {
541        self.allow_external_commands = allow;
542        self
543    }
544
545    /// Enable or disable confirmation latch at startup.
546    pub fn with_latch(mut self, enabled: bool) -> Self {
547        self.latch_enabled = enabled;
548        self
549    }
550
551    /// Enable or disable trash-on-delete at startup.
552    pub fn with_trash(mut self, enabled: bool) -> Self {
553        self.trash_enabled = enabled;
554        self
555    }
556
557    /// Use a shared nonce store for cross-request confirmation latch.
558    ///
559    /// Pass a `NonceStore` that outlives individual kernel instances so nonces
560    /// issued in one MCP `execute()` call can be validated in subsequent calls.
561    pub fn with_nonce_store(mut self, store: crate::nonce::NonceStore) -> Self {
562        self.nonce_store = Some(store);
563        self
564    }
565
566    /// Add a single initial variable; marked exported when the kernel boots.
567    ///
568    /// Repeated calls add (last write wins on key collision).
569    pub fn with_var(mut self, name: impl Into<String>, value: Value) -> Self {
570        self.initial_vars.insert(name.into(), value);
571        self
572    }
573
574    /// Replace the entire initial-vars map. All entries are marked exported.
575    pub fn with_initial_vars(mut self, vars: HashMap<String, Value>) -> Self {
576        self.initial_vars = vars;
577        self
578    }
579
580    /// Extend the initial-vars map with the given entries (last write wins).
581    pub fn with_vars(mut self, vars: HashMap<String, Value>) -> Self {
582        self.initial_vars.extend(vars);
583        self
584    }
585
586    /// Set the default per-request timeout (kernel-wide).
587    ///
588    /// Each `execute_with_options` call without an explicit timeout uses
589    /// this. On elapsed, the kernel cancels and returns exit code 124.
590    pub fn with_request_timeout(mut self, timeout: Duration) -> Self {
591        self.request_timeout = Some(timeout);
592        self
593    }
594
595    /// Set the SIGTERM-to-SIGKILL grace period for child kills.
596    pub fn with_kill_grace(mut self, grace: Duration) -> Self {
597        self.kill_grace = grace;
598        self
599    }
600
601    /// Cap VFS memory-resident bytes at `bytes` across all kernel-owned
602    /// `MemoryFs` mounts. A shared `ByteBudget` labeled `"vfs-memory"` is
603    /// created at kernel construction and passed to every `MemoryFs` the
604    /// kernel builds (see `setup_vfs` and `with_backend`).
605    ///
606    /// Writes that would exceed the cap fail loudly with `StorageFull` — an
607    /// in-band error a model reads and adapts to; fail loud over quietly eating
608    /// RAM. Use `without_vfs_budget` to remove the cap entirely.
609    pub fn with_vfs_budget(mut self, bytes: u64) -> Self {
610        self.vfs_budget_bytes = Some(bytes);
611        self
612    }
613
614    /// Remove the VFS memory budget — all `MemoryFs` mounts are unbounded.
615    ///
616    /// Use when the caller knows the workload and the default 64 MiB cap
617    /// (set by `KernelConfig::mcp`) is too conservative.
618    pub fn without_vfs_budget(mut self) -> Self {
619        self.vfs_budget_bytes = None;
620        self
621    }
622
623    /// Enable or disable copy-on-write overlay mode.
624    ///
625    /// When `true`, the primary local filesystem mount is wrapped in an
626    /// `OverlayFs` so writes are virtual — the lower layer is never touched.
627    /// Incompatible with `VfsMountMode::NoLocal` (fails loudly at construction)
628    /// and `with_backend` kernels (same — the embedder controls the VFS).
629    pub fn with_overlay(mut self, overlay: bool) -> Self {
630        self.overlay = overlay;
631        self
632    }
633}
634
635/// Handle to an active overlay session, kept on the kernel and shared to
636/// `ExecContext` so the `kaish-vfs` builtin can reach the `OverlayFs`.
637///
638/// The `mount_path` is the VFS prefix the overlay was mounted under (e.g.
639/// `/home/user`); `commit_root` is the real filesystem path the overlay's
640/// lower is backed by (used as the target for `kaish-vfs commit`).
641#[cfg(all(feature = "localfs", feature = "overlay"))]
642#[derive(Clone)]
643pub struct OverlayHandle {
644    /// The mounted `OverlayFs`, Arc-shared so the builtin can call inspection
645    /// methods without holding a VfsRouter lock.
646    pub fs: Arc<OverlayFs>,
647    /// VFS path this overlay is mounted at (e.g. `/home/user`).
648    pub mount_path: PathBuf,
649    /// Real filesystem root to commit into. Same as the lower's root.
650    pub commit_root: PathBuf,
651}
652
653/// The Kernel (核) — executes kaish code.
654///
655/// This is the primary interface for running kaish commands. It owns all
656/// the runtime state: variables, tools, VFS, jobs, and persistence.
657pub struct Kernel {
658    /// Kernel name.
659    name: String,
660    /// Variable scope.
661    scope: RwLock<Scope>,
662    /// Tool registry.
663    tools: Arc<ToolRegistry>,
664    /// User-defined tools (from `tool name { body }` statements).
665    user_tools: RwLock<HashMap<String, ToolDef>>,
666    /// Virtual filesystem router.
667    vfs: Arc<VfsRouter>,
668    /// Background job manager.
669    jobs: Arc<JobManager>,
670    /// Pipeline runner.
671    runner: PipelineRunner,
672    /// Execution context (cwd, stdin, etc.).
673    exec_ctx: RwLock<ExecContext>,
674    /// Whether to skip pre-execution validation.
675    skip_validation: bool,
676    /// When true, standalone external commands inherit stdio for real-time output.
677    interactive: bool,
678    /// Whether external command execution is allowed.
679    allow_external_commands: bool,
680    /// Shared memory budget for all kernel-owned `MemoryFs` mounts.
681    ///
682    /// `None` when `KernelConfig::vfs_budget_bytes` was `None` (unbounded).
683    /// `Some` is Arc-cloned into forks so all concurrent execution draws from
684    /// the same pool — a background job's writes reduce the same cap as
685    /// foreground writes, which is the correct behaviour.
686    vfs_budget: Option<Arc<kaish_vfs::ByteBudget>>,
687    /// Active overlay session handle, if this kernel was constructed with
688    /// `overlay: true`. Arc-shared so `ExecContext` (and thus the
689    /// `kaish-vfs` builtin) can inspect and mutate the overlay without
690    /// holding a kernel write lock. Propagated to forks via `fork_inner`
691    /// and `child_for_pipeline` so `kaish-vfs` works inside background
692    /// jobs, scatter workers, and pipeline stages.
693    #[cfg(all(feature = "localfs", feature = "overlay"))]
694    overlay_handle: Option<Arc<OverlayHandle>>,
695    /// Default per-request timeout (None = no default).
696    request_timeout: Option<Duration>,
697    /// SIGTERM-to-SIGKILL grace period for child kills.
698    kill_grace: Duration,
699    /// Receiver for the kernel stderr stream.
700    ///
701    /// Pipeline stages write to the corresponding `StderrStream` (set on ExecContext).
702    /// The kernel drains this after each statement in `execute_streaming`.
703    stderr_receiver: tokio::sync::Mutex<StderrReceiver>,
704    /// Cancellation token for interrupting execution (Ctrl-C).
705    ///
706    /// Protected by `std::sync::Mutex` (not tokio) because the SIGINT handler
707    /// needs sync access. Each `execute()` call gets a fresh child token;
708    /// `cancel()` cancels the current token and replaces it.
709    cancel_token: std::sync::Mutex<tokio_util::sync::CancellationToken>,
710    /// Terminal state for job control (interactive mode only, Unix only).
711    #[cfg(all(unix, feature = "subprocess"))]
712    terminal_state: Option<Arc<crate::terminal::TerminalState>>,
713    /// Weak self-reference for handing out `Arc<dyn CommandDispatcher>`.
714    ///
715    /// Set by `into_arc()`. Allows builtins to re-dispatch inner commands
716    /// through the full Kernel resolution chain.
717    self_weak: std::sync::OnceLock<std::sync::Weak<Self>>,
718    /// Background job this kernel (a fork) is executing on behalf of, if any.
719    /// Set on the fork created by `execute_background` and inherited by all its
720    /// sub-forks (pipeline stages, scatter workers), so an external command
721    /// spawned anywhere under a background job can record its process group on
722    /// that job for `kill -<sig> %N`. `None` for foreground execution.
723    bg_job_id: Option<crate::scheduler::JobId>,
724    /// Serializes concurrent `execute()` / `execute_streaming()` callers on
725    /// this Kernel instance. Tokio's Mutex is fair (FIFO) and acts as the
726    /// queue. Background jobs, scatter workers, and concurrent pipeline
727    /// stages do NOT take this lock — they run against a *forked* Kernel
728    /// (see [`Kernel::fork`]) so they never contend with the foreground.
729    execute_lock: tokio::sync::Mutex<()>,
730}
731
732/// Internal result of [`Kernel::setup_vfs`].
733struct VfsSetupResult {
734    vfs: VfsRouter,
735    budget: Option<Arc<ByteBudget>>,
736    #[cfg(all(feature = "localfs", feature = "overlay"))]
737    overlay_handle: Option<Arc<OverlayHandle>>,
738}
739
740impl Kernel {
741    /// Create a new kernel with the given configuration.
742    pub fn new(config: KernelConfig) -> Result<Self> {
743        let mut setup = Self::setup_vfs(&config)?;
744        let jobs = Arc::new(JobManager::new());
745
746        // Mount JobFs for job observability at /v/jobs
747        setup.vfs.mount("/v/jobs", JobFs::new(jobs.clone()));
748
749        #[cfg(all(feature = "localfs", feature = "overlay"))]
750        let overlay_handle = setup.overlay_handle.take();
751
752        // Mode-based construction: the kernel owns its host mounts, so whether
753        // host side channels are allowed is decided by the VFS mode inside
754        // `assemble` (NoLocal forbids them).
755        let kernel = Self::assemble(config, setup.vfs, jobs, false, setup.budget, |_| {}, |vfs_ref, tools| {
756            ExecContext::with_vfs_and_tools(vfs_ref.clone(), tools.clone())
757        })?;
758
759        #[cfg(all(feature = "localfs", feature = "overlay"))]
760        {
761            let mut kernel = kernel;
762            kernel.overlay_handle = overlay_handle;
763            // Also set it on the ExecContext so builtins can access it.
764            if let Some(ref handle) = kernel.overlay_handle {
765                kernel.exec_ctx.get_mut().overlay_handle = Some(Arc::clone(handle));
766            }
767            return Ok(kernel);
768        }
769
770        #[allow(unreachable_code)]
771        Ok(kernel)
772    }
773
774    /// Set up VFS based on mount mode.
775    ///
776    /// Returns the router, the budget handle (if bounded), and an optional
777    /// overlay handle when `config.overlay` is true. The budget is Arc-shared:
778    /// every `MemoryFs` the kernel creates here holds a clone of the same
779    /// `Arc<ByteBudget>`, so the total charged against it is the sum of all
780    /// in-memory content across all kernel-owned memory mounts.
781    ///
782    /// # Errors
783    /// Returns `Err` if `config.overlay` is true and the mode is `NoLocal`
784    /// (overlay is meaningless when everything is already virtual — there is
785    /// no real lower layer to wrap). The caller (`Kernel::new`) propagates
786    /// this as an `anyhow::Error`.
787    fn setup_vfs(config: &KernelConfig) -> Result<VfsSetupResult> {
788        let mut vfs = VfsRouter::new();
789
790        // One budget for all memory mounts this kernel owns — labeled so the
791        // error message tells the user exactly which knob to raise.
792        let budget: Option<Arc<ByteBudget>> = config
793            .vfs_budget_bytes
794            .map(|bytes| Arc::new(ByteBudget::labeled(bytes, "vfs-memory")));
795
796        /// Helper: construct a `MemoryFs` wired to `budget` if present.
797        fn mem(budget: &Option<Arc<ByteBudget>>) -> MemoryFs {
798            match budget {
799                Some(b) => MemoryFs::with_budget(Arc::clone(b)),
800                None => MemoryFs::new(),
801            }
802        }
803
804        // Overlay handle — populated below if config.overlay is true.
805        #[cfg(all(feature = "localfs", feature = "overlay"))]
806        let mut overlay_handle: Option<Arc<OverlayHandle>> = None;
807
808        match &config.vfs_mode {
809            #[cfg(feature = "localfs")]
810            VfsMountMode::Passthrough => {
811                #[cfg(feature = "overlay")]
812                if config.overlay {
813                    // Wrap "/" in an OverlayFs so writes are virtual.
814                    let lower = Arc::new(LocalFs::read_only(PathBuf::from("/")));
815                    let overlay_fs = Arc::new(match &budget {
816                        Some(b) => OverlayFs::over_with_budget(lower, Arc::clone(b)),
817                        None => OverlayFs::over(lower),
818                    });
819                    let handle = Arc::new(OverlayHandle {
820                        fs: Arc::clone(&overlay_fs),
821                        mount_path: PathBuf::from("/"),
822                        commit_root: PathBuf::from("/"),
823                    });
824                    vfs.mount_arc("/", overlay_fs as Arc<dyn kaish_vfs::Filesystem>);
825                    overlay_handle = Some(handle);
826                } else {
827                    // LocalFs at "/" — native paths work directly
828                    vfs.mount("/", LocalFs::new(PathBuf::from("/")));
829                }
830                #[cfg(not(feature = "overlay"))]
831                {
832                    if config.overlay {
833                        return Err(anyhow::anyhow!(
834                            "overlay=true requires the `overlay` feature, but this build \
835                             was compiled without it. Recompile with --features overlay \
836                             (or the default feature set) to enable overlay mode."
837                        ));
838                    }
839                    // LocalFs at "/" — native paths work directly
840                    vfs.mount("/", LocalFs::new(PathBuf::from("/")));
841                }
842                // Memory for blobs
843                vfs.mount("/v", mem(&budget));
844            }
845            #[cfg(feature = "localfs")]
846            VfsMountMode::Sandboxed { root } => {
847                // Memory at root for safety (catches paths outside sandbox).
848                // Note: /tmp and the XDG runtime dir are LocalFs — writes
849                // there escape the VFS budget and are NOT virtual. This is
850                // intentional: /tmp interop with other processes matters more
851                // than accounting for scratch files there.
852                vfs.mount("/", mem(&budget));
853                vfs.mount("/v", mem(&budget));
854
855                // Real /tmp for interop with other processes
856                vfs.mount("/tmp", LocalFs::new(PathBuf::from("/tmp")));
857
858                // Mount XDG runtime dir for spill files and socket access
859                let runtime = crate::paths::xdg_runtime_dir();
860                if runtime.exists() {
861                    let runtime_str = runtime.to_string_lossy().to_string();
862                    vfs.mount(&runtime_str, LocalFs::new(runtime));
863                }
864
865                // Resolve the sandbox root (defaults to $HOME)
866                let local_root = root.clone().unwrap_or_else(|| {
867                    std::env::var("HOME")
868                        .map(PathBuf::from)
869                        .unwrap_or_else(|_| PathBuf::from("/"))
870                });
871
872                let mount_point = local_root.to_string_lossy().to_string();
873
874                #[cfg(feature = "overlay")]
875                if config.overlay {
876                    // Wrap the sandbox root in an OverlayFs.
877                    let lower = Arc::new(LocalFs::read_only(local_root.clone()));
878                    let overlay_fs = Arc::new(match &budget {
879                        Some(b) => OverlayFs::over_with_budget(lower, Arc::clone(b)),
880                        None => OverlayFs::over(lower),
881                    });
882                    let handle = Arc::new(OverlayHandle {
883                        fs: Arc::clone(&overlay_fs),
884                        mount_path: PathBuf::from(&mount_point),
885                        commit_root: local_root,
886                    });
887                    vfs.mount_arc(&mount_point, overlay_fs as Arc<dyn kaish_vfs::Filesystem>);
888                    overlay_handle = Some(handle);
889                } else {
890                    // Mount at the real path for transparent access
891                    // e.g., /home/atobey → LocalFs("/home/atobey")
892                    // so /home/atobey/src/kaish just works
893                    vfs.mount(&mount_point, LocalFs::new(local_root));
894                }
895                #[cfg(not(feature = "overlay"))]
896                {
897                    if config.overlay {
898                        return Err(anyhow::anyhow!(
899                            "overlay=true requires the `overlay` feature, but this build \
900                             was compiled without it. Recompile with --features overlay \
901                             (or the default feature set) to enable overlay mode."
902                        ));
903                    }
904                    // Mount at the real path for transparent access
905                    vfs.mount(&mount_point, LocalFs::new(local_root));
906                }
907            }
908            VfsMountMode::NoLocal => {
909                if config.overlay {
910                    return Err(anyhow::anyhow!(
911                        "overlay=true is incompatible with VfsMountMode::NoLocal: \
912                         everything is already virtual, there is no real lower layer \
913                         to wrap. Use with_overlay(false) or switch to a Passthrough \
914                         or Sandboxed VFS mode."
915                    ));
916                }
917                // Pure memory mode — no local filesystem
918                vfs.mount("/", mem(&budget));
919                vfs.mount("/tmp", mem(&budget));
920                vfs.mount("/v", mem(&budget));
921            }
922        }
923
924        Ok(VfsSetupResult {
925            vfs,
926            budget,
927            #[cfg(all(feature = "localfs", feature = "overlay"))]
928            overlay_handle,
929        })
930    }
931
932    /// Create a transient kernel (no persistence).
933    pub fn transient() -> Result<Self> {
934        Self::new(KernelConfig::transient())
935    }
936
937    /// Create a kernel with a custom backend and `/v/*` virtual path support.
938    ///
939    /// This is the constructor for embedding kaish in other systems that provide
940    /// their own storage backend (e.g., CRDT-backed storage in kaijutsu).
941    ///
942    /// A `VirtualOverlayBackend` routes paths automatically:
943    /// - `/v/*` → Internal VFS (JobFs at `/v/jobs`, MemoryFs at `/v/blobs`)
944    /// - Everything else → Your custom backend
945    ///
946    /// The optional `configure_vfs` closure lets you add additional virtual mounts
947    /// (e.g., `/v/docs` for CRDT blocks) after the built-in mounts are set up.
948    ///
949    /// **Note:** The config's `vfs_mode` is ignored — all non-`/v/*` path routing
950    /// is handled by your custom backend. The config is only used for `name`, `cwd`,
951    /// `skip_validation`, and `interactive`.
952    ///
953    /// # Example
954    ///
955    /// ```ignore
956    /// // Simple: default /v/* mounts only
957    /// let kernel = Kernel::with_backend(backend, config, |_| {}, |_| {})?;
958    ///
959    /// // With custom mounts
960    /// let kernel = Kernel::with_backend(backend, config, |vfs| {
961    ///     vfs.mount_arc("/v/docs", docs_fs);
962    ///     vfs.mount_arc("/v/g", git_fs);
963    /// }, |_| {})?;
964    ///
965    /// // With custom tools
966    /// let kernel = Kernel::with_backend(backend, config, |_| {}, |tools| {
967    ///     tools.register(MyCustomTool::new());
968    /// })?;
969    /// ```
970    pub fn with_backend(
971        backend: Arc<dyn KernelBackend>,
972        config: KernelConfig,
973        configure_vfs: impl FnOnce(&mut VfsRouter),
974        configure_tools: impl FnOnce(&mut ToolRegistry),
975    ) -> Result<Self> {
976        use crate::backend::VirtualOverlayBackend;
977
978        // overlay=true is incompatible with with_backend: the embedder controls
979        // the VFS and the kernel cannot wrap it without bypassing the embedder's
980        // semantics. Fail loudly rather than silently ignoring the flag.
981        if config.overlay {
982            return Err(anyhow::anyhow!(
983                "overlay=true is incompatible with Kernel::with_backend: the embedder \
984                 controls the VFS; the kernel cannot wrap it with an OverlayFs without \
985                 bypassing the embedder's storage semantics. Use KernelConfig::with_overlay(false)."
986            ));
987        }
988
989        let mut vfs = VfsRouter::new();
990        let jobs = Arc::new(JobManager::new());
991
992        // Create the budget from config so `with_vfs_budget` / `without_vfs_budget`
993        // work for `with_backend` callers too. The /v/blobs MemoryFs is the only
994        // kernel-owned memory mount here — embedders own the rest of the VFS.
995        let vfs_budget: Option<Arc<ByteBudget>> = config
996            .vfs_budget_bytes
997            .map(|bytes| Arc::new(ByteBudget::labeled(bytes, "vfs-memory")));
998
999        vfs.mount("/v/jobs", JobFs::new(jobs.clone()));
1000        let blobs_fs = match &vfs_budget {
1001            Some(b) => MemoryFs::with_budget(Arc::clone(b)),
1002            None => MemoryFs::new(),
1003        };
1004        vfs.mount("/v/blobs", blobs_fs);
1005
1006        // Let caller add custom mounts (e.g., /v/docs, /v/g)
1007        configure_vfs(&mut vfs);
1008
1009        // A custom-backend kernel owns no host mounts — the embedder supplies
1010        // the entire VFS — so any kernel write to a host filesystem via
1011        // `std::fs` (output spill, job output files) bypasses that VFS and its
1012        // read-only guarantees. Forbid host side channels unconditionally.
1013        Self::assemble(config, vfs, jobs, true, vfs_budget, configure_tools, |vfs_arc: &Arc<VfsRouter>, _: &Arc<ToolRegistry>| {
1014            let overlay: Arc<dyn KernelBackend> =
1015                Arc::new(VirtualOverlayBackend::new(backend, vfs_arc.clone()));
1016            ExecContext::with_backend(overlay)
1017        })
1018    }
1019
1020    /// Shared assembly: wires up tools, runner, scope, and ExecContext.
1021    ///
1022    /// The `make_ctx` closure receives the VFS and tools so backends that need
1023    /// them (like `LocalBackend::with_tools`) can capture them. Custom backends
1024    /// that already have their own storage can ignore these parameters.
1025    fn assemble(
1026        config: KernelConfig,
1027        mut vfs: VfsRouter,
1028        jobs: Arc<JobManager>,
1029        no_host_filesystem: bool,
1030        vfs_budget: Option<Arc<ByteBudget>>,
1031        configure_tools: impl FnOnce(&mut ToolRegistry),
1032        make_ctx: impl FnOnce(&Arc<VfsRouter>, &Arc<ToolRegistry>) -> ExecContext,
1033    ) -> Result<Self> {
1034        // A kernel with no host filesystem of its own must never write to one
1035        // through a side channel. Two paths bypass the VFS by going straight to
1036        // `std::fs`: output spill (`paths::spill_dir()` → host temp/cache) and
1037        // background-job output files (`Job::write_output_file` → host temp).
1038        // Both would punch through the isolation, so force them off:
1039        // in-memory truncation for spill, no host file for job output.
1040        //
1041        // This is true for a `NoLocal` kernel (mounts nothing) and for any
1042        // `with_backend` kernel (`no_host_filesystem` — the embedder owns the
1043        // VFS, so the kernel controls no host mounts and any host write is a
1044        // bypass). Overrides an explicit `SpillMode::Disk`, which is nonsensical
1045        // when there is no kernel-owned host filesystem to spill to.
1046        let no_host_side_channel =
1047            no_host_filesystem || matches!(config.vfs_mode, VfsMountMode::NoLocal);
1048
1049        let KernelConfig { name, cwd, skip_validation, interactive, ignore_config, mut output_limit, allow_external_commands, latch_enabled, trash_enabled, nonce_store, initial_vars, request_timeout, kill_grace, .. } = config;
1050
1051        if no_host_side_channel {
1052            output_limit.set_spill_mode(crate::output_limit::SpillMode::Memory);
1053            jobs.set_persist_output_files(false);
1054        }
1055
1056        let mut tools = ToolRegistry::new();
1057        register_builtins(&mut tools);
1058        configure_tools(&mut tools);
1059        let tools = Arc::new(tools);
1060
1061        // Mount BuiltinFs so `ls /v/bin` lists builtins
1062        vfs.mount("/v/bin", BuiltinFs::new(tools.clone()));
1063
1064        let vfs = Arc::new(vfs);
1065
1066        let runner = PipelineRunner::new(tools.clone());
1067
1068        let (stderr_writer, stderr_receiver) = stderr_stream();
1069
1070        let mut exec_ctx = make_ctx(&vfs, &tools);
1071        exec_ctx.set_cwd(cwd);
1072        exec_ctx.set_job_manager(jobs.clone());
1073        exec_ctx.set_tool_schemas(tools.schemas());
1074        exec_ctx.set_tools(tools.clone());
1075        #[cfg(feature = "os-integration")]
1076        exec_ctx.set_trash_backend(Arc::new(crate::trash_system::SystemTrash));
1077        exec_ctx.stderr = Some(stderr_writer);
1078        exec_ctx.ignore_config = ignore_config;
1079        exec_ctx.output_limit = output_limit;
1080        exec_ctx.allow_external_commands = allow_external_commands;
1081        exec_ctx.vfs_budget = vfs_budget.clone();
1082        if let Some(store) = nonce_store {
1083            exec_ctx.nonce_store = store;
1084        }
1085
1086        Ok(Self {
1087            name,
1088            scope: RwLock::new({
1089                let mut scope = Scope::new();
1090                scope.set_pid(KERNEL_COUNTER.fetch_add(1, Ordering::Relaxed));
1091                // HOME is NOT read from the host env here — the kernel is
1092                // hermetic. Frontends (REPL, MCP) seed it via `initial_vars`
1093                // below (from `std::env::vars()`); a hermetic embedder leaves
1094                // `initial_vars` empty and gets no HOME (tilde stays literal).
1095                // Apply caller-supplied initial variables, all marked exported.
1096                // Frontends (REPL, MCP) populate this from std::env::vars()
1097                // for shell-like UX; embedders that want hermetic behavior
1098                // simply leave it empty.
1099                for (name, value) in initial_vars {
1100                    scope.set_exported(name, value);
1101                }
1102                scope.set_latch_enabled(latch_enabled);
1103                scope.set_trash_enabled(trash_enabled);
1104                scope
1105            }),
1106            tools,
1107            user_tools: RwLock::new(HashMap::new()),
1108            vfs,
1109            jobs,
1110            runner,
1111            exec_ctx: RwLock::new(exec_ctx),
1112            skip_validation,
1113            interactive,
1114            allow_external_commands,
1115            vfs_budget,
1116            request_timeout,
1117            kill_grace,
1118            stderr_receiver: tokio::sync::Mutex::new(stderr_receiver),
1119            cancel_token: std::sync::Mutex::new(tokio_util::sync::CancellationToken::new()),
1120            #[cfg(all(unix, feature = "subprocess"))]
1121            terminal_state: None,
1122            self_weak: std::sync::OnceLock::new(),
1123            execute_lock: tokio::sync::Mutex::new(()),
1124            bg_job_id: None,
1125            // Overlay handle is set by Kernel::new after assemble returns;
1126            // assemble itself doesn't know the handle (it's constructed in setup_vfs).
1127            // with_backend always has None (overlay=true is rejected above).
1128            #[cfg(all(feature = "localfs", feature = "overlay"))]
1129            overlay_handle: None,
1130        })
1131    }
1132
1133    /// Get the kernel name.
1134    pub fn name(&self) -> &str {
1135        &self.name
1136    }
1137
1138    /// Wrap this Kernel in an Arc and initialize its self-reference.
1139    ///
1140    /// This enables the Kernel to hand out `Arc<dyn CommandDispatcher>` references
1141    /// to child contexts, allowing builtins like `timeout` to dispatch inner
1142    /// commands through the full resolution chain (user tools → builtins →
1143    /// .kai scripts → external commands).
1144    pub fn into_arc(self) -> Arc<Self> {
1145        let arc = Arc::new(self);
1146        let _ = arc.self_weak.set(Arc::downgrade(&arc));
1147        arc
1148    }
1149
1150    /// Fork a subsidiary kernel for concurrent execution.
1151    ///
1152    /// The fork is a fully-functional `Kernel` that:
1153    /// - **Snapshots** per-session state from the parent: scope (COW — cheap),
1154    ///   user-defined tools, cwd, aliases, ignore config, etc. Mutations on
1155    ///   the fork do NOT propagate back to the parent — matching bash
1156    ///   subshell / background-job semantics.
1157    /// - **Shares** read-mostly resources with the parent via `Arc`: the tool
1158    ///   registry, the VFS router, and the job manager. A job registered by
1159    ///   the fork is visible to the parent's `jobs` builtin, and the fork
1160    ///   sees the same VFS mounts.
1161    /// - **Owns** its own `stderr_receiver`, `cancel_token`, and
1162    ///   `execute_lock`. It is never the TTY owner, so `interactive` is
1163    ///   `false` and `terminal_state` is `None`.
1164    ///
1165    /// The returned Arc has its `self_weak` populated (via `into_arc`), so
1166    /// nested dispatch through `ctx.dispatcher` (e.g. the `timeout` builtin)
1167    /// routes through the fork itself, not the parent — which is essential
1168    /// for concurrency safety.
1169    ///
1170    /// Use this for **detached** background concurrency where the fork should
1171    /// survive parent cancellation: the `&` background-job operator and any
1172    /// other "fire and forget" worker. The fork gets a fresh, independent
1173    /// cancellation token.
1174    ///
1175    /// For foreground concurrency (scatter workers, concurrent pipeline
1176    /// stages, `$(...)` cmdsubs) where parent timeout/cancel must cascade
1177    /// into the fork's external children, use [`Self::fork_attached`].
1178    pub async fn fork(&self) -> Arc<Self> {
1179        self.fork_inner(tokio_util::sync::CancellationToken::new(), self.bg_job_id)
1180            .await
1181    }
1182
1183    /// Fork attached to the parent's cancellation.
1184    ///
1185    /// Same as [`Self::fork`] but the fork's `cancel_token` is a child of
1186    /// the parent's. When the parent cancels (request timeout, embedder
1187    /// `Kernel::cancel`, etc.), the fork's token also cancels, which in
1188    /// turn kills any external children spawned in the fork via the
1189    /// `wait_or_kill` / SIGTERM-grace-SIGKILL path.
1190    pub async fn fork_attached(&self) -> Arc<Self> {
1191        let child_token = {
1192            #[allow(clippy::expect_used)]
1193            let parent = self.cancel_token.lock().expect("cancel_token poisoned");
1194            parent.child_token()
1195        };
1196        self.fork_inner(child_token, self.bg_job_id).await
1197    }
1198
1199    /// Fork for a background job, stamping the job id so external commands
1200    /// spawned anywhere beneath it record their process groups on that job
1201    /// (for `kill -<sig> %N`). The caller owns `cancel` so it can also drive
1202    /// `JobManager::cancel`.
1203    pub async fn fork_for_background(
1204        &self,
1205        cancel: tokio_util::sync::CancellationToken,
1206        job_id: crate::scheduler::JobId,
1207    ) -> Arc<Self> {
1208        self.fork_inner(cancel, Some(job_id)).await
1209    }
1210
1211    /// Shared fork implementation. Caller decides the cancellation token and
1212    /// which background job (if any) this fork runs on behalf of.
1213    async fn fork_inner(
1214        &self,
1215        cancel: tokio_util::sync::CancellationToken,
1216        bg_job_id: Option<crate::scheduler::JobId>,
1217    ) -> Arc<Self> {
1218        let scope_snapshot = self.scope.read().await.clone();
1219        let user_tools_snapshot = self.user_tools.read().await.clone();
1220
1221        // Snapshot exec_ctx by cloning the cloneable fields, then override
1222        // the ones that should not carry over (stderr channel, dispatcher,
1223        // interactive flag, terminal state, cancel — set from `cancel` arg).
1224        let mut fork_ctx = {
1225            let parent_ctx = self.exec_ctx.read().await;
1226            parent_ctx.child_for_pipeline()
1227        };
1228        let (stderr_writer, stderr_receiver) = stderr_stream();
1229        fork_ctx.stderr = Some(stderr_writer);
1230        // Clear dispatcher; dispatch_command will repopulate it to point at
1231        // the fork on the first dispatch call.
1232        fork_ctx.dispatcher = None;
1233        fork_ctx.interactive = false;
1234        fork_ctx.cancel = cancel.clone();
1235        #[cfg(all(unix, feature = "subprocess"))]
1236        {
1237            fork_ctx.terminal_state = None;
1238        }
1239
1240        let fork = Self {
1241            name: format!("{}:fork", self.name),
1242            scope: RwLock::new(scope_snapshot),
1243            tools: Arc::clone(&self.tools),
1244            user_tools: RwLock::new(user_tools_snapshot),
1245            vfs: Arc::clone(&self.vfs),
1246            jobs: Arc::clone(&self.jobs),
1247            runner: self.runner.clone(),
1248            exec_ctx: RwLock::new(fork_ctx),
1249            skip_validation: self.skip_validation,
1250            // Forks are never the TTY owner — they run in the background.
1251            interactive: false,
1252            allow_external_commands: self.allow_external_commands,
1253            // Arc-clone the budget so the fork draws from the same pool as the
1254            // parent — background jobs and scatter workers count against the same
1255            // cap as foreground writes.
1256            vfs_budget: self.vfs_budget.clone(),
1257            request_timeout: self.request_timeout,
1258            kill_grace: self.kill_grace,
1259            stderr_receiver: tokio::sync::Mutex::new(stderr_receiver),
1260            cancel_token: std::sync::Mutex::new(cancel),
1261            #[cfg(all(unix, feature = "subprocess"))]
1262            terminal_state: None,
1263            self_weak: std::sync::OnceLock::new(),
1264            execute_lock: tokio::sync::Mutex::new(()),
1265            bg_job_id,
1266            // Arc-clone the overlay handle so forks (background jobs, scatter
1267            // workers, pipeline stages) can reach the same overlay transaction
1268            // via `kaish-vfs status/diff/commit/reset`.
1269            #[cfg(all(feature = "localfs", feature = "overlay"))]
1270            overlay_handle: self.overlay_handle.clone(),
1271        };
1272
1273        fork.into_arc()
1274    }
1275
1276    /// Get an `Arc<dyn CommandDispatcher>` to this Kernel, if wrapped via `into_arc()`.
1277    ///
1278    /// Returns `None` if the Kernel was not wrapped, or if all strong references
1279    /// have been dropped (the `Weak` can no longer upgrade).
1280    pub fn dispatcher(&self) -> Option<Arc<dyn CommandDispatcher>> {
1281        self.self_weak
1282            .get()
1283            .and_then(|weak| weak.upgrade())
1284            .map(|arc| arc as Arc<dyn CommandDispatcher>)
1285    }
1286
1287    /// Initialize terminal state for interactive job control.
1288    ///
1289    /// Call this after kernel creation when running as an interactive REPL
1290    /// and stdin is a TTY. Sets up process groups and signal handling.
1291    #[cfg(all(unix, feature = "subprocess"))]
1292    pub fn init_terminal(&mut self) {
1293        if !self.interactive {
1294            return;
1295        }
1296        match crate::terminal::TerminalState::init() {
1297            Ok(state) => {
1298                let state = Arc::new(state);
1299                self.terminal_state = Some(state.clone());
1300                // Set on exec_ctx so builtins (fg, bg, kill) can access it
1301                self.exec_ctx.get_mut().terminal_state = Some(state);
1302                tracing::debug!("terminal job control initialized");
1303            }
1304            Err(e) => {
1305                tracing::warn!("failed to initialize terminal job control: {}", e);
1306            }
1307        }
1308    }
1309
1310    /// Cancel the current execution.
1311    ///
1312    /// This cancels the current cancellation token, causing any execution
1313    /// loop to exit at the next checkpoint with exit code 130 (SIGINT).
1314    /// A fresh token is installed for the next `execute()` call.
1315    pub fn cancel(&self) {
1316        #[allow(clippy::expect_used)]
1317        let token = self.cancel_token.lock().expect("cancel_token poisoned");
1318        token.cancel();
1319    }
1320
1321    /// Check if the current execution has been cancelled.
1322    pub fn is_cancelled(&self) -> bool {
1323        #[allow(clippy::expect_used)]
1324        let token = self.cancel_token.lock().expect("cancel_token poisoned");
1325        token.is_cancelled()
1326    }
1327
1328    /// Reset the cancellation token (called at the start of each execute).
1329    fn reset_cancel(&self) -> tokio_util::sync::CancellationToken {
1330        #[allow(clippy::expect_used)]
1331        let mut token = self.cancel_token.lock().expect("cancel_token poisoned");
1332        if token.is_cancelled() {
1333            *token = tokio_util::sync::CancellationToken::new();
1334        }
1335        token.clone()
1336    }
1337
1338    /// Acquire the per-Kernel execute lock, warning on contention.
1339    ///
1340    /// Tokio's Mutex is fair (FIFO) so callers queue in arrival order. When
1341    /// the lock is already held, emit a warning so the silent serialization
1342    /// is observable in logs — if you need real parallelism, fork the kernel.
1343    async fn acquire_execute_lock(&self) -> tokio::sync::MutexGuard<'_, ()> {
1344        match self.execute_lock.try_lock() {
1345            Ok(guard) => guard,
1346            Err(_) => {
1347                tracing::warn!(
1348                    target: "kaish::kernel::concurrency",
1349                    kernel = %self.name,
1350                    "execute() contended — serializing concurrent caller; \
1351                     use Kernel::fork() for parallelism instead of sharing"
1352                );
1353                self.execute_lock.lock().await
1354            }
1355        }
1356    }
1357
1358    /// Execute kaish source code with default options.
1359    ///
1360    /// Equivalent to `execute_with_options(input, ExecuteOptions::default())`.
1361    /// Returns the result of the last statement executed.
1362    pub async fn execute(&self, input: &str) -> Result<ExecResult> {
1363        self.run_inner(input, ExecuteOptions::default(), None).await
1364    }
1365
1366    /// Execute with per-call options. The primary entry point for embedders
1367    /// that don't need per-statement output streaming.
1368    ///
1369    /// `opts` carries timeout, transient vars overlay, optional cwd override,
1370    /// and optional embedder-owned cancellation token. See [`ExecuteOptions`]
1371    /// for semantics. For streaming, use [`Self::execute_with_options_streaming`].
1372    ///
1373    /// **Cancellation:** if `opts.cancel_token` is `Some`, it is *raced*
1374    /// against the kernel's internal token. Either firing cancels and kills
1375    /// external children. The embedder's token is read-only — kernel
1376    /// timeouts do NOT propagate into it. Distinguish via the returned
1377    /// `code`: 124 = timeout, 130 = cancellation.
1378    ///
1379    /// **Timeout:** `opts.timeout` overrides `KernelConfig::request_timeout`.
1380    /// `Some(Duration::ZERO)` returns 124 immediately without spawning.
1381    ///
1382    /// Concurrent callers on the same Kernel serialize on the kernel-wide
1383    /// execute lock. For true parallelism, call [`Kernel::fork`] (detached)
1384    /// or [`Kernel::fork_attached`] (cancellation cascades from this kernel).
1385    pub async fn execute_with_options(
1386        &self,
1387        input: &str,
1388        opts: ExecuteOptions,
1389    ) -> Result<ExecResult> {
1390        self.run_inner(input, opts, None).await
1391    }
1392
1393    /// Same as [`Self::execute_with_options`] but with a per-statement output
1394    /// callback. The callback fires after each top-level statement so the
1395    /// embedder (REPL, MCP streaming) can flush output incrementally.
1396    pub async fn execute_with_options_streaming(
1397        &self,
1398        input: &str,
1399        opts: ExecuteOptions,
1400        on_output: &mut (dyn FnMut(&ExecResult) + Send),
1401    ) -> Result<ExecResult> {
1402        self.run_inner(input, opts, Some(on_output)).await
1403    }
1404
1405    /// Execute kaish source code with a transient overlay of exported variables.
1406    ///
1407    /// Deprecated thin wrapper over [`Self::execute_with_options`]. New code
1408    /// should use that method directly:
1409    /// `execute_with_options(input, ExecuteOptions::new().with_vars(vars))`.
1410    #[deprecated(note = "use Kernel::execute_with_options with ExecuteOptions::with_vars")]
1411    pub async fn execute_with_vars(
1412        &self,
1413        input: &str,
1414        vars: HashMap<String, Value>,
1415    ) -> Result<ExecResult> {
1416        self.run_inner(input, ExecuteOptions::new().with_vars(vars), None).await
1417    }
1418
1419    /// Execute kaish source code with a per-statement callback.
1420    ///
1421    /// Deprecated thin wrapper. New code should use
1422    /// [`Self::execute_with_options_streaming`].
1423    #[deprecated(note = "use Kernel::execute_with_options_streaming")]
1424    pub async fn execute_streaming(
1425        &self,
1426        input: &str,
1427        on_output: &mut (dyn FnMut(&ExecResult) + Send),
1428    ) -> Result<ExecResult> {
1429        self.run_inner(input, ExecuteOptions::default(), Some(on_output)).await
1430    }
1431
1432    /// Link embedder trace context, then run [`Self::execute_with_options_inner`].
1433    ///
1434    /// The `#[instrument]` execution span resolves its parent from the *current*
1435    /// OpenTelemetry context (see `tracing-opentelemetry`'s `parent_context`),
1436    /// captured when the span is first entered — not when the future is
1437    /// constructed. So a thread-local `attach()` scoped to construction is too
1438    /// early to be seen (the integration test confirms this). `with_context`
1439    /// re-attaches the embedder's context on *every* poll of the inner future,
1440    /// so the context is current at first-enter and survives runtime thread
1441    /// hops. With no embedder trace context, the future runs unwrapped.
1442    async fn run_inner(
1443        &self,
1444        input: &str,
1445        opts: ExecuteOptions,
1446        on_output: Option<&mut (dyn FnMut(&ExecResult) + Send)>,
1447    ) -> Result<ExecResult> {
1448        use opentelemetry::context::FutureExt;
1449
1450        // Capture the embedder's baggage before `opts` is consumed so it can be
1451        // echoed back onto the result on egress (see `merge_egress_baggage`).
1452        let embedder_baggage = opts.baggage.clone();
1453
1454        let result = match crate::telemetry::extract_parent(&opts) {
1455            Some(parent) => self
1456                .execute_with_options_inner(input, opts, on_output)
1457                .with_context(parent)
1458                .await,
1459            None => self.execute_with_options_inner(input, opts, on_output).await,
1460        };
1461
1462        result.map(|mut r| {
1463            crate::telemetry::merge_egress_baggage(&mut r, embedder_baggage);
1464            r
1465        })
1466    }
1467
1468    /// Shared body for `execute`, `execute_with_options(_streaming)`, and
1469    /// the deprecated wrappers. Owns the per-call cancel token, vars overlay,
1470    /// cwd override, and timeout race.
1471    #[tracing::instrument(level = "info", skip(self, opts, on_output), fields(input_len = input.len()))]
1472    async fn execute_with_options_inner(
1473        &self,
1474        input: &str,
1475        opts: ExecuteOptions,
1476        on_output: Option<&mut (dyn FnMut(&ExecResult) + Send)>,
1477    ) -> Result<ExecResult> {
1478        let _guard = self.acquire_execute_lock().await;
1479
1480        // Always reset to a fresh internal token; this is the kernel's own
1481        // cancel surface for embedders calling `Kernel::cancel()`. The
1482        // embedder-supplied `opts.cancel_token` is a *read-only input* — it
1483        // is NOT written into `self.cancel_token`, because doing so would
1484        // (a) leak the embedder's token past this call's lifetime,
1485        // (b) re-route a later `Kernel::cancel()` into the embedder's token,
1486        // (c) extend the token's lifetime via the kernel's strong clone.
1487        let internal = self.reset_cancel();
1488        // Race the embedder token against the kernel's internal token via a
1489        // tracked watcher task. We hold the JoinHandle so we can abort the
1490        // task at function exit — otherwise it would wait forever for either
1491        // token to fire and leak per call.
1492        let (effective_cancel, watcher_handle): (
1493            tokio_util::sync::CancellationToken,
1494            Option<tokio::task::JoinHandle<()>>,
1495        ) = if let Some(ext) = opts.cancel_token {
1496            let combined = tokio_util::sync::CancellationToken::new();
1497            let combined_writer = combined.clone();
1498            let i = internal.clone();
1499            let handle = tokio::spawn(async move {
1500                tokio::select! {
1501                    _ = i.cancelled() => combined_writer.cancel(),
1502                    _ = ext.cancelled() => combined_writer.cancel(),
1503                }
1504            });
1505            (combined, Some(handle))
1506        } else {
1507            (internal, None)
1508        };
1509
1510        // Effective timeout: per-call wins over kernel-config default.
1511        let timeout = opts.timeout.or(self.request_timeout);
1512
1513        // ZERO timeout: return 124 immediately without spawning anything.
1514        if timeout == Some(Duration::ZERO) {
1515            if let Some(h) = watcher_handle {
1516                h.abort();
1517            }
1518            return Ok(ExecResult::failure(124, "timeout: timed out after 0s".to_string()));
1519        }
1520
1521        // Apply per-call vars overlay (push frame + set_exported), wrapped in
1522        // an RAII guard so a panic inside `execute_streaming_inner` still
1523        // pops the frame and unexports the temporarily-exported names.
1524        struct VarsFrameGuard<'a> {
1525            kernel: &'a Kernel,
1526            newly_exported: Vec<String>,
1527        }
1528        impl Drop for VarsFrameGuard<'_> {
1529            fn drop(&mut self) {
1530                // Best-effort cleanup using try_write. The execute_lock held
1531                // throughout execute_with_options means there is no concurrent
1532                // foreground caller; forks have their own scope and won't
1533                // block this. blocking_write would deadlock the runtime when
1534                // called from a tokio worker thread, so we explicitly do NOT
1535                // fall back to it — if try_write fails (which we've never
1536                // seen in practice), log loudly and accept the leak rather
1537                // than deadlock the entire kernel.
1538                let Ok(mut scope) = self.kernel.scope.try_write() else {
1539                    tracing::error!(
1540                        "vars frame guard: scope lock unexpectedly busy; \
1541                         skipping pop_frame to avoid runtime deadlock — \
1542                         transient vars may leak"
1543                    );
1544                    return;
1545                };
1546                scope.pop_frame();
1547                for name in self.newly_exported.drain(..) {
1548                    scope.unexport(&name);
1549                }
1550            }
1551        }
1552
1553        // Per-call cwd override: save current cwd, set the new one, restore
1554        // on Drop so the kernel's persistent cwd doesn't leak between calls.
1555        // Same RAII pattern as VarsFrameGuard, same blocking_write trade-off.
1556        struct CwdGuard<'a> {
1557            kernel: &'a Kernel,
1558            saved: PathBuf,
1559        }
1560        impl Drop for CwdGuard<'_> {
1561            fn drop(&mut self) {
1562                let Ok(mut ec) = self.kernel.exec_ctx.try_write() else {
1563                    tracing::error!(
1564                        "cwd guard: exec_ctx lock unexpectedly busy; \
1565                         skipping cwd restore — kernel cwd may be wrong for next call"
1566                    );
1567                    return;
1568                };
1569                ec.cwd = std::mem::take(&mut self.saved);
1570            }
1571        }
1572        let _cwd_guard: Option<CwdGuard<'_>> = if let Some(new_cwd) = opts.cwd {
1573            let mut ec = self.exec_ctx.write().await;
1574            let saved = std::mem::replace(&mut ec.cwd, new_cwd);
1575            drop(ec);
1576            Some(CwdGuard { kernel: self, saved })
1577        } else {
1578            None
1579        };
1580
1581        let _vars_guard: Option<VarsFrameGuard<'_>> = if !opts.vars.is_empty() {
1582            let mut scope = self.scope.write().await;
1583            scope.push_frame();
1584            let mut newly = Vec::with_capacity(opts.vars.len());
1585            for (name, value) in opts.vars {
1586                if !scope.is_exported(&name) {
1587                    newly.push(name.clone());
1588                }
1589                scope.set_exported(name, value);
1590            }
1591            drop(scope);
1592            Some(VarsFrameGuard { kernel: self, newly_exported: newly })
1593        } else {
1594            None
1595        };
1596
1597        // Sync the effective cancel into self.exec_ctx so try_execute_external
1598        // (which reads via self.cancel_token) sees cancellation. We also need
1599        // builtins to see it via ctx.cancel — handled in execute_command.
1600        // For simplicity here we mirror effective_cancel into self.cancel_token
1601        // for the duration of this call, then restore the internal token at
1602        // the end (so a later Kernel::cancel still hits our internal surface).
1603        {
1604            #[allow(clippy::expect_used)]
1605            let mut cur = self.cancel_token.lock().expect("cancel_token poisoned");
1606            *cur = effective_cancel.clone();
1607        }
1608
1609        // The movable-deadline watchdog for this call (None without a timeout),
1610        // mirrored into exec_ctx — like the cancel token — so builtins can
1611        // suspend the script clock via `ctx.patient`. Assigned unconditionally
1612        // (clearing any stale handle) and reset to None in the restore block.
1613        let watchdog = timeout.map(|d| Arc::new(crate::watchdog::Watchdog::new(d)));
1614        {
1615            let mut ec = self.exec_ctx.write().await;
1616            ec.watchdog = watchdog.clone();
1617        }
1618
1619        // Run inner with optional timeout. The watchdog task cancels our token
1620        // on elapsed; the cascade fires SIGTERM/SIGKILL on any external
1621        // children via the wait_or_kill discipline in try_execute_external.
1622        let mut noop_cb: Box<dyn FnMut(&ExecResult) + Send> = Box::new(|_| {});
1623        let cb_ref: &mut (dyn FnMut(&ExecResult) + Send) = match on_output {
1624            Some(cb) => cb,
1625            None => &mut *noop_cb,
1626        };
1627
1628        let result = if let Some(d) = timeout {
1629            #[allow(clippy::expect_used)]
1630            let watchdog = watchdog.clone().expect("watchdog constructed when timeout is set");
1631            let elapsed = Arc::new(std::sync::atomic::AtomicBool::new(false));
1632            let timer = tokio::spawn(watchdog.run(elapsed.clone(), effective_cancel.clone()));
1633            let r = self.execute_streaming_inner(input, cb_ref).await;
1634            timer.abort();
1635            match r {
1636                Ok(mut res) => {
1637                    if elapsed.load(std::sync::atomic::Ordering::SeqCst) {
1638                        res.code = 124;
1639                        if res.err.is_empty() {
1640                            res.err = format!("timeout: timed out after {:?}", d);
1641                        }
1642                    }
1643                    Ok(res)
1644                }
1645                Err(e) => Err(e),
1646            }
1647        } else {
1648            self.execute_streaming_inner(input, cb_ref).await
1649        };
1650
1651        // Restore self.cancel_token to a fresh, uncancelled token so the
1652        // embedder's view of `Kernel::cancel()` stays predictable on the
1653        // next call (it cancels the kernel's own token, not whatever was
1654        // left over from this call's combined token).
1655        {
1656            #[allow(clippy::expect_used)]
1657            let mut cur = self.cancel_token.lock().expect("cancel_token poisoned");
1658            *cur = tokio_util::sync::CancellationToken::new();
1659        }
1660
1661        // Drop the watchdog handle from exec_ctx — its timer task is gone
1662        // (fired or aborted above); a patient hold acquired against a stale
1663        // handle would silently suspend nothing.
1664        {
1665            let mut ec = self.exec_ctx.write().await;
1666            ec.watchdog = None;
1667        }
1668
1669        // Tear down the embedder-token race watcher (if any). Leaving it
1670        // alive would idle forever waiting for tokens that may never fire.
1671        if let Some(h) = watcher_handle {
1672            h.abort();
1673        }
1674
1675        // VarsFrameGuard drops here on the success path and on early-return
1676        // paths above (error path included). Panic safety preserved.
1677        result
1678    }
1679
1680    /// The actual body of `execute_streaming`, run while holding the execute lock.
1681    ///
1682    /// Split out so internal kernel paths that are already under the lock can
1683    /// call this without deadlocking on re-entry. External callers must go
1684    /// through [`Self::execute_streaming`] so they acquire the lock.
1685    async fn execute_streaming_inner(
1686        &self,
1687        input: &str,
1688        on_output: &mut (dyn FnMut(&ExecResult) + Send),
1689    ) -> Result<ExecResult> {
1690        let program = parse(input).map_err(|errors| {
1691            let msg = errors
1692                .iter()
1693                .map(|e| e.format(input))
1694                .collect::<Vec<_>>()
1695                .join("\n");
1696            anyhow::anyhow!("parse error:\n{}", msg)
1697        })?;
1698
1699        // AST display mode: show AST instead of executing
1700        {
1701            let scope = self.scope.read().await;
1702            if scope.show_ast() {
1703                let output = format!("{:#?}\n", program);
1704                return Ok(ExecResult::with_output(crate::interpreter::OutputData::text(output)));
1705            }
1706        }
1707
1708        // Pre-execution validation
1709        if !self.skip_validation {
1710            let user_tools = self.user_tools.read().await;
1711            let validator = Validator::new(&self.tools, &user_tools);
1712            let issues = validator.validate(&program);
1713
1714            // Collect errors (warnings are logged but don't prevent execution)
1715            let errors: Vec<_> = issues
1716                .iter()
1717                .filter(|i| i.severity == Severity::Error)
1718                .collect();
1719
1720            if !errors.is_empty() {
1721                let error_msg = errors
1722                    .iter()
1723                    .map(|e| e.format(input))
1724                    .collect::<Vec<_>>()
1725                    .join("\n");
1726                return Err(anyhow::anyhow!("validation failed:\n{}", error_msg));
1727            }
1728
1729            // Log warnings via tracing (trace level to avoid noise)
1730            for warning in issues.iter().filter(|i| i.severity == Severity::Warning) {
1731                tracing::trace!("validation: {}", warning.format(input));
1732            }
1733        }
1734
1735        let mut result = ExecResult::success("");
1736
1737        // Reset cancellation token for this execution.
1738        let cancel = self.reset_cancel();
1739
1740        for stmt in program.statements {
1741            if matches!(stmt, Stmt::Empty) {
1742                continue;
1743            }
1744
1745            // Cancellation checkpoint
1746            if cancel.is_cancelled() {
1747                result.code = 130;
1748                return Ok(result);
1749            }
1750
1751            let flow = self.execute_stmt_flow(&stmt).await?;
1752
1753            // Drain any stderr written by pipeline stages during this statement.
1754            // This captures stderr from intermediate pipeline stages that would
1755            // otherwise be lost (only the last stage's result is returned).
1756            let drained_stderr = {
1757                let mut receiver = self.stderr_receiver.lock().await;
1758                receiver.drain_lossy()
1759            };
1760
1761            match flow {
1762                ControlFlow::Normal(mut r) => {
1763                    if !drained_stderr.is_empty() {
1764                        if !r.err.is_empty() && !r.err.ends_with('\n') {
1765                            r.err.push('\n');
1766                        }
1767                        // Prepend pipeline stderr before the last stage's stderr
1768                        let combined = format!("{}{}", drained_stderr, r.err);
1769                        r.err = combined;
1770                    }
1771                    on_output(&r);
1772                    // Carry the last statement's structured output for MCP TOON encoding.
1773                    // Must be done here (not in accumulate_result) because accumulate_result
1774                    // is also used in loops where per-iteration output would be wrong.
1775                    let last_output = r.output().cloned();
1776                    accumulate_result(&mut result, &r);
1777                    result.set_output(last_output);
1778                }
1779                ControlFlow::Exit { code } => {
1780                    if !drained_stderr.is_empty() {
1781                        result.err.push_str(&drained_stderr);
1782                    }
1783                    result.code = code;
1784                    return Ok(result);
1785                }
1786                ControlFlow::Return { mut value } => {
1787                    if !drained_stderr.is_empty() {
1788                        value.err = format!("{}{}", drained_stderr, value.err);
1789                    }
1790                    on_output(&value);
1791                    result = value;
1792                }
1793                ControlFlow::Break { result: mut r, .. } | ControlFlow::Continue { result: mut r, .. } => {
1794                    if !drained_stderr.is_empty() {
1795                        r.err = format!("{}{}", drained_stderr, r.err);
1796                    }
1797                    on_output(&r);
1798                    result = r;
1799                }
1800            }
1801        }
1802
1803        Ok(result)
1804    }
1805
1806    /// Execute a single statement, returning control flow information.
1807    fn execute_stmt_flow<'a>(
1808        &'a self,
1809        stmt: &'a Stmt,
1810    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<ControlFlow>> + Send + 'a>> {
1811        use tracing::Instrument;
1812        let span = tracing::debug_span!("execute_stmt_flow", stmt_type = %stmt.kind_name());
1813        Box::pin(async move {
1814        match stmt {
1815            Stmt::Assignment(assign) => {
1816                // Use async evaluator to support command substitution
1817                let value = self.eval_expr_async(&assign.value).await
1818                    .context("failed to evaluate assignment")?;
1819                let mut scope = self.scope.write().await;
1820                if assign.local {
1821                    // local: set in innermost (current function) frame
1822                    scope.set(&assign.name, value.clone());
1823                } else {
1824                    // non-local: update existing or create in root frame
1825                    scope.set_global(&assign.name, value.clone());
1826                }
1827                drop(scope);
1828
1829                // Assignments don't produce output (like sh)
1830                Ok(ControlFlow::ok(ExecResult::success("")))
1831            }
1832            Stmt::Command(cmd) => {
1833                // Route single commands through execute_pipeline for a unified path.
1834                // This ensures all commands go through the dispatcher chain.
1835                let pipeline = crate::ast::Pipeline {
1836                    commands: vec![cmd.clone()],
1837                    background: false,
1838                };
1839                let result = self.execute_pipeline(&pipeline).await?;
1840                self.update_last_result(&result).await;
1841
1842                // Check for error exit mode (set -e)
1843                if !result.ok() {
1844                    let scope = self.scope.read().await;
1845                    if scope.error_exit_enabled() {
1846                        return Ok(ControlFlow::exit_code(result.code));
1847                    }
1848                }
1849
1850                Ok(ControlFlow::ok(result))
1851            }
1852            Stmt::Pipeline(pipeline) => {
1853                let result = self.execute_pipeline(pipeline).await?;
1854                self.update_last_result(&result).await;
1855
1856                // Check for error exit mode (set -e)
1857                if !result.ok() {
1858                    let scope = self.scope.read().await;
1859                    if scope.error_exit_enabled() {
1860                        return Ok(ControlFlow::exit_code(result.code));
1861                    }
1862                }
1863
1864                Ok(ControlFlow::ok(result))
1865            }
1866            Stmt::If(if_stmt) => {
1867                // Use async evaluator to support command substitution in conditions
1868                let cond_value = self.eval_expr_async(&if_stmt.condition).await?;
1869
1870                let branch = if is_truthy(&cond_value) {
1871                    &if_stmt.then_branch
1872                } else {
1873                    if_stmt.else_branch.as_deref().unwrap_or(&[])
1874                };
1875
1876                let mut result = ExecResult::success("");
1877                for stmt in branch {
1878                    let flow = self.execute_stmt_flow(stmt).await?;
1879                    match flow {
1880                        ControlFlow::Normal(r) => {
1881                            accumulate_result(&mut result, &r);
1882                            self.drain_stderr_into(&mut result).await;
1883                        }
1884                        other => {
1885                            self.drain_stderr_into(&mut result).await;
1886                            return Ok(other);
1887                        }
1888                    }
1889                }
1890                Ok(ControlFlow::ok(result))
1891            }
1892            Stmt::For(for_loop) => {
1893                // Evaluate all items and collect values for iteration
1894                // Use async evaluator to support command substitution like $(seq 1 5)
1895                let mut items: Vec<Value> = Vec::new();
1896                for item_expr in &for_loop.items {
1897                    // Glob expansion in for-loop items: `for f in *.txt`
1898                    if let Expr::GlobPattern(pattern) = item_expr {
1899                        let glob_enabled = {
1900                            let scope = self.scope.read().await;
1901                            scope.glob_enabled()
1902                        };
1903                        if glob_enabled {
1904                            let (paths, cwd) = {
1905                                let ctx = self.exec_ctx.read().await;
1906                                let paths = ctx.expand_glob(pattern).await
1907                                    .map_err(|e| anyhow::anyhow!("glob: {}", e))?;
1908                                let cwd = ctx.resolve_path(".");
1909                                (paths, cwd)
1910                            };
1911                            if paths.is_empty() {
1912                                return Err(anyhow::anyhow!("no matches: {}", pattern));
1913                            }
1914                            for path in paths {
1915                                let display = if !pattern.starts_with('/') {
1916                                    path.strip_prefix(&cwd)
1917                                        .unwrap_or(&path)
1918                                        .to_string_lossy().into_owned()
1919                                } else {
1920                                    path.to_string_lossy().into_owned()
1921                                };
1922                                items.push(Value::String(display));
1923                            }
1924                            continue;
1925                        }
1926                    }
1927                    // Track whether this item came from $(cmd); that's the
1928                    // only position where multi-line stdout auto-splits per
1929                    // line. Arrays still spread element-by-element; bare
1930                    // $VAR is rejected upstream by validator E012. See
1931                    // docs/plan-for-loop-newline-split.md.
1932                    let from_command_subst = matches!(item_expr, Expr::CommandSubst(_));
1933                    let item = self.eval_expr_async(item_expr).await?;
1934                    match item {
1935                        // JSON arrays iterate over elements (preferred path
1936                        // when builtins emit .data — seq, jq, cut, find, …)
1937                        Value::Json(serde_json::Value::Array(arr)) => {
1938                            for elem in arr {
1939                                items.push(json_to_value(elem));
1940                            }
1941                        }
1942                        // Strings from $(cmd): empty → 0 iterations,
1943                        // multi-line → split per line (trimming trailing
1944                        // newlines and per-line trailing \r), single-line
1945                        // → one iteration. Whitespace within a line is
1946                        // NOT split — the "$VAR with spaces just works"
1947                        // promise is preserved because this only fires
1948                        // in CommandSubst position.
1949                        Value::String(s) if from_command_subst => {
1950                            let trimmed = s.trim_end_matches(['\n', '\r']);
1951                            if trimmed.is_empty() {
1952                                continue;
1953                            }
1954                            if trimmed.contains('\n') {
1955                                for line in trimmed.split('\n') {
1956                                    let line = line.trim_end_matches('\r');
1957                                    items.push(Value::String(line.to_string()));
1958                                }
1959                            } else {
1960                                items.push(Value::String(trimmed.to_string()));
1961                            }
1962                        }
1963                        // Strings not from $(cmd) stay as one value.
1964                        other => items.push(other),
1965                    }
1966                }
1967
1968                let mut result = ExecResult::success("");
1969                {
1970                    let mut scope = self.scope.write().await;
1971                    scope.push_frame();
1972                }
1973
1974                'outer: for item in items {
1975                    // Cancellation checkpoint per iteration
1976                    if self.is_cancelled() {
1977                        let mut scope = self.scope.write().await;
1978                        scope.pop_frame();
1979                        result.code = 130;
1980                        return Ok(ControlFlow::ok(result));
1981                    }
1982                    {
1983                        let mut scope = self.scope.write().await;
1984                        scope.set(&for_loop.variable, item);
1985                    }
1986                    for stmt in &for_loop.body {
1987                        let mut flow = match self.execute_stmt_flow(stmt).await {
1988                            Ok(f) => f,
1989                            Err(e) => {
1990                                let mut scope = self.scope.write().await;
1991                                scope.pop_frame();
1992                                return Err(e);
1993                            }
1994                        };
1995                        self.drain_stderr_into(&mut result).await;
1996                        match &mut flow {
1997                            ControlFlow::Normal(r) => {
1998                                accumulate_result(&mut result, r);
1999                                if !r.ok() {
2000                                    let scope = self.scope.read().await;
2001                                    if scope.error_exit_enabled() {
2002                                        drop(scope);
2003                                        let mut scope = self.scope.write().await;
2004                                        scope.pop_frame();
2005                                        return Ok(ControlFlow::exit_code(r.code));
2006                                    }
2007                                }
2008                            }
2009                            ControlFlow::Break { .. } => {
2010                                if flow.decrement_level() {
2011                                    accumulate_flow_output(&mut result, &flow);
2012                                    break 'outer;
2013                                }
2014                                fold_loop_output_into_flow(std::mem::take(&mut result), &mut flow);
2015                                let mut scope = self.scope.write().await;
2016                                scope.pop_frame();
2017                                return Ok(flow);
2018                            }
2019                            ControlFlow::Continue { .. } => {
2020                                if flow.decrement_level() {
2021                                    accumulate_flow_output(&mut result, &flow);
2022                                    continue 'outer;
2023                                }
2024                                fold_loop_output_into_flow(std::mem::take(&mut result), &mut flow);
2025                                let mut scope = self.scope.write().await;
2026                                scope.pop_frame();
2027                                return Ok(flow);
2028                            }
2029                            ControlFlow::Return { .. } | ControlFlow::Exit { .. } => {
2030                                let mut scope = self.scope.write().await;
2031                                scope.pop_frame();
2032                                return Ok(flow);
2033                            }
2034                        }
2035                    }
2036                }
2037
2038                {
2039                    let mut scope = self.scope.write().await;
2040                    scope.pop_frame();
2041                }
2042                Ok(ControlFlow::ok(result))
2043            }
2044            Stmt::While(while_loop) => {
2045                let mut result = ExecResult::success("");
2046
2047                'outer: loop {
2048                    // Evaluate condition - use async to support command substitution
2049                    // Cancellation checkpoint per iteration
2050                    if self.is_cancelled() {
2051                        result.code = 130;
2052                        return Ok(ControlFlow::ok(result));
2053                    }
2054
2055                    let cond_value = self.eval_expr_async(&while_loop.condition).await?;
2056
2057                    if !is_truthy(&cond_value) {
2058                        break;
2059                    }
2060
2061                    // Execute body
2062                    for stmt in &while_loop.body {
2063                        let mut flow = self.execute_stmt_flow(stmt).await?;
2064                        self.drain_stderr_into(&mut result).await;
2065                        match &mut flow {
2066                            ControlFlow::Normal(r) => {
2067                                accumulate_result(&mut result, r);
2068                                if !r.ok() {
2069                                    let scope = self.scope.read().await;
2070                                    if scope.error_exit_enabled() {
2071                                        return Ok(ControlFlow::exit_code(r.code));
2072                                    }
2073                                }
2074                            }
2075                            ControlFlow::Break { .. } => {
2076                                if flow.decrement_level() {
2077                                    accumulate_flow_output(&mut result, &flow);
2078                                    break 'outer;
2079                                }
2080                                fold_loop_output_into_flow(std::mem::take(&mut result), &mut flow);
2081                                return Ok(flow);
2082                            }
2083                            ControlFlow::Continue { .. } => {
2084                                if flow.decrement_level() {
2085                                    accumulate_flow_output(&mut result, &flow);
2086                                    continue 'outer;
2087                                }
2088                                fold_loop_output_into_flow(std::mem::take(&mut result), &mut flow);
2089                                return Ok(flow);
2090                            }
2091                            ControlFlow::Return { .. } | ControlFlow::Exit { .. } => {
2092                                return Ok(flow);
2093                            }
2094                        }
2095                    }
2096                }
2097
2098                Ok(ControlFlow::ok(result))
2099            }
2100            Stmt::Case(case_stmt) => {
2101                // Evaluate the expression to match against
2102                let match_value = {
2103                    let value = self.eval_expr_async(&case_stmt.expr).await?;
2104                    value_to_string(&value)
2105                };
2106
2107                // Try each branch until we find a match
2108                for branch in &case_stmt.branches {
2109                    let matched = branch.patterns.iter().any(|pattern| {
2110                        glob_match(pattern, &match_value)
2111                    });
2112
2113                    if matched {
2114                        // Execute the branch body
2115                        let mut result = ExecResult::success("");
2116                        for stmt in &branch.body {
2117                            let flow = self.execute_stmt_flow(stmt).await?;
2118                            match flow {
2119                                ControlFlow::Normal(r) => {
2120                                    accumulate_result(&mut result, &r);
2121                                    self.drain_stderr_into(&mut result).await;
2122                                }
2123                                other => {
2124                                    self.drain_stderr_into(&mut result).await;
2125                                    return Ok(other);
2126                                }
2127                            }
2128                        }
2129                        return Ok(ControlFlow::ok(result));
2130                    }
2131                }
2132
2133                // No match - return success with empty output (like sh)
2134                Ok(ControlFlow::ok(ExecResult::success("")))
2135            }
2136            Stmt::Break(levels) => {
2137                Ok(ControlFlow::break_n(levels.unwrap_or(1)))
2138            }
2139            Stmt::Continue(levels) => {
2140                Ok(ControlFlow::continue_n(levels.unwrap_or(1)))
2141            }
2142            Stmt::Return(expr) => {
2143                // return [N] - N becomes the exit code, NOT stdout
2144                // Shell semantics: return sets exit code, doesn't produce output
2145                let result = if let Some(e) = expr {
2146                    let val = self.eval_expr_async(e).await?;
2147                    let code = crate::interpreter::value_to_exit_code(&val)
2148                        .map_err(|e| anyhow::anyhow!("return: {}", e))?;
2149                    ExecResult::from_parts(code, String::new(), String::new(), None)
2150                } else {
2151                    ExecResult::success("")
2152                };
2153                Ok(ControlFlow::return_value(result))
2154            }
2155            Stmt::Exit(expr) => {
2156                let code = if let Some(e) = expr {
2157                    let val = self.eval_expr_async(e).await?;
2158                    crate::interpreter::value_to_exit_code(&val)
2159                        .map_err(|e| anyhow::anyhow!("exit: {}", e))?
2160                } else {
2161                    0
2162                };
2163                Ok(ControlFlow::exit_code(code))
2164            }
2165            Stmt::ToolDef(tool_def) => {
2166                let mut user_tools = self.user_tools.write().await;
2167                user_tools.insert(tool_def.name.clone(), tool_def.clone());
2168                Ok(ControlFlow::ok(ExecResult::success("")))
2169            }
2170            Stmt::AndChain { left, right } => {
2171                // cmd1 && cmd2 - run cmd2 only if cmd1 succeeds (exit code 0)
2172                // Suppress errexit for the left side — && handles failure itself.
2173                {
2174                    let mut scope = self.scope.write().await;
2175                    scope.suppress_errexit();
2176                }
2177                let left_flow = match self.execute_stmt_flow(left).await {
2178                    Ok(f) => f,
2179                    Err(e) => {
2180                        let mut scope = self.scope.write().await;
2181                        scope.unsuppress_errexit();
2182                        return Err(e);
2183                    }
2184                };
2185                {
2186                    let mut scope = self.scope.write().await;
2187                    scope.unsuppress_errexit();
2188                }
2189                match left_flow {
2190                    ControlFlow::Normal(mut left_result) => {
2191                        self.drain_stderr_into(&mut left_result).await;
2192                        self.update_last_result(&left_result).await;
2193                        if left_result.ok() {
2194                            let right_flow = self.execute_stmt_flow(right).await?;
2195                            match right_flow {
2196                                ControlFlow::Normal(mut right_result) => {
2197                                    self.drain_stderr_into(&mut right_result).await;
2198                                    self.update_last_result(&right_result).await;
2199                                    let mut combined = left_result;
2200                                    accumulate_result(&mut combined, &right_result);
2201                                    Ok(ControlFlow::ok(combined))
2202                                }
2203                                other => Ok(other),
2204                            }
2205                        } else {
2206                            Ok(ControlFlow::ok(left_result))
2207                        }
2208                    }
2209                    _ => Ok(left_flow),
2210                }
2211            }
2212            Stmt::OrChain { left, right } => {
2213                // cmd1 || cmd2 - run cmd2 only if cmd1 fails (non-zero exit code)
2214                // Suppress errexit for the left side — || handles failure itself.
2215                {
2216                    let mut scope = self.scope.write().await;
2217                    scope.suppress_errexit();
2218                }
2219                let left_flow = match self.execute_stmt_flow(left).await {
2220                    Ok(f) => f,
2221                    Err(e) => {
2222                        let mut scope = self.scope.write().await;
2223                        scope.unsuppress_errexit();
2224                        return Err(e);
2225                    }
2226                };
2227                {
2228                    let mut scope = self.scope.write().await;
2229                    scope.unsuppress_errexit();
2230                }
2231                match left_flow {
2232                    ControlFlow::Normal(mut left_result) => {
2233                        self.drain_stderr_into(&mut left_result).await;
2234                        self.update_last_result(&left_result).await;
2235                        if !left_result.ok() {
2236                            let right_flow = self.execute_stmt_flow(right).await?;
2237                            match right_flow {
2238                                ControlFlow::Normal(mut right_result) => {
2239                                    self.drain_stderr_into(&mut right_result).await;
2240                                    self.update_last_result(&right_result).await;
2241                                    let mut combined = left_result;
2242                                    accumulate_result(&mut combined, &right_result);
2243                                    Ok(ControlFlow::ok(combined))
2244                                }
2245                                other => Ok(other),
2246                            }
2247                        } else {
2248                            Ok(ControlFlow::ok(left_result))
2249                        }
2250                    }
2251                    _ => Ok(left_flow), // Propagate non-normal flow
2252                }
2253            }
2254            Stmt::Test(test_expr) => {
2255                let is_true = self.eval_test_async(test_expr).await?;
2256                if is_true {
2257                    Ok(ControlFlow::ok(ExecResult::success("")))
2258                } else {
2259                    Ok(ControlFlow::ok(ExecResult::failure(1, "")))
2260                }
2261            }
2262            Stmt::Empty => Ok(ControlFlow::ok(ExecResult::success(""))),
2263        }
2264        }.instrument(span))
2265    }
2266
2267    /// Execute a pipeline.
2268    #[tracing::instrument(level = "debug", skip(self, pipeline), fields(background = pipeline.background, command_count = pipeline.commands.len()))]
2269    async fn execute_pipeline(&self, pipeline: &crate::ast::Pipeline) -> Result<ExecResult> {
2270        if pipeline.commands.is_empty() {
2271            return Ok(ExecResult::success(""));
2272        }
2273
2274        // Handle background execution (`&` operator)
2275        if pipeline.background {
2276            return self.execute_background(pipeline).await;
2277        }
2278
2279        // All commands go through the runner with the Kernel as dispatcher.
2280        // This is the single execution path — no fast path for single commands.
2281        //
2282        // IMPORTANT: We snapshot exec_ctx into a local context and release the
2283        // lock before running. This prevents deadlocks when dispatch_command
2284        // is called from within the pipeline and recursively triggers another
2285        // pipeline (e.g., via user-defined tools).
2286        let mut ctx = {
2287            let ec = self.exec_ctx.read().await;
2288            let scope = self.scope.read().await;
2289            ExecContext {
2290                backend: ec.backend.clone(),
2291                scope: scope.clone(),
2292                cwd: ec.cwd.clone(),
2293                prev_cwd: ec.prev_cwd.clone(),
2294                stdin: None,
2295                stdin_data: None,
2296                pipe_stdin: None,
2297                pipe_stdout: None,
2298                stderr: ec.stderr.clone(),
2299                tool_schemas: ec.tool_schemas.clone(),
2300                tools: ec.tools.clone(),
2301                job_manager: ec.job_manager.clone(),
2302                pipeline_position: PipelinePosition::Only,
2303                interactive: self.interactive,
2304                aliases: ec.aliases.clone(),
2305                ignore_config: ec.ignore_config.clone(),
2306                output_limit: ec.output_limit.clone(),
2307                allow_external_commands: self.allow_external_commands,
2308                nonce_store: ec.nonce_store.clone(),
2309                trash_backend: ec.trash_backend.clone(),
2310                #[cfg(all(unix, feature = "subprocess"))]
2311                terminal_state: ec.terminal_state.clone(),
2312                dispatcher: self.dispatcher(),
2313                cancel: {
2314                    #[allow(clippy::expect_used)]
2315                    let token = self.cancel_token.lock().expect("cancel_token poisoned");
2316                    token.clone()
2317                },
2318                output_format: None,
2319                vfs_budget: self.vfs_budget.clone(),
2320                watchdog: ec.watchdog.clone(),
2321                #[cfg(all(feature = "localfs", feature = "overlay"))]
2322                overlay_handle: self.overlay_handle.clone(),
2323            }
2324        }; // locks released
2325
2326        let mut result = self.runner.run(&pipeline.commands, &mut ctx, self).await;
2327
2328        // Post-hoc spill check (catches builtins and fast external commands)
2329        if ctx.output_limit.is_enabled() {
2330            let _ = crate::output_limit::spill_if_needed(&mut result, &ctx.output_limit).await;
2331        }
2332
2333        // Signal spill with exit 3; agent reads the spill file directly
2334        // (use `set +o output-limit` before cat/head/tail to bypass the limit)
2335        if result.did_spill {
2336            result.original_code = Some(result.code);
2337            result.code = 3;
2338        }
2339
2340        // Sync changes back from context
2341        {
2342            let mut ec = self.exec_ctx.write().await;
2343            ec.cwd = ctx.cwd.clone();
2344            ec.prev_cwd = ctx.prev_cwd.clone();
2345            ec.aliases = ctx.aliases.clone();
2346            ec.ignore_config = ctx.ignore_config.clone();
2347            ec.output_limit = ctx.output_limit.clone();
2348        }
2349        {
2350            let mut scope = self.scope.write().await;
2351            *scope = ctx.scope.clone();
2352        }
2353
2354        Ok(result)
2355    }
2356
2357    /// Execute a pipeline in the background.
2358    ///
2359    /// The command is spawned as a tokio task, registered with the JobManager,
2360    /// and its output is captured via BoundedStreams. The job is observable via
2361    /// `/v/jobs/{id}/stdout`, `/v/jobs/{id}/stderr`, and `/v/jobs/{id}/status`.
2362    ///
2363    /// Returns immediately with a job ID like "[1]".
2364    #[tracing::instrument(level = "debug", skip(self, pipeline), fields(command_count = pipeline.commands.len()))]
2365    async fn execute_background(&self, pipeline: &crate::ast::Pipeline) -> Result<ExecResult> {
2366        use tokio::sync::oneshot;
2367
2368        // Format the command for display in /v/jobs/{id}/command
2369        let command_str = self.format_pipeline(pipeline);
2370
2371        // Create bounded streams for output capture
2372        let stdout = Arc::new(BoundedStream::default_size());
2373        let stderr = Arc::new(BoundedStream::default_size());
2374
2375        // Create channel for result notification
2376        let (tx, rx) = oneshot::channel();
2377
2378        // Register with JobManager to get job ID and create VFS entries
2379        let job_id = self.jobs.register_with_streams(
2380            command_str.clone(),
2381            rx,
2382            stdout.clone(),
2383            stderr.clone(),
2384        ).await;
2385
2386        // Fork the kernel for this background job. The fork snapshots the
2387        // parent's scope/cwd/aliases/user_tools so mutations stay isolated,
2388        // while sharing the job manager, VFS, and tool registry. The fork's
2389        // full dispatch chain (user tools, .kai scripts, `$(...)` in args)
2390        // is available here — something BackendDispatcher couldn't provide.
2391        //
2392        // The fork gets its own cancellation token (recorded on the job so
2393        // `kill %N` can stop the job — including a pure-builtin job with no OS
2394        // process group) and is stamped with the job id so any external
2395        // command it spawns records its process group for `kill -<sig> %N`.
2396        let cancel = tokio_util::sync::CancellationToken::new();
2397        self.jobs.set_cancel_token(job_id, cancel.clone()).await;
2398        let fork = self.fork_for_background(cancel, job_id).await;
2399        let runner = self.runner.clone();
2400        let commands = pipeline.commands.clone();
2401
2402        // Snapshot the fork's exec_ctx for the spawned task. We have to do
2403        // this before tokio::spawn because the fork's exec_ctx is behind a
2404        // tokio RwLock and we want the spawned task to own its ctx.
2405        let mut bg_ctx = {
2406            let ec = fork.exec_ctx.read().await;
2407            ec.child_for_pipeline()
2408        };
2409        bg_ctx.scope = fork.scope.read().await.clone();
2410        // The fork's dispatcher points at the fork itself; set it here so
2411        // builtins inside the background task (e.g. timeout) re-dispatch
2412        // through the fork, not the parent.
2413        bg_ctx.dispatcher = fork.dispatcher();
2414
2415        // Spawn the background task. Propagate the embedder's trace context
2416        // across the spawn boundary so the job's spans stay in the same trace.
2417        tokio::spawn(crate::telemetry::bind_current_context(async move {
2418            // runner.run needs a &dyn CommandDispatcher; fork.as_ref()
2419            // gives us that (Kernel implements CommandDispatcher).
2420            let result = runner.run(&commands, &mut bg_ctx, fork.as_ref()).await;
2421
2422            // Write output to streams
2423            let text = result.text_out();
2424            if !text.is_empty() {
2425                stdout.write(text.as_bytes()).await;
2426            }
2427            if !result.err.is_empty() {
2428                stderr.write(result.err.as_bytes()).await;
2429            }
2430
2431            // Close streams
2432            stdout.close().await;
2433            stderr.close().await;
2434
2435            // Send result to JobManager (ignore error if receiver dropped)
2436            let _ = tx.send(result);
2437        }));
2438
2439        Ok(ExecResult::success(format!("[{}]", job_id)))
2440    }
2441
2442    /// Format a pipeline as a command string for display.
2443    fn format_pipeline(&self, pipeline: &crate::ast::Pipeline) -> String {
2444        pipeline.commands
2445            .iter()
2446            .map(|cmd| {
2447                let mut parts = vec![cmd.name.clone()];
2448                for arg in &cmd.args {
2449                    match arg {
2450                        Arg::Positional(expr) => {
2451                            parts.push(self.format_expr(expr));
2452                        }
2453                        Arg::Named { key, value } => {
2454                            parts.push(format!("--{}={}", key, self.format_expr(value)));
2455                        }
2456                        Arg::WordAssign { key, value } => {
2457                            parts.push(format!("{}={}", key, self.format_expr(value)));
2458                        }
2459                        Arg::ShortFlag(name) => {
2460                            parts.push(format!("-{}", name));
2461                        }
2462                        Arg::LongFlag(name) => {
2463                            parts.push(format!("--{}", name));
2464                        }
2465                        Arg::DoubleDash => {
2466                            parts.push("--".to_string());
2467                        }
2468                    }
2469                }
2470                parts.join(" ")
2471            })
2472            .collect::<Vec<_>>()
2473            .join(" | ")
2474    }
2475
2476    /// Format an expression as a string for display.
2477    fn format_expr(&self, expr: &Expr) -> String {
2478        match expr {
2479            Expr::Literal(Value::String(s)) => {
2480                if s.contains(' ') || s.contains('"') {
2481                    format!("'{}'", s.replace('\'', "\\'"))
2482                } else {
2483                    s.clone()
2484                }
2485            }
2486            Expr::Literal(Value::Int(i)) => i.to_string(),
2487            Expr::Literal(Value::Float(f)) => f.to_string(),
2488            Expr::Literal(Value::Bool(b)) => b.to_string(),
2489            Expr::Literal(Value::Null) => "null".to_string(),
2490            Expr::VarRef(path) => {
2491                let name = path.segments.iter()
2492                    .map(|seg| match seg {
2493                        crate::ast::VarSegment::Field(f) => f.clone(),
2494                    })
2495                    .collect::<Vec<_>>()
2496                    .join(".");
2497                format!("${{{}}}", name)
2498            }
2499            Expr::Interpolated(_) => "\"...\"".to_string(),
2500            Expr::HereDocBody { .. } => "<<heredoc".to_string(),
2501            _ => "...".to_string(),
2502        }
2503    }
2504
2505    /// Execute a single command.
2506    async fn execute_command(&self, name: &str, args: &[Arg]) -> Result<ExecResult> {
2507        self.execute_command_depth(name, args, 0).await
2508    }
2509
2510    #[tracing::instrument(level = "info", skip(self, args, alias_depth), fields(command = %name), err)]
2511    async fn execute_command_depth(&self, name: &str, args: &[Arg], alias_depth: u8) -> Result<ExecResult> {
2512        // Special built-ins
2513        match name {
2514            "true" => return Ok(ExecResult::success("")),
2515            "false" => return Ok(ExecResult::failure(1, "")),
2516            "source" | "." => return self.execute_source(args).await,
2517            _ => {}
2518        }
2519
2520        // Alias expansion (with recursion limit)
2521        if alias_depth < 10 {
2522            let alias_value = {
2523                let ctx = self.exec_ctx.read().await;
2524                ctx.aliases.get(name).cloned()
2525            };
2526            if let Some(alias_val) = alias_value {
2527                // Split alias value into command + args
2528                let parts: Vec<&str> = alias_val.split_whitespace().collect();
2529                if let Some((alias_cmd, alias_args)) = parts.split_first() {
2530                    let mut new_args: Vec<Arg> = alias_args
2531                        .iter()
2532                        .map(|a| Arg::Positional(Expr::Literal(Value::String(a.to_string()))))
2533                        .collect();
2534                    new_args.extend_from_slice(args);
2535                    return Box::pin(self.execute_command_depth(alias_cmd, &new_args, alias_depth + 1)).await;
2536                }
2537            }
2538        }
2539
2540        // Handle /v/bin/ prefix — dispatch to builtins via virtual path
2541        if let Some(builtin_name) = name.strip_prefix("/v/bin/") {
2542            return match self.tools.get(builtin_name) {
2543                Some(_) => Box::pin(self.execute_command_depth(builtin_name, args, alias_depth)).await,
2544                None => Ok(ExecResult::failure(127, format!("command not found: {}", name))),
2545            };
2546        }
2547
2548        // Check user-defined tools first
2549        {
2550            let user_tools = self.user_tools.read().await;
2551            if let Some(tool_def) = user_tools.get(name) {
2552                let tool_def = tool_def.clone();
2553                drop(user_tools);
2554                return self.execute_user_tool(tool_def, args).await;
2555            }
2556        }
2557
2558        // Look up builtin tool
2559        let tool = match self.tools.get(name) {
2560            Some(t) => t,
2561            None => {
2562                // Try executing as .kai script from PATH
2563                if let Some(result) = self.try_execute_script(name, args).await? {
2564                    return Ok(result);
2565                }
2566                // Try executing as external command from PATH
2567                if let Some(result) = self.try_execute_external(name, args).await? {
2568                    return Ok(result);
2569                }
2570
2571                // Try backend-registered tools (embedder engines, etc.)
2572                // Look up tool schema for positional→named mapping.
2573                // Clone backend and drop read lock before awaiting (may involve network I/O).
2574                // Backend tools expect named JSON params, so enable positional mapping.
2575                let backend = self.exec_ctx.read().await.backend.clone();
2576                let tool_schema = backend.get_tool(name).await.ok().flatten().map(|t| {
2577                    let mut s = t.schema;
2578                    // Flat backend/MCP tools expect named JSON params, so map
2579                    // bare positionals onto named params. Subcommand-aware tools
2580                    // route positionals through the subcommand path and declare
2581                    // map_positionals per leaf (kj keeps it false so it re-parses
2582                    // the argv with its own clap) — don't blanket-override them.
2583                    if s.subcommands.is_empty() {
2584                        s.map_positionals = true;
2585                    }
2586                    s
2587                });
2588                let tool_args = self.build_args_async(args, tool_schema.as_ref()).await?;
2589                let mut ctx = self.exec_ctx.write().await;
2590                {
2591                    let scope = self.scope.read().await;
2592                    ctx.scope = scope.clone();
2593                }
2594                let backend = ctx.backend.clone();
2595                match backend.call_tool(name, tool_args, &mut *ctx).await {
2596                    Ok(tool_result) => {
2597                        let mut scope = self.scope.write().await;
2598                        *scope = ctx.scope.clone();
2599                        let mut exec = ExecResult::from_output(
2600                            tool_result.code as i64, tool_result.stdout, tool_result.stderr,
2601                        );
2602                        exec.set_output(tool_result.output);
2603                        return Ok(exec);
2604                    }
2605                    Err(BackendError::ToolNotFound(_)) => {
2606                        // Fall through to "command not found"
2607                    }
2608                    Err(e) => {
2609                        // Backend dispatch is last-resort lookup — if it fails
2610                        // for any reason, the command simply doesn't exist.
2611                        tracing::debug!("backend error for {name}: {e}");
2612                    }
2613                }
2614
2615                return Ok(ExecResult::failure(127, format!("command not found: {}", name)));
2616            }
2617        };
2618
2619        // Build arguments (async to support command substitution, schema-aware for flag values)
2620        let schema = tool.schema();
2621        let tool_args = self.build_args_async(args, Some(&schema)).await?;
2622
2623        // --help / -h: show help unless the tool's schema claims that flag
2624        let schema_claims = |flag: &str| -> bool {
2625            let bare = flag.trim_start_matches('-');
2626            schema.params.iter().any(|p| p.matches_flag(flag) || p.matches_flag(bare))
2627        };
2628        let wants_help =
2629            (tool_args.flags.contains("help") && !schema_claims("help"))
2630            || (tool_args.flags.contains("h") && !schema_claims("-h"));
2631        if wants_help {
2632            let help_topic = crate::help::HelpTopic::Tool(name.to_string());
2633            let ctx = self.exec_ctx.read().await;
2634            let content = crate::help::get_help(&help_topic, &ctx.tool_schemas);
2635            return Ok(ExecResult::with_output(crate::interpreter::OutputData::text(content)));
2636        }
2637
2638        // Snapshot exec_ctx into a local context and release the write lock
2639        // before calling tool.execute. Holding the write across tool execution
2640        // would deadlock any builtin that re-dispatches through ctx.dispatcher
2641        // (timeout, scatter) — the inner dispatch_command needs its own
2642        // exec_ctx.write() and would block forever.
2643        let mut ctx = {
2644            let ec = self.exec_ctx.write().await;
2645            let scope = self.scope.read().await;
2646            ExecContext {
2647                backend: ec.backend.clone(),
2648                scope: scope.clone(),
2649                cwd: ec.cwd.clone(),
2650                prev_cwd: ec.prev_cwd.clone(),
2651                stdin: ec.stdin.clone(),
2652                stdin_data: ec.stdin_data.clone(),
2653                pipe_stdin: None, // streaming pipes are per-pipeline; not snapshotted
2654                pipe_stdout: None,
2655                stderr: ec.stderr.clone(),
2656                tool_schemas: ec.tool_schemas.clone(),
2657                tools: ec.tools.clone(),
2658                job_manager: ec.job_manager.clone(),
2659                pipeline_position: ec.pipeline_position,
2660                interactive: self.interactive,
2661                aliases: ec.aliases.clone(),
2662                ignore_config: ec.ignore_config.clone(),
2663                output_limit: ec.output_limit.clone(),
2664                allow_external_commands: self.allow_external_commands,
2665                nonce_store: ec.nonce_store.clone(),
2666                trash_backend: ec.trash_backend.clone(),
2667                #[cfg(all(unix, feature = "subprocess"))]
2668                terminal_state: ec.terminal_state.clone(),
2669                dispatcher: self.dispatcher(),
2670                // Use ec.cancel (set by dispatch_command from the runner's
2671                // ctx.cancel) so any builtin-swapped child token (e.g. timeout's
2672                // child token) reaches the spawned external via wait_or_kill.
2673                // Falls back to the kernel's own token when ec.cancel is the
2674                // default fresh token from a non-dispatch path.
2675                cancel: ec.cancel.clone(),
2676                output_format: None,
2677                vfs_budget: self.vfs_budget.clone(),
2678                watchdog: ec.watchdog.clone(),
2679                #[cfg(all(feature = "localfs", feature = "overlay"))]
2680                overlay_handle: self.overlay_handle.clone(),
2681            }
2682        }; // both locks released — tool.execute can re-dispatch safely
2683
2684        // Move stdin out of self.exec_ctx into the snapshot (consumed-by-tool
2685        // semantics): take() so a later dispatch doesn't see stale stdin.
2686        // Done after the snapshot above so we hold the write briefly.
2687        {
2688            let mut ec = self.exec_ctx.write().await;
2689            ctx.stdin = ec.stdin.take();
2690            ctx.stdin_data = ec.stdin_data.take();
2691            ctx.pipe_stdin = ec.pipe_stdin.take();
2692            ctx.pipe_stdout = ec.pipe_stdout.take();
2693        }
2694
2695        // Honor --json before the builtin runs so its setting survives a clap
2696        // parse failure (e.g. `cmd --json --bogus-flag` would otherwise drop
2697        // --json on the floor when `try_parse_from` returns Err early).
2698        // The builtin's own `parsed.global.apply(ctx)` becomes idempotent.
2699        GlobalFlags::apply_from_args(&tool_args, &mut ctx);
2700
2701        let result = tool.execute(tool_args, &mut ctx).await;
2702
2703        // Sync mutations back. Tools may have changed scope (set/cd),
2704        // cwd/prev_cwd (cd), and aliases (alias). Also return any unused pipe
2705        // endpoints to self.exec_ctx so dispatch_command's post-execute sync
2706        // hands them back to the pipeline runner — the runner uses
2707        // stage_ctx.pipe_stdout to write the result to the next stage when
2708        // the tool itself didn't take and write to it.
2709        {
2710            let mut scope = self.scope.write().await;
2711            *scope = ctx.scope.clone();
2712        }
2713        {
2714            let mut ec = self.exec_ctx.write().await;
2715            ec.cwd = ctx.cwd;
2716            ec.prev_cwd = ctx.prev_cwd;
2717            ec.aliases = ctx.aliases;
2718            // A builtin (`set -o output-limit`, `kaish-output-limit set`) can
2719            // mutate the runtime output limit; without this sync the change is
2720            // dropped here and never reaches dispatch_command's read-back, so
2721            // it would not survive past the current statement.
2722            ec.output_limit = ctx.output_limit.clone();
2723            ec.pipe_stdin = ctx.pipe_stdin.take();
2724            ec.pipe_stdout = ctx.pipe_stdout.take();
2725        }
2726
2727        // Builtins parse --json via the GlobalFlags flatten in their clap
2728        // struct and write ctx.output_format. The kernel applies it — unless the
2729        // tool owns its own output (renders --json itself), in which case we
2730        // leave its bytes untouched.
2731        let result = finalize_output(result, ctx.output_format, schema.owns_output);
2732
2733        Ok(result)
2734    }
2735
2736    /// The session `HOME` from the kernel scope, if set. Tilde expansion reads
2737    /// this rather than `std::env::var("HOME")` so the kernel stays hermetic —
2738    /// a hermetic embedder (empty `initial_vars`) gets `None`, and `~` is left
2739    /// unexpanded rather than leaking the host home directory.
2740    async fn scope_home(&self) -> Option<String> {
2741        match self.scope.read().await.get("HOME") {
2742            Some(Value::String(s)) => Some(s.clone()),
2743            _ => None,
2744        }
2745    }
2746
2747    /// Pull `consumes` positional args after a non-bool flag and stash them
2748    /// on `tool_args.named` under the canonical param name.
2749    ///
2750    /// - `consumes == 1` keeps the historical contract: a single scalar value.
2751    /// - `consumes > 1` accumulates each occurrence as an inner
2752    ///   `serde_json::Value::Array` inside `named[canonical] =
2753    ///   Value::Json(Array(...))`, preserving invocation order. This is the
2754    ///   shape jq's `--arg NAME VAL` / `--argjson NAME VAL` land in.
2755    ///
2756    /// Errors loudly if the flag is missing required positionals — matches
2757    /// kaish's "no silent fallback" posture and mirrors real jq, which
2758    /// errors on `--arg NAME` with no value.
2759    #[allow(clippy::too_many_arguments)]
2760    async fn consume_flag_positionals(
2761        &self,
2762        args: &[Arg],
2763        flag_name: &str,
2764        canonical: &str,
2765        consumes: usize,
2766        positional_indices: &[usize],
2767        consumed: &mut std::collections::HashSet<usize>,
2768        current_idx: usize,
2769        tool_args: &mut ToolArgs,
2770    ) -> Result<()> {
2771        let home = self.scope_home().await;
2772        let mut collected: Vec<Value> = Vec::with_capacity(consumes.max(1));
2773        for _ in 0..consumes.max(1) {
2774            let next_pos = positional_indices
2775                .iter()
2776                .find(|idx| **idx > current_idx && !consumed.contains(idx))
2777                .copied();
2778            match next_pos {
2779                Some(pos_idx) => {
2780                    if let Arg::Positional(expr) = &args[pos_idx] {
2781                        let value = self.eval_expr_async(expr).await?;
2782                        let value = apply_tilde_expansion(value, home.as_deref());
2783                        collected.push(value);
2784                        consumed.insert(pos_idx);
2785                    }
2786                }
2787                None => {
2788                    if consumes <= 1 && collected.is_empty() {
2789                        // Back-compat: a flag with no follow-up positional
2790                        // becomes a bare flag. `--path` with nothing after
2791                        // lands in `flags`, same as before this refactor.
2792                        tool_args.flags.insert(flag_name.to_string());
2793                        return Ok(());
2794                    }
2795                    anyhow::bail!(
2796                        "--{flag_name} requires {consumes} argument{}, got {}",
2797                        if consumes == 1 { "" } else { "s" },
2798                        collected.len()
2799                    );
2800                }
2801            }
2802        }
2803
2804        if consumes <= 1 {
2805            if let Some(v) = collected.pop() {
2806                tool_args.named.insert(canonical.to_string(), v);
2807            }
2808            return Ok(());
2809        }
2810
2811        // Multi-consume: accumulate under named[canonical] as array-of-arrays.
2812        let occ: Vec<serde_json::Value> = collected
2813            .into_iter()
2814            .map(|v| crate::interpreter::value_to_json(&v))
2815            .collect();
2816        let entry = tool_args
2817            .named
2818            .entry(canonical.to_string())
2819            .or_insert_with(|| Value::Json(serde_json::Value::Array(Vec::new())));
2820        if let Value::Json(serde_json::Value::Array(outer)) = entry {
2821            outer.push(serde_json::Value::Array(occ));
2822        } else {
2823            anyhow::bail!(
2824                "--{flag_name}: named[{canonical}] already holds a non-array value"
2825            );
2826        }
2827        Ok(())
2828    }
2829
2830    /// Build tool arguments from AST args.
2831    ///
2832    /// Uses async evaluation to support command substitution in arguments.
2833    async fn build_args_async(&self, args: &[Arg], schema: Option<&crate::tools::ToolSchema>) -> Result<ToolArgs> {
2834        let mut tool_args = ToolArgs::new();
2835        let home = self.scope_home().await;
2836        // Subcommand-aware tools (e.g. `kj context list`) expose a tree of
2837        // schemas; pick the leaf the leading positionals route to and bind
2838        // flags against *its* params. Flat tools return the root. select_leaf
2839        // errors (fail loud) if a computed positional sits where a subcommand
2840        // selector is required.
2841        let leaf = match schema {
2842            Some(s) => Some(select_leaf(s, args)?),
2843            None => None,
2844        };
2845        // Bind against the leaf's params, but MERGE the root schema's params on
2846        // top as "global" flags: a value-flag declared at the tool's top level
2847        // (e.g. kj's `--confirm <nonce>`) must bind at every leaf, including when
2848        // it trails the subcommand path (`kj context retag a b --confirm <n>`).
2849        // The leaf wins on name conflicts. For a flat tool, leaf == root, so the
2850        // merge is a harmless no-op.
2851        let mut param_lookup = schema.map(schema_param_lookup).unwrap_or_default();
2852        if let Some(l) = leaf {
2853            param_lookup.extend(schema_param_lookup(l));
2854        }
2855        // accepts_word_assign keys off the root tool name (the WORD_ASSIGN list),
2856        // not the leaf — it's a property of the command, not the subcommand.
2857        let accepts_word_assign = schema
2858            .map(|s| crate::tools::accepts_word_assign(s.name.as_str()))
2859            .unwrap_or(false);
2860
2861        // Track which positional indices have been consumed as flag values
2862        let mut consumed: std::collections::HashSet<usize> = std::collections::HashSet::new();
2863        let mut past_double_dash = false;
2864
2865        // Find positional arg indices for flag value consumption
2866        let positional_indices: Vec<usize> = args.iter().enumerate()
2867            .filter_map(|(i, a)| matches!(a, Arg::Positional(_)).then_some(i))
2868            .collect();
2869
2870        let mut i = 0;
2871        while i < args.len() {
2872            match &args[i] {
2873                Arg::DoubleDash => {
2874                    past_double_dash = true;
2875                }
2876                Arg::Positional(expr) => {
2877                    if !consumed.contains(&i) {
2878                        // Glob expansion: bare glob patterns expand to matching files
2879                        if let Expr::GlobPattern(pattern) = expr {
2880                            let glob_enabled = {
2881                                let scope = self.scope.read().await;
2882                                scope.glob_enabled()
2883                            };
2884                            if glob_enabled {
2885                                let (paths, cwd) = {
2886                                    let ctx = self.exec_ctx.read().await;
2887                                    let paths = ctx.expand_glob(pattern).await
2888                                        .map_err(|e| anyhow::anyhow!("glob: {}", e))?;
2889                                    let cwd = ctx.resolve_path(".");
2890                                    (paths, cwd)
2891                                };
2892                                if paths.is_empty() {
2893                                    return Err(anyhow::anyhow!("no matches: {}", pattern));
2894                                }
2895                                for path in paths {
2896                                    let display = if !pattern.starts_with('/') {
2897                                        path.strip_prefix(&cwd)
2898                                            .unwrap_or(&path)
2899                                            .to_string_lossy().into_owned()
2900                                    } else {
2901                                        path.to_string_lossy().into_owned()
2902                                    };
2903                                    tool_args.positional.push(Value::String(display));
2904                                }
2905                                i += 1;
2906                                continue;
2907                            }
2908                        }
2909                        let value = self.eval_expr_async(expr).await?;
2910                        let value = apply_tilde_expansion(value, home.as_deref());
2911                        tool_args.positional.push(value);
2912                    }
2913                }
2914                Arg::Named { key, value } => {
2915                    let val = self.eval_expr_async(value).await?;
2916                    let val = apply_tilde_expansion(val, home.as_deref());
2917                    tool_args.named.insert(key.clone(), val);
2918                }
2919                Arg::WordAssign { key, value } => {
2920                    let val = self.eval_expr_async(value).await?;
2921                    let val = apply_tilde_expansion(val, home.as_deref());
2922                    if accepts_word_assign {
2923                        tool_args.named.insert(key.clone(), val);
2924                    } else {
2925                        // Stringify "key=value" and pass as a positional.
2926                        // Matches bash: `cat foo=bar` opens a file named `foo=bar`.
2927                        let val_str = crate::interpreter::value_to_string(&val);
2928                        tool_args.positional.push(Value::String(format!("{key}={val_str}")));
2929                    }
2930                }
2931                Arg::ShortFlag(name) => {
2932                    if past_double_dash {
2933                        tool_args.positional.push(Value::String(format!("-{name}")));
2934                    } else if name.len() == 1 {
2935                        let flag_name = name.as_str();
2936                        let lookup = param_lookup.get(flag_name);
2937                        let is_bool = lookup.map(|(_, typ, _)| is_bool_type(typ)).unwrap_or(true);
2938
2939                        if is_bool {
2940                            tool_args.flags.insert(flag_name.to_string());
2941                        } else {
2942                            // Non-bool: consume `consumes` positionals as value(s)
2943                            let canonical = lookup.map(|(n, _, _)| *n).unwrap_or(flag_name);
2944                            let consumes = lookup.map(|(_, _, c)| *c).unwrap_or(1);
2945                            self.consume_flag_positionals(
2946                                args,
2947                                name,
2948                                canonical,
2949                                consumes,
2950                                &positional_indices,
2951                                &mut consumed,
2952                                i,
2953                                &mut tool_args,
2954                            )
2955                            .await?;
2956                        }
2957                    } else if let Some(&(canonical, typ, consumes)) = param_lookup.get(name.as_str()) {
2958                        // Multi-char short flag matches a schema param (POSIX style: -name value)
2959                        if is_bool_type(typ) {
2960                            tool_args.flags.insert(canonical.to_string());
2961                        } else {
2962                            self.consume_flag_positionals(
2963                                args,
2964                                name,
2965                                canonical,
2966                                consumes,
2967                                &positional_indices,
2968                                &mut consumed,
2969                                i,
2970                                &mut tool_args,
2971                            )
2972                            .await?;
2973                        }
2974                    } else {
2975                        // Multi-char combined flags like -la: always boolean
2976                        for c in name.chars() {
2977                            tool_args.flags.insert(c.to_string());
2978                        }
2979                    }
2980                }
2981                Arg::LongFlag(name) => {
2982                    if past_double_dash {
2983                        tool_args.positional.push(Value::String(format!("--{name}")));
2984                    } else {
2985                        let lookup = param_lookup.get(name.as_str());
2986                        // An *undeclared* long flag under a `map_positionals`
2987                        // (backend/MCP) schema that is immediately followed by an
2988                        // unconsumed positional is ambiguous: kaish can't tell the
2989                        // space-form value (`--type explorer`) from a bool flag
2990                        // before a real positional (`--force file.txt`). Defaulting
2991                        // to bool here silently divorces the value and misroutes it
2992                        // — a privilege-escalation-by-typo against deny-by-default
2993                        // embedders (docs/issues.md). Fail loud instead of guessing.
2994                        let ambiguous_value = (lookup.is_none()
2995                            && leaf.is_some_and(|s| s.map_positionals)
2996                            && !consumed.contains(&(i + 1)))
2997                            .then(|| match args.get(i + 1) {
2998                                // Echo a concrete value for a copy-pasteable fix
2999                                // when it's a plain literal; fall back to VALUE.
3000                                Some(Arg::Positional(Expr::Literal(Value::String(s)))) => {
3001                                    Some(s.clone())
3002                                }
3003                                Some(Arg::Positional(_)) => Some("VALUE".to_string()),
3004                                _ => None,
3005                            })
3006                            .flatten();
3007                        if let Some(val) = ambiguous_value {
3008                            let tool = leaf.map(|s| s.name.as_str()).unwrap_or("command");
3009                            anyhow::bail!(
3010                                "{tool}: --{name} is not a declared flag, so the \
3011                                 space-separated value would be silently dropped. \
3012                                 Use --{name}={val}, or have {tool} declare --{name} \
3013                                 in its schema."
3014                            );
3015                        }
3016                        let is_bool = lookup.map(|(_, typ, _)| is_bool_type(typ)).unwrap_or(true);
3017
3018                        if is_bool {
3019                            tool_args.flags.insert(name.clone());
3020                        } else {
3021                            let canonical = lookup.map(|(n, _, _)| *n).unwrap_or(name.as_str());
3022                            let consumes = lookup.map(|(_, _, c)| *c).unwrap_or(1);
3023                            self.consume_flag_positionals(
3024                                args,
3025                                name,
3026                                canonical,
3027                                consumes,
3028                                &positional_indices,
3029                                &mut consumed,
3030                                i,
3031                                &mut tool_args,
3032                            )
3033                            .await?;
3034                        }
3035                    }
3036                }
3037            }
3038            i += 1;
3039        }
3040
3041        // Map remaining positionals to unfilled non-bool schema params (in order).
3042        // This enables `drift_push "abc" "hello"` → named["target_ctx"] = "abc", named["content"] = "hello"
3043        // Positionals that appeared after `--` are never mapped (they're raw data).
3044        // Only for backend/external tools (map_positionals=true). Builtins handle their own positionals.
3045        // Keyed off the routed leaf so a subcommand tool maps against the active
3046        // leaf's params (kj leaves keep map_positionals=false → block skipped).
3047        if let Some(schema) = leaf.filter(|s| s.map_positionals) {
3048            let pre_dash_count = if past_double_dash {
3049                let dash_pos = args.iter().position(|a| matches!(a, Arg::DoubleDash)).unwrap_or(args.len());
3050                positional_indices.iter()
3051                    .filter(|idx| **idx < dash_pos && !consumed.contains(idx))
3052                    .count()
3053            } else {
3054                tool_args.positional.len()
3055            };
3056
3057            let mut remaining = Vec::new();
3058            let mut positional_iter = tool_args.positional.drain(..).enumerate();
3059
3060            for param in &schema.params {
3061                if tool_args.named.contains_key(&param.name) || tool_args.flags.contains(&param.name) {
3062                    continue;
3063                }
3064                if is_bool_type(&param.param_type) {
3065                    continue;
3066                }
3067                loop {
3068                    match positional_iter.next() {
3069                        Some((idx, val)) if idx < pre_dash_count => {
3070                            tool_args.named.insert(param.name.clone(), val);
3071                            break;
3072                        }
3073                        Some((_, val)) => {
3074                            remaining.push(val);
3075                        }
3076                        None => break,
3077                    }
3078                }
3079            }
3080
3081            remaining.extend(positional_iter.map(|(_, v)| v));
3082            tool_args.positional = remaining;
3083        }
3084
3085        Ok(tool_args)
3086    }
3087
3088    /// Build arguments as flat string list for external commands.
3089    ///
3090    /// Unlike `build_args_async` which separates flags into a HashSet (for schema-aware builtins),
3091    /// this preserves the original flag format as strings for external commands:
3092    /// - `-l` stays as `-l`
3093    /// - `--verbose` stays as `--verbose`
3094    /// - `key=value` stays as `key=value`
3095    ///
3096    /// This is what external commands expect in their argv.
3097    #[cfg(feature = "subprocess")]
3098    async fn build_args_flat(&self, args: &[Arg]) -> Result<Vec<String>> {
3099        let mut argv = Vec::new();
3100        let home = self.scope_home().await;
3101        for arg in args {
3102            match arg {
3103                Arg::Positional(expr) => {
3104                    // Glob expansion for external commands
3105                    if let Expr::GlobPattern(pattern) = expr {
3106                        let glob_enabled = {
3107                            let scope = self.scope.read().await;
3108                            scope.glob_enabled()
3109                        };
3110                        if glob_enabled {
3111                            let (paths, cwd) = {
3112                                let ctx = self.exec_ctx.read().await;
3113                                let paths = ctx.expand_glob(pattern).await
3114                                    .map_err(|e| anyhow::anyhow!("glob: {}", e))?;
3115                                let cwd = ctx.resolve_path(".");
3116                                (paths, cwd)
3117                            };
3118                            if paths.is_empty() {
3119                                return Err(anyhow::anyhow!("no matches: {}", pattern));
3120                            }
3121                            for path in paths {
3122                                let display = if !pattern.starts_with('/') {
3123                                    path.strip_prefix(&cwd)
3124                                        .unwrap_or(&path)
3125                                        .to_string_lossy().into_owned()
3126                                } else {
3127                                    path.to_string_lossy().into_owned()
3128                                };
3129                                argv.push(display);
3130                            }
3131                            continue;
3132                        }
3133                    }
3134                    let value = self.eval_expr_async(expr).await?;
3135                    let value = apply_tilde_expansion(value, home.as_deref());
3136                    argv.push(value_to_string(&value));
3137                }
3138                Arg::Named { key, value } => {
3139                    let val = self.eval_expr_async(value).await?;
3140                    let val = apply_tilde_expansion(val, home.as_deref());
3141                    argv.push(format!("--{}={}", key, value_to_string(&val)));
3142                }
3143                Arg::WordAssign { key, value } => {
3144                    let val = self.eval_expr_async(value).await?;
3145                    let val = apply_tilde_expansion(val, home.as_deref());
3146                    argv.push(format!("{}={}", key, value_to_string(&val)));
3147                }
3148                Arg::ShortFlag(name) => {
3149                    // Preserve original format: -l, -la (combined flags)
3150                    argv.push(format!("-{}", name));
3151                }
3152                Arg::LongFlag(name) => {
3153                    // Preserve original format: --verbose
3154                    argv.push(format!("--{}", name));
3155                }
3156                Arg::DoubleDash => {
3157                    // Preserve the -- marker
3158                    argv.push("--".to_string());
3159                }
3160            }
3161        }
3162        Ok(argv)
3163    }
3164
3165    /// Async expression evaluator that supports command substitution.
3166    ///
3167    /// This is used for contexts where expressions may contain `$(...)` command
3168    /// substitution. Unlike the sync `eval_expr`, this can execute pipelines.
3169    fn eval_expr_async<'a>(&'a self, expr: &'a Expr) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Value>> + Send + 'a>> {
3170        Box::pin(async move {
3171        match expr {
3172            Expr::Literal(value) => Ok(value.clone()),
3173            Expr::VarRef(path) => {
3174                let scope = self.scope.read().await;
3175                scope.resolve_path(path)
3176                    .ok_or_else(|| anyhow::anyhow!("undefined variable"))
3177            }
3178            Expr::Interpolated(parts) => {
3179                let mut result = String::new();
3180                for part in parts {
3181                    result.push_str(&self.eval_string_part_async(part).await?);
3182                }
3183                Ok(Value::String(result))
3184            }
3185            Expr::HereDocBody { parts, strip_tabs } => {
3186                let mut result = String::new();
3187                for sp in parts {
3188                    result.push_str(&self.eval_string_part_async(&sp.part).await?);
3189                }
3190                if *strip_tabs {
3191                    Ok(Value::String(crate::interpreter::strip_leading_tabs(&result)))
3192                } else {
3193                    Ok(Value::String(result))
3194                }
3195            }
3196            Expr::BinaryOp { left, op, right } => match op {
3197                BinaryOp::And => {
3198                    let left_val = self.eval_expr_async(left).await?;
3199                    if !is_truthy(&left_val) {
3200                        return Ok(left_val);
3201                    }
3202                    self.eval_expr_async(right).await
3203                }
3204                BinaryOp::Or => {
3205                    let left_val = self.eval_expr_async(left).await?;
3206                    if is_truthy(&left_val) {
3207                        return Ok(left_val);
3208                    }
3209                    self.eval_expr_async(right).await
3210                }
3211            },
3212            Expr::CommandSubst(pipeline) => {
3213                // Snapshot scope+cwd before running — only output escapes,
3214                // not side effects like `cd` or variable assignments.
3215                let saved_scope = { self.scope.read().await.clone() };
3216                let saved_cwd = {
3217                    let ec = self.exec_ctx.read().await;
3218                    (ec.cwd.clone(), ec.prev_cwd.clone())
3219                };
3220
3221                // Capture result without `?` — restore state unconditionally
3222                let run_result = self.execute_pipeline(pipeline).await;
3223
3224                // Restore scope and cwd regardless of success/failure
3225                {
3226                    let mut scope = self.scope.write().await;
3227                    *scope = saved_scope;
3228                    if let Ok(ref r) = run_result {
3229                        scope.set_last_result(r.clone());
3230                    }
3231                }
3232                {
3233                    let mut ec = self.exec_ctx.write().await;
3234                    ec.cwd = saved_cwd.0;
3235                    ec.prev_cwd = saved_cwd.1;
3236                }
3237
3238                // Now propagate the error
3239                let result = run_result?;
3240
3241                // Prefer structured data (enables `for i in $(cmd)` iteration)
3242                if let Some(data) = &result.data {
3243                    Ok(data.clone())
3244                } else if let Some(output) = result.output() {
3245                    // Flat non-text node lists (glob, ls, tree) → iterable array
3246                    if output.is_flat() && !output.is_simple_text() && !output.root.is_empty() {
3247                        let items: Vec<serde_json::Value> = output.root.iter()
3248                            .map(|n| serde_json::Value::String(n.display_name().to_string()))
3249                            .collect();
3250                        Ok(Value::Json(serde_json::Value::Array(items)))
3251                    } else {
3252                        Ok(Value::String(result.text_out().trim_end().to_string()))
3253                    }
3254                } else {
3255                    // Otherwise return stdout as single string (NO implicit splitting)
3256                    Ok(Value::String(result.text_out().trim_end().to_string()))
3257                }
3258            }
3259            Expr::Test(test_expr) => {
3260                Ok(Value::Bool(self.eval_test_async(test_expr).await?))
3261            }
3262            Expr::Positional(n) => {
3263                let scope = self.scope.read().await;
3264                match scope.get_positional(*n) {
3265                    Some(s) => Ok(Value::String(s.to_string())),
3266                    None => Ok(Value::String(String::new())),
3267                }
3268            }
3269            Expr::AllArgs => {
3270                let scope = self.scope.read().await;
3271                Ok(Value::String(scope.all_args().join(" ")))
3272            }
3273            Expr::ArgCount => {
3274                let scope = self.scope.read().await;
3275                Ok(Value::Int(scope.arg_count() as i64))
3276            }
3277            Expr::VarLength(name) => {
3278                let scope = self.scope.read().await;
3279                match scope.get(name) {
3280                    Some(value) => Ok(Value::Int(value_to_string(value).len() as i64)),
3281                    None => Ok(Value::Int(0)),
3282                }
3283            }
3284            Expr::VarWithDefault { name, default } => {
3285                let scope = self.scope.read().await;
3286                let use_default = match scope.get(name) {
3287                    Some(value) => value_to_string(value).is_empty(),
3288                    None => true,
3289                };
3290                drop(scope); // Release the lock before recursive evaluation
3291                if use_default {
3292                    // Evaluate the default parts (supports nested expansions)
3293                    self.eval_string_parts_async(default).await.map(Value::String)
3294                } else {
3295                    let scope = self.scope.read().await;
3296                    scope.get(name).cloned().ok_or_else(|| anyhow::anyhow!("variable '{}' not found", name))
3297                }
3298            }
3299            Expr::Arithmetic(expr_str) => {
3300                let scope = self.scope.read().await;
3301                crate::arithmetic::eval_arithmetic(expr_str, &scope)
3302                    .map(Value::Int)
3303                    .map_err(|e| anyhow::anyhow!("arithmetic error: {}", e))
3304            }
3305            Expr::Command(cmd) => {
3306                // Execute command and return boolean based on exit code
3307                let result = self.execute_command(&cmd.name, &cmd.args).await?;
3308                Ok(Value::Bool(result.code == 0))
3309            }
3310            Expr::LastExitCode => {
3311                let scope = self.scope.read().await;
3312                Ok(Value::Int(scope.last_result().code))
3313            }
3314            Expr::CurrentPid => {
3315                let scope = self.scope.read().await;
3316                Ok(Value::Int(scope.pid() as i64))
3317            }
3318            Expr::GlobPattern(s) => Ok(Value::String(s.clone())),
3319        }
3320        })
3321    }
3322
3323    /// Async helper to evaluate multiple StringParts into a single string.
3324    fn eval_string_parts_async<'a>(&'a self, parts: &'a [StringPart]) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<String>> + Send + 'a>> {
3325        Box::pin(async move {
3326            let mut result = String::new();
3327            for part in parts {
3328                result.push_str(&self.eval_string_part_async(part).await?);
3329            }
3330            Ok(result)
3331        })
3332    }
3333
3334    /// Async helper to evaluate a StringPart.
3335    /// Evaluate a `[[ ]]` test expression asynchronously, routing file tests
3336    /// through the VFS backend instead of using raw `std::path`.
3337    fn eval_test_async<'a>(&'a self, test_expr: &'a TestExpr) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<bool>> + Send + 'a>> {
3338        Box::pin(async move {
3339            match test_expr {
3340                TestExpr::FileTest { op, path } => {
3341                    let path_value = self.eval_expr_async(path).await?;
3342                    let path_str = value_to_string(&path_value);
3343                    let backend = self.exec_ctx.read().await.backend.clone();
3344                    let entry = backend.stat(std::path::Path::new(&path_str)).await.ok();
3345                    Ok(match op {
3346                        FileTestOp::Exists => entry.is_some(),
3347                        FileTestOp::IsFile => entry.as_ref().is_some_and(|e| e.is_file()),
3348                        FileTestOp::IsDir => entry.as_ref().is_some_and(|e| e.is_dir()),
3349                        FileTestOp::Readable => entry.is_some(),
3350                        FileTestOp::Writable => entry.as_ref().is_some_and(|e| {
3351                            e.permissions.is_none_or(|p| p & 0o222 != 0)
3352                        }),
3353                        FileTestOp::Executable => entry.as_ref().is_some_and(|e| {
3354                            e.permissions.is_some_and(|p| p & 0o111 != 0)
3355                        }),
3356                    })
3357                }
3358                TestExpr::StringTest { op, value } => {
3359                    let val = self.eval_expr_async(value).await?;
3360                    let s = value_to_string(&val);
3361                    Ok(match op {
3362                        crate::ast::StringTestOp::IsEmpty => s.is_empty(),
3363                        crate::ast::StringTestOp::IsNonEmpty => !s.is_empty(),
3364                    })
3365                }
3366                TestExpr::Comparison { left, op, right } => {
3367                    // Evaluate operands async (handles $(cmd)), then compare sync
3368                    let left_val = self.eval_expr_async(left).await?;
3369                    let right_val = self.eval_expr_async(right).await?;
3370                    let resolved = TestExpr::Comparison {
3371                        left: Box::new(Expr::Literal(left_val)),
3372                        op: *op,
3373                        right: Box::new(Expr::Literal(right_val)),
3374                    };
3375                    let expr = Expr::Test(Box::new(resolved));
3376                    let mut scope = self.scope.write().await;
3377                    let value = eval_expr(&expr, &mut scope)
3378                        .map_err(|e| anyhow::anyhow!("{}", e))?;
3379                    Ok(value_to_bool(&value))
3380                }
3381                TestExpr::And { left, right } => {
3382                    if !self.eval_test_async(left).await? {
3383                        Ok(false)
3384                    } else {
3385                        self.eval_test_async(right).await
3386                    }
3387                }
3388                TestExpr::Or { left, right } => {
3389                    if self.eval_test_async(left).await? {
3390                        Ok(true)
3391                    } else {
3392                        self.eval_test_async(right).await
3393                    }
3394                }
3395                TestExpr::Not { expr } => {
3396                    Ok(!self.eval_test_async(expr).await?)
3397                }
3398            }
3399        })
3400    }
3401
3402    fn eval_string_part_async<'a>(&'a self, part: &'a StringPart) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<String>> + Send + 'a>> {
3403        Box::pin(async move {
3404            match part {
3405                StringPart::Literal(s) => Ok(s.clone()),
3406                StringPart::Var(path) => {
3407                    let scope = self.scope.read().await;
3408                    match scope.resolve_path(path) {
3409                        Some(value) => Ok(value_to_string(&value)),
3410                        None => Ok(String::new()), // Unset vars expand to empty
3411                    }
3412                }
3413                StringPart::VarWithDefault { name, default } => {
3414                    let scope = self.scope.read().await;
3415                    let use_default = match scope.get(name) {
3416                        Some(value) => value_to_string(value).is_empty(),
3417                        None => true,
3418                    };
3419                    drop(scope); // Release lock before recursive evaluation
3420                    if use_default {
3421                        // Evaluate the default parts (supports nested expansions)
3422                        self.eval_string_parts_async(default).await
3423                    } else {
3424                        let scope = self.scope.read().await;
3425                        Ok(value_to_string(scope.get(name).ok_or_else(|| anyhow::anyhow!("variable '{}' not found", name))?))
3426                    }
3427                }
3428            StringPart::VarLength(name) => {
3429                let scope = self.scope.read().await;
3430                match scope.get(name) {
3431                    Some(value) => Ok(value_to_string(value).len().to_string()),
3432                    None => Ok("0".to_string()),
3433                }
3434            }
3435            StringPart::Positional(n) => {
3436                let scope = self.scope.read().await;
3437                match scope.get_positional(*n) {
3438                    Some(s) => Ok(s.to_string()),
3439                    None => Ok(String::new()),
3440                }
3441            }
3442            StringPart::AllArgs => {
3443                let scope = self.scope.read().await;
3444                Ok(scope.all_args().join(" "))
3445            }
3446            StringPart::ArgCount => {
3447                let scope = self.scope.read().await;
3448                Ok(scope.arg_count().to_string())
3449            }
3450            StringPart::Arithmetic(expr) => {
3451                let scope = self.scope.read().await;
3452                match crate::arithmetic::eval_arithmetic(expr, &scope) {
3453                    Ok(value) => Ok(value.to_string()),
3454                    Err(_) => Ok(String::new()),
3455                }
3456            }
3457            StringPart::CommandSubst(pipeline) => {
3458                // Snapshot scope+cwd — command substitution in strings must
3459                // not leak side effects (e.g., `"dir: $(cd /; pwd)"` must not change cwd).
3460                let saved_scope = { self.scope.read().await.clone() };
3461                let saved_cwd = {
3462                    let ec = self.exec_ctx.read().await;
3463                    (ec.cwd.clone(), ec.prev_cwd.clone())
3464                };
3465
3466                // Capture result without `?` — restore state unconditionally
3467                let run_result = self.execute_pipeline(pipeline).await;
3468
3469                // Restore scope and cwd regardless of success/failure
3470                {
3471                    let mut scope = self.scope.write().await;
3472                    *scope = saved_scope;
3473                    if let Ok(ref r) = run_result {
3474                        scope.set_last_result(r.clone());
3475                    }
3476                }
3477                {
3478                    let mut ec = self.exec_ctx.write().await;
3479                    ec.cwd = saved_cwd.0;
3480                    ec.prev_cwd = saved_cwd.1;
3481                }
3482
3483                // Now propagate the error
3484                let result = run_result?;
3485
3486                Ok(result.text_out().trim_end_matches('\n').to_string())
3487            }
3488            StringPart::LastExitCode => {
3489                let scope = self.scope.read().await;
3490                Ok(scope.last_result().code.to_string())
3491            }
3492            StringPart::CurrentPid => {
3493                let scope = self.scope.read().await;
3494                Ok(scope.pid().to_string())
3495            }
3496        }
3497        })
3498    }
3499
3500    /// Update the last result in scope.
3501    async fn update_last_result(&self, result: &ExecResult) {
3502        let mut scope = self.scope.write().await;
3503        scope.set_last_result(result.clone());
3504    }
3505
3506    /// Drain accumulated pipeline stderr into a result.
3507    ///
3508    /// Called after each sub-statement inside control structures (`if`, `for`,
3509    /// `while`, `case`, `&&`, `||`) so that stderr appears incrementally rather
3510    /// than batching until the entire structure finishes.
3511    async fn drain_stderr_into(&self, result: &mut ExecResult) {
3512        let drained = {
3513            let mut receiver = self.stderr_receiver.lock().await;
3514            receiver.drain_lossy()
3515        };
3516        if !drained.is_empty() {
3517            if !result.err.is_empty() && !result.err.ends_with('\n') {
3518                result.err.push('\n');
3519            }
3520            result.err.push_str(&drained);
3521        }
3522    }
3523
3524    /// Execute a user-defined function with local variable scoping.
3525    ///
3526    /// Functions push a new scope frame for local variables. Variables declared
3527    /// with `local` are scoped to the function; other assignments modify outer
3528    /// scopes (or create in root if new).
3529    async fn execute_user_tool(&self, def: ToolDef, args: &[Arg]) -> Result<ExecResult> {
3530        // 1. Build function args from AST args (async to support command substitution)
3531        let tool_args = self.build_args_async(args, None).await?;
3532
3533        // 2. Push a new scope frame for local variables
3534        {
3535            let mut scope = self.scope.write().await;
3536            scope.push_frame();
3537        }
3538
3539        // 3. Save current positional parameters and set new ones for this function
3540        let saved_positional = {
3541            let mut scope = self.scope.write().await;
3542            let saved = scope.save_positional();
3543
3544            // Set up new positional parameters ($0 = function name, $1, $2, ... = args)
3545            let positional_args: Vec<String> = tool_args.positional
3546                .iter()
3547                .map(value_to_string)
3548                .collect();
3549            scope.set_positional(&def.name, positional_args);
3550
3551            saved
3552        };
3553
3554        // 3. Execute body statements with control flow handling
3555        // Accumulate output across statements (like sh)
3556        let mut accumulated_out = String::new();
3557        let mut accumulated_err = String::new();
3558        let mut last_code = 0i64;
3559        let mut last_data: Option<Value> = None;
3560
3561        // Track execution error for propagation after cleanup
3562        let mut exec_error: Option<anyhow::Error> = None;
3563        let mut exit_code: Option<i64> = None;
3564
3565        for stmt in &def.body {
3566            match self.execute_stmt_flow(stmt).await {
3567                Ok(flow) => {
3568                    // Drain pipeline stderr after each sub-statement.
3569                    let drained = {
3570                        let mut receiver = self.stderr_receiver.lock().await;
3571                        receiver.drain_lossy()
3572                    };
3573                    if !drained.is_empty() {
3574                        accumulated_err.push_str(&drained);
3575                    }
3576
3577                    match flow {
3578                        ControlFlow::Normal(r) => {
3579                            accumulated_out.push_str(&r.text_out());
3580                            accumulated_err.push_str(&r.err);
3581                            last_code = r.code;
3582                            last_data = r.data;
3583                        }
3584                        ControlFlow::Return { value } => {
3585                            accumulated_out.push_str(&value.text_out());
3586                            accumulated_err.push_str(&value.err);
3587                            last_code = value.code;
3588                            last_data = value.data;
3589                            break;
3590                        }
3591                        ControlFlow::Exit { code } => {
3592                            exit_code = Some(code);
3593                            break;
3594                        }
3595                        ControlFlow::Break { result: r, .. } | ControlFlow::Continue { result: r, .. } => {
3596                            accumulated_out.push_str(&r.text_out());
3597                            accumulated_err.push_str(&r.err);
3598                            last_code = r.code;
3599                            last_data = r.data;
3600                        }
3601                    }
3602                }
3603                Err(e) => {
3604                    exec_error = Some(e);
3605                    break;
3606                }
3607            }
3608        }
3609
3610        // 4. Pop scope frame and restore original positional parameters (unconditionally)
3611        {
3612            let mut scope = self.scope.write().await;
3613            scope.pop_frame();
3614            scope.set_positional(saved_positional.0, saved_positional.1);
3615        }
3616
3617        // 5. Propagate error or exit after cleanup
3618        if let Some(e) = exec_error {
3619            return Err(e);
3620        }
3621        if let Some(code) = exit_code {
3622            return Ok(ExecResult::from_parts(code, accumulated_out, accumulated_err, last_data));
3623        }
3624
3625        Ok(ExecResult::from_parts(last_code, accumulated_out, accumulated_err, last_data))
3626    }
3627
3628    /// Execute the `source` / `.` command to include and run a script.
3629    ///
3630    /// Unlike regular tool execution, `source` executes in the CURRENT scope,
3631    /// allowing the sourced script to set variables and modify shell state.
3632    async fn execute_source(&self, args: &[Arg]) -> Result<ExecResult> {
3633        // Get the file path from the first positional argument
3634        let tool_args = self.build_args_async(args, None).await?;
3635        let path = match tool_args.positional.first() {
3636            Some(Value::String(s)) => s.clone(),
3637            Some(v) => value_to_string(v),
3638            None => {
3639                return Ok(ExecResult::failure(1, "source: missing filename"));
3640            }
3641        };
3642
3643        // Resolve path relative to cwd
3644        let full_path = {
3645            let ctx = self.exec_ctx.read().await;
3646            if path.starts_with('/') {
3647                std::path::PathBuf::from(&path)
3648            } else {
3649                ctx.cwd.join(&path)
3650            }
3651        };
3652
3653        // Read file content via backend
3654        let content = {
3655            let ctx = self.exec_ctx.read().await;
3656            match ctx.backend.read(&full_path, None).await {
3657                Ok(bytes) => {
3658                    String::from_utf8(bytes).map_err(|e| {
3659                        anyhow::anyhow!("source: {}: invalid UTF-8: {}", path, e)
3660                    })?
3661                }
3662                Err(e) => {
3663                    return Ok(ExecResult::failure(
3664                        1,
3665                        format!("source: {}: {}", path, e),
3666                    ));
3667                }
3668            }
3669        };
3670
3671        // Parse the content
3672        let program = match crate::parser::parse(&content) {
3673            Ok(p) => p,
3674            Err(errors) => {
3675                let msg = errors
3676                    .iter()
3677                    .map(|e| format!("{}:{}: {}", path, e.span.start, e.message))
3678                    .collect::<Vec<_>>()
3679                    .join("\n");
3680                return Ok(ExecResult::failure(1, format!("source: {}", msg)));
3681            }
3682        };
3683
3684        // Execute each statement in the CURRENT scope (not isolated)
3685        let mut result = ExecResult::success("");
3686        for stmt in program.statements {
3687            if matches!(stmt, crate::ast::Stmt::Empty) {
3688                continue;
3689            }
3690
3691            match self.execute_stmt_flow(&stmt).await {
3692                Ok(flow) => {
3693                    self.drain_stderr_into(&mut result).await;
3694                    match flow {
3695                        ControlFlow::Normal(r) => {
3696                            result = r.clone();
3697                            self.update_last_result(&r).await;
3698                        }
3699                        ControlFlow::Break { .. } | ControlFlow::Continue { .. } => {
3700                            return Err(anyhow::anyhow!(
3701                                "source: {}: unexpected break/continue outside loop",
3702                                path
3703                            ));
3704                        }
3705                        ControlFlow::Return { value } => {
3706                            return Ok(value);
3707                        }
3708                        ControlFlow::Exit { code } => {
3709                            result.code = code;
3710                            return Ok(result);
3711                        }
3712                    }
3713                }
3714                Err(e) => {
3715                    return Err(e.context(format!("source: {}", path)));
3716                }
3717            }
3718        }
3719
3720        Ok(result)
3721    }
3722
3723    /// Try to execute a script from PATH directories.
3724    ///
3725    /// Searches PATH for `{name}.kai` files and executes them in isolated scope
3726    /// (like user-defined tools). Returns None if no script is found.
3727    async fn try_execute_script(&self, name: &str, args: &[Arg]) -> Result<Option<ExecResult>> {
3728        // Get PATH from scope (default to "/bin")
3729        let path_value = {
3730            let scope = self.scope.read().await;
3731            scope
3732                .get("PATH")
3733                .map(value_to_string)
3734                .unwrap_or_else(|| "/bin".to_string())
3735        };
3736
3737        // Search PATH directories for script
3738        for dir in path_value.split(':') {
3739            if dir.is_empty() {
3740                continue;
3741            }
3742
3743            // Build script path: {dir}/{name}.kai
3744            let script_path = PathBuf::from(dir).join(format!("{}.kai", name));
3745
3746            // Check if script exists
3747            let exists = {
3748                let ctx = self.exec_ctx.read().await;
3749                ctx.backend.exists(&script_path).await
3750            };
3751
3752            if !exists {
3753                continue;
3754            }
3755
3756            // Read script content
3757            let content = {
3758                let ctx = self.exec_ctx.read().await;
3759                match ctx.backend.read(&script_path, None).await {
3760                    Ok(bytes) => match String::from_utf8(bytes) {
3761                        Ok(s) => s,
3762                        Err(e) => {
3763                            return Ok(Some(ExecResult::failure(
3764                                1,
3765                                format!("{}: invalid UTF-8: {}", script_path.display(), e),
3766                            )));
3767                        }
3768                    },
3769                    Err(e) => {
3770                        return Ok(Some(ExecResult::failure(
3771                            1,
3772                            format!("{}: {}", script_path.display(), e),
3773                        )));
3774                    }
3775                }
3776            };
3777
3778            // Parse the script
3779            let program = match crate::parser::parse(&content) {
3780                Ok(p) => p,
3781                Err(errors) => {
3782                    let msg = errors
3783                        .iter()
3784                        .map(|e| format!("{}:{}: {}", script_path.display(), e.span.start, e.message))
3785                        .collect::<Vec<_>>()
3786                        .join("\n");
3787                    return Ok(Some(ExecResult::failure(1, msg)));
3788                }
3789            };
3790
3791            // Build tool_args from args (async for command substitution support)
3792            let tool_args = self.build_args_async(args, None).await?;
3793
3794            // Create isolated scope (like user tools)
3795            let mut isolated_scope = Scope::new();
3796
3797            // Set up positional parameters ($0 = script name, $1, $2, ... = args)
3798            let positional_args: Vec<String> = tool_args.positional
3799                .iter()
3800                .map(value_to_string)
3801                .collect();
3802            isolated_scope.set_positional(name, positional_args);
3803
3804            // Save current scope and swap with isolated scope
3805            let original_scope = {
3806                let mut scope = self.scope.write().await;
3807                std::mem::replace(&mut *scope, isolated_scope)
3808            };
3809
3810            // Execute script statements — track outcome for cleanup
3811            let mut result = ExecResult::success("");
3812            let mut exec_error: Option<anyhow::Error> = None;
3813            let mut exit_code: Option<i64> = None;
3814
3815            for stmt in program.statements {
3816                if matches!(stmt, crate::ast::Stmt::Empty) {
3817                    continue;
3818                }
3819
3820                match self.execute_stmt_flow(&stmt).await {
3821                    Ok(flow) => {
3822                        match flow {
3823                            ControlFlow::Normal(r) => result = r,
3824                            ControlFlow::Return { value } => {
3825                                result = value;
3826                                break;
3827                            }
3828                            ControlFlow::Exit { code } => {
3829                                exit_code = Some(code);
3830                                break;
3831                            }
3832                            ControlFlow::Break { result: r, .. } | ControlFlow::Continue { result: r, .. } => {
3833                                result = r;
3834                            }
3835                        }
3836                    }
3837                    Err(e) => {
3838                        exec_error = Some(e);
3839                        break;
3840                    }
3841                }
3842            }
3843
3844            // Restore original scope unconditionally
3845            {
3846                let mut scope = self.scope.write().await;
3847                *scope = original_scope;
3848            }
3849
3850            // Propagate error or exit after cleanup
3851            if let Some(e) = exec_error {
3852                return Err(e.context(format!("script: {}", script_path.display())));
3853            }
3854            if let Some(code) = exit_code {
3855                result.code = code;
3856                return Ok(Some(result));
3857            }
3858
3859            return Ok(Some(result));
3860        }
3861
3862        // No script found
3863        Ok(None)
3864    }
3865
3866    /// Try to execute an external command from PATH.
3867    ///
3868    /// This is the fallback when no builtin or user-defined tool matches.
3869    /// External commands receive a clean argv (flags preserved in their original format).
3870    ///
3871    /// # Requirements
3872    /// - Command must be found in PATH
3873    /// - Current working directory must be on a real filesystem (not virtual like /v)
3874    ///
3875    /// # Returns
3876    /// - `Ok(Some(result))` if command was found and executed
3877    /// - `Ok(None)` if command was not found in PATH
3878    /// - `Err` on execution errors
3879    #[cfg(not(feature = "subprocess"))]
3880    async fn try_execute_external(&self, _name: &str, _args: &[Arg]) -> Result<Option<ExecResult>> {
3881        Ok(None)
3882    }
3883
3884    /// Try to execute an external command from PATH.
3885    #[cfg(feature = "subprocess")]
3886    #[tracing::instrument(level = "debug", skip(self, args), fields(command = %name))]
3887    async fn try_execute_external(&self, name: &str, args: &[Arg]) -> Result<Option<ExecResult>> {
3888        // Read the cancel token from `self.exec_ctx`, which `dispatch_command`
3889        // populates from the inbound ctx.cancel on every dispatch. This is
3890        // what makes the `timeout` builtin's swapped child token reach the
3891        // wait_or_kill discipline below — reading `self.cancel_token` would
3892        // give the kernel-wide token and miss the timeout's child cascade.
3893        let cancel = {
3894            let ec = self.exec_ctx.read().await;
3895            ec.cancel.clone()
3896        };
3897        let kill_grace = self.kill_grace;
3898        if !self.allow_external_commands {
3899            return Ok(None);
3900        }
3901
3902        // Get real working directory for relative path resolution and child cwd.
3903        // If the CWD is virtual (no real filesystem path), skip external command
3904        // execution entirely — return None so the dispatch can fall through to
3905        // backend-registered tools.
3906        let real_cwd = {
3907            let ctx = self.exec_ctx.read().await;
3908            match ctx.backend.resolve_real_path(&ctx.cwd) {
3909                Some(p) => p,
3910                None => return Ok(None),
3911            }
3912        };
3913
3914        let executable = if name.contains('/') {
3915            // Resolve relative paths (./script, ../bin/tool) against the shell's cwd
3916            let resolved = if std::path::Path::new(name).is_absolute() {
3917                std::path::PathBuf::from(name)
3918            } else {
3919                real_cwd.join(name)
3920            };
3921            if !resolved.exists() {
3922                return Ok(Some(ExecResult::failure(
3923                    127,
3924                    format!("{}: No such file or directory", name),
3925                )));
3926            }
3927            if !resolved.is_file() {
3928                return Ok(Some(ExecResult::failure(
3929                    126,
3930                    format!("{}: Is a directory", name),
3931                )));
3932            }
3933            #[cfg(unix)]
3934            {
3935                use std::os::unix::fs::PermissionsExt;
3936                let mode = std::fs::metadata(&resolved)
3937                    .map(|m| m.permissions().mode())
3938                    .unwrap_or(0);
3939                if mode & 0o111 == 0 {
3940                    return Ok(Some(ExecResult::failure(
3941                        126,
3942                        format!("{}: Permission denied", name),
3943                    )));
3944                }
3945            }
3946            resolved.to_string_lossy().into_owned()
3947        } else {
3948            // Get PATH from scope or environment
3949            let path_var = {
3950                let scope = self.scope.read().await;
3951                scope
3952                    .get("PATH")
3953                    .map(value_to_string)
3954                    .unwrap_or_else(|| std::env::var("PATH").unwrap_or_default())
3955            };
3956
3957            // Resolve command in PATH
3958            match resolve_in_path(name, &path_var) {
3959                Some(path) => path,
3960                None => return Ok(None), // Not found - let caller handle error
3961            }
3962        };
3963
3964        tracing::debug!(executable = %executable, "resolved external command");
3965
3966        // Build flat argv (preserves flag format)
3967        let argv = self.build_args_flat(args).await?;
3968
3969        // Get stdin if available
3970        let stdin_data = {
3971            let mut ctx = self.exec_ctx.write().await;
3972            ctx.take_stdin()
3973        };
3974
3975        // Build and spawn the command
3976        use tokio::process::Command;
3977
3978        let mut cmd = Command::new(&executable);
3979        cmd.args(&argv);
3980        cmd.current_dir(&real_cwd);
3981
3982        // Hermetic env: child sees only kaish's exported vars, not the kaish
3983        // process's OS env. Frontends that want OS-env passthrough (REPL, MCP)
3984        // populate it via KernelConfig::initial_vars at construction.
3985        cmd.env_clear();
3986        {
3987            let scope = self.scope.read().await;
3988            for (var_name, value) in scope.exported_vars() {
3989                cmd.env(var_name, value_to_string(&value));
3990            }
3991        }
3992
3993        // Handle stdin
3994        cmd.stdin(if stdin_data.is_some() {
3995            std::process::Stdio::piped()
3996        } else if self.interactive {
3997            std::process::Stdio::inherit()
3998        } else {
3999            std::process::Stdio::null()
4000        });
4001
4002        // In interactive mode, standalone or last-in-pipeline commands inherit
4003        // the terminal's stdout/stderr so output streams in real-time.
4004        // First/middle commands must capture stdout for the pipe — same as bash.
4005        let pipeline_position = {
4006            let ctx = self.exec_ctx.read().await;
4007            ctx.pipeline_position
4008        };
4009        let inherit_output = self.interactive
4010            && matches!(pipeline_position, PipelinePosition::Only | PipelinePosition::Last);
4011
4012        if inherit_output {
4013            cmd.stdout(std::process::Stdio::inherit());
4014            cmd.stderr(std::process::Stdio::inherit());
4015        } else {
4016            cmd.stdout(std::process::Stdio::piped());
4017            cmd.stderr(std::process::Stdio::piped());
4018        }
4019
4020        // On Unix, always put the child in its own process group so cancellation
4021        // can `killpg` the whole tree (the child plus any grandchildren).
4022        // Restoring default tty-related signal handlers stays gated on
4023        // job-control mode — those only matter when the child has a controlling
4024        // terminal.
4025        #[cfg(unix)]
4026        {
4027            let restore_jc_signals = self.terminal_state.is_some() && inherit_output;
4028            // SAFETY: setpgid and sigaction(SIG_DFL) are async-signal-safe per POSIX
4029            #[allow(unsafe_code)]
4030            unsafe {
4031                cmd.pre_exec(move || {
4032                    // Own process group — for kill scope.
4033                    nix::unistd::setpgid(nix::unistd::Pid::from_raw(0), nix::unistd::Pid::from_raw(0))
4034                        .map_err(|e| std::io::Error::from_raw_os_error(e as i32))?;
4035                    if restore_jc_signals {
4036                        use nix::libc::{sigaction, SIGTSTP, SIGTTOU, SIGTTIN, SIGINT, SIG_DFL};
4037                        let mut sa: nix::libc::sigaction = std::mem::zeroed();
4038                        sa.sa_sigaction = SIG_DFL;
4039                        if sigaction(SIGTSTP, &sa, std::ptr::null_mut()) != 0 {
4040                            return Err(std::io::Error::last_os_error());
4041                        }
4042                        if sigaction(SIGTTOU, &sa, std::ptr::null_mut()) != 0 {
4043                            return Err(std::io::Error::last_os_error());
4044                        }
4045                        if sigaction(SIGTTIN, &sa, std::ptr::null_mut()) != 0 {
4046                            return Err(std::io::Error::last_os_error());
4047                        }
4048                        if sigaction(SIGINT, &sa, std::ptr::null_mut()) != 0 {
4049                            return Err(std::io::Error::last_os_error());
4050                        }
4051                    }
4052                    Ok(())
4053                });
4054            }
4055        }
4056
4057        // Backstop for kill on drop in case our explicit kill path is bypassed
4058        // (panic, early return, etc) on the **capture** wait path. We do NOT
4059        // set this on the JC inherit path: that uses sync `waitpid` outside
4060        // tokio's view of the child, so on drop tokio would try to kill an
4061        // already-reaped (possibly-reused) PID. The JC path has its own
4062        // cancel handling via the side-task watcher.
4063        let in_jc_inherit_path = inherit_output && self.terminal_state.is_some();
4064        if !in_jc_inherit_path {
4065            cmd.kill_on_drop(true);
4066        }
4067
4068        // Spawn the process. Capture a `KillTarget` immediately so cancel/
4069        // timeout paths can deliver signals via pidfd (Linux ≥ 5.3) — bound
4070        // to this process's generation, immune to PID reuse if the OS reaps
4071        // the child before our kill syscalls fire.
4072        let mut child = match cmd.spawn() {
4073            Ok(child) => child,
4074            Err(e) => {
4075                return Ok(Some(ExecResult::failure(
4076                    127,
4077                    format!("{}: {}", name, e),
4078                )));
4079            }
4080        };
4081        let kill_target = crate::pidfd::KillTarget::from_child(&child);
4082
4083        // If this external runs on behalf of a background job, record its
4084        // process group on the job so `kill -<sig> %N` can signal the real
4085        // process directly (STOP/CONT/USR1/…, not just terminate). The child
4086        // did `setpgid(0, 0)` in pre_exec, so its PGID equals its PID.
4087        if let Some(job_id) = self.bg_job_id
4088            && let Some(pid) = child.id()
4089        {
4090            self.jobs.add_pgid(job_id, pid).await;
4091        }
4092
4093        // Write stdin if present
4094        if let Some(data) = stdin_data
4095            && let Some(mut stdin) = child.stdin.take()
4096        {
4097            use tokio::io::AsyncWriteExt;
4098            if let Err(e) = stdin.write_all(data.as_bytes()).await {
4099                return Ok(Some(ExecResult::failure(
4100                    1,
4101                    format!("{}: failed to write stdin: {}", name, e),
4102                )));
4103            }
4104            // Drop stdin to signal EOF
4105        }
4106
4107        if inherit_output {
4108            // Job control path: use waitpid with WUNTRACED for Ctrl-Z support
4109            #[cfg(unix)]
4110            if let Some(ref term) = self.terminal_state {
4111                let child_id = child.id().unwrap_or(0);
4112                let pid = nix::unistd::Pid::from_raw(child_id as i32);
4113                let pgid = pid; // child is its own pgid leader
4114
4115                // Give the terminal to the child's process group
4116                if let Err(e) = term.give_terminal_to(pgid) {
4117                    tracing::warn!("failed to give terminal to child: {}", e);
4118                }
4119
4120                let term_clone = term.clone();
4121                let cmd_name = name.to_string();
4122                let cmd_display = format!("{} {}", name, argv.join(" "));
4123                let jobs = self.jobs.clone();
4124
4125                // Side task that watches for cancellation while the blocking
4126                // waitpid runs. On cancel, it SIGTERMs the process group, waits
4127                // the grace period, then SIGKILLs. The blocking waitpid returns
4128                // when the child dies. AbortOnDrop guard cancels the watcher
4129                // on the success path so it doesn't keep running after wait
4130                // returns naturally.
4131                //
4132                // `wait_complete` shrinks the PID-reuse race: the watcher
4133                // checks it before each kill syscall and bails out if
4134                // wait_for_foreground has already reaped the child. This
4135                // doesn't fully eliminate the race (atomic load + kill is
4136                // not atomic with the OS reap+reuse), but narrows the window
4137                // to nanoseconds — enough to be ignorable in practice.
4138                let wait_complete = std::sync::Arc::new(
4139                    std::sync::atomic::AtomicBool::new(false)
4140                );
4141                let cancel_watcher = {
4142                    let cancel = cancel.clone();
4143                    let wc = wait_complete.clone();
4144                    // Ownership transfer: the JC path's sync wait inside
4145                    // block_in_place owns the child's reaping, so the
4146                    // cancel_watcher drives the kill side via KillTarget
4147                    // (pidfd-bound on Linux). When kill_target is None
4148                    // (older kernel + open failure, or non-Linux), falls
4149                    // through to the older PID-based path the closure
4150                    // captures from `pid`.
4151                    let target = kill_target.as_ref().map(|t| {
4152                        // Re-borrow the components we need into Owned-ish form
4153                        // so the spawned task is 'static. We can't move
4154                        // KillTarget directly because try_execute_external
4155                        // still uses it after the spawn — but on the JC path
4156                        // there is no further use after the watcher spawn,
4157                        // so a clone-of-pid + owned None pidfd is safe.
4158                        // Simpler: signal via the existing target by cloning
4159                        // a fresh pidfd; the original keeps its handle.
4160                        // Pidfd is just an OwnedFd — not Clone — so do it
4161                        // by re-opening from the pid. Fall back if reopen
4162                        // fails (race already reaped → best-effort kill).
4163                        crate::pidfd::KillTarget::from_pid(t.pid())
4164                    });
4165                    tokio::spawn(async move {
4166                        cancel.cancelled().await;
4167                        if wc.load(std::sync::atomic::Ordering::SeqCst) { return; }
4168                        use nix::sys::signal::Signal;
4169                        if let Some(t) = &target {
4170                            t.signal(Signal::SIGTERM);
4171                            t.signal_pg(Signal::SIGTERM);
4172                        } else {
4173                            let _ = nix::sys::signal::kill(pid, Signal::SIGTERM);
4174                            let _ = nix::sys::signal::killpg(pid, Signal::SIGTERM);
4175                        }
4176                        if kill_grace > Duration::ZERO {
4177                            tokio::time::sleep(kill_grace).await;
4178                            if wc.load(std::sync::atomic::Ordering::SeqCst) { return; }
4179                        }
4180                        if let Some(t) = &target {
4181                            t.signal(Signal::SIGKILL);
4182                            t.signal_pg(Signal::SIGKILL);
4183                        } else {
4184                            let _ = nix::sys::signal::kill(pid, Signal::SIGKILL);
4185                            let _ = nix::sys::signal::killpg(pid, Signal::SIGKILL);
4186                        }
4187                    })
4188                };
4189                struct AbortOnDrop(tokio::task::JoinHandle<()>);
4190                impl Drop for AbortOnDrop {
4191                    fn drop(&mut self) {
4192                        self.0.abort();
4193                    }
4194                }
4195                let _watcher_guard = AbortOnDrop(cancel_watcher);
4196
4197                let wait_complete_setter = wait_complete.clone();
4198                let code = tokio::task::block_in_place(move || {
4199                    let result = term_clone.wait_for_foreground(pid);
4200                    // Mark wait done before the watcher might fire.
4201                    wait_complete_setter.store(true, std::sync::atomic::Ordering::SeqCst);
4202
4203                    // Always reclaim the terminal
4204                    if let Err(e) = term_clone.reclaim_terminal() {
4205                        tracing::warn!("failed to reclaim terminal: {}", e);
4206                    }
4207
4208                    match result {
4209                        crate::terminal::WaitResult::Exited(code) => code as i64,
4210                        crate::terminal::WaitResult::Signaled(sig) => 128 + sig as i64,
4211                        crate::terminal::WaitResult::Stopped(_sig) => {
4212                            // Register as a stopped job
4213                            let rt = tokio::runtime::Handle::current();
4214                            let job_id = rt.block_on(jobs.register_stopped(
4215                                cmd_display,
4216                                child_id,
4217                                child_id, // pgid = pid for group leader
4218                            ));
4219                            eprintln!("\n[{}]+ Stopped\t{}", job_id, cmd_name);
4220                            148 // 128 + SIGTSTP(20) on most systems, but we use a fixed value
4221                        }
4222                    }
4223                });
4224
4225                return Ok(Some(ExecResult::from_output(code, String::new(), String::new())));
4226            }
4227
4228            // Non-job-control path with inherited stdio.
4229            let status = match wait_or_kill(&mut child, kill_target.as_ref(), &cancel, kill_grace).await {
4230                Ok(s) => s,
4231                Err(e) => {
4232                    return Ok(Some(ExecResult::failure(
4233                        1,
4234                        format!("{}: failed to wait: {}", name, e),
4235                    )));
4236                }
4237            };
4238
4239            let code = status.code().unwrap_or_else(|| {
4240                #[cfg(unix)]
4241                {
4242                    use std::os::unix::process::ExitStatusExt;
4243                    128 + status.signal().unwrap_or(0)
4244                }
4245                #[cfg(not(unix))]
4246                {
4247                    -1
4248                }
4249            }) as i64;
4250
4251            // stdout/stderr already went to the terminal
4252            Ok(Some(ExecResult::from_output(code, String::new(), String::new())))
4253        } else {
4254            // Capture output via bounded streams
4255            let stdout_stream = Arc::new(BoundedStream::new(DEFAULT_STREAM_MAX_SIZE));
4256            let stderr_stream = Arc::new(BoundedStream::new(DEFAULT_STREAM_MAX_SIZE));
4257
4258            let stdout_pipe = child.stdout.take();
4259            let stderr_pipe = child.stderr.take();
4260
4261            let stdout_clone = stdout_stream.clone();
4262            let stderr_clone = stderr_stream.clone();
4263
4264            let stdout_task = stdout_pipe.map(|pipe| {
4265                tokio::spawn(async move {
4266                    drain_to_stream(pipe, stdout_clone).await;
4267                })
4268            });
4269
4270            let stderr_task = stderr_pipe.map(|pipe| {
4271                tokio::spawn(async move {
4272                    drain_to_stream(pipe, stderr_clone).await;
4273                })
4274            });
4275
4276            let cancelled_before_wait = cancel.is_cancelled();
4277            let status = match wait_or_kill(&mut child, kill_target.as_ref(), &cancel, kill_grace).await {
4278                Ok(s) => s,
4279                Err(e) => {
4280                    if let Some(task) = stdout_task { task.abort(); let _ = task.await; }
4281                    if let Some(task) = stderr_task { task.abort(); let _ = task.await; }
4282                    return Ok(Some(ExecResult::failure(
4283                        1,
4284                        format!("{}: failed to wait: {}", name, e),
4285                    )));
4286                }
4287            };
4288
4289            // On cancel, abort the drain tasks (the child's pipes are gone;
4290            // late output is lost but predictable death beats partial capture).
4291            // On normal exit, await drains so we don't lose buffered output.
4292            if cancelled_before_wait || cancel.is_cancelled() {
4293                if let Some(task) = stdout_task { task.abort(); let _ = task.await; }
4294                if let Some(task) = stderr_task { task.abort(); let _ = task.await; }
4295            } else {
4296                if let Some(task) = stdout_task {
4297                    // Ignore join error — the drain task logs its own errors
4298                    let _ = task.await;
4299                }
4300                if let Some(task) = stderr_task {
4301                    let _ = task.await;
4302                }
4303            }
4304
4305            let code = status.code().unwrap_or_else(|| {
4306                #[cfg(unix)]
4307                {
4308                    use std::os::unix::process::ExitStatusExt;
4309                    128 + status.signal().unwrap_or(0)
4310                }
4311                #[cfg(not(unix))]
4312                {
4313                    -1
4314                }
4315            }) as i64;
4316
4317            let stdout = stdout_stream.read_string().await;
4318            let stderr = stderr_stream.read_string().await;
4319
4320            Ok(Some(ExecResult::from_output(code, stdout, stderr)))
4321        }
4322    }
4323
4324    // --- Variable Access ---
4325
4326    /// Get a variable value.
4327    pub async fn get_var(&self, name: &str) -> Option<Value> {
4328        let scope = self.scope.read().await;
4329        scope.get(name).cloned()
4330    }
4331
4332    /// Check if error-exit mode is enabled (for testing).
4333    #[cfg(test)]
4334    pub async fn error_exit_enabled(&self) -> bool {
4335        let scope = self.scope.read().await;
4336        scope.error_exit_enabled()
4337    }
4338
4339    /// Set a variable value.
4340    pub async fn set_var(&self, name: &str, value: Value) {
4341        let mut scope = self.scope.write().await;
4342        scope.set(name.to_string(), value);
4343    }
4344
4345    /// Set positional parameters ($0 script name and $1-$9 args).
4346    pub async fn set_positional(&self, script_name: impl Into<String>, args: Vec<String>) {
4347        let mut scope = self.scope.write().await;
4348        scope.set_positional(script_name, args);
4349    }
4350
4351    /// List all variables.
4352    pub async fn list_vars(&self) -> Vec<(String, Value)> {
4353        let scope = self.scope.read().await;
4354        scope.all()
4355    }
4356
4357    /// List exported variables (name, value), sorted by name. These are the
4358    /// vars a child process would see (see `dispatch`'s hermetic env build).
4359    pub async fn exported_vars(&self) -> Vec<(String, Value)> {
4360        let scope = self.scope.read().await;
4361        scope.exported_vars()
4362    }
4363
4364    // --- CWD ---
4365
4366    /// Get current working directory.
4367    pub async fn cwd(&self) -> PathBuf {
4368        self.exec_ctx.read().await.cwd.clone()
4369    }
4370
4371    /// Set current working directory.
4372    pub async fn set_cwd(&self, path: PathBuf) {
4373        let mut ctx = self.exec_ctx.write().await;
4374        ctx.set_cwd(path);
4375    }
4376
4377    /// Set the working directory only if `path` resolves to a directory in the
4378    /// kernel's backend — the same namespace `cd` validates against. Unlike a
4379    /// raw host-FS `is_dir()` check, this correctly accepts virtual mounts
4380    /// (`/v/docs`, in-memory scratch, …) and rejects real paths that have since
4381    /// disappeared. Returns whether the cwd was changed.
4382    pub async fn try_set_cwd(&self, path: PathBuf) -> bool {
4383        // Clone the backend Arc out before the stat so we never hold the
4384        // exec_ctx lock across the await.
4385        let backend = self.exec_ctx.read().await.backend.clone();
4386        let is_dir = matches!(backend.stat(&path).await, Ok(entry) if entry.is_dir());
4387        if is_dir {
4388            self.exec_ctx.write().await.set_cwd(path);
4389        }
4390        is_dir
4391    }
4392
4393    // --- Last Result ---
4394
4395    /// Get the last result ($?).
4396    pub async fn last_result(&self) -> ExecResult {
4397        let scope = self.scope.read().await;
4398        scope.last_result().clone()
4399    }
4400
4401    // --- Tools ---
4402
4403    /// Check if a user-defined function exists.
4404    pub async fn has_function(&self, name: &str) -> bool {
4405        self.user_tools.read().await.contains_key(name)
4406    }
4407
4408    /// Get available tool schemas.
4409    pub fn tool_schemas(&self) -> Vec<crate::tools::ToolSchema> {
4410        self.tools.schemas()
4411    }
4412
4413    // --- Jobs ---
4414
4415    /// Get job manager.
4416    pub fn jobs(&self) -> Arc<JobManager> {
4417        self.jobs.clone()
4418    }
4419
4420    // --- VFS ---
4421
4422    /// Get VFS router.
4423    pub fn vfs(&self) -> Arc<VfsRouter> {
4424        self.vfs.clone()
4425    }
4426
4427    // --- State ---
4428
4429    /// Reset kernel to initial state.
4430    ///
4431    /// Clears in-memory variables and resets cwd to root.
4432    /// History is not cleared (it persists across resets).
4433    pub async fn reset(&self) -> Result<()> {
4434        {
4435            let mut scope = self.scope.write().await;
4436            *scope = Scope::new();
4437        }
4438        {
4439            let mut ctx = self.exec_ctx.write().await;
4440            ctx.cwd = PathBuf::from("/");
4441        }
4442        Ok(())
4443    }
4444
4445    /// Shutdown the kernel.
4446    pub async fn shutdown(self) -> Result<()> {
4447        // Wait for all background jobs
4448        self.jobs.wait_all().await;
4449        Ok(())
4450    }
4451
4452    /// Dispatch a single command using the full resolution chain.
4453    ///
4454    /// This is the core of `CommandDispatcher` — it syncs state between the
4455    /// passed-in `ExecContext` and kernel-internal state (scope, exec_ctx),
4456    /// then delegates to `execute_command` for the actual dispatch.
4457    ///
4458    /// State flow:
4459    /// 1. ctx → self: sync scope, cwd, stdin so internal methods see current state
4460    /// 2. execute_command: full dispatch chain (user tools, builtins, scripts, external, backend)
4461    /// 3. self → ctx: sync scope, cwd changes back so the pipeline runner sees them
4462    async fn dispatch_command(&self, cmd: &Command, ctx: &mut ExecContext) -> Result<ExecResult> {
4463        // Ensure nested dispatch (e.g. the `timeout` builtin re-dispatching
4464        // its inner command via ctx.dispatcher) routes through THIS kernel,
4465        // not a stale parent. Critical for forks: the fork's builtins must
4466        // use the fork's dispatcher, not the parent's.
4467        if let Some(d) = self.dispatcher() {
4468            ctx.dispatcher = Some(d);
4469        }
4470
4471        // 1. Sync ctx → self internals
4472        {
4473            let mut scope = self.scope.write().await;
4474            *scope = ctx.scope.clone();
4475        }
4476        {
4477            let mut ec = self.exec_ctx.write().await;
4478            ec.cwd = ctx.cwd.clone();
4479            ec.prev_cwd = ctx.prev_cwd.clone();
4480            ec.stdin = ctx.stdin.take();
4481            ec.stdin_data = ctx.stdin_data.take();
4482            // Streaming pipe endpoints and kernel stderr must flow to the
4483            // tool via self.exec_ctx — execute_command reads that, not the
4484            // passed-in ctx. Without moving these, concurrent pipeline
4485            // stages dispatched via a fork get pipe_stdin = None and
4486            // silently read nothing.
4487            ec.pipe_stdin = ctx.pipe_stdin.take();
4488            ec.pipe_stdout = ctx.pipe_stdout.take();
4489            if let Some(stderr) = ctx.stderr.clone() {
4490                ec.stderr = Some(stderr);
4491            }
4492            ec.aliases = ctx.aliases.clone();
4493            ec.ignore_config = ctx.ignore_config.clone();
4494            ec.output_limit = ctx.output_limit.clone();
4495            ec.pipeline_position = ctx.pipeline_position;
4496            // Sync the cancel token from ctx → ec. Builtins like `timeout`
4497            // swap ctx.cancel to a derived child token before re-dispatching;
4498            // execute_command's snapshot reads ec.cancel (kept aligned by
4499            // this sync), so try_execute_external sees the right token.
4500            ec.cancel = ctx.cancel.clone();
4501            // Same alignment for the watchdog: a fork dispatching through its
4502            // own kernel must hand the shared script clock to the snapshot so
4503            // patient holds in forked stages suspend the right timer.
4504            ec.watchdog = ctx.watchdog.clone();
4505        }
4506
4507        // 2. Execute via the full dispatch chain
4508        let result = self.execute_command(&cmd.name, &cmd.args).await?;
4509
4510        // 3. Sync self → ctx
4511        {
4512            let scope = self.scope.read().await;
4513            ctx.scope = scope.clone();
4514        }
4515        {
4516            let mut ec = self.exec_ctx.write().await;
4517            ctx.cwd = ec.cwd.clone();
4518            ctx.prev_cwd = ec.prev_cwd.clone();
4519            ctx.aliases = ec.aliases.clone();
4520            ctx.ignore_config = ec.ignore_config.clone();
4521            ctx.output_limit = ec.output_limit.clone();
4522            // Return any pipe endpoints that the tool didn't consume.
4523            // `take()` here keeps the fork's exec_ctx in a clean state for
4524            // the next dispatch — these are per-command and shouldn't leak
4525            // between calls.
4526            ctx.pipe_stdin = ec.pipe_stdin.take();
4527            ctx.pipe_stdout = ec.pipe_stdout.take();
4528        }
4529
4530        Ok(result)
4531    }
4532}
4533
4534#[async_trait]
4535impl CommandDispatcher for Kernel {
4536    /// Dispatch a command through the Kernel's full resolution chain.
4537    ///
4538    /// This is the single path for all command execution when called from
4539    /// the pipeline runner. It provides the full dispatch chain:
4540    /// user tools → builtins → .kai scripts → external commands → backend tools.
4541    async fn dispatch(&self, cmd: &Command, ctx: &mut ExecContext) -> Result<ExecResult> {
4542        self.dispatch_command(cmd, ctx).await
4543    }
4544
4545    /// Evaluate an expression through the kernel's async chain, including
4546    /// command substitution. Delegates to `eval_expr_async`, which snapshots
4547    /// the kernel's scope/cwd and restores them after any `$(...)` runs, so
4548    /// only command output escapes. The `ctx` is unused here because the
4549    /// kernel evaluates against its own session state (a fork carries the
4550    /// pipeline stage's snapshot); var refs resolve against that scope.
4551    async fn eval_expr(&self, expr: &Expr, _ctx: &ExecContext) -> Result<Value> {
4552        self.eval_expr_async(expr).await
4553    }
4554
4555    /// Produce a forked dispatcher with independent mutable state (detached).
4556    ///
4557    /// Calls the inherent `Kernel::fork` method (note the UFCS to avoid
4558    /// recursing into the trait method we're defining) and coerces the
4559    /// returned `Arc<Kernel>` to `Arc<dyn CommandDispatcher>`.
4560    async fn fork(&self) -> Arc<dyn CommandDispatcher> {
4561        let fork: Arc<Kernel> = Kernel::fork(self).await;
4562        fork
4563    }
4564
4565    /// Produce a forked dispatcher with cancellation cascading from this kernel.
4566    async fn fork_attached(&self) -> Arc<dyn CommandDispatcher> {
4567        let fork: Arc<Kernel> = Kernel::fork_attached(self).await;
4568        fork
4569    }
4570}
4571
4572/// Apply the requested output format to a builtin's result, unless the tool
4573/// owns its own output.
4574///
4575/// `format` is `ctx.output_format` (set from `--json`). When `owns_output` is
4576/// true the tool already rendered its bytes (bespoke JSON envelope), so the
4577/// kernel leaves the result untouched rather than re-formatting its
4578/// `OutputData`. Otherwise the kernel renders the typed `OutputData` uniformly.
4579fn finalize_output(
4580    result: ExecResult,
4581    format: Option<crate::interpreter::OutputFormat>,
4582    owns_output: bool,
4583) -> ExecResult {
4584    match format {
4585        Some(_) if owns_output => result,
4586        Some(format) => apply_output_format(result, format),
4587        None => result,
4588    }
4589}
4590
4591/// Accumulate output from one result into another.
4592///
4593/// Appends stdout and stderr verbatim and updates the exit code to match the
4594/// new result. Used to preserve output from multiple statements, loop
4595/// iterations, and command chains. No separator is inserted between outputs —
4596/// each command's output concatenates raw, matching bash (`printf a; printf b`
4597/// and `printf a && printf b` both yield `ab`; a trailing newline only appears
4598/// when a command emits its own, as `echo` does).
4599fn accumulate_result(accumulated: &mut ExecResult, new: &ExecResult) {
4600    // Materialize lazy OutputData into .out before accumulating.
4601    // Without this, the first command's output stays in .output while
4602    // the second's text gets appended to .out, losing the first.
4603    accumulated.materialize();
4604    accumulated.push_out(&new.text_out());
4605    accumulated.err.push_str(&new.err);
4606    accumulated.code = new.code;
4607    accumulated.data = new.data.clone();
4608    accumulated.did_spill = new.did_spill;
4609    accumulated.original_code = new.original_code;
4610    accumulated.content_type = new.content_type.clone();
4611    accumulated.baggage.clone_from(&new.baggage);
4612}
4613
4614/// Fold a loop's accumulated output into a break/continue signal that is
4615/// propagating to an *outer* loop. Output printed before `break N`/`continue N`
4616/// (with `N > 1`) would otherwise be discarded when the signal replaces the
4617/// loop's result on its way up. The loop's output comes first (it ran before
4618/// the signal was raised), then the signal's already-carried output.
4619fn fold_loop_output_into_flow(loop_output: ExecResult, flow: &mut ControlFlow) {
4620    if let ControlFlow::Break { result, .. } | ControlFlow::Continue { result, .. } = flow {
4621        let mut merged = loop_output;
4622        accumulate_result(&mut merged, result);
4623        *result = merged;
4624    }
4625}
4626
4627/// Accumulate the output a break/continue signal carried (from inner loops it
4628/// propagated through) into the loop that finally handles it, so it survives
4629/// into that loop's result.
4630fn accumulate_flow_output(accumulated: &mut ExecResult, flow: &ControlFlow) {
4631    if let ControlFlow::Break { result, .. } | ControlFlow::Continue { result, .. } = flow {
4632        accumulate_result(accumulated, result);
4633    }
4634}
4635
4636/// Check if a value is truthy.
4637fn is_truthy(value: &Value) -> bool {
4638    match value {
4639        Value::Null => false,
4640        Value::Bool(b) => *b,
4641        Value::Int(i) => *i != 0,
4642        Value::Float(f) => *f != 0.0,
4643        Value::String(s) => !s.is_empty(),
4644        Value::Json(json) => match json {
4645            serde_json::Value::Null => false,
4646            serde_json::Value::Array(arr) => !arr.is_empty(),
4647            serde_json::Value::Object(obj) => !obj.is_empty(),
4648            serde_json::Value::Bool(b) => *b,
4649            serde_json::Value::Number(n) => n.as_f64().map(|f| f != 0.0).unwrap_or(false),
4650            serde_json::Value::String(s) => !s.is_empty(),
4651        },
4652        Value::Blob(_) => true, // Blob references are always truthy
4653    }
4654}
4655
4656/// Apply tilde expansion to a value.
4657///
4658/// Only string values starting with `~` are expanded. `home` is the session
4659/// `HOME` from the kernel scope (the kernel is hermetic and never reads the
4660/// host env); `None` leaves `~`/`~/path` unexpanded. See [`expand_tilde`].
4661fn apply_tilde_expansion(value: Value, home: Option<&str>) -> Value {
4662    match value {
4663        Value::String(s) if s.starts_with('~') => Value::String(expand_tilde(&s, home)),
4664        _ => value,
4665    }
4666}
4667
4668/// Wait for a child to exit, killing it if `cancel` fires first.
4669///
4670/// `target` carries a Linux pidfd (when available) for race-free direct-child
4671/// kill; fall-through to PID-based kill otherwise. On non-unix targets the
4672/// parameter is ignored and we use tokio's cross-platform `start_kill`.
4673#[cfg(all(unix, feature = "subprocess"))]
4674pub(crate) async fn wait_or_kill(
4675    child: &mut tokio::process::Child,
4676    target: Option<&crate::pidfd::KillTarget>,
4677    cancel: &tokio_util::sync::CancellationToken,
4678    grace: Duration,
4679) -> std::io::Result<std::process::ExitStatus> {
4680    tokio::select! {
4681        biased;
4682        status = child.wait() => status,
4683        _ = cancel.cancelled() => kill_with_grace(child, target, grace).await,
4684    }
4685}
4686
4687#[cfg(all(not(unix), feature = "subprocess"))]
4688pub(crate) async fn wait_or_kill(
4689    child: &mut tokio::process::Child,
4690    _target: Option<&()>,
4691    cancel: &tokio_util::sync::CancellationToken,
4692    _grace: Duration,
4693) -> std::io::Result<std::process::ExitStatus> {
4694    tokio::select! {
4695        biased;
4696        status = child.wait() => status,
4697        _ = cancel.cancelled() => {
4698            let _ = child.start_kill();
4699            child.wait().await
4700        }
4701    }
4702}
4703
4704/// Send SIGTERM to the child and its process group; wait `grace`; then SIGKILL.
4705///
4706/// Direct-child kill goes through `target.signal()`, which on Linux uses a
4707/// pidfd (immune to PID reuse). Process-group kill uses `killpg` — there is
4708/// no PGID-equivalent of pidfd, so grandchildren retain a small reuse window.
4709#[cfg(all(unix, feature = "subprocess"))]
4710pub(crate) async fn kill_with_grace(
4711    child: &mut tokio::process::Child,
4712    target: Option<&crate::pidfd::KillTarget>,
4713    grace: Duration,
4714) -> std::io::Result<std::process::ExitStatus> {
4715    use nix::sys::signal::Signal;
4716
4717    if let Some(t) = target {
4718        t.signal(Signal::SIGTERM);
4719        t.signal_pg(Signal::SIGTERM);
4720        if grace > Duration::ZERO
4721            && let Ok(status) = tokio::time::timeout(grace, child.wait()).await
4722        {
4723            return status;
4724        }
4725        t.signal(Signal::SIGKILL);
4726        t.signal_pg(Signal::SIGKILL);
4727    }
4728    child.wait().await
4729}
4730
4731#[cfg(all(test, feature = "subprocess"))]
4732mod tests {
4733    use super::*;
4734
4735    #[tokio::test]
4736    async fn test_kernel_transient() {
4737        let kernel = Kernel::transient().expect("failed to create kernel");
4738        assert_eq!(kernel.name(), "transient");
4739    }
4740
4741    #[tokio::test]
4742    async fn test_kernel_execute_echo() {
4743        let kernel = Kernel::transient().expect("failed to create kernel");
4744        let result = kernel.execute("echo hello").await.expect("execution failed");
4745        assert!(result.ok());
4746        assert_eq!(result.text_out().trim(), "hello");
4747    }
4748
4749    #[tokio::test]
4750    async fn test_multiple_statements_accumulate_output() {
4751        let kernel = Kernel::transient().expect("failed to create kernel");
4752        let result = kernel
4753            .execute("echo one\necho two\necho three")
4754            .await
4755            .expect("execution failed");
4756        assert!(result.ok());
4757        // Should have all three outputs separated by newlines
4758        assert!(result.text_out().contains("one"), "missing 'one': {}", result.text_out());
4759        assert!(result.text_out().contains("two"), "missing 'two': {}", result.text_out());
4760        assert!(result.text_out().contains("three"), "missing 'three': {}", result.text_out());
4761    }
4762
4763    #[tokio::test]
4764    async fn test_and_chain_accumulates_output() {
4765        let kernel = Kernel::transient().expect("failed to create kernel");
4766        let result = kernel
4767            .execute("echo first && echo second")
4768            .await
4769            .expect("execution failed");
4770        assert!(result.ok());
4771        assert!(result.text_out().contains("first"), "missing 'first': {}", result.text_out());
4772        assert!(result.text_out().contains("second"), "missing 'second': {}", result.text_out());
4773    }
4774
4775    #[tokio::test]
4776    async fn test_for_loop_accumulates_output() {
4777        let kernel = Kernel::transient().expect("failed to create kernel");
4778        let result = kernel
4779            .execute(r#"for X in a b c; do echo "item: ${X}"; done"#)
4780            .await
4781            .expect("execution failed");
4782        assert!(result.ok());
4783        assert!(result.text_out().contains("item: a"), "missing 'item: a': {}", result.text_out());
4784        assert!(result.text_out().contains("item: b"), "missing 'item: b': {}", result.text_out());
4785        assert!(result.text_out().contains("item: c"), "missing 'item: c': {}", result.text_out());
4786    }
4787
4788    #[tokio::test]
4789    async fn test_while_loop_accumulates_output() {
4790        let kernel = Kernel::transient().expect("failed to create kernel");
4791        let result = kernel
4792            .execute(r#"
4793                N=3
4794                while [[ ${N} -gt 0 ]]; do
4795                    echo "N=${N}"
4796                    N=$((N - 1))
4797                done
4798            "#)
4799            .await
4800            .expect("execution failed");
4801        assert!(result.ok());
4802        assert!(result.text_out().contains("N=3"), "missing 'N=3': {}", result.text_out());
4803        assert!(result.text_out().contains("N=2"), "missing 'N=2': {}", result.text_out());
4804        assert!(result.text_out().contains("N=1"), "missing 'N=1': {}", result.text_out());
4805    }
4806
4807    #[tokio::test]
4808    async fn test_kernel_set_var() {
4809        let kernel = Kernel::transient().expect("failed to create kernel");
4810
4811        kernel.execute("X=42").await.expect("set failed");
4812
4813        let value = kernel.get_var("X").await;
4814        assert_eq!(value, Some(Value::Int(42)));
4815    }
4816
4817    #[tokio::test]
4818    async fn test_kernel_var_expansion() {
4819        let kernel = Kernel::transient().expect("failed to create kernel");
4820
4821        kernel.execute("NAME=\"world\"").await.expect("set failed");
4822        let result = kernel.execute("echo \"hello ${NAME}\"").await.expect("echo failed");
4823
4824        assert!(result.ok());
4825        assert_eq!(result.text_out().trim(), "hello world");
4826    }
4827
4828    #[tokio::test]
4829    async fn test_kernel_last_result() {
4830        let kernel = Kernel::transient().expect("failed to create kernel");
4831
4832        kernel.execute("echo test").await.expect("echo failed");
4833
4834        let last = kernel.last_result().await;
4835        assert!(last.ok());
4836        assert_eq!(last.text_out().trim(), "test");
4837    }
4838
4839    #[tokio::test]
4840    async fn test_kernel_tool_not_found() {
4841        let kernel = Kernel::transient().expect("failed to create kernel");
4842
4843        let result = kernel.execute("nonexistent_tool").await.expect("execution failed");
4844        assert!(!result.ok());
4845        assert_eq!(result.code, 127);
4846        assert!(result.err.contains("command not found"));
4847    }
4848
4849    #[tokio::test]
4850    async fn test_external_command_true() {
4851        // Use REPL config for passthrough filesystem access
4852        let kernel = Kernel::new(KernelConfig::repl()).expect("failed to create kernel");
4853
4854        // /bin/true should be available on any Unix system
4855        let result = kernel.execute("true").await.expect("execution failed");
4856        // This should use the builtin true, which returns 0
4857        assert!(result.ok(), "true should succeed: {:?}", result);
4858    }
4859
4860    #[tokio::test]
4861    async fn test_external_command_basic() {
4862        // Use REPL config for passthrough filesystem access
4863        let kernel = Kernel::new(KernelConfig::repl()).expect("failed to create kernel");
4864
4865        // Test with /bin/echo which is external
4866        // Note: kaish has a builtin echo, so this will use the builtin
4867        // Let's test with a command that's not a builtin
4868        // Actually, let's just test that PATH resolution works by checking the PATH var
4869        let path_var = std::env::var("PATH").unwrap_or_default();
4870        eprintln!("System PATH: {}", path_var);
4871
4872        // Set PATH in kernel to ensure it's available
4873        kernel.execute(&format!(r#"PATH="{}""#, path_var)).await.expect("set PATH failed");
4874
4875        // Now try an external command like /usr/bin/env
4876        // But env is also a builtin... let's try uname
4877        let result = kernel.execute("uname").await.expect("execution failed");
4878        eprintln!("uname result: {:?}", result);
4879        // uname should succeed if external commands work
4880        assert!(result.ok() || result.code == 127, "uname: {:?}", result);
4881    }
4882
4883    #[tokio::test]
4884    async fn test_kernel_reset() {
4885        let kernel = Kernel::transient().expect("failed to create kernel");
4886
4887        kernel.execute("X=1").await.expect("set failed");
4888        assert!(kernel.get_var("X").await.is_some());
4889
4890        kernel.reset().await.expect("reset failed");
4891        assert!(kernel.get_var("X").await.is_none());
4892    }
4893
4894    #[tokio::test]
4895    async fn test_kernel_cwd() {
4896        let kernel = Kernel::transient().expect("failed to create kernel");
4897
4898        // Transient kernel uses sandboxed mode with cwd=$HOME
4899        let cwd = kernel.cwd().await;
4900        let home = std::env::var("HOME")
4901            .map(PathBuf::from)
4902            .unwrap_or_else(|_| PathBuf::from("/"));
4903        assert_eq!(cwd, home);
4904
4905        kernel.set_cwd(PathBuf::from("/tmp")).await;
4906        assert_eq!(kernel.cwd().await, PathBuf::from("/tmp"));
4907    }
4908
4909    #[tokio::test]
4910    async fn test_kernel_list_vars() {
4911        let kernel = Kernel::transient().expect("failed to create kernel");
4912
4913        kernel.execute("A=1").await.ok();
4914        kernel.execute("B=2").await.ok();
4915
4916        let vars = kernel.list_vars().await;
4917        assert!(vars.iter().any(|(n, v)| n == "A" && *v == Value::Int(1)));
4918        assert!(vars.iter().any(|(n, v)| n == "B" && *v == Value::Int(2)));
4919    }
4920
4921    #[tokio::test]
4922    async fn test_is_truthy() {
4923        assert!(!is_truthy(&Value::Null));
4924        assert!(!is_truthy(&Value::Bool(false)));
4925        assert!(is_truthy(&Value::Bool(true)));
4926        assert!(!is_truthy(&Value::Int(0)));
4927        assert!(is_truthy(&Value::Int(1)));
4928        assert!(!is_truthy(&Value::String("".into())));
4929        assert!(is_truthy(&Value::String("x".into())));
4930    }
4931
4932    #[tokio::test]
4933    async fn test_jq_in_pipeline() {
4934        let kernel = Kernel::transient().expect("failed to create kernel");
4935        // kaish uses double quotes only; escape inner quotes
4936        let result = kernel
4937            .execute(r#"echo "{\"name\": \"Alice\"}" | jq ".name" -r"#)
4938            .await
4939            .expect("execution failed");
4940        assert!(result.ok(), "jq pipeline failed: {}", result.err);
4941        assert_eq!(result.text_out().trim(), "Alice");
4942    }
4943
4944    #[tokio::test]
4945    async fn test_user_defined_tool() {
4946        let kernel = Kernel::transient().expect("failed to create kernel");
4947
4948        // Define a function
4949        kernel
4950            .execute(r#"greet() { echo "Hello, $1!" }"#)
4951            .await
4952            .expect("function definition failed");
4953
4954        // Call the function
4955        let result = kernel
4956            .execute(r#"greet "World""#)
4957            .await
4958            .expect("function call failed");
4959
4960        assert!(result.ok(), "greet failed: {}", result.err);
4961        assert_eq!(result.text_out().trim(), "Hello, World!");
4962    }
4963
4964    #[tokio::test]
4965    async fn test_user_tool_positional_args() {
4966        let kernel = Kernel::transient().expect("failed to create kernel");
4967
4968        // Define a function with positional param
4969        kernel
4970            .execute(r#"greet() { echo "Hi $1" }"#)
4971            .await
4972            .expect("function definition failed");
4973
4974        // Call with positional argument
4975        let result = kernel
4976            .execute(r#"greet "Amy""#)
4977            .await
4978            .expect("function call failed");
4979
4980        assert!(result.ok(), "greet failed: {}", result.err);
4981        assert_eq!(result.text_out().trim(), "Hi Amy");
4982    }
4983
4984    #[tokio::test]
4985    async fn test_function_shared_scope() {
4986        let kernel = Kernel::transient().expect("failed to create kernel");
4987
4988        // Set a variable in parent scope
4989        kernel
4990            .execute(r#"SECRET="hidden""#)
4991            .await
4992            .expect("set failed");
4993
4994        // Define a function that accesses and modifies parent variable
4995        kernel
4996            .execute(r#"access_parent() {
4997                echo "${SECRET}"
4998                SECRET="modified"
4999            }"#)
5000            .await
5001            .expect("function definition failed");
5002
5003        // Call the function - it SHOULD see SECRET (shared scope like sh)
5004        let result = kernel.execute("access_parent").await.expect("function call failed");
5005
5006        // Function should have access to parent scope
5007        assert!(
5008            result.text_out().contains("hidden"),
5009            "Function should access parent scope, got: {}",
5010            result.text_out()
5011        );
5012
5013        // Function should have modified the parent variable
5014        let secret = kernel.get_var("SECRET").await;
5015        assert_eq!(
5016            secret,
5017            Some(Value::String("modified".into())),
5018            "Function should modify parent scope"
5019        );
5020    }
5021
5022    #[tokio::test]
5023    #[ignore = "exec replaces the test binary via CommandExt::exec, hangs libtest; cannot be run under cargo test"]
5024    async fn test_exec_builtin() {
5025        let kernel = Kernel::transient().expect("failed to create kernel");
5026        // argv is now a space-separated string or JSON array string
5027        let result = kernel
5028            .execute(r#"exec command="/bin/echo" argv="hello world""#)
5029            .await
5030            .expect("exec failed");
5031
5032        assert!(result.ok(), "exec failed: {}", result.err);
5033        assert_eq!(result.text_out().trim(), "hello world");
5034    }
5035
5036    #[tokio::test]
5037    async fn test_while_false_never_runs() {
5038        let kernel = Kernel::transient().expect("failed to create kernel");
5039
5040        // A while loop with false condition should never run
5041        let result = kernel
5042            .execute(r#"
5043                while false; do
5044                    echo "should not run"
5045                done
5046            "#)
5047            .await
5048            .expect("while false failed");
5049
5050        assert!(result.ok());
5051        assert!(result.text_out().is_empty(), "while false should not execute body: {}", result.text_out());
5052    }
5053
5054    #[tokio::test]
5055    async fn test_while_string_comparison() {
5056        let kernel = Kernel::transient().expect("failed to create kernel");
5057
5058        // Set a flag
5059        kernel.execute(r#"FLAG="go""#).await.expect("set failed");
5060
5061        // Use string comparison as condition (shell-compatible [[ ]] syntax)
5062        // Note: Put echo last so we can check the output
5063        let result = kernel
5064            .execute(r#"
5065                while [[ ${FLAG} == "go" ]]; do
5066                    FLAG="stop"
5067                    echo "running"
5068                done
5069            "#)
5070            .await
5071            .expect("while with string cmp failed");
5072
5073        assert!(result.ok());
5074        assert!(result.text_out().contains("running"), "should have run once: {}", result.text_out());
5075
5076        // Verify flag was changed
5077        let flag = kernel.get_var("FLAG").await;
5078        assert_eq!(flag, Some(Value::String("stop".into())));
5079    }
5080
5081    #[tokio::test]
5082    async fn test_while_numeric_comparison() {
5083        let kernel = Kernel::transient().expect("failed to create kernel");
5084
5085        // Test > comparison (shell-compatible [[ ]] with -gt)
5086        kernel.execute("N=5").await.expect("set failed");
5087
5088        // Note: Put echo last so we can check the output
5089        let result = kernel
5090            .execute(r#"
5091                while [[ ${N} -gt 3 ]]; do
5092                    N=3
5093                    echo "N was greater"
5094                done
5095            "#)
5096            .await
5097            .expect("while with > failed");
5098
5099        assert!(result.ok());
5100        assert!(result.text_out().contains("N was greater"), "should have run once: {}", result.text_out());
5101    }
5102
5103    #[tokio::test]
5104    async fn test_break_in_while_loop() {
5105        let kernel = Kernel::transient().expect("failed to create kernel");
5106
5107        let result = kernel
5108            .execute(r#"
5109                I=0
5110                while true; do
5111                    I=1
5112                    echo "before break"
5113                    break
5114                    echo "after break"
5115                done
5116            "#)
5117            .await
5118            .expect("while with break failed");
5119
5120        assert!(result.ok());
5121        assert!(result.text_out().contains("before break"), "should see before break: {}", result.text_out());
5122        assert!(!result.text_out().contains("after break"), "should not see after break: {}", result.text_out());
5123
5124        // Verify we exited the loop
5125        let i = kernel.get_var("I").await;
5126        assert_eq!(i, Some(Value::Int(1)));
5127    }
5128
5129    #[tokio::test]
5130    async fn test_continue_in_while_loop() {
5131        let kernel = Kernel::transient().expect("failed to create kernel");
5132
5133        // Test continue in a while loop where variables persist
5134        // We use string state transition: "start" -> "middle" -> "end"
5135        // continue on "middle" should skip to next iteration
5136        // Shell-compatible: use [[ ]] for comparisons
5137        let result = kernel
5138            .execute(r#"
5139                STATE="start"
5140                AFTER_CONTINUE="no"
5141                while [[ ${STATE} != "done" ]]; do
5142                    if [[ ${STATE} == "start" ]]; then
5143                        STATE="middle"
5144                        continue
5145                        AFTER_CONTINUE="yes"
5146                    fi
5147                    if [[ ${STATE} == "middle" ]]; then
5148                        STATE="done"
5149                    fi
5150                done
5151            "#)
5152            .await
5153            .expect("while with continue failed");
5154
5155        assert!(result.ok());
5156
5157        // STATE should be "done" (we completed the loop)
5158        let state = kernel.get_var("STATE").await;
5159        assert_eq!(state, Some(Value::String("done".into())));
5160
5161        // AFTER_CONTINUE should still be "no" (continue skipped the assignment)
5162        let after = kernel.get_var("AFTER_CONTINUE").await;
5163        assert_eq!(after, Some(Value::String("no".into())));
5164    }
5165
5166    #[tokio::test]
5167    async fn test_break_with_level() {
5168        let kernel = Kernel::transient().expect("failed to create kernel");
5169
5170        // Nested loop with break 2 to exit both loops
5171        // We verify by checking OUTER value:
5172        // - If break 2 works, OUTER stays at 1 (set before for loop)
5173        // - If break 2 fails, OUTER becomes 2 (set after for loop)
5174        let result = kernel
5175            .execute(r#"
5176                OUTER=0
5177                while true; do
5178                    OUTER=1
5179                    for X in "1 2"; do
5180                        break 2
5181                    done
5182                    OUTER=2
5183                done
5184            "#)
5185            .await
5186            .expect("nested break failed");
5187
5188        assert!(result.ok());
5189
5190        // OUTER should be 1 (set before for loop), not 2 (would be set after for loop)
5191        let outer = kernel.get_var("OUTER").await;
5192        assert_eq!(outer, Some(Value::Int(1)), "break 2 should have skipped OUTER=2");
5193    }
5194
5195    #[tokio::test]
5196    async fn test_return_from_tool() {
5197        let kernel = Kernel::transient().expect("failed to create kernel");
5198
5199        // Define a function that returns early
5200        kernel
5201            .execute(r#"early_return() {
5202                if [[ $1 == 1 ]]; then
5203                    return 42
5204                fi
5205                echo "not returned"
5206            }"#)
5207            .await
5208            .expect("function definition failed");
5209
5210        // Call with arg=1 should return with exit code 42
5211        // (POSIX shell behavior: return N sets exit code, doesn't output N)
5212        let result = kernel
5213            .execute("early_return 1")
5214            .await
5215            .expect("function call failed");
5216
5217        // Exit code should be 42 (non-zero, so not ok())
5218        assert_eq!(result.code, 42);
5219        // Output should be empty (we returned before echo)
5220        assert!(result.text_out().is_empty());
5221    }
5222
5223    #[tokio::test]
5224    async fn test_return_without_value() {
5225        let kernel = Kernel::transient().expect("failed to create kernel");
5226
5227        // Define a function that returns without a value
5228        kernel
5229            .execute(r#"early_exit() {
5230                if [[ $1 == "stop" ]]; then
5231                    return
5232                fi
5233                echo "continued"
5234            }"#)
5235            .await
5236            .expect("function definition failed");
5237
5238        // Call with arg="stop" should return early
5239        let result = kernel
5240            .execute(r#"early_exit "stop""#)
5241            .await
5242            .expect("function call failed");
5243
5244        assert!(result.ok());
5245        assert!(result.text_out().is_empty() || result.text_out().trim().is_empty());
5246    }
5247
5248    #[tokio::test]
5249    async fn test_exit_stops_execution() {
5250        let kernel = Kernel::transient().expect("failed to create kernel");
5251
5252        // exit should stop further execution
5253        kernel
5254            .execute(r#"
5255                BEFORE="yes"
5256                exit 0
5257                AFTER="yes"
5258            "#)
5259            .await
5260            .expect("execution failed");
5261
5262        // BEFORE should be set, AFTER should not
5263        let before = kernel.get_var("BEFORE").await;
5264        assert_eq!(before, Some(Value::String("yes".into())));
5265
5266        let after = kernel.get_var("AFTER").await;
5267        assert!(after.is_none(), "AFTER should not be set after exit");
5268    }
5269
5270    #[tokio::test]
5271    async fn test_exit_with_code() {
5272        let kernel = Kernel::transient().expect("failed to create kernel");
5273
5274        // exit with code should propagate the exit code
5275        let result = kernel
5276            .execute("exit 42")
5277            .await
5278            .expect("exit failed");
5279
5280        assert_eq!(result.code, 42);
5281        assert!(result.text_out().is_empty(), "exit should not produce stdout");
5282    }
5283
5284    #[tokio::test]
5285    async fn test_set_e_stops_on_failure() {
5286        let kernel = Kernel::transient().expect("failed to create kernel");
5287
5288        // Enable error-exit mode
5289        kernel.execute("set -e").await.expect("set -e failed");
5290
5291        // Run a sequence where the middle command fails
5292        kernel
5293            .execute(r#"
5294                STEP1="done"
5295                false
5296                STEP2="done"
5297            "#)
5298            .await
5299            .expect("execution failed");
5300
5301        // STEP1 should be set, but STEP2 should NOT be set (exit on false)
5302        let step1 = kernel.get_var("STEP1").await;
5303        assert_eq!(step1, Some(Value::String("done".into())));
5304
5305        let step2 = kernel.get_var("STEP2").await;
5306        assert!(step2.is_none(), "STEP2 should not be set after false with set -e");
5307    }
5308
5309    #[tokio::test]
5310    async fn test_set_plus_e_disables_error_exit() {
5311        let kernel = Kernel::transient().expect("failed to create kernel");
5312
5313        // Enable then disable error-exit mode
5314        kernel.execute("set -e").await.expect("set -e failed");
5315        kernel.execute("set +e").await.expect("set +e failed");
5316
5317        // Now failure should NOT stop execution
5318        kernel
5319            .execute(r#"
5320                STEP1="done"
5321                false
5322                STEP2="done"
5323            "#)
5324            .await
5325            .expect("execution failed");
5326
5327        // Both should be set since +e disables error exit
5328        let step1 = kernel.get_var("STEP1").await;
5329        assert_eq!(step1, Some(Value::String("done".into())));
5330
5331        let step2 = kernel.get_var("STEP2").await;
5332        assert_eq!(step2, Some(Value::String("done".into())));
5333    }
5334
5335    #[tokio::test]
5336    async fn test_set_ignores_unknown_options() {
5337        let kernel = Kernel::transient().expect("failed to create kernel");
5338
5339        // Bash idiom: set -euo pipefail (we support -e, ignore the rest)
5340        let result = kernel
5341            .execute("set -e -u -o pipefail")
5342            .await
5343            .expect("set with unknown options failed");
5344
5345        assert!(result.ok(), "set should succeed with unknown options");
5346
5347        // -e should still be enabled
5348        kernel
5349            .execute(r#"
5350                BEFORE="yes"
5351                false
5352                AFTER="yes"
5353            "#)
5354            .await
5355            .ok();
5356
5357        let after = kernel.get_var("AFTER").await;
5358        assert!(after.is_none(), "-e should be enabled despite unknown options");
5359    }
5360
5361    #[tokio::test]
5362    async fn test_set_no_args_shows_settings() {
5363        let kernel = Kernel::transient().expect("failed to create kernel");
5364
5365        // Enable -e
5366        kernel.execute("set -e").await.expect("set -e failed");
5367
5368        // Call set with no args to see settings
5369        let result = kernel.execute("set").await.expect("set failed");
5370
5371        assert!(result.ok());
5372        assert!(result.text_out().contains("set -e"), "should show -e is enabled: {}", result.text_out());
5373    }
5374
5375    #[tokio::test]
5376    async fn test_set_e_in_pipeline() {
5377        let kernel = Kernel::transient().expect("failed to create kernel");
5378
5379        kernel.execute("set -e").await.expect("set -e failed");
5380
5381        // Pipeline failure should trigger exit
5382        kernel
5383            .execute(r#"
5384                BEFORE="yes"
5385                false | cat
5386                AFTER="yes"
5387            "#)
5388            .await
5389            .ok();
5390
5391        let before = kernel.get_var("BEFORE").await;
5392        assert_eq!(before, Some(Value::String("yes".into())));
5393
5394        // AFTER should not be set if pipeline failure triggers exit
5395        // Note: The exit code of a pipeline is the exit code of the last command
5396        // So `false | cat` returns 0 (cat succeeds). This is bash-compatible behavior.
5397        // To test pipeline failure, we need the last command to fail.
5398    }
5399
5400    #[tokio::test]
5401    async fn test_set_e_with_and_chain() {
5402        let kernel = Kernel::transient().expect("failed to create kernel");
5403
5404        kernel.execute("set -e").await.expect("set -e failed");
5405
5406        // Commands in && chain should not trigger -e on the first failure
5407        // because && explicitly handles the error
5408        kernel
5409            .execute(r#"
5410                RESULT="initial"
5411                false && RESULT="chained"
5412                RESULT="continued"
5413            "#)
5414            .await
5415            .ok();
5416
5417        // In bash, commands in && don't trigger -e. The chain handles the failure.
5418        // Our implementation may differ - let's verify current behavior.
5419        let result = kernel.get_var("RESULT").await;
5420        // If we follow bash semantics, RESULT should be "continued"
5421        // If we trigger -e on the false, RESULT stays "initial"
5422        assert!(result.is_some(), "RESULT should be set");
5423    }
5424
5425    #[tokio::test]
5426    async fn test_set_e_exits_in_for_loop() {
5427        let kernel = Kernel::transient().expect("failed to create kernel");
5428
5429        kernel.execute("set -e").await.expect("set -e failed");
5430
5431        kernel
5432            .execute(r#"
5433                REACHED="no"
5434                for x in 1 2 3; do
5435                    false
5436                    REACHED="yes"
5437                done
5438            "#)
5439            .await
5440            .ok();
5441
5442        // With set -e, false should trigger exit; REACHED should remain "no"
5443        let reached = kernel.get_var("REACHED").await;
5444        assert_eq!(reached, Some(Value::String("no".into())),
5445            "set -e should exit on failure in for loop body");
5446    }
5447
5448    #[tokio::test]
5449    async fn test_for_loop_continues_without_set_e() {
5450        let kernel = Kernel::transient().expect("failed to create kernel");
5451
5452        // Without set -e, for loop should continue normally
5453        kernel
5454            .execute(r#"
5455                COUNT=0
5456                for x in 1 2 3; do
5457                    false
5458                    COUNT=$((COUNT + 1))
5459                done
5460            "#)
5461            .await
5462            .ok();
5463
5464        let count = kernel.get_var("COUNT").await;
5465        // Arithmetic produces Int values; accept either Int or String representation
5466        let count_val = match &count {
5467            Some(Value::Int(n)) => *n,
5468            Some(Value::String(s)) => s.parse().unwrap_or(-1),
5469            _ => -1,
5470        };
5471        assert_eq!(count_val, 3,
5472            "without set -e, loop should complete all iterations (got {:?})", count);
5473    }
5474
5475    // ═══════════════════════════════════════════════════════════════════════════
5476    // Source Tests
5477    // ═══════════════════════════════════════════════════════════════════════════
5478
5479    #[tokio::test]
5480    async fn test_source_sets_variables() {
5481        let kernel = Kernel::transient().expect("failed to create kernel");
5482
5483        // Write a script to the VFS
5484        kernel
5485            .execute(r#"write "/test.kai" 'FOO="bar"'"#)
5486            .await
5487            .expect("write failed");
5488
5489        // Source the script
5490        let result = kernel
5491            .execute(r#"source "/test.kai""#)
5492            .await
5493            .expect("source failed");
5494
5495        assert!(result.ok(), "source should succeed");
5496
5497        // Variable should be set in current scope
5498        let foo = kernel.get_var("FOO").await;
5499        assert_eq!(foo, Some(Value::String("bar".into())));
5500    }
5501
5502    #[tokio::test]
5503    async fn test_source_with_dot_alias() {
5504        let kernel = Kernel::transient().expect("failed to create kernel");
5505
5506        // Write a script to the VFS
5507        kernel
5508            .execute(r#"write "/vars.kai" 'X=42'"#)
5509            .await
5510            .expect("write failed");
5511
5512        // Source using . alias
5513        let result = kernel
5514            .execute(r#". "/vars.kai""#)
5515            .await
5516            .expect(". failed");
5517
5518        assert!(result.ok(), ". should succeed");
5519
5520        // Variable should be set in current scope
5521        let x = kernel.get_var("X").await;
5522        assert_eq!(x, Some(Value::Int(42)));
5523    }
5524
5525    #[tokio::test]
5526    async fn test_source_not_found() {
5527        let kernel = Kernel::transient().expect("failed to create kernel");
5528
5529        // Try to source a non-existent file
5530        let result = kernel
5531            .execute(r#"source "/nonexistent.kai""#)
5532            .await
5533            .expect("source should not fail with error");
5534
5535        assert!(!result.ok(), "source of non-existent file should fail");
5536        assert!(result.err.contains("nonexistent.kai"), "error should mention filename");
5537    }
5538
5539    #[tokio::test]
5540    async fn test_source_missing_filename() {
5541        let kernel = Kernel::transient().expect("failed to create kernel");
5542
5543        // Call source with no arguments
5544        let result = kernel
5545            .execute("source")
5546            .await
5547            .expect("source should not fail with error");
5548
5549        assert!(!result.ok(), "source without filename should fail");
5550        assert!(result.err.contains("missing filename"), "error should mention missing filename");
5551    }
5552
5553    #[tokio::test]
5554    async fn test_source_executes_multiple_statements() {
5555        let kernel = Kernel::transient().expect("failed to create kernel");
5556
5557        // Write a script with multiple statements
5558        kernel
5559            .execute(r#"write "/multi.kai" 'A=1
5560B=2
5561C=3'"#)
5562            .await
5563            .expect("write failed");
5564
5565        // Source it
5566        kernel
5567            .execute(r#"source "/multi.kai""#)
5568            .await
5569            .expect("source failed");
5570
5571        // All variables should be set
5572        assert_eq!(kernel.get_var("A").await, Some(Value::Int(1)));
5573        assert_eq!(kernel.get_var("B").await, Some(Value::Int(2)));
5574        assert_eq!(kernel.get_var("C").await, Some(Value::Int(3)));
5575    }
5576
5577    #[tokio::test]
5578    async fn test_source_can_define_functions() {
5579        let kernel = Kernel::transient().expect("failed to create kernel");
5580
5581        // Write a script that defines a function
5582        kernel
5583            .execute(r#"write "/functions.kai" 'greet() {
5584    echo "Hello, $1!"
5585}'"#)
5586            .await
5587            .expect("write failed");
5588
5589        // Source it
5590        kernel
5591            .execute(r#"source "/functions.kai""#)
5592            .await
5593            .expect("source failed");
5594
5595        // Use the defined function
5596        let result = kernel
5597            .execute(r#"greet "World""#)
5598            .await
5599            .expect("greet failed");
5600
5601        assert!(result.ok());
5602        assert!(result.text_out().contains("Hello, World!"));
5603    }
5604
5605    #[tokio::test]
5606    async fn test_source_inherits_error_exit() {
5607        let kernel = Kernel::transient().expect("failed to create kernel");
5608
5609        // Enable error exit
5610        kernel.execute("set -e").await.expect("set -e failed");
5611
5612        // Write a script that has a failure
5613        kernel
5614            .execute(r#"write "/fail.kai" 'BEFORE="yes"
5615false
5616AFTER="yes"'"#)
5617            .await
5618            .expect("write failed");
5619
5620        // Source it (should exit on false due to set -e)
5621        kernel
5622            .execute(r#"source "/fail.kai""#)
5623            .await
5624            .ok();
5625
5626        // BEFORE should be set, AFTER should NOT be set due to error exit
5627        let before = kernel.get_var("BEFORE").await;
5628        assert_eq!(before, Some(Value::String("yes".into())));
5629
5630        // Note: This test depends on whether error exit is checked within source
5631        // Currently our implementation checks per-statement in the main kernel
5632    }
5633
5634    // ═══════════════════════════════════════════════════════════════════════════
5635    // set -e with && / || chains
5636    // ═══════════════════════════════════════════════════════════════════════════
5637
5638    #[tokio::test]
5639    async fn test_set_e_and_chain_left_fails() {
5640        // set -e; false && echo hi; REACHED=1 → REACHED should be set
5641        let kernel = Kernel::transient().expect("failed to create kernel");
5642        kernel.execute("set -e").await.expect("set -e failed");
5643
5644        kernel
5645            .execute("false && echo hi; REACHED=1")
5646            .await
5647            .expect("execution failed");
5648
5649        let reached = kernel.get_var("REACHED").await;
5650        assert_eq!(
5651            reached,
5652            Some(Value::Int(1)),
5653            "set -e should not trigger on left side of &&"
5654        );
5655    }
5656
5657    #[tokio::test]
5658    async fn test_set_e_and_chain_right_fails() {
5659        // set -e; true && false; REACHED=1 → REACHED should NOT be set
5660        let kernel = Kernel::transient().expect("failed to create kernel");
5661        kernel.execute("set -e").await.expect("set -e failed");
5662
5663        kernel
5664            .execute("true && false; REACHED=1")
5665            .await
5666            .expect("execution failed");
5667
5668        let reached = kernel.get_var("REACHED").await;
5669        assert!(
5670            reached.is_none(),
5671            "set -e should trigger when right side of && fails"
5672        );
5673    }
5674
5675    #[tokio::test]
5676    async fn test_set_e_or_chain_recovers() {
5677        // set -e; false || echo recovered; REACHED=1 → REACHED should be set
5678        let kernel = Kernel::transient().expect("failed to create kernel");
5679        kernel.execute("set -e").await.expect("set -e failed");
5680
5681        kernel
5682            .execute("false || echo recovered; REACHED=1")
5683            .await
5684            .expect("execution failed");
5685
5686        let reached = kernel.get_var("REACHED").await;
5687        assert_eq!(
5688            reached,
5689            Some(Value::Int(1)),
5690            "set -e should not trigger when || recovers the failure"
5691        );
5692    }
5693
5694    #[tokio::test]
5695    async fn test_set_e_or_chain_both_fail() {
5696        // set -e; false || false; REACHED=1 → REACHED should NOT be set
5697        let kernel = Kernel::transient().expect("failed to create kernel");
5698        kernel.execute("set -e").await.expect("set -e failed");
5699
5700        kernel
5701            .execute("false || false; REACHED=1")
5702            .await
5703            .expect("execution failed");
5704
5705        let reached = kernel.get_var("REACHED").await;
5706        assert!(
5707            reached.is_none(),
5708            "set -e should trigger when || chain ultimately fails"
5709        );
5710    }
5711
5712    // ═══════════════════════════════════════════════════════════════════════════
5713    // Cancellation Tests
5714    // ═══════════════════════════════════════════════════════════════════════════
5715
5716    /// Helper: schedule a cancel after a delay from a background thread.
5717    /// Uses std::thread because cancel() is sync and Kernel is not Send.
5718    fn schedule_cancel(kernel: &Arc<Kernel>, delay: std::time::Duration) {
5719        let k = Arc::clone(kernel);
5720        std::thread::spawn(move || {
5721            std::thread::sleep(delay);
5722            k.cancel();
5723        });
5724    }
5725
5726    #[tokio::test]
5727    async fn test_cancel_interrupts_for_loop() {
5728        let kernel = Arc::new(Kernel::transient().expect("failed to create kernel"));
5729
5730        // Schedule cancel after a short delay from a background OS thread
5731        schedule_cancel(&kernel, std::time::Duration::from_millis(10));
5732
5733        let result = kernel
5734            .execute("for i in $(seq 1 100000); do X=$i; done")
5735            .await
5736            .expect("execute failed");
5737
5738        assert_eq!(result.code, 130, "cancelled execution should exit with code 130");
5739
5740        // The loop variable should be set to something < 100000
5741        let x = kernel.get_var("X").await;
5742        if let Some(Value::Int(n)) = x {
5743            assert!(n < 100000, "loop should have been interrupted before finishing, got X={n}");
5744        }
5745    }
5746
5747    #[tokio::test]
5748    async fn test_cancel_interrupts_while_loop() {
5749        let kernel = Arc::new(Kernel::transient().expect("failed to create kernel"));
5750        kernel.execute("COUNT=0").await.expect("init failed");
5751
5752        schedule_cancel(&kernel, std::time::Duration::from_millis(10));
5753
5754        let result = kernel
5755            .execute("while true; do COUNT=$((COUNT + 1)); done")
5756            .await
5757            .expect("execute failed");
5758
5759        assert_eq!(result.code, 130);
5760
5761        let count = kernel.get_var("COUNT").await;
5762        if let Some(Value::Int(n)) = count {
5763            assert!(n > 0, "loop should have run at least once");
5764        }
5765    }
5766
5767    #[tokio::test]
5768    async fn test_reset_after_cancel() {
5769        // After cancellation, the next execute() should work normally
5770        let kernel = Kernel::transient().expect("failed to create kernel");
5771        kernel.cancel(); // cancel with nothing running
5772
5773        let result = kernel.execute("echo hello").await.expect("execute failed");
5774        assert!(result.ok(), "execute after cancel should succeed");
5775        assert_eq!(result.text_out().trim(), "hello");
5776    }
5777
5778    #[tokio::test]
5779    async fn test_cancel_interrupts_statement_sequence() {
5780        let kernel = Arc::new(Kernel::transient().expect("failed to create kernel"));
5781
5782        // Schedule cancel after the first statement runs but before sleep finishes
5783        schedule_cancel(&kernel, std::time::Duration::from_millis(50));
5784
5785        let result = kernel
5786            .execute("STEP=1; sleep 5; STEP=2; sleep 5; STEP=3")
5787            .await
5788            .expect("execute failed");
5789
5790        assert_eq!(result.code, 130);
5791
5792        // STEP should be 1 (set before sleep), not 2 or 3
5793        let step = kernel.get_var("STEP").await;
5794        assert_eq!(step, Some(Value::Int(1)), "cancel should stop before STEP=2");
5795    }
5796
5797    // ═══════════════════════════════════════════════════════════════════════════
5798    // Case Statement Tests
5799    // ═══════════════════════════════════════════════════════════════════════════
5800
5801    #[tokio::test]
5802    async fn test_case_simple_match() {
5803        let kernel = Kernel::transient().expect("failed to create kernel");
5804
5805        let result = kernel
5806            .execute(r#"
5807                case "hello" in
5808                    hello) echo "matched hello" ;;
5809                    world) echo "matched world" ;;
5810                esac
5811            "#)
5812            .await
5813            .expect("case failed");
5814
5815        assert!(result.ok());
5816        assert_eq!(result.text_out().trim(), "matched hello");
5817    }
5818
5819    #[tokio::test]
5820    async fn test_case_wildcard_match() {
5821        let kernel = Kernel::transient().expect("failed to create kernel");
5822
5823        let result = kernel
5824            .execute(r#"
5825                case "main.rs" in
5826                    *.py) echo "Python" ;;
5827                    *.rs) echo "Rust" ;;
5828                    *) echo "Unknown" ;;
5829                esac
5830            "#)
5831            .await
5832            .expect("case failed");
5833
5834        assert!(result.ok());
5835        assert_eq!(result.text_out().trim(), "Rust");
5836    }
5837
5838    #[tokio::test]
5839    async fn test_case_default_match() {
5840        let kernel = Kernel::transient().expect("failed to create kernel");
5841
5842        let result = kernel
5843            .execute(r#"
5844                case "unknown.xyz" in
5845                    *.py) echo "Python" ;;
5846                    *.rs) echo "Rust" ;;
5847                    *) echo "Default" ;;
5848                esac
5849            "#)
5850            .await
5851            .expect("case failed");
5852
5853        assert!(result.ok());
5854        assert_eq!(result.text_out().trim(), "Default");
5855    }
5856
5857    #[tokio::test]
5858    async fn test_case_no_match() {
5859        let kernel = Kernel::transient().expect("failed to create kernel");
5860
5861        // Case with no default branch and no match
5862        let result = kernel
5863            .execute(r#"
5864                case "nope" in
5865                    "yes") echo "yes" ;;
5866                    "no") echo "no" ;;
5867                esac
5868            "#)
5869            .await
5870            .expect("case failed");
5871
5872        assert!(result.ok());
5873        assert!(result.text_out().is_empty(), "no match should produce empty output");
5874    }
5875
5876    #[tokio::test]
5877    async fn test_case_with_variable() {
5878        let kernel = Kernel::transient().expect("failed to create kernel");
5879
5880        kernel.execute(r#"LANG="rust""#).await.expect("set failed");
5881
5882        let result = kernel
5883            .execute(r#"
5884                case ${LANG} in
5885                    python) echo "snake" ;;
5886                    rust) echo "crab" ;;
5887                    go) echo "gopher" ;;
5888                esac
5889            "#)
5890            .await
5891            .expect("case failed");
5892
5893        assert!(result.ok());
5894        assert_eq!(result.text_out().trim(), "crab");
5895    }
5896
5897    #[tokio::test]
5898    async fn test_case_multiple_patterns() {
5899        let kernel = Kernel::transient().expect("failed to create kernel");
5900
5901        let result = kernel
5902            .execute(r#"
5903                case "yes" in
5904                    "y"|"yes"|"Y"|"YES") echo "affirmative" ;;
5905                    "n"|"no"|"N"|"NO") echo "negative" ;;
5906                esac
5907            "#)
5908            .await
5909            .expect("case failed");
5910
5911        assert!(result.ok());
5912        assert_eq!(result.text_out().trim(), "affirmative");
5913    }
5914
5915    #[tokio::test]
5916    async fn test_case_glob_question_mark() {
5917        let kernel = Kernel::transient().expect("failed to create kernel");
5918
5919        let result = kernel
5920            .execute(r#"
5921                case "test1" in
5922                    test?) echo "matched test?" ;;
5923                    *) echo "default" ;;
5924                esac
5925            "#)
5926            .await
5927            .expect("case failed");
5928
5929        assert!(result.ok());
5930        assert_eq!(result.text_out().trim(), "matched test?");
5931    }
5932
5933    #[tokio::test]
5934    async fn test_case_char_class() {
5935        let kernel = Kernel::transient().expect("failed to create kernel");
5936
5937        let result = kernel
5938            .execute(r#"
5939                case "Yes" in
5940                    [Yy]*) echo "yes-like" ;;
5941                    [Nn]*) echo "no-like" ;;
5942                esac
5943            "#)
5944            .await
5945            .expect("case failed");
5946
5947        assert!(result.ok());
5948        assert_eq!(result.text_out().trim(), "yes-like");
5949    }
5950
5951    // ═══════════════════════════════════════════════════════════════════════════
5952    // Cat Stdin Tests
5953    // ═══════════════════════════════════════════════════════════════════════════
5954
5955    #[tokio::test]
5956    async fn test_cat_from_pipeline() {
5957        let kernel = Kernel::transient().expect("failed to create kernel");
5958
5959        let result = kernel
5960            .execute(r#"echo "piped text" | cat"#)
5961            .await
5962            .expect("cat pipeline failed");
5963
5964        assert!(result.ok(), "cat failed: {}", result.err);
5965        assert_eq!(result.text_out().trim(), "piped text");
5966    }
5967
5968    #[tokio::test]
5969    async fn test_cat_from_pipeline_multiline() {
5970        let kernel = Kernel::transient().expect("failed to create kernel");
5971
5972        let result = kernel
5973            .execute(r#"echo "line1\nline2" | cat -n"#)
5974            .await
5975            .expect("cat pipeline failed");
5976
5977        assert!(result.ok(), "cat failed: {}", result.err);
5978        assert!(result.text_out().contains("1\t"), "output: {}", result.text_out());
5979    }
5980
5981    // ═══════════════════════════════════════════════════════════════════════════
5982    // Heredoc Tests
5983    // ═══════════════════════════════════════════════════════════════════════════
5984
5985    #[tokio::test]
5986    async fn test_heredoc_basic() {
5987        let kernel = Kernel::transient().expect("failed to create kernel");
5988
5989        let result = kernel
5990            .execute("cat <<EOF\nhello\nEOF")
5991            .await
5992            .expect("heredoc failed");
5993
5994        assert!(result.ok(), "cat with heredoc failed: {}", result.err);
5995        assert_eq!(result.text_out().trim(), "hello");
5996    }
5997
5998    #[tokio::test]
5999    async fn test_arithmetic_in_string() {
6000        let kernel = Kernel::transient().expect("failed to create kernel");
6001
6002        let result = kernel
6003            .execute(r#"echo "result: $((1 + 2))""#)
6004            .await
6005            .expect("arithmetic in string failed");
6006
6007        assert!(result.ok(), "echo failed: {}", result.err);
6008        assert_eq!(result.text_out().trim(), "result: 3");
6009    }
6010
6011    #[tokio::test]
6012    async fn test_heredoc_multiline() {
6013        let kernel = Kernel::transient().expect("failed to create kernel");
6014
6015        let result = kernel
6016            .execute("cat <<EOF\nline1\nline2\nline3\nEOF")
6017            .await
6018            .expect("heredoc failed");
6019
6020        assert!(result.ok(), "cat with heredoc failed: {}", result.err);
6021        assert!(result.text_out().contains("line1"), "output: {}", result.text_out());
6022        assert!(result.text_out().contains("line2"), "output: {}", result.text_out());
6023        assert!(result.text_out().contains("line3"), "output: {}", result.text_out());
6024    }
6025
6026    #[tokio::test]
6027    async fn test_heredoc_variable_expansion() {
6028        // Bug N: unquoted heredoc should expand variables
6029        let kernel = Kernel::transient().expect("failed to create kernel");
6030
6031        kernel.execute("GREETING=hello").await.expect("set var");
6032
6033        let result = kernel
6034            .execute("cat <<EOF\n$GREETING world\nEOF")
6035            .await
6036            .expect("heredoc expansion failed");
6037
6038        assert!(result.ok(), "heredoc expansion failed: {}", result.err);
6039        assert_eq!(result.text_out().trim(), "hello world");
6040    }
6041
6042    #[tokio::test]
6043    async fn test_heredoc_quoted_no_expansion() {
6044        // Bug N: quoted heredoc (<<'EOF') should NOT expand variables
6045        let kernel = Kernel::transient().expect("failed to create kernel");
6046
6047        kernel.execute("GREETING=hello").await.expect("set var");
6048
6049        let result = kernel
6050            .execute("cat <<'EOF'\n$GREETING world\nEOF")
6051            .await
6052            .expect("quoted heredoc failed");
6053
6054        assert!(result.ok(), "quoted heredoc failed: {}", result.err);
6055        assert_eq!(result.text_out().trim(), "$GREETING world");
6056    }
6057
6058    #[tokio::test]
6059    async fn test_heredoc_default_value_expansion() {
6060        // Bug N: ${VAR:-default} should expand in unquoted heredocs
6061        let kernel = Kernel::transient().expect("failed to create kernel");
6062
6063        let result = kernel
6064            .execute("cat <<EOF\n${UNSET:-fallback}\nEOF")
6065            .await
6066            .expect("heredoc default expansion failed");
6067
6068        assert!(result.ok(), "heredoc default expansion failed: {}", result.err);
6069        assert_eq!(result.text_out().trim(), "fallback");
6070    }
6071
6072    // ═══════════════════════════════════════════════════════════════════════════
6073    // Read Builtin Tests
6074    // ═══════════════════════════════════════════════════════════════════════════
6075
6076    #[tokio::test]
6077    async fn test_read_from_pipeline() {
6078        let kernel = Kernel::transient().expect("failed to create kernel");
6079
6080        // Pipe input to read
6081        let result = kernel
6082            .execute(r#"echo "Alice" | read NAME; echo "Hello, ${NAME}""#)
6083            .await
6084            .expect("read pipeline failed");
6085
6086        assert!(result.ok(), "read failed: {}", result.err);
6087        assert!(result.text_out().contains("Hello, Alice"), "output: {}", result.text_out());
6088    }
6089
6090    #[tokio::test]
6091    async fn test_read_multiple_vars_from_pipeline() {
6092        let kernel = Kernel::transient().expect("failed to create kernel");
6093
6094        let result = kernel
6095            .execute(r#"echo "John Doe 42" | read FIRST LAST AGE; echo "${FIRST} is ${AGE}""#)
6096            .await
6097            .expect("read pipeline failed");
6098
6099        assert!(result.ok(), "read failed: {}", result.err);
6100        assert!(result.text_out().contains("John is 42"), "output: {}", result.text_out());
6101    }
6102
6103    // ═══════════════════════════════════════════════════════════════════════════
6104    // Shell-Style Function Tests
6105    // ═══════════════════════════════════════════════════════════════════════════
6106
6107    #[tokio::test]
6108    async fn test_posix_function_with_positional_params() {
6109        let kernel = Kernel::transient().expect("failed to create kernel");
6110
6111        // Define POSIX-style function
6112        kernel
6113            .execute(r#"greet() { echo "Hello, $1!" }"#)
6114            .await
6115            .expect("function definition failed");
6116
6117        // Call the function
6118        let result = kernel
6119            .execute(r#"greet "Amy""#)
6120            .await
6121            .expect("function call failed");
6122
6123        assert!(result.ok(), "greet failed: {}", result.err);
6124        assert_eq!(result.text_out().trim(), "Hello, Amy!");
6125    }
6126
6127    #[tokio::test]
6128    async fn test_posix_function_multiple_args() {
6129        let kernel = Kernel::transient().expect("failed to create kernel");
6130
6131        // Define function using $1 and $2
6132        kernel
6133            .execute(r#"add_greeting() { echo "$1 $2!" }"#)
6134            .await
6135            .expect("function definition failed");
6136
6137        // Call the function
6138        let result = kernel
6139            .execute(r#"add_greeting "Hello" "World""#)
6140            .await
6141            .expect("function call failed");
6142
6143        assert!(result.ok(), "function failed: {}", result.err);
6144        assert_eq!(result.text_out().trim(), "Hello World!");
6145    }
6146
6147    #[tokio::test]
6148    async fn test_bash_function_with_positional_params() {
6149        let kernel = Kernel::transient().expect("failed to create kernel");
6150
6151        // Define bash-style function (function keyword, no parens)
6152        kernel
6153            .execute(r#"function greet { echo "Hi $1" }"#)
6154            .await
6155            .expect("function definition failed");
6156
6157        // Call the function
6158        let result = kernel
6159            .execute(r#"greet "Bob""#)
6160            .await
6161            .expect("function call failed");
6162
6163        assert!(result.ok(), "greet failed: {}", result.err);
6164        assert_eq!(result.text_out().trim(), "Hi Bob");
6165    }
6166
6167    #[tokio::test]
6168    async fn test_shell_function_with_all_args() {
6169        let kernel = Kernel::transient().expect("failed to create kernel");
6170
6171        // Define function using $@ (all args)
6172        kernel
6173            .execute(r#"echo_all() { echo "args: $@" }"#)
6174            .await
6175            .expect("function definition failed");
6176
6177        // Call with multiple args
6178        let result = kernel
6179            .execute(r#"echo_all "a" "b" "c""#)
6180            .await
6181            .expect("function call failed");
6182
6183        assert!(result.ok(), "function failed: {}", result.err);
6184        assert_eq!(result.text_out().trim(), "args: a b c");
6185    }
6186
6187    #[tokio::test]
6188    async fn test_shell_function_with_arg_count() {
6189        let kernel = Kernel::transient().expect("failed to create kernel");
6190
6191        // Define function using $# (arg count)
6192        kernel
6193            .execute(r#"count_args() { echo "count: $#" }"#)
6194            .await
6195            .expect("function definition failed");
6196
6197        // Call with three args
6198        let result = kernel
6199            .execute(r#"count_args "x" "y" "z""#)
6200            .await
6201            .expect("function call failed");
6202
6203        assert!(result.ok(), "function failed: {}", result.err);
6204        assert_eq!(result.text_out().trim(), "count: 3");
6205    }
6206
6207    #[tokio::test]
6208    async fn test_shell_function_shared_scope() {
6209        let kernel = Kernel::transient().expect("failed to create kernel");
6210
6211        // Set a variable in parent scope
6212        kernel
6213            .execute(r#"PARENT_VAR="visible""#)
6214            .await
6215            .expect("set failed");
6216
6217        // Define shell function that reads and writes parent variable
6218        kernel
6219            .execute(r#"modify_parent() {
6220                echo "saw: ${PARENT_VAR}"
6221                PARENT_VAR="changed by function"
6222            }"#)
6223            .await
6224            .expect("function definition failed");
6225
6226        // Call the function - it SHOULD see PARENT_VAR (bash-compatible shared scope)
6227        let result = kernel.execute("modify_parent").await.expect("function failed");
6228
6229        assert!(
6230            result.text_out().contains("visible"),
6231            "Shell function should access parent scope, got: {}",
6232            result.text_out()
6233        );
6234
6235        // Parent variable should be modified
6236        let var = kernel.get_var("PARENT_VAR").await;
6237        assert_eq!(
6238            var,
6239            Some(Value::String("changed by function".into())),
6240            "Shell function should modify parent scope"
6241        );
6242    }
6243
6244    // ═══════════════════════════════════════════════════════════════════════════
6245    // Script Execution via PATH Tests
6246    // ═══════════════════════════════════════════════════════════════════════════
6247
6248    #[tokio::test]
6249    async fn test_script_execution_from_path() {
6250        let kernel = Kernel::transient().expect("failed to create kernel");
6251
6252        // Create /bin directory and script
6253        kernel.execute(r#"mkdir "/bin""#).await.ok();
6254        kernel
6255            .execute(r#"write "/bin/hello.kai" 'echo "Hello from script!"'"#)
6256            .await
6257            .expect("write script failed");
6258
6259        // Set PATH to /bin
6260        kernel.execute(r#"PATH="/bin""#).await.expect("set PATH failed");
6261
6262        // Call script by name (without .kai extension)
6263        let result = kernel
6264            .execute("hello")
6265            .await
6266            .expect("script execution failed");
6267
6268        assert!(result.ok(), "script failed: {}", result.err);
6269        assert_eq!(result.text_out().trim(), "Hello from script!");
6270    }
6271
6272    #[tokio::test]
6273    async fn test_script_with_args() {
6274        let kernel = Kernel::transient().expect("failed to create kernel");
6275
6276        // Create script that uses positional params
6277        kernel.execute(r#"mkdir "/bin""#).await.ok();
6278        kernel
6279            .execute(r#"write "/bin/greet.kai" 'echo "Hello, $1!"'"#)
6280            .await
6281            .expect("write script failed");
6282
6283        // Set PATH
6284        kernel.execute(r#"PATH="/bin""#).await.expect("set PATH failed");
6285
6286        // Call script with arg
6287        let result = kernel
6288            .execute(r#"greet "World""#)
6289            .await
6290            .expect("script execution failed");
6291
6292        assert!(result.ok(), "script failed: {}", result.err);
6293        assert_eq!(result.text_out().trim(), "Hello, World!");
6294    }
6295
6296    #[tokio::test]
6297    async fn test_script_not_found() {
6298        let kernel = Kernel::transient().expect("failed to create kernel");
6299
6300        // Set empty PATH
6301        kernel.execute(r#"PATH="/nonexistent""#).await.expect("set PATH failed");
6302
6303        // Call non-existent script
6304        let result = kernel
6305            .execute("noscript")
6306            .await
6307            .expect("execution failed");
6308
6309        assert!(!result.ok(), "should fail with command not found");
6310        assert_eq!(result.code, 127);
6311        assert!(result.err.contains("command not found"));
6312    }
6313
6314    #[tokio::test]
6315    async fn test_script_path_search_order() {
6316        let kernel = Kernel::transient().expect("failed to create kernel");
6317
6318        // Create two directories with same-named script
6319        // Note: using "myscript" not "test" to avoid conflict with test builtin
6320        kernel.execute(r#"mkdir "/first""#).await.ok();
6321        kernel.execute(r#"mkdir "/second""#).await.ok();
6322        kernel
6323            .execute(r#"write "/first/myscript.kai" 'echo "from first"'"#)
6324            .await
6325            .expect("write failed");
6326        kernel
6327            .execute(r#"write "/second/myscript.kai" 'echo "from second"'"#)
6328            .await
6329            .expect("write failed");
6330
6331        // Set PATH with first before second
6332        kernel.execute(r#"PATH="/first:/second""#).await.expect("set PATH failed");
6333
6334        // Should find first one
6335        let result = kernel
6336            .execute("myscript")
6337            .await
6338            .expect("script execution failed");
6339
6340        assert!(result.ok(), "script failed: {}", result.err);
6341        assert_eq!(result.text_out().trim(), "from first");
6342    }
6343
6344    // ═══════════════════════════════════════════════════════════════════════════
6345    // Special Variable Tests ($?, $$, unset vars)
6346    // ═══════════════════════════════════════════════════════════════════════════
6347
6348    #[tokio::test]
6349    async fn test_last_exit_code_success() {
6350        let kernel = Kernel::transient().expect("failed to create kernel");
6351
6352        // true exits with 0
6353        let result = kernel.execute("true; echo $?").await.expect("execution failed");
6354        assert!(result.text_out().contains("0"), "expected 0, got: {}", result.text_out());
6355    }
6356
6357    #[tokio::test]
6358    async fn test_last_exit_code_failure() {
6359        let kernel = Kernel::transient().expect("failed to create kernel");
6360
6361        // false exits with 1
6362        let result = kernel.execute("false; echo $?").await.expect("execution failed");
6363        assert!(result.text_out().contains("1"), "expected 1, got: {}", result.text_out());
6364    }
6365
6366    #[tokio::test]
6367    async fn test_current_pid() {
6368        let kernel = Kernel::transient().expect("failed to create kernel");
6369
6370        let result = kernel.execute("echo $$").await.expect("execution failed");
6371        // PID should be a positive number
6372        let pid: u32 = result.text_out().trim().parse().expect("PID should be a number");
6373        assert!(pid > 0, "PID should be positive");
6374    }
6375
6376    #[tokio::test]
6377    async fn test_unset_variable_expands_to_empty() {
6378        let kernel = Kernel::transient().expect("failed to create kernel");
6379
6380        // Unset variable in interpolation should be empty
6381        let result = kernel.execute(r#"echo "prefix:${UNSET_VAR}:suffix""#).await.expect("execution failed");
6382        assert_eq!(result.text_out().trim(), "prefix::suffix");
6383    }
6384
6385    #[tokio::test]
6386    async fn test_eq_ne_operators() {
6387        let kernel = Kernel::transient().expect("failed to create kernel");
6388
6389        // Test -eq operator
6390        let result = kernel.execute(r#"if [[ 5 -eq 5 ]]; then echo "eq works"; fi"#).await.expect("execution failed");
6391        assert_eq!(result.text_out().trim(), "eq works");
6392
6393        // Test -ne operator
6394        let result = kernel.execute(r#"if [[ 5 -ne 3 ]]; then echo "ne works"; fi"#).await.expect("execution failed");
6395        assert_eq!(result.text_out().trim(), "ne works");
6396
6397        // Test -eq with different values
6398        let result = kernel.execute(r#"if [[ 5 -eq 3 ]]; then echo "wrong"; else echo "correct"; fi"#).await.expect("execution failed");
6399        assert_eq!(result.text_out().trim(), "correct");
6400    }
6401
6402    #[tokio::test]
6403    async fn test_escaped_dollar_in_string() {
6404        let kernel = Kernel::transient().expect("failed to create kernel");
6405
6406        // \$ should produce literal $
6407        let result = kernel.execute(r#"echo "\$100""#).await.expect("execution failed");
6408        assert_eq!(result.text_out().trim(), "$100");
6409    }
6410
6411    #[tokio::test]
6412    async fn test_special_vars_in_interpolation() {
6413        let kernel = Kernel::transient().expect("failed to create kernel");
6414
6415        // Test $? in string interpolation
6416        let result = kernel.execute(r#"true; echo "exit: $?""#).await.expect("execution failed");
6417        assert_eq!(result.text_out().trim(), "exit: 0");
6418
6419        // Test $$ in string interpolation
6420        let result = kernel.execute(r#"echo "pid: $$""#).await.expect("execution failed");
6421        assert!(result.text_out().starts_with("pid: "), "unexpected output: {}", result.text_out());
6422        let text = result.text_out();
6423        let pid_part = text.trim().strip_prefix("pid: ").unwrap();
6424        let _pid: u32 = pid_part.parse().expect("PID in string should be a number");
6425    }
6426
6427    // ═══════════════════════════════════════════════════════════════════════════
6428    // Command Substitution Tests
6429    // ═══════════════════════════════════════════════════════════════════════════
6430
6431    #[tokio::test]
6432    async fn test_command_subst_assignment() {
6433        let kernel = Kernel::transient().expect("failed to create kernel");
6434
6435        // Command substitution in assignment
6436        let result = kernel.execute(r#"X=$(echo hello); echo "$X""#).await.expect("execution failed");
6437        assert_eq!(result.text_out().trim(), "hello");
6438    }
6439
6440    #[tokio::test]
6441    async fn test_command_subst_with_args() {
6442        let kernel = Kernel::transient().expect("failed to create kernel");
6443
6444        // Command substitution with string argument
6445        let result = kernel.execute(r#"X=$(echo "a b c"); echo "$X""#).await.expect("execution failed");
6446        assert_eq!(result.text_out().trim(), "a b c");
6447    }
6448
6449    #[tokio::test]
6450    async fn test_command_subst_nested_vars() {
6451        let kernel = Kernel::transient().expect("failed to create kernel");
6452
6453        // Variables inside command substitution
6454        let result = kernel.execute(r#"Y=world; X=$(echo "hello $Y"); echo "$X""#).await.expect("execution failed");
6455        assert_eq!(result.text_out().trim(), "hello world");
6456    }
6457
6458    #[tokio::test]
6459    async fn test_background_job_basic() {
6460        use std::time::Duration;
6461
6462        let kernel = Kernel::new(KernelConfig::isolated()).expect("failed to create kernel");
6463
6464        // Run a simple background command
6465        let result = kernel.execute("echo hello &").await.expect("execution failed");
6466        assert!(result.ok(), "background command should succeed: {}", result.err);
6467        assert!(result.text_out().contains("[1]"), "should return job ID: {}", result.text_out());
6468
6469        // Give the job time to complete
6470        tokio::time::sleep(Duration::from_millis(100)).await;
6471
6472        // Check job status
6473        let status = kernel.execute("cat /v/jobs/1/status").await.expect("status check failed");
6474        assert!(status.ok(), "status should succeed: {}", status.err);
6475        assert!(
6476            status.text_out().contains("done:") || status.text_out().contains("running"),
6477            "should have valid status: {}",
6478            status.text_out()
6479        );
6480
6481        // Check stdout
6482        let stdout = kernel.execute("cat /v/jobs/1/stdout").await.expect("stdout check failed");
6483        assert!(stdout.ok());
6484        assert!(stdout.text_out().contains("hello"));
6485    }
6486
6487    #[tokio::test]
6488    async fn test_heredoc_piped_to_command() {
6489        // Bug 4: heredoc content should pipe through to next command
6490        let kernel = Kernel::transient().expect("kernel");
6491        let result = kernel.execute("cat <<EOF | cat\nhello world\nEOF").await.expect("exec");
6492        assert!(result.ok(), "heredoc | cat failed: {}", result.err);
6493        assert_eq!(result.text_out().trim(), "hello world");
6494    }
6495
6496    #[tokio::test]
6497    async fn test_for_loop_glob_iterates() {
6498        // Bug 1: for F in $(glob ...) should iterate per file, not once
6499        let kernel = Kernel::transient().expect("kernel");
6500        let dir = format!("/tmp/kaish_test_glob_{}", std::process::id());
6501        kernel.execute(&format!("mkdir -p {dir}")).await.unwrap();
6502        kernel.execute(&format!("echo a > {dir}/a.txt")).await.unwrap();
6503        kernel.execute(&format!("echo b > {dir}/b.txt")).await.unwrap();
6504        let result = kernel.execute(&format!(r#"
6505            N=0
6506            for F in $(glob "{dir}/*.txt"); do
6507                N=$((N + 1))
6508            done
6509            echo $N
6510        "#)).await.unwrap();
6511        assert!(result.ok(), "for glob failed: {}", result.err);
6512        assert_eq!(result.text_out().trim(), "2", "Should iterate 2 files, got: {}", result.text_out());
6513        kernel.execute(&format!("rm {dir}/a.txt")).await.unwrap();
6514        kernel.execute(&format!("rm {dir}/b.txt")).await.unwrap();
6515    }
6516
6517    #[tokio::test]
6518    async fn test_bare_glob_expansion_echo() {
6519        let kernel = Kernel::transient().expect("kernel");
6520        let dir = format!("/tmp/kaish_test_bareglob_{}", std::process::id());
6521        kernel.execute(&format!("mkdir -p {dir}")).await.unwrap();
6522        kernel.execute(&format!("echo a > {dir}/a.txt")).await.unwrap();
6523        kernel.execute(&format!("echo b > {dir}/b.txt")).await.unwrap();
6524        kernel.execute(&format!("echo c > {dir}/c.rs")).await.unwrap();
6525        kernel.execute(&format!("cd {dir}")).await.unwrap();
6526        let result = kernel.execute("echo *.txt").await.unwrap();
6527        assert!(result.ok(), "echo *.txt failed: {}", result.err);
6528        let out = result.text_out();
6529        let out = out.trim();
6530        // Should contain both .txt files (order may vary)
6531        assert!(out.contains("a.txt"), "missing a.txt in: {}", out);
6532        assert!(out.contains("b.txt"), "missing b.txt in: {}", out);
6533        assert!(!out.contains("c.rs"), "should not contain c.rs in: {}", out);
6534        // cleanup
6535        kernel.execute(&format!("rm {dir}/a.txt")).await.unwrap();
6536        kernel.execute(&format!("rm {dir}/b.txt")).await.unwrap();
6537        kernel.execute(&format!("rm {dir}/c.rs")).await.unwrap();
6538    }
6539
6540    #[tokio::test]
6541    async fn test_bare_glob_no_matches_errors() {
6542        let kernel = Kernel::transient().expect("kernel");
6543        let dir = format!("/tmp/kaish_test_bareglob_nomatch_{}", std::process::id());
6544        kernel.execute(&format!("mkdir -p {dir}")).await.unwrap();
6545        kernel.execute(&format!("cd {dir}")).await.unwrap();
6546        let result = kernel.execute("echo *.nonexistent").await;
6547        match &result {
6548            Ok(exec) => {
6549                // No-match glob should produce a non-zero exit code
6550                assert!(!exec.ok(), "expected failure, got success: out={}, err={}", exec.text_out(), exec.err);
6551                assert!(exec.err.contains("no matches"), "error should say no matches: {}", exec.err);
6552            }
6553            Err(e) => {
6554                assert!(e.to_string().contains("no matches"), "error should say no matches: {}", e);
6555            }
6556        }
6557    }
6558
6559    #[tokio::test]
6560    async fn test_bare_glob_disabled_with_set() {
6561        let kernel = Kernel::transient().expect("kernel");
6562        let dir = format!("/tmp/kaish_test_bareglob_noglob_{}", std::process::id());
6563        kernel.execute(&format!("mkdir -p {dir}")).await.unwrap();
6564        kernel.execute(&format!("echo a > {dir}/a.txt")).await.unwrap();
6565        kernel.execute(&format!("cd {dir}")).await.unwrap();
6566        // Disable glob expansion
6567        kernel.execute("set +o glob").await.unwrap();
6568        let result = kernel.execute("echo *.txt").await.unwrap();
6569        // With glob disabled, *.txt should be passed as literal string
6570        assert!(result.ok(), "echo should succeed: {}", result.err);
6571        assert_eq!(result.text_out().trim(), "*.txt", "should be literal: {}", result.text_out());
6572        // cleanup
6573        kernel.execute("set -o glob").await.unwrap();
6574        kernel.execute(&format!("rm {dir}/a.txt")).await.unwrap();
6575    }
6576
6577    #[tokio::test]
6578    async fn test_bare_glob_quoted_not_expanded() {
6579        let kernel = Kernel::transient().expect("kernel");
6580        let dir = format!("/tmp/kaish_test_bareglob_quoted_{}", std::process::id());
6581        kernel.execute(&format!("mkdir -p {dir}")).await.unwrap();
6582        kernel.execute(&format!("echo a > {dir}/a.txt")).await.unwrap();
6583        kernel.execute(&format!("cd {dir}")).await.unwrap();
6584        // Quoted globs should NOT expand
6585        let result = kernel.execute("echo \"*.txt\"").await.unwrap();
6586        assert!(result.ok(), "echo should succeed: {}", result.err);
6587        assert_eq!(result.text_out().trim(), "*.txt", "quoted should be literal: {}", result.text_out());
6588        // cleanup
6589        kernel.execute(&format!("rm {dir}/a.txt")).await.unwrap();
6590    }
6591
6592    #[tokio::test]
6593    async fn test_bare_glob_for_loop() {
6594        let kernel = Kernel::transient().expect("kernel");
6595        let dir = format!("/tmp/kaish_test_bareglob_forloop_{}", std::process::id());
6596        kernel.execute(&format!("mkdir -p {dir}")).await.unwrap();
6597        kernel.execute(&format!("echo a > {dir}/a.txt")).await.unwrap();
6598        kernel.execute(&format!("echo b > {dir}/b.txt")).await.unwrap();
6599        kernel.execute(&format!("cd {dir}")).await.unwrap();
6600        let result = kernel.execute(r#"
6601            N=0
6602            for f in *.txt; do
6603                N=$((N + 1))
6604            done
6605            echo $N
6606        "#).await.unwrap();
6607        assert!(result.ok(), "for loop failed: {}", result.err);
6608        assert_eq!(result.text_out().trim(), "2", "should iterate 2 files: {}", result.text_out());
6609        // cleanup
6610        kernel.execute(&format!("rm {dir}/a.txt")).await.unwrap();
6611        kernel.execute(&format!("rm {dir}/b.txt")).await.unwrap();
6612    }
6613
6614    #[tokio::test]
6615    async fn test_glob_in_assignment_is_literal() {
6616        let kernel = Kernel::transient().expect("kernel");
6617        let result = kernel.execute("X=*.txt; echo $X").await.unwrap();
6618        assert!(result.ok());
6619        assert_eq!(result.text_out().trim(), "*.txt", "glob in assignment should be literal");
6620    }
6621
6622    #[tokio::test]
6623    async fn test_glob_in_test_expr_is_literal() {
6624        let kernel = Kernel::transient().expect("kernel");
6625        let result = kernel.execute(r#"
6626            if [[ *.txt == "*.txt" ]]; then
6627                echo "match"
6628            else
6629                echo "no"
6630            fi
6631        "#).await.unwrap();
6632        assert!(result.ok());
6633        assert_eq!(result.text_out().trim(), "match", "glob in test expr should be literal");
6634    }
6635
6636    #[tokio::test]
6637    async fn test_command_subst_echo_not_iterable() {
6638        // Regression guard: $(echo "a b c") must remain a single string
6639        let kernel = Kernel::transient().expect("kernel");
6640        let result = kernel.execute(r#"
6641            N=0
6642            for X in $(echo "a b c"); do N=$((N + 1)); done
6643            echo $N
6644        "#).await.unwrap();
6645        assert!(result.ok());
6646        assert_eq!(result.text_out().trim(), "1", "echo should be one item: {}", result.text_out());
6647    }
6648
6649    // -- accumulate_result / newline tests --
6650
6651    #[test]
6652    fn test_accumulate_preserves_own_newlines() {
6653        // Outputs concatenate verbatim — a command's own trailing newline is
6654        // kept, none is invented.
6655        let mut acc = ExecResult::success("line1\n");
6656        let new = ExecResult::success("line2\n");
6657        accumulate_result(&mut acc, &new);
6658        assert_eq!(&*acc.text_out(), "line1\nline2\n");
6659        assert!(!acc.text_out().contains("\n\n"), "should not have double newlines: {:?}", acc.text_out());
6660    }
6661
6662    #[test]
6663    fn test_accumulate_inserts_no_separator() {
6664        // No artificial separator: `printf a; printf b` style concatenates to
6665        // `ab`, matching bash (regression for the 2026-06-09 finding).
6666        let mut acc = ExecResult::success("line1");
6667        let new = ExecResult::success("line2");
6668        accumulate_result(&mut acc, &new);
6669        assert_eq!(&*acc.text_out(), "line1line2");
6670    }
6671
6672    #[test]
6673    fn test_accumulate_empty_into_nonempty() {
6674        let mut acc = ExecResult::success("");
6675        let new = ExecResult::success("hello\n");
6676        accumulate_result(&mut acc, &new);
6677        assert_eq!(&*acc.text_out(), "hello\n");
6678    }
6679
6680    #[test]
6681    fn test_accumulate_nonempty_into_empty() {
6682        let mut acc = ExecResult::success("hello\n");
6683        let new = ExecResult::success("");
6684        accumulate_result(&mut acc, &new);
6685        assert_eq!(&*acc.text_out(), "hello\n");
6686    }
6687
6688    #[test]
6689    fn test_accumulate_stderr_no_double_newlines() {
6690        let mut acc = ExecResult::failure(1, "err1\n");
6691        let new = ExecResult::failure(1, "err2\n");
6692        accumulate_result(&mut acc, &new);
6693        assert!(!acc.err.contains("\n\n"), "stderr should not have double newlines: {:?}", acc.err);
6694    }
6695
6696    #[tokio::test]
6697    async fn test_multiple_echo_no_blank_lines() {
6698        let kernel = Kernel::transient().expect("kernel");
6699        let result = kernel
6700            .execute("echo one\necho two\necho three")
6701            .await
6702            .expect("execution failed");
6703        assert!(result.ok());
6704        assert_eq!(&*result.text_out(), "one\ntwo\nthree\n");
6705    }
6706
6707    #[tokio::test]
6708    async fn test_for_loop_no_blank_lines() {
6709        let kernel = Kernel::transient().expect("kernel");
6710        let result = kernel
6711            .execute(r#"for X in a b c; do echo "item: ${X}"; done"#)
6712            .await
6713            .expect("execution failed");
6714        assert!(result.ok());
6715        assert_eq!(&*result.text_out(), "item: a\nitem: b\nitem: c\n");
6716    }
6717
6718    #[tokio::test]
6719    async fn test_for_command_subst_no_blank_lines() {
6720        let kernel = Kernel::transient().expect("kernel");
6721        let result = kernel
6722            .execute(r#"for N in $(seq 1 3); do echo "n=${N}"; done"#)
6723            .await
6724            .expect("execution failed");
6725        assert!(result.ok());
6726        assert_eq!(&*result.text_out(), "n=1\nn=2\nn=3\n");
6727    }
6728
6729    // ------------------------------------------------------------------
6730    // build_args_async: multi-consume flags (jq --arg NAME VALUE pattern)
6731    // ------------------------------------------------------------------
6732
6733    /// Helper: a throwaway schema with one `--pair` param declared as
6734    /// consuming two positionals per occurrence. Modelled after what
6735    /// jq_native will declare for `--arg` / `--argjson`.
6736    fn multi_consume_schema() -> crate::tools::ToolSchema {
6737        use crate::tools::{ParamSchema, ToolSchema};
6738        ToolSchema::new("test", "multi-consume smoke")
6739            .param(
6740                ParamSchema::optional("pair", "array", Value::Null, "name+value pair")
6741                    .consumes(2),
6742            )
6743    }
6744
6745    fn pos(s: &str) -> Arg {
6746        Arg::Positional(Expr::Literal(Value::String(s.to_string())))
6747    }
6748
6749    #[tokio::test]
6750    async fn build_args_multi_consume_single_occurrence() {
6751        let kernel = Kernel::transient().expect("kernel");
6752        let schema = multi_consume_schema();
6753        // Simulates:  test --pair NAME VALUE filter
6754        let args = vec![
6755            Arg::LongFlag("pair".into()),
6756            pos("NAME"),
6757            pos("VALUE"),
6758            pos("filter"),
6759        ];
6760        let built = kernel
6761            .build_args_async(&args, Some(&schema))
6762            .await
6763            .expect("build_args should succeed");
6764
6765        // `--pair` + its two positionals are consumed into named["pair"],
6766        // which becomes an outer array of one inner 2-element array.
6767        let pair = built.named.get("pair").expect("named[pair] missing");
6768        match pair {
6769            Value::Json(serde_json::Value::Array(occurrences)) => {
6770                assert_eq!(occurrences.len(), 1, "expected one occurrence");
6771                match &occurrences[0] {
6772                    serde_json::Value::Array(values) => {
6773                        assert_eq!(values.len(), 2, "pair must have 2 values");
6774                        assert_eq!(values[0], serde_json::Value::String("NAME".into()));
6775                        assert_eq!(values[1], serde_json::Value::String("VALUE".into()));
6776                    }
6777                    other => panic!("expected inner array, got {other:?}"),
6778                }
6779            }
6780            other => panic!("expected Json(Array(...)) for named[pair], got {other:?}"),
6781        }
6782
6783        // The un-consumed positional ("filter") remains in `positional`.
6784        assert_eq!(built.positional.len(), 1);
6785        assert_eq!(built.positional[0], Value::String("filter".into()));
6786    }
6787    #[tokio::test]
6788    async fn build_args_multi_consume_two_occurrences_accumulate() {
6789        let kernel = Kernel::transient().expect("kernel");
6790        let schema = multi_consume_schema();
6791        // Simulates:  test --pair A 1 --pair B 2 filter
6792        let args = vec![
6793            Arg::LongFlag("pair".into()),
6794            pos("A"),
6795            pos("1"),
6796            Arg::LongFlag("pair".into()),
6797            pos("B"),
6798            pos("2"),
6799            pos("filter"),
6800        ];
6801        let built = kernel
6802            .build_args_async(&args, Some(&schema))
6803            .await
6804            .expect("build_args should succeed");
6805
6806        let pair = built.named.get("pair").expect("named[pair] missing");
6807        match pair {
6808            Value::Json(serde_json::Value::Array(occurrences)) => {
6809                assert_eq!(occurrences.len(), 2, "expected two occurrences");
6810                // Preserved in invocation order.
6811                match &occurrences[0] {
6812                    serde_json::Value::Array(values) => {
6813                        assert_eq!(values[0], serde_json::Value::String("A".into()));
6814                        assert_eq!(values[1], serde_json::Value::String("1".into()));
6815                    }
6816                    other => panic!("expected inner array, got {other:?}"),
6817                }
6818                match &occurrences[1] {
6819                    serde_json::Value::Array(values) => {
6820                        assert_eq!(values[0], serde_json::Value::String("B".into()));
6821                        assert_eq!(values[1], serde_json::Value::String("2".into()));
6822                    }
6823                    other => panic!("expected inner array, got {other:?}"),
6824                }
6825            }
6826            other => panic!("expected Json(Array(...)), got {other:?}"),
6827        }
6828    }
6829
6830    // ── undeclared space-form flag under map_positionals (kj --type val) ──
6831    //
6832    // A backend/MCP tool whose schema does NOT declare a flag must not let
6833    // `--flag value` (space form) silently divorce the value: that was a
6834    // privilege-escalation-by-typo against kaijutsu (see docs/issues.md).
6835    // kaish fails loud rather than guessing.
6836
6837    use crate::tools::{ParamSchema, ToolSchema};
6838
6839    /// Backend-style schema (map_positionals) declaring only a `name`
6840    /// positional — `--type` is intentionally undeclared.
6841    fn kj_like_schema() -> ToolSchema {
6842        ToolSchema::new("kj", "incomplete backend schema")
6843            .param(ParamSchema::optional("name", "string", Value::Null, "context name"))
6844            .with_positional_mapping()
6845    }
6846
6847    #[tokio::test]
6848    async fn build_args_undeclared_space_flag_errors_under_map_positionals() {
6849        let kernel = Kernel::transient().expect("kernel");
6850        let schema = kj_like_schema();
6851        // kj context create exp --type explorer
6852        let args = vec![
6853            pos("context"),
6854            pos("create"),
6855            pos("exp"),
6856            Arg::LongFlag("type".into()),
6857            pos("explorer"),
6858        ];
6859        let err = kernel
6860            .build_args_async(&args, Some(&schema))
6861            .await
6862            .expect_err("undeclared --type with a space value must fail loud");
6863        let msg = err.to_string();
6864        assert!(msg.contains("--type"), "message should name the flag: {msg}");
6865        assert!(msg.contains("--type=explorer"), "message should suggest the = form: {msg}");
6866        assert!(msg.contains("kj"), "message should name the tool: {msg}");
6867    }
6868
6869    #[tokio::test]
6870    async fn build_args_declared_space_flag_still_binds() {
6871        let kernel = Kernel::transient().expect("kernel");
6872        // Same tool, but now the schema DECLARES --type as a string param.
6873        let schema = ToolSchema::new("kj", "complete schema")
6874            .param(ParamSchema::optional("name", "string", Value::Null, "context name"))
6875            .param(ParamSchema::optional("type", "string", Value::Null, "role type"))
6876            .with_positional_mapping();
6877        let args = vec![
6878            pos("exp"),
6879            Arg::LongFlag("type".into()),
6880            pos("explorer"),
6881        ];
6882        let built = kernel.build_args_async(&args, Some(&schema)).await.unwrap();
6883        assert_eq!(built.named.get("type"), Some(&Value::String("explorer".into())));
6884    }
6885
6886    #[tokio::test]
6887    async fn build_args_equals_form_binds_for_undeclared_flag() {
6888        let kernel = Kernel::transient().expect("kernel");
6889        let schema = kj_like_schema();
6890        // The unambiguous `=` form must keep working even when undeclared.
6891        let args = vec![
6892            pos("exp"),
6893            Arg::Named { key: "type".into(), value: Expr::Literal(Value::String("explorer".into())) },
6894        ];
6895        let built = kernel.build_args_async(&args, Some(&schema)).await.unwrap();
6896        assert_eq!(built.named.get("type"), Some(&Value::String("explorer".into())));
6897    }
6898
6899    #[tokio::test]
6900    async fn build_args_undeclared_bool_flag_at_end_is_ok() {
6901        let kernel = Kernel::transient().expect("kernel");
6902        let schema = kj_like_schema();
6903        // No positional follows --force → unambiguously a bare flag.
6904        let args = vec![pos("exp"), Arg::LongFlag("force".into())];
6905        let built = kernel.build_args_async(&args, Some(&schema)).await.unwrap();
6906        assert!(built.flags.contains("force"));
6907    }
6908
6909    #[tokio::test]
6910    async fn build_args_undeclared_flag_before_another_flag_is_ok() {
6911        let kernel = Kernel::transient().expect("kernel");
6912        let schema = kj_like_schema();
6913        // --verbose is followed by a flag, not a positional → not ambiguous.
6914        let args = vec![
6915            Arg::LongFlag("verbose".into()),
6916            Arg::Named { key: "name".into(), value: Expr::Literal(Value::String("x".into())) },
6917        ];
6918        let built = kernel.build_args_async(&args, Some(&schema)).await.unwrap();
6919        assert!(built.flags.contains("verbose"));
6920    }
6921
6922    #[tokio::test]
6923    async fn build_args_undeclared_space_flag_ok_for_builtin_schema() {
6924        let kernel = Kernel::transient().expect("kernel");
6925        // Builtins set map_positionals=false; the ambiguity guard must not
6926        // fire there (clap validates their flags separately).
6927        let schema = ToolSchema::new("frobnicate", "builtin-style")
6928            .param(ParamSchema::optional("name", "string", Value::Null, "name"));
6929        let args = vec![Arg::LongFlag("frob".into()), pos("value")];
6930        let built = kernel.build_args_async(&args, Some(&schema)).await.unwrap();
6931        assert!(built.flags.contains("frob"));
6932    }
6933
6934    // ── subcommand-aware binding (select_leaf wired into build_args_async) ──
6935    //
6936    // A tool exposing a subcommand tree binds flags against the *routed leaf's*
6937    // params, not the root's. The subcommand-path positionals stay positional
6938    // (kj re-parses them with its own clap), and a value flag declared only on
6939    // a deep leaf still binds in space form.
6940
6941    /// kj → context (alias ctx) → create{--type value, --force bool}.
6942    /// map_positionals defaults false on every node (builtin/kj style).
6943    fn kj_tree_schema() -> ToolSchema {
6944        ToolSchema::new("kj", "subcommand tool").subcommand(
6945            ToolSchema::new("context", "context ops")
6946                .with_command_aliases(["ctx"])
6947                .subcommand(
6948                    ToolSchema::new("create", "create context")
6949                        .param(ParamSchema::new("type", "string").with_aliases(["t"]))
6950                        .param(ParamSchema::new("force", "bool")),
6951                ),
6952        )
6953    }
6954
6955    #[tokio::test]
6956    async fn build_args_binds_deep_leaf_value_flag_space_form() {
6957        let kernel = Kernel::transient().expect("kernel");
6958        let schema = kj_tree_schema();
6959        // kj context create --type explorer
6960        let args = vec![
6961            pos("context"),
6962            pos("create"),
6963            Arg::LongFlag("type".into()),
6964            pos("explorer"),
6965        ];
6966        let built = kernel.build_args_async(&args, Some(&schema)).await.expect("build_args");
6967        // --type (declared only on the create leaf) binds in space form.
6968        assert_eq!(built.named.get("type"), Some(&Value::String("explorer".into())));
6969        // The subcommand path survives as positionals for kj to re-parse.
6970        let positionals: Vec<&str> = built
6971            .positional
6972            .iter()
6973            .filter_map(|v| if let Value::String(s) = v { Some(s.as_str()) } else { None })
6974            .collect();
6975        assert_eq!(positionals, vec!["context", "create"]);
6976    }
6977
6978    #[tokio::test]
6979    async fn build_args_leaf_bool_flag_does_not_swallow_positional() {
6980        let kernel = Kernel::transient().expect("kernel");
6981        let schema = kj_tree_schema();
6982        // kj context create --force somearg  → --force is a leaf bool flag,
6983        // it must NOT consume `somearg`.
6984        let args = vec![
6985            pos("context"),
6986            pos("create"),
6987            Arg::LongFlag("force".into()),
6988            pos("somearg"),
6989        ];
6990        let built = kernel.build_args_async(&args, Some(&schema)).await.expect("build_args");
6991        assert!(built.flags.contains("force"), "force should be a bare flag");
6992        let positionals: Vec<&str> = built
6993            .positional
6994            .iter()
6995            .filter_map(|v| if let Value::String(s) = v { Some(s.as_str()) } else { None })
6996            .collect();
6997        assert_eq!(positionals, vec!["context", "create", "somearg"]);
6998    }
6999
7000    #[tokio::test]
7001    async fn build_args_alias_routed_leaf_binds_value_flag() {
7002        let kernel = Kernel::transient().expect("kernel");
7003        let schema = kj_tree_schema();
7004        // kj ctx create -t explorer  → command alias + short flag alias.
7005        let args = vec![
7006            pos("ctx"),
7007            pos("create"),
7008            Arg::ShortFlag("t".into()),
7009            pos("explorer"),
7010        ];
7011        let built = kernel.build_args_async(&args, Some(&schema)).await.expect("build_args");
7012        assert_eq!(built.named.get("type"), Some(&Value::String("explorer".into())));
7013    }
7014
7015    #[tokio::test]
7016    async fn build_args_computed_subcommand_selector_fails_loud() {
7017        let kernel = Kernel::transient().expect("kernel");
7018        let schema = kj_tree_schema();
7019        // kj $(echo context) — routing can't see the value; fail loud.
7020        let args = vec![Arg::Positional(Expr::CommandSubst(Box::new(
7021            crate::ast::Pipeline { commands: vec![], background: false },
7022        )))];
7023        let err = kernel
7024            .build_args_async(&args, Some(&schema))
7025            .await
7026            .expect_err("computed subcommand selector must error");
7027        assert!(
7028            err.to_string().contains("subcommand name is required"),
7029            "got: {err}"
7030        );
7031    }
7032
7033    // ── finalize_output: --json rendering vs. owns_output opt-out ───────────
7034
7035    #[test]
7036    fn finalize_output_renders_when_kernel_owns_it() {
7037        use crate::interpreter::{OutputData, OutputFormat};
7038        let r = ExecResult::with_output(OutputData::text("RAW"));
7039        let out = finalize_output(r, Some(OutputFormat::Json), false);
7040        // Kernel renders the typed OutputData → JSON; text is no longer bare.
7041        assert_ne!(out.text_out(), "RAW", "kernel should reformat to JSON");
7042    }
7043
7044    #[test]
7045    fn finalize_output_skips_when_tool_owns_output() {
7046        use crate::interpreter::{OutputData, OutputFormat};
7047        let r = ExecResult::with_output(OutputData::text("RAW"));
7048        let out = finalize_output(r, Some(OutputFormat::Json), true);
7049        // owns_output: the tool already rendered; kernel leaves bytes untouched.
7050        assert_eq!(out.text_out(), "RAW", "owned output must be left as-is");
7051    }
7052
7053    #[test]
7054    fn finalize_output_no_format_is_noop() {
7055        use crate::interpreter::OutputData;
7056        let r = ExecResult::with_output(OutputData::text("RAW"));
7057        let out = finalize_output(r, None, false);
7058        assert_eq!(out.text_out(), "RAW");
7059    }
7060
7061    // ── initial_vars + execute_with_vars + hermetic env ───────────────────
7062
7063    #[tokio::test]
7064    async fn test_initial_vars_set_and_exported() {
7065        let config = KernelConfig::transient()
7066            .with_var("INIT_FOO", Value::String("bar".into()));
7067        let kernel = Kernel::new(config).expect("failed to create kernel");
7068
7069        assert_eq!(
7070            kernel.get_var("INIT_FOO").await,
7071            Some(Value::String("bar".into()))
7072        );
7073        assert!(
7074            kernel.scope.read().await.is_exported("INIT_FOO"),
7075            "initial_vars entries must be marked exported"
7076        );
7077    }
7078
7079    #[tokio::test]
7080    async fn test_execute_with_vars_overlay_visible() {
7081        let kernel = Kernel::transient().expect("failed to create kernel");
7082        let mut overlay = HashMap::new();
7083        overlay.insert("OVERLAY_X".to_string(), Value::String("yes".into()));
7084
7085        let result = kernel
7086            .execute_with_options(r#"echo "${OVERLAY_X}""#, ExecuteOptions::new().with_vars(overlay))
7087            .await
7088            .expect("execute failed");
7089
7090        assert!(result.ok());
7091        assert_eq!(result.text_out().trim(), "yes");
7092    }
7093
7094    #[tokio::test]
7095    async fn test_execute_with_vars_overlay_cleanup() {
7096        let kernel = Kernel::transient().expect("failed to create kernel");
7097        let mut overlay = HashMap::new();
7098        overlay.insert("EPHEMERAL".to_string(), Value::String("transient".into()));
7099
7100        kernel
7101            .execute_with_options("echo ignored", ExecuteOptions::new().with_vars(overlay))
7102            .await
7103            .expect("execute failed");
7104
7105        assert_eq!(kernel.get_var("EPHEMERAL").await, None);
7106        assert!(
7107            !kernel.scope.read().await.is_exported("EPHEMERAL"),
7108            "overlay-only export must be cleared on return"
7109        );
7110    }
7111
7112    #[tokio::test]
7113    async fn test_execute_with_vars_does_not_clobber_existing_export() {
7114        let kernel = Kernel::transient().expect("failed to create kernel");
7115        kernel
7116            .execute("export OUTER=outer")
7117            .await
7118            .expect("export failed");
7119
7120        let mut overlay = HashMap::new();
7121        overlay.insert("OUTER".to_string(), Value::String("inner".into()));
7122        let result = kernel
7123            .execute_with_options(r#"echo "${OUTER}""#, ExecuteOptions::new().with_vars(overlay))
7124            .await
7125            .expect("execute failed");
7126        assert_eq!(result.text_out().trim(), "inner");
7127
7128        assert_eq!(
7129            kernel.get_var("OUTER").await,
7130            Some(Value::String("outer".into())),
7131            "outer value must reappear after pop"
7132        );
7133        assert!(
7134            kernel.scope.read().await.is_exported("OUTER"),
7135            "outer export must survive overlay"
7136        );
7137    }
7138
7139    #[tokio::test]
7140    async fn test_execute_with_vars_inner_assignment_is_local() {
7141        let kernel = Kernel::transient().expect("failed to create kernel");
7142        let mut overlay = HashMap::new();
7143        overlay.insert("LOCAL_FOO".to_string(), Value::String("from-overlay".into()));
7144
7145        // Variable assignment inside a single statement uses set() (innermost
7146        // frame), not set_global() — this matches bash function-local semantics.
7147        // We explicitly use `local FOO=...` style by relying on the pushed
7148        // frame; the assignment in the script body modifies the same frame.
7149        let result = kernel
7150            .execute_with_options(
7151                r#"LOCAL_FOO="reassigned"; echo "${LOCAL_FOO}""#,
7152                ExecuteOptions::new().with_vars(overlay),
7153            )
7154            .await
7155            .expect("execute failed");
7156        assert!(result.ok());
7157
7158        // After the call the frame is popped, so LOCAL_FOO is gone regardless
7159        // of how the script reassigned it.
7160        assert_eq!(kernel.get_var("LOCAL_FOO").await, None);
7161    }
7162
7163    #[tokio::test]
7164    async fn test_external_command_sees_exported_var() {
7165        let kernel = Kernel::transient().expect("failed to create kernel");
7166        let result = kernel
7167            .execute("export EXT_FOO=bar; printenv EXT_FOO")
7168            .await
7169            .expect("execute failed");
7170
7171        assert!(result.ok(), "printenv should succeed: stderr={}", result.err);
7172        assert_eq!(result.text_out().trim(), "bar");
7173    }
7174
7175    #[tokio::test]
7176    async fn test_external_command_does_not_see_unexported_var() {
7177        let kernel = Kernel::transient().expect("failed to create kernel");
7178
7179        // Set without exporting; printenv must not see it (exit code != 0,
7180        // empty stdout per printenv semantics).
7181        let result = kernel
7182            .execute("EXT_BAR=hidden; printenv EXT_BAR")
7183            .await
7184            .expect("execute failed");
7185
7186        assert!(!result.ok(), "printenv should fail when var is unexported");
7187        assert!(
7188            result.text_out().trim().is_empty(),
7189            "no stdout when var is missing, got: {}",
7190            result.text_out()
7191        );
7192    }
7193
7194    #[tokio::test]
7195    async fn test_external_command_does_not_see_os_env() {
7196        // The kernel is hermetic: it never reads std::env::vars() and only
7197        // exports what it has been told to export. Cargo always sets PATH for
7198        // tests, so PATH is reliably present in the OS env — but a transient
7199        // kernel doesn't seed it into initial_vars, so `printenv PATH` from
7200        // inside the kernel must fail.
7201        assert!(
7202            std::env::var_os("PATH").is_some(),
7203            "test precondition: cargo should set PATH"
7204        );
7205
7206        let kernel = Kernel::transient().expect("failed to create kernel");
7207        let result = kernel
7208            .execute("printenv PATH")
7209            .await
7210            .expect("execute failed");
7211
7212        assert!(
7213            !result.ok(),
7214            "printenv PATH must fail in hermetic kernel, got stdout={:?}",
7215            result.text_out()
7216        );
7217        assert!(
7218            result.text_out().trim().is_empty(),
7219            "no PATH in subprocess env, got stdout={:?}",
7220            result.text_out()
7221        );
7222    }
7223
7224    #[tokio::test]
7225    async fn test_execute_with_vars_overlay_reaches_subprocess() {
7226        let kernel = Kernel::transient().expect("failed to create kernel");
7227        let mut overlay = HashMap::new();
7228        overlay.insert("SUB_FOO".to_string(), Value::String("subproc".into()));
7229
7230        let result = kernel
7231            .execute_with_options("printenv SUB_FOO", ExecuteOptions::new().with_vars(overlay))
7232            .await
7233            .expect("execute failed");
7234
7235        assert!(
7236            result.ok(),
7237            "printenv should succeed: code={} stdout={:?} stderr={:?}",
7238            result.code,
7239            result.text_out(),
7240            result.err
7241        );
7242        assert_eq!(result.text_out().trim(), "subproc");
7243    }
7244}