Skip to main content

kaish_kernel/
kernel.rs

1//! The Kernel (核) — the heart of kaish.
2//!
3//! The Kernel owns and coordinates all core components:
4//! - Interpreter state (scope, $?)
5//! - Tool registry (builtins, user tools)
6//! - VFS router (mount points)
7//! - Job manager (background jobs)
8//!
9//! # Architecture
10//!
11//! ```text
12//! ┌────────────────────────────────────────────────────────────┐
13//! │                         Kernel (核)                         │
14//! │  ┌──────────────┐  ┌──────────────┐  ┌──────────────────┐  │
15//! │  │   Scope      │  │ ToolRegistry │  │  VfsRouter       │  │
16//! │  │  (variables) │  │  (builtins,  │  │  (mount points)  │  │
17//! │  │              │  │   user tools)│  │                  │  │
18//! │  └──────────────┘  └──────────────┘  └──────────────────┘  │
19//! │  ┌──────────────────────────────┐  ┌──────────────────┐    │
20//! │  │  JobManager (background)     │  │  ExecResult ($?) │    │
21//! │  └──────────────────────────────┘  └──────────────────┘    │
22//! └────────────────────────────────────────────────────────────┘
23//! ```
24
25use std::collections::HashMap;
26use std::path::PathBuf;
27use std::sync::Arc;
28use std::sync::atomic::{AtomicU64, Ordering};
29use std::time::Duration;
30
31use anyhow::{Context, Result};
32use tokio::sync::RwLock;
33
34/// Monotonic counter assigned to each Kernel at construction time, exposed
35/// via `$$` / `${$}`. Starts at 1; each new Kernel gets the next value.
36/// `Kernel::fork()` inherits the parent's value (matching bash's "subshell
37/// keeps parent's $$" semantics) because forks clone the parent's Scope
38/// rather than calling `set_pid` again.
39///
40/// Deliberately *not* the OS PID — kaish runs as a long-lived MCP server
41/// or embedded inside other binaries (kaijutsu), where the host PID is
42/// meaningless to the script. See
43/// `~/.claude/projects/-home-atobey-src-kaish/memory/lang_dollar_dollar_identifier.md`
44/// for the design rationale.
45static KERNEL_COUNTER: AtomicU64 = AtomicU64::new(1);
46
47use async_trait::async_trait;
48
49use crate::ast::{Arg, Command, Expr, FileTestOp, Stmt, StringPart, TestExpr, ToolDef, Value, BinaryOp};
50pub use kaish_types::ExecuteOptions;
51use crate::backend::{BackendError, KernelBackend};
52use kaish_glob::glob_match;
53use crate::dispatch::{CommandDispatcher, PipelinePosition};
54use crate::interpreter::{apply_output_format, eval_expr, expand_tilde, json_to_value, value_to_bool, value_to_string, ControlFlow, ExecResult, Scope};
55use crate::parser::parse;
56use crate::scheduler::{is_bool_type, schema_param_lookup, select_leaf, stderr_stream, BoundedStream, JobManager, PipelineRunner, StderrReceiver};
57#[cfg(feature = "subprocess")]
58use crate::scheduler::{drain_to_stream, DEFAULT_STREAM_MAX_SIZE};
59use crate::tools::{register_builtins, ExecContext, GlobalFlags, ToolArgs, ToolRegistry};
60#[cfg(feature = "subprocess")]
61use crate::tools::resolve_in_path;
62use crate::validator::{Severity, Validator};
63#[cfg(feature = "localfs")]
64use crate::vfs::LocalFs;
65use crate::vfs::{BuiltinFs, DevFs, JobFs, MemoryFs, VfsRouter};
66use kaish_vfs::ByteBudget;
67#[cfg(all(feature = "localfs", feature = "overlay"))]
68use kaish_vfs::OverlayFs;
69
70/// VFS mount mode determines how the local filesystem is exposed.
71///
72/// Different modes trade off convenience vs. security:
73/// - `Passthrough` gives native path access (best for human REPL use)
74/// - `Sandboxed` restricts access to a subtree (safer for agents)
75/// - `NoLocal` provides complete isolation (tests, pure memory mode)
76#[derive(Debug, Clone)]
77pub enum VfsMountMode {
78    /// LocalFs at "/" — native paths work directly.
79    ///
80    /// Full filesystem access. Use for human-operated REPL sessions where
81    /// native paths like `/home/user/project` should just work.
82    ///
83    /// Mounts:
84    /// - `/` → LocalFs("/")
85    /// - `/v` → MemoryFs (blob storage)
86    #[cfg(feature = "localfs")]
87    Passthrough,
88
89    /// Transparent sandbox — paths look native but access is restricted.
90    ///
91    /// The local filesystem is mounted at its real path (e.g., `/home/user`),
92    /// so `/home/user/src/project` just works. But paths outside the sandbox
93    /// root are not accessible.
94    ///
95    /// **Note:** This only restricts VFS (builtin) operations. External commands
96    /// bypass the sandbox entirely — see [`KernelConfig::allow_external_commands`].
97    ///
98    /// Mounts:
99    /// - `/` → MemoryFs (catches paths outside sandbox)
100    /// - `{root}` → LocalFs(root)  (e.g., `/home/user` → LocalFs)
101    /// - `/tmp` → LocalFs("/tmp")
102    /// - `/dev` → DevFs (synthetic /dev/null, /dev/zero)
103    /// - `/v` → MemoryFs (blob storage)
104    #[cfg(feature = "localfs")]
105    Sandboxed {
106        /// Root path for local filesystem. Defaults to `$HOME`.
107        /// Can be restricted further, e.g., `~/src`.
108        root: Option<PathBuf>,
109    },
110
111    /// No local filesystem. Memory only.
112    ///
113    /// Complete isolation — no access to the host filesystem.
114    /// Useful for tests or pure sandboxed execution.
115    ///
116    /// Output spill is forced to [`SpillMode::Memory`](crate::output_limit::SpillMode::Memory)
117    /// for this mode at kernel construction: with no host filesystem mounted,
118    /// large output must not write a host spill file (`paths::spill_dir()`
119    /// bypasses the VFS). This overrides any explicit `SpillMode::Disk`.
120    ///
121    /// Mounts:
122    /// - `/` → MemoryFs
123    /// - `/tmp` → MemoryFs
124    /// - `/v` → MemoryFs
125    /// - `/dev` → DevFs (synthetic /dev/null, /dev/zero)
126    NoLocal,
127}
128
129#[allow(clippy::derivable_impls)] // native has multiple variants; not derivable cross-feature
130impl Default for VfsMountMode {
131    fn default() -> Self {
132        #[cfg(feature = "localfs")]
133        { VfsMountMode::Sandboxed { root: None } }
134        #[cfg(not(feature = "localfs"))]
135        { VfsMountMode::NoLocal }
136    }
137}
138
139/// Configuration for kernel initialization.
140#[derive(Debug, Clone)]
141pub struct KernelConfig {
142    /// Name of this kernel (for identification).
143    pub name: String,
144
145    /// VFS mount mode — controls how local filesystem is exposed.
146    pub vfs_mode: VfsMountMode,
147
148    /// Initial working directory (VFS path).
149    pub cwd: PathBuf,
150
151    /// Whether to skip pre-execution validation.
152    ///
153    /// When false (default), scripts are validated before execution to catch
154    /// errors early. Set to true to skip validation for performance or to
155    /// allow dynamic/external commands.
156    pub skip_validation: bool,
157
158    /// When true, standalone external commands inherit stdio for real-time output.
159    ///
160    /// Set by script runner and REPL for human-visible output.
161    /// Not set by MCP server (output must be captured for structured responses).
162    pub interactive: bool,
163
164    /// Ignore file configuration for file-walking tools.
165    pub ignore_config: crate::ignore_config::IgnoreConfig,
166
167    /// Output size limit configuration for agent safety.
168    pub output_limit: crate::output_limit::OutputLimitConfig,
169
170    /// Whether external command execution (PATH lookup, `exec`, `spawn`) is allowed.
171    ///
172    /// When `true` (default), commands not found as builtins are resolved via PATH
173    /// and executed as child processes. When `false`, only kaish builtins and
174    /// backend-registered tools are available.
175    ///
176    /// **Security:** External commands bypass the VFS sandbox entirely — they see
177    /// the real filesystem, network, and environment. Set to `false` when running
178    /// untrusted input.
179    pub allow_external_commands: bool,
180
181    /// Enable confirmation latch for dangerous operations (set -o latch).
182    ///
183    /// When enabled, destructive operations like `rm` require nonce confirmation.
184    /// Can also be enabled at runtime with `set -o latch` or via `KAISH_LATCH=1`.
185    pub latch_enabled: bool,
186
187    /// Enable trash-on-delete for rm (set -o trash).
188    ///
189    /// When enabled, small files are moved to freedesktop.org Trash instead of
190    /// being permanently deleted. Can also be enabled at runtime with `set -o trash`
191    /// or via `KAISH_TRASH=1`.
192    pub trash_enabled: bool,
193
194    /// Shared nonce store for cross-request confirmation latch.
195    ///
196    /// When `Some`, the kernel uses this store instead of creating a fresh one.
197    /// This allows nonces issued in one MCP `execute()` call to be validated
198    /// in a subsequent call. When `None` (default), a fresh store is created.
199    pub nonce_store: Option<crate::nonce::NonceStore>,
200
201    /// Variables to populate the root scope with at construction, all marked
202    /// for export to child processes.
203    ///
204    /// The kernel itself is hermetic — it never reads `std::env::vars()` —
205    /// so frontends that want OS-env passthrough (REPL, MCP) populate this
206    /// from `std::env::vars()`. Embedders that want isolation pass nothing
207    /// (or only the keys they curate).
208    pub initial_vars: HashMap<String, Value>,
209
210    /// Default per-request timeout. When `Some`, every `execute_with_options`
211    /// call without an explicit `ExecuteOptions::timeout` uses this duration.
212    /// When elapsed, the kernel cancels the request, kills any external
213    /// children with the configured grace, and returns exit code 124.
214    ///
215    /// `None` means no default timeout — only explicit per-call timeouts apply.
216    pub request_timeout: Option<Duration>,
217
218    /// Grace period between SIGTERM and SIGKILL when killing an external
219    /// child on cancellation or timeout.
220    ///
221    /// Defaults to 2 seconds. Set to `Duration::ZERO` to escalate immediately
222    /// to SIGKILL. Long-shutdown processes (databases, etc.) may need more.
223    pub kill_grace: Duration,
224
225    /// Cap on memory-resident bytes across all kernel-owned `MemoryFs` mounts.
226    ///
227    /// One shared `ByteBudget` (labeled `"vfs-memory"`) is created at kernel
228    /// construction and handed to every `MemoryFs` the kernel builds in
229    /// `setup_vfs` (Passthrough `/v`; Sandboxed `/` and `/v`; NoLocal `/`,
230    /// `/tmp`, `/v`). Writes that would exceed the cap fail loudly with
231    /// `StorageFull` — an in-band error a model reads and adapts to; fail
232    /// loud over quietly eating RAM.
233    ///
234    /// **Why MCP is bounded by default:** each `execute()` call creates a fresh
235    /// kernel (see `server/execute.rs`), so the 64 MiB cap is per-call, not
236    /// per-session. Embedders that know their workload needs more opt out with
237    /// `without_vfs_budget()` or raise the cap with `with_vfs_budget(bytes)` —
238    /// protection on by default, opt out knowingly. All other profiles default
239    /// to `None` (unbounded).
240    ///
241    /// Follows the same pattern as `OutputLimitConfig`: MCP bounded, rest unbounded.
242    pub vfs_budget_bytes: Option<u64>,
243
244    /// Enable copy-on-write overlay mode (opt-in).
245    ///
246    /// When `true`, the primary local filesystem mount is wrapped in an
247    /// `OverlayFs` so writes are virtual — the lower layer is never touched.
248    /// Use `kaish-vfs status/diff/commit/reset` to inspect and manage the
249    /// overlay transaction.
250    ///
251    /// **Passthrough:** `/` becomes `OverlayFs over LocalFs::read_only("/")`.
252    /// **Sandboxed{root}:** the `{root}` mount becomes
253    /// `OverlayFs over LocalFs::read_only(root)`; the `/tmp` and XDG runtime
254    /// mounts stay as real `LocalFs` (real writes escape the transaction —
255    /// see `docs/kaish-overlayfs.md` for the escape-hatch inventory).
256    /// **NoLocal:** incompatible — construction fails loudly (everything is
257    /// already virtual; an overlay adds no value and no lower layer to wrap).
258    /// **with_backend:** incompatible — the embedder controls the VFS; the
259    /// kernel cannot wrap it without bypassing the embedder's semantics.
260    ///
261    /// **Not default-on for MCP:** each `execute()` call gets a fresh kernel,
262    /// making the overlay a per-call transaction — `kaish-vfs commit` must run
263    /// in the same call as the writes, or the transaction is discarded on drop.
264    /// Frontends (REPL, MCP) expose `--overlay` as an explicit opt-in flag.
265    pub overlay: bool,
266}
267
268/// Get the default sandbox root ($HOME).
269#[cfg(feature = "localfs")]
270fn default_sandbox_root() -> PathBuf {
271    std::env::var("HOME")
272        .map(PathBuf::from)
273        .unwrap_or_else(|_| PathBuf::from("/"))
274}
275
276impl Default for KernelConfig {
277    fn default() -> Self {
278        #[cfg(feature = "localfs")]
279        {
280            let home = default_sandbox_root();
281            Self {
282                name: "default".to_string(),
283                vfs_mode: VfsMountMode::Sandboxed { root: None },
284                cwd: home,
285                skip_validation: false,
286                interactive: false,
287                ignore_config: crate::ignore_config::IgnoreConfig::none(),
288                output_limit: crate::output_limit::OutputLimitConfig::none(),
289                allow_external_commands: cfg!(feature = "subprocess"),
290                latch_enabled: std::env::var("KAISH_LATCH").is_ok_and(|v| v == "1"),
291                trash_enabled: std::env::var("KAISH_TRASH").is_ok_and(|v| v == "1"),
292                nonce_store: None,
293                initial_vars: HashMap::new(),
294                request_timeout: None,
295                kill_grace: Duration::from_secs(2),
296                vfs_budget_bytes: None,
297                overlay: false,
298            }
299        }
300        #[cfg(not(feature = "localfs"))]
301        {
302            Self {
303                name: "default".to_string(),
304                vfs_mode: VfsMountMode::NoLocal,
305                cwd: PathBuf::from("/"),
306                skip_validation: false,
307                interactive: false,
308                ignore_config: crate::ignore_config::IgnoreConfig::none(),
309                output_limit: crate::output_limit::OutputLimitConfig::none(),
310                allow_external_commands: false,
311                latch_enabled: false,
312                trash_enabled: false,
313                nonce_store: None,
314                initial_vars: HashMap::new(),
315                request_timeout: None,
316                kill_grace: Duration::from_secs(2),
317                vfs_budget_bytes: None,
318                overlay: false,
319            }
320        }
321    }
322}
323
324impl KernelConfig {
325    /// Create a transient kernel config (sandboxed, for temporary use).
326    #[cfg(feature = "localfs")]
327    pub fn transient() -> Self {
328        let home = default_sandbox_root();
329        Self {
330            name: "transient".to_string(),
331            vfs_mode: VfsMountMode::Sandboxed { root: None },
332            cwd: home,
333            skip_validation: false,
334            interactive: false,
335            ignore_config: crate::ignore_config::IgnoreConfig::none(),
336            output_limit: crate::output_limit::OutputLimitConfig::none(),
337            allow_external_commands: cfg!(feature = "subprocess"),
338            latch_enabled: false,
339            trash_enabled: false,
340            nonce_store: None,
341            initial_vars: HashMap::new(),
342            request_timeout: None,
343            kill_grace: Duration::from_secs(2),
344            vfs_budget_bytes: None,
345            overlay: false,
346        }
347    }
348
349    /// Create a transient kernel config (isolated, no-default-features).
350    #[cfg(not(feature = "localfs"))]
351    pub fn transient() -> Self {
352        Self::isolated()
353    }
354
355    /// Create a kernel config with the given name (sandboxed by default).
356    #[cfg(feature = "localfs")]
357    pub fn named(name: &str) -> Self {
358        let home = default_sandbox_root();
359        Self {
360            name: name.to_string(),
361            vfs_mode: VfsMountMode::Sandboxed { root: None },
362            cwd: home,
363            skip_validation: false,
364            interactive: false,
365            ignore_config: crate::ignore_config::IgnoreConfig::none(),
366            output_limit: crate::output_limit::OutputLimitConfig::none(),
367            allow_external_commands: cfg!(feature = "subprocess"),
368            latch_enabled: false,
369            trash_enabled: false,
370            nonce_store: None,
371            initial_vars: HashMap::new(),
372            request_timeout: None,
373            kill_grace: Duration::from_secs(2),
374            vfs_budget_bytes: None,
375            overlay: false,
376        }
377    }
378
379    /// Create a kernel config with the given name (isolated, no-default-features).
380    #[cfg(not(feature = "localfs"))]
381    pub fn named(name: &str) -> Self {
382        Self {
383            name: name.to_string(),
384            ..Self::isolated()
385        }
386    }
387
388    /// Create a REPL config with passthrough filesystem access.
389    ///
390    /// Native paths like `/home/user/project` work directly.
391    /// The cwd is set to the actual current working directory.
392    #[cfg(feature = "localfs")]
393    pub fn repl() -> Self {
394        let cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("/"));
395        Self {
396            name: "repl".to_string(),
397            vfs_mode: VfsMountMode::Passthrough,
398            cwd,
399            skip_validation: false,
400            interactive: false,
401            ignore_config: crate::ignore_config::IgnoreConfig::none(),
402            output_limit: crate::output_limit::OutputLimitConfig::none(),
403            allow_external_commands: cfg!(feature = "subprocess"),
404            latch_enabled: std::env::var("KAISH_LATCH").is_ok_and(|v| v == "1"),
405            trash_enabled: std::env::var("KAISH_TRASH").is_ok_and(|v| v == "1"),
406            nonce_store: None,
407            initial_vars: HashMap::new(),
408            request_timeout: None,
409            kill_grace: Duration::from_secs(2),
410            vfs_budget_bytes: None,
411            overlay: false,
412        }
413    }
414
415    /// Create an MCP server config with sandboxed filesystem access.
416    ///
417    /// Local filesystem is accessible at its real path (e.g., `/home/user`),
418    /// but sandboxed to `$HOME`. Paths outside the sandbox are not accessible
419    /// through builtins. External commands still access the real filesystem —
420    /// use `.with_allow_external_commands(false)` to block them.
421    ///
422    /// VFS memory is bounded at 64 MiB per `execute()` call by default
423    /// (MCP creates a fresh kernel per call). Raise or remove with
424    /// `with_vfs_budget` / `without_vfs_budget`.
425    #[cfg(feature = "localfs")]
426    pub fn mcp() -> Self {
427        let home = default_sandbox_root();
428        Self {
429            name: "mcp".to_string(),
430            vfs_mode: VfsMountMode::Sandboxed { root: None },
431            cwd: home,
432            skip_validation: false,
433            interactive: false,
434            ignore_config: crate::ignore_config::IgnoreConfig::mcp(),
435            output_limit: crate::output_limit::OutputLimitConfig::mcp(),
436            allow_external_commands: cfg!(feature = "subprocess"),
437            latch_enabled: std::env::var("KAISH_LATCH").is_ok_and(|v| v == "1"),
438            trash_enabled: std::env::var("KAISH_TRASH").is_ok_and(|v| v == "1"),
439            nonce_store: None,
440            initial_vars: HashMap::new(),
441            request_timeout: None,
442            kill_grace: Duration::from_secs(2),
443            vfs_budget_bytes: Some(64 * 1024 * 1024),
444            overlay: false,
445        }
446    }
447
448    /// Create an MCP server config with a custom sandbox root.
449    ///
450    /// Use this to restrict access to a subdirectory like `~/src`.
451    ///
452    /// VFS memory is bounded at 64 MiB per `execute()` call by default.
453    /// Raise or remove with `with_vfs_budget` / `without_vfs_budget`.
454    #[cfg(feature = "localfs")]
455    pub fn mcp_with_root(root: PathBuf) -> Self {
456        Self {
457            name: "mcp".to_string(),
458            vfs_mode: VfsMountMode::Sandboxed { root: Some(root.clone()) },
459            cwd: root,
460            skip_validation: false,
461            interactive: false,
462            ignore_config: crate::ignore_config::IgnoreConfig::mcp(),
463            output_limit: crate::output_limit::OutputLimitConfig::mcp(),
464            allow_external_commands: cfg!(feature = "subprocess"),
465            latch_enabled: std::env::var("KAISH_LATCH").is_ok_and(|v| v == "1"),
466            trash_enabled: std::env::var("KAISH_TRASH").is_ok_and(|v| v == "1"),
467            nonce_store: None,
468            initial_vars: HashMap::new(),
469            request_timeout: None,
470            kill_grace: Duration::from_secs(2),
471            vfs_budget_bytes: Some(64 * 1024 * 1024),
472            overlay: false,
473        }
474    }
475
476    /// Create a config with no local filesystem (memory only).
477    ///
478    /// Complete isolation: no local filesystem and external commands are disabled.
479    /// Useful for tests or pure sandboxed execution.
480    pub fn isolated() -> Self {
481        Self {
482            name: "isolated".to_string(),
483            vfs_mode: VfsMountMode::NoLocal,
484            cwd: PathBuf::from("/"),
485            skip_validation: false,
486            interactive: false,
487            ignore_config: crate::ignore_config::IgnoreConfig::none(),
488            output_limit: crate::output_limit::OutputLimitConfig::none(),
489            allow_external_commands: false,
490            latch_enabled: false,
491            trash_enabled: false,
492            nonce_store: None,
493            initial_vars: HashMap::new(),
494            request_timeout: None,
495            kill_grace: Duration::from_secs(2),
496            vfs_budget_bytes: None,
497            overlay: false,
498        }
499    }
500
501    /// Set the VFS mount mode.
502    pub fn with_vfs_mode(mut self, mode: VfsMountMode) -> Self {
503        self.vfs_mode = mode;
504        self
505    }
506
507    /// Set the initial working directory.
508    pub fn with_cwd(mut self, cwd: PathBuf) -> Self {
509        self.cwd = cwd;
510        self
511    }
512
513    /// Skip pre-execution validation.
514    pub fn with_skip_validation(mut self, skip: bool) -> Self {
515        self.skip_validation = skip;
516        self
517    }
518
519    /// Enable interactive mode (external commands inherit stdio).
520    pub fn with_interactive(mut self, interactive: bool) -> Self {
521        self.interactive = interactive;
522        self
523    }
524
525    /// Set the ignore file configuration.
526    pub fn with_ignore_config(mut self, config: crate::ignore_config::IgnoreConfig) -> Self {
527        self.ignore_config = config;
528        self
529    }
530
531    /// Set the output limit configuration.
532    pub fn with_output_limit(mut self, config: crate::output_limit::OutputLimitConfig) -> Self {
533        self.output_limit = config;
534        self
535    }
536
537    /// Set whether external command execution is allowed.
538    ///
539    /// When `false`, commands not found as builtins produce "command not found"
540    /// instead of searching PATH. The `exec` and `spawn` builtins also return
541    /// errors. Use this to prevent VFS sandbox bypass via external binaries.
542    pub fn with_allow_external_commands(mut self, allow: bool) -> Self {
543        self.allow_external_commands = allow;
544        self
545    }
546
547    /// Enable or disable confirmation latch at startup.
548    pub fn with_latch(mut self, enabled: bool) -> Self {
549        self.latch_enabled = enabled;
550        self
551    }
552
553    /// Enable or disable trash-on-delete at startup.
554    pub fn with_trash(mut self, enabled: bool) -> Self {
555        self.trash_enabled = enabled;
556        self
557    }
558
559    /// Use a shared nonce store for cross-request confirmation latch.
560    ///
561    /// Pass a `NonceStore` that outlives individual kernel instances so nonces
562    /// issued in one MCP `execute()` call can be validated in subsequent calls.
563    pub fn with_nonce_store(mut self, store: crate::nonce::NonceStore) -> Self {
564        self.nonce_store = Some(store);
565        self
566    }
567
568    /// Add a single initial variable; marked exported when the kernel boots.
569    ///
570    /// Repeated calls add (last write wins on key collision).
571    pub fn with_var(mut self, name: impl Into<String>, value: Value) -> Self {
572        self.initial_vars.insert(name.into(), value);
573        self
574    }
575
576    /// Replace the entire initial-vars map. All entries are marked exported.
577    pub fn with_initial_vars(mut self, vars: HashMap<String, Value>) -> Self {
578        self.initial_vars = vars;
579        self
580    }
581
582    /// Extend the initial-vars map with the given entries (last write wins).
583    pub fn with_vars(mut self, vars: HashMap<String, Value>) -> Self {
584        self.initial_vars.extend(vars);
585        self
586    }
587
588    /// Set the default per-request timeout (kernel-wide).
589    ///
590    /// Each `execute_with_options` call without an explicit timeout uses
591    /// this. On elapsed, the kernel cancels and returns exit code 124.
592    pub fn with_request_timeout(mut self, timeout: Duration) -> Self {
593        self.request_timeout = Some(timeout);
594        self
595    }
596
597    /// Set the SIGTERM-to-SIGKILL grace period for child kills.
598    pub fn with_kill_grace(mut self, grace: Duration) -> Self {
599        self.kill_grace = grace;
600        self
601    }
602
603    /// Cap VFS memory-resident bytes at `bytes` across all kernel-owned
604    /// `MemoryFs` mounts. A shared `ByteBudget` labeled `"vfs-memory"` is
605    /// created at kernel construction and passed to every `MemoryFs` the
606    /// kernel builds (see `setup_vfs` and `with_backend`).
607    ///
608    /// Writes that would exceed the cap fail loudly with `StorageFull` — an
609    /// in-band error a model reads and adapts to; fail loud over quietly eating
610    /// RAM. Use `without_vfs_budget` to remove the cap entirely.
611    pub fn with_vfs_budget(mut self, bytes: u64) -> Self {
612        self.vfs_budget_bytes = Some(bytes);
613        self
614    }
615
616    /// Remove the VFS memory budget — all `MemoryFs` mounts are unbounded.
617    ///
618    /// Use when the caller knows the workload and the default 64 MiB cap
619    /// (set by `KernelConfig::mcp`) is too conservative.
620    pub fn without_vfs_budget(mut self) -> Self {
621        self.vfs_budget_bytes = None;
622        self
623    }
624
625    /// Enable or disable copy-on-write overlay mode.
626    ///
627    /// When `true`, the primary local filesystem mount is wrapped in an
628    /// `OverlayFs` so writes are virtual — the lower layer is never touched.
629    /// Incompatible with `VfsMountMode::NoLocal` (fails loudly at construction)
630    /// and `with_backend` kernels (same — the embedder controls the VFS).
631    pub fn with_overlay(mut self, overlay: bool) -> Self {
632        self.overlay = overlay;
633        self
634    }
635}
636
637/// Handle to an active overlay session, kept on the kernel and shared to
638/// `ExecContext` so the `kaish-vfs` builtin can reach the `OverlayFs`.
639///
640/// The `mount_path` is the VFS prefix the overlay was mounted under (e.g.
641/// `/home/user`); `commit_root` is the real filesystem path the overlay's
642/// lower is backed by (used as the target for `kaish-vfs commit`).
643#[cfg(all(feature = "localfs", feature = "overlay"))]
644#[derive(Clone)]
645pub struct OverlayHandle {
646    /// The mounted `OverlayFs`, Arc-shared so the builtin can call inspection
647    /// methods without holding a VfsRouter lock.
648    pub fs: Arc<OverlayFs>,
649    /// VFS path this overlay is mounted at (e.g. `/home/user`).
650    pub mount_path: PathBuf,
651    /// Real filesystem root to commit into. Same as the lower's root.
652    pub commit_root: PathBuf,
653}
654
655/// The Kernel (核) — executes kaish code.
656///
657/// This is the primary interface for running kaish commands. It owns all
658/// the runtime state: variables, tools, VFS, jobs, and persistence.
659pub struct Kernel {
660    /// Kernel name.
661    name: String,
662    /// Variable scope.
663    scope: RwLock<Scope>,
664    /// Tool registry.
665    tools: Arc<ToolRegistry>,
666    /// User-defined tools (from `tool name { body }` statements).
667    user_tools: RwLock<HashMap<String, ToolDef>>,
668    /// Virtual filesystem router.
669    vfs: Arc<VfsRouter>,
670    /// Background job manager.
671    jobs: Arc<JobManager>,
672    /// Pipeline runner.
673    runner: PipelineRunner,
674    /// Execution context (cwd, stdin, etc.).
675    exec_ctx: RwLock<ExecContext>,
676    /// Whether to skip pre-execution validation.
677    skip_validation: bool,
678    /// When true, standalone external commands inherit stdio for real-time output.
679    interactive: bool,
680    /// Whether external command execution is allowed.
681    allow_external_commands: bool,
682    /// Shared memory budget for all kernel-owned `MemoryFs` mounts.
683    ///
684    /// `None` when `KernelConfig::vfs_budget_bytes` was `None` (unbounded).
685    /// `Some` is Arc-cloned into forks so all concurrent execution draws from
686    /// the same pool — a background job's writes reduce the same cap as
687    /// foreground writes, which is the correct behaviour.
688    vfs_budget: Option<Arc<kaish_vfs::ByteBudget>>,
689    /// Active overlay session handle, if this kernel was constructed with
690    /// `overlay: true`. Arc-shared so `ExecContext` (and thus the
691    /// `kaish-vfs` builtin) can inspect and mutate the overlay without
692    /// holding a kernel write lock. Propagated to forks via `fork_inner`
693    /// and `child_for_pipeline` so `kaish-vfs` works inside background
694    /// jobs, scatter workers, and pipeline stages.
695    #[cfg(all(feature = "localfs", feature = "overlay"))]
696    overlay_handle: Option<Arc<OverlayHandle>>,
697    /// Default per-request timeout (None = no default).
698    request_timeout: Option<Duration>,
699    /// SIGTERM-to-SIGKILL grace period for child kills.
700    kill_grace: Duration,
701    /// Receiver for the kernel stderr stream.
702    ///
703    /// Pipeline stages write to the corresponding `StderrStream` (set on ExecContext).
704    /// The kernel drains this after each statement in `execute_streaming`.
705    stderr_receiver: tokio::sync::Mutex<StderrReceiver>,
706    /// Cancellation token for interrupting execution (Ctrl-C).
707    ///
708    /// Protected by `std::sync::Mutex` (not tokio) because the SIGINT handler
709    /// needs sync access. Each `execute()` call gets a fresh child token;
710    /// `cancel()` cancels the current token and replaces it.
711    cancel_token: std::sync::Mutex<tokio_util::sync::CancellationToken>,
712    /// Terminal state for job control (interactive mode only, Unix only).
713    #[cfg(all(unix, feature = "subprocess"))]
714    terminal_state: Option<Arc<crate::terminal::TerminalState>>,
715    /// Weak self-reference for handing out `Arc<dyn CommandDispatcher>`.
716    ///
717    /// Set by `into_arc()`. Allows builtins to re-dispatch inner commands
718    /// through the full Kernel resolution chain.
719    self_weak: std::sync::OnceLock<std::sync::Weak<Self>>,
720    /// Background job this kernel (a fork) is executing on behalf of, if any.
721    /// Set on the fork created by `execute_background` and inherited by all its
722    /// sub-forks (pipeline stages, scatter workers), so an external command
723    /// spawned anywhere under a background job can record its process group on
724    /// that job for `kill -<sig> %N`. `None` for foreground execution.
725    bg_job_id: Option<crate::scheduler::JobId>,
726    /// Serializes concurrent `execute()` / `execute_streaming()` callers on
727    /// this Kernel instance. Tokio's Mutex is fair (FIFO) and acts as the
728    /// queue. Background jobs, scatter workers, and concurrent pipeline
729    /// stages do NOT take this lock — they run against a *forked* Kernel
730    /// (see [`Kernel::fork`]) so they never contend with the foreground.
731    execute_lock: tokio::sync::Mutex<()>,
732}
733
734/// Internal result of [`Kernel::setup_vfs`].
735struct VfsSetupResult {
736    vfs: VfsRouter,
737    budget: Option<Arc<ByteBudget>>,
738    #[cfg(all(feature = "localfs", feature = "overlay"))]
739    overlay_handle: Option<Arc<OverlayHandle>>,
740}
741
742impl Kernel {
743    /// Create a new kernel with the given configuration.
744    pub fn new(config: KernelConfig) -> Result<Self> {
745        let mut setup = Self::setup_vfs(&config)?;
746        let jobs = Arc::new(JobManager::new());
747
748        // Mount JobFs for job observability at /v/jobs
749        setup.vfs.mount("/v/jobs", JobFs::new(jobs.clone()));
750
751        #[cfg(all(feature = "localfs", feature = "overlay"))]
752        let overlay_handle = setup.overlay_handle.take();
753
754        // Mode-based construction: the kernel owns its host mounts, so whether
755        // host side channels are allowed is decided by the VFS mode inside
756        // `assemble` (NoLocal forbids them).
757        let kernel = Self::assemble(config, setup.vfs, jobs, false, setup.budget, |_| {}, |vfs_ref, tools| {
758            ExecContext::with_vfs_and_tools(vfs_ref.clone(), tools.clone())
759        })?;
760
761        #[cfg(all(feature = "localfs", feature = "overlay"))]
762        {
763            let mut kernel = kernel;
764            kernel.overlay_handle = overlay_handle;
765            // Also set it on the ExecContext so builtins can access it.
766            if let Some(ref handle) = kernel.overlay_handle {
767                kernel.exec_ctx.get_mut().overlay_handle = Some(Arc::clone(handle));
768            }
769            return Ok(kernel);
770        }
771
772        #[allow(unreachable_code)]
773        Ok(kernel)
774    }
775
776    /// Set up VFS based on mount mode.
777    ///
778    /// Returns the router, the budget handle (if bounded), and an optional
779    /// overlay handle when `config.overlay` is true. The budget is Arc-shared:
780    /// every `MemoryFs` the kernel creates here holds a clone of the same
781    /// `Arc<ByteBudget>`, so the total charged against it is the sum of all
782    /// in-memory content across all kernel-owned memory mounts.
783    ///
784    /// # Errors
785    /// Returns `Err` if `config.overlay` is true and the mode is `NoLocal`
786    /// (overlay is meaningless when everything is already virtual — there is
787    /// no real lower layer to wrap). The caller (`Kernel::new`) propagates
788    /// this as an `anyhow::Error`.
789    fn setup_vfs(config: &KernelConfig) -> Result<VfsSetupResult> {
790        let mut vfs = VfsRouter::new();
791
792        // One budget for all memory mounts this kernel owns — labeled so the
793        // error message tells the user exactly which knob to raise.
794        let budget: Option<Arc<ByteBudget>> = config
795            .vfs_budget_bytes
796            .map(|bytes| Arc::new(ByteBudget::labeled(bytes, "vfs-memory")));
797
798        /// Helper: construct a `MemoryFs` wired to `budget` if present.
799        fn mem(budget: &Option<Arc<ByteBudget>>) -> MemoryFs {
800            match budget {
801                Some(b) => MemoryFs::with_budget(Arc::clone(b)),
802                None => MemoryFs::new(),
803            }
804        }
805
806        // Overlay handle — populated below if config.overlay is true.
807        #[cfg(all(feature = "localfs", feature = "overlay"))]
808        let mut overlay_handle: Option<Arc<OverlayHandle>> = None;
809
810        match &config.vfs_mode {
811            #[cfg(feature = "localfs")]
812            VfsMountMode::Passthrough => {
813                #[cfg(feature = "overlay")]
814                if config.overlay {
815                    // Wrap "/" in an OverlayFs so writes are virtual.
816                    let lower = Arc::new(LocalFs::read_only(PathBuf::from("/")));
817                    let overlay_fs = Arc::new(match &budget {
818                        Some(b) => OverlayFs::over_with_budget(lower, Arc::clone(b)),
819                        None => OverlayFs::over(lower),
820                    });
821                    let handle = Arc::new(OverlayHandle {
822                        fs: Arc::clone(&overlay_fs),
823                        mount_path: PathBuf::from("/"),
824                        commit_root: PathBuf::from("/"),
825                    });
826                    vfs.mount_arc("/", overlay_fs as Arc<dyn kaish_vfs::Filesystem>);
827                    overlay_handle = Some(handle);
828                } else {
829                    // LocalFs at "/" — native paths work directly
830                    vfs.mount("/", LocalFs::new(PathBuf::from("/")));
831                }
832                #[cfg(not(feature = "overlay"))]
833                {
834                    if config.overlay {
835                        return Err(anyhow::anyhow!(
836                            "overlay=true requires the `overlay` feature, but this build \
837                             was compiled without it. Recompile with --features overlay \
838                             (or the default feature set) to enable overlay mode."
839                        ));
840                    }
841                    // LocalFs at "/" — native paths work directly
842                    vfs.mount("/", LocalFs::new(PathBuf::from("/")));
843                }
844                // Memory for blobs
845                vfs.mount("/v", mem(&budget));
846            }
847            #[cfg(feature = "localfs")]
848            VfsMountMode::Sandboxed { root } => {
849                // Memory at root for safety (catches paths outside sandbox).
850                // Note: /tmp and the XDG runtime dir are LocalFs — writes
851                // there escape the VFS budget and are NOT virtual. This is
852                // intentional: /tmp interop with other processes matters more
853                // than accounting for scratch files there.
854                vfs.mount("/", mem(&budget));
855                vfs.mount("/v", mem(&budget));
856
857                // Synthetic /dev: the host's real /dev isn't reachable here, so
858                // /dev/null and /dev/zero are software-backed (see DevFs).
859                vfs.mount("/dev", DevFs::new());
860
861                // Real /tmp for interop with other processes
862                vfs.mount("/tmp", LocalFs::new(PathBuf::from("/tmp")));
863
864                // Mount XDG runtime dir for spill files and socket access
865                let runtime = crate::paths::xdg_runtime_dir();
866                if runtime.exists() {
867                    let runtime_str = runtime.to_string_lossy().to_string();
868                    vfs.mount(&runtime_str, LocalFs::new(runtime));
869                }
870
871                // Resolve the sandbox root (defaults to $HOME)
872                let local_root = root.clone().unwrap_or_else(|| {
873                    std::env::var("HOME")
874                        .map(PathBuf::from)
875                        .unwrap_or_else(|_| PathBuf::from("/"))
876                });
877
878                let mount_point = local_root.to_string_lossy().to_string();
879
880                #[cfg(feature = "overlay")]
881                if config.overlay {
882                    // Wrap the sandbox root in an OverlayFs.
883                    let lower = Arc::new(LocalFs::read_only(local_root.clone()));
884                    let overlay_fs = Arc::new(match &budget {
885                        Some(b) => OverlayFs::over_with_budget(lower, Arc::clone(b)),
886                        None => OverlayFs::over(lower),
887                    });
888                    let handle = Arc::new(OverlayHandle {
889                        fs: Arc::clone(&overlay_fs),
890                        mount_path: PathBuf::from(&mount_point),
891                        commit_root: local_root,
892                    });
893                    vfs.mount_arc(&mount_point, overlay_fs as Arc<dyn kaish_vfs::Filesystem>);
894                    overlay_handle = Some(handle);
895                } else {
896                    // Mount at the real path for transparent access
897                    // e.g., /home/atobey → LocalFs("/home/atobey")
898                    // so /home/atobey/src/kaish just works
899                    vfs.mount(&mount_point, LocalFs::new(local_root));
900                }
901                #[cfg(not(feature = "overlay"))]
902                {
903                    if config.overlay {
904                        return Err(anyhow::anyhow!(
905                            "overlay=true requires the `overlay` feature, but this build \
906                             was compiled without it. Recompile with --features overlay \
907                             (or the default feature set) to enable overlay mode."
908                        ));
909                    }
910                    // Mount at the real path for transparent access
911                    vfs.mount(&mount_point, LocalFs::new(local_root));
912                }
913            }
914            VfsMountMode::NoLocal => {
915                if config.overlay {
916                    return Err(anyhow::anyhow!(
917                        "overlay=true is incompatible with VfsMountMode::NoLocal: \
918                         everything is already virtual, there is no real lower layer \
919                         to wrap. Use with_overlay(false) or switch to a Passthrough \
920                         or Sandboxed VFS mode."
921                    ));
922                }
923                // Pure memory mode — no local filesystem
924                vfs.mount("/", mem(&budget));
925                vfs.mount("/tmp", mem(&budget));
926                vfs.mount("/v", mem(&budget));
927                // Synthetic /dev so /dev/null and /dev/zero work hermetically.
928                vfs.mount("/dev", DevFs::new());
929            }
930        }
931
932        Ok(VfsSetupResult {
933            vfs,
934            budget,
935            #[cfg(all(feature = "localfs", feature = "overlay"))]
936            overlay_handle,
937        })
938    }
939
940    /// Create a transient kernel (no persistence).
941    pub fn transient() -> Result<Self> {
942        Self::new(KernelConfig::transient())
943    }
944
945    /// Create a kernel with a custom backend and `/v/*` virtual path support.
946    ///
947    /// This is the constructor for embedding kaish in other systems that provide
948    /// their own storage backend (e.g., CRDT-backed storage in kaijutsu).
949    ///
950    /// A `VirtualOverlayBackend` routes paths automatically:
951    /// - `/v/*` → Internal VFS (JobFs at `/v/jobs`, MemoryFs at `/v/blobs`)
952    /// - Everything else → Your custom backend
953    ///
954    /// The optional `configure_vfs` closure lets you add additional virtual mounts
955    /// (e.g., `/v/docs` for CRDT blocks) after the built-in mounts are set up.
956    ///
957    /// **Note:** The config's `vfs_mode` is ignored — all non-`/v/*` path routing
958    /// is handled by your custom backend. The config is only used for `name`, `cwd`,
959    /// `skip_validation`, and `interactive`.
960    ///
961    /// # Example
962    ///
963    /// ```ignore
964    /// // Simple: default /v/* mounts only
965    /// let kernel = Kernel::with_backend(backend, config, |_| {}, |_| {})?;
966    ///
967    /// // With custom mounts
968    /// let kernel = Kernel::with_backend(backend, config, |vfs| {
969    ///     vfs.mount_arc("/v/docs", docs_fs);
970    ///     vfs.mount_arc("/v/g", git_fs);
971    /// }, |_| {})?;
972    ///
973    /// // With custom tools
974    /// let kernel = Kernel::with_backend(backend, config, |_| {}, |tools| {
975    ///     tools.register(MyCustomTool::new());
976    /// })?;
977    /// ```
978    pub fn with_backend(
979        backend: Arc<dyn KernelBackend>,
980        config: KernelConfig,
981        configure_vfs: impl FnOnce(&mut VfsRouter),
982        configure_tools: impl FnOnce(&mut ToolRegistry),
983    ) -> Result<Self> {
984        use crate::backend::VirtualOverlayBackend;
985
986        // overlay=true is incompatible with with_backend: the embedder controls
987        // the VFS and the kernel cannot wrap it without bypassing the embedder's
988        // semantics. Fail loudly rather than silently ignoring the flag.
989        if config.overlay {
990            return Err(anyhow::anyhow!(
991                "overlay=true is incompatible with Kernel::with_backend: the embedder \
992                 controls the VFS; the kernel cannot wrap it with an OverlayFs without \
993                 bypassing the embedder's storage semantics. Use KernelConfig::with_overlay(false)."
994            ));
995        }
996
997        let mut vfs = VfsRouter::new();
998        let jobs = Arc::new(JobManager::new());
999
1000        // Create the budget from config so `with_vfs_budget` / `without_vfs_budget`
1001        // work for `with_backend` callers too. The /v/blobs MemoryFs is the only
1002        // kernel-owned memory mount here — embedders own the rest of the VFS.
1003        let vfs_budget: Option<Arc<ByteBudget>> = config
1004            .vfs_budget_bytes
1005            .map(|bytes| Arc::new(ByteBudget::labeled(bytes, "vfs-memory")));
1006
1007        vfs.mount("/v/jobs", JobFs::new(jobs.clone()));
1008        let blobs_fs = match &vfs_budget {
1009            Some(b) => MemoryFs::with_budget(Arc::clone(b)),
1010            None => MemoryFs::new(),
1011        };
1012        vfs.mount("/v/blobs", blobs_fs);
1013
1014        // Let caller add custom mounts (e.g., /v/docs, /v/g)
1015        configure_vfs(&mut vfs);
1016
1017        // A custom-backend kernel owns no host mounts — the embedder supplies
1018        // the entire VFS — so any kernel write to a host filesystem via
1019        // `std::fs` (output spill, job output files) bypasses that VFS and its
1020        // read-only guarantees. Forbid host side channels unconditionally.
1021        Self::assemble(config, vfs, jobs, true, vfs_budget, configure_tools, |vfs_arc: &Arc<VfsRouter>, _: &Arc<ToolRegistry>| {
1022            let overlay: Arc<dyn KernelBackend> =
1023                Arc::new(VirtualOverlayBackend::new(backend, vfs_arc.clone()));
1024            ExecContext::with_backend(overlay)
1025        })
1026    }
1027
1028    /// Shared assembly: wires up tools, runner, scope, and ExecContext.
1029    ///
1030    /// The `make_ctx` closure receives the VFS and tools so backends that need
1031    /// them (like `LocalBackend::with_tools`) can capture them. Custom backends
1032    /// that already have their own storage can ignore these parameters.
1033    fn assemble(
1034        config: KernelConfig,
1035        mut vfs: VfsRouter,
1036        jobs: Arc<JobManager>,
1037        no_host_filesystem: bool,
1038        vfs_budget: Option<Arc<ByteBudget>>,
1039        configure_tools: impl FnOnce(&mut ToolRegistry),
1040        make_ctx: impl FnOnce(&Arc<VfsRouter>, &Arc<ToolRegistry>) -> ExecContext,
1041    ) -> Result<Self> {
1042        // A kernel with no host filesystem of its own must never write to one
1043        // through a side channel. Two paths bypass the VFS by going straight to
1044        // `std::fs`: output spill (`paths::spill_dir()` → host temp/cache) and
1045        // background-job output files (`Job::write_output_file` → host temp).
1046        // Both would punch through the isolation, so force them off:
1047        // in-memory truncation for spill, no host file for job output.
1048        //
1049        // This is true for a `NoLocal` kernel (mounts nothing) and for any
1050        // `with_backend` kernel (`no_host_filesystem` — the embedder owns the
1051        // VFS, so the kernel controls no host mounts and any host write is a
1052        // bypass). Overrides an explicit `SpillMode::Disk`, which is nonsensical
1053        // when there is no kernel-owned host filesystem to spill to.
1054        let no_host_side_channel =
1055            no_host_filesystem || matches!(config.vfs_mode, VfsMountMode::NoLocal);
1056
1057        let KernelConfig { name, cwd, skip_validation, interactive, ignore_config, mut output_limit, allow_external_commands, latch_enabled, trash_enabled, nonce_store, initial_vars, request_timeout, kill_grace, .. } = config;
1058
1059        if no_host_side_channel {
1060            output_limit.set_spill_mode(crate::output_limit::SpillMode::Memory);
1061            jobs.set_persist_output_files(false);
1062        }
1063
1064        let mut tools = ToolRegistry::new();
1065        register_builtins(&mut tools);
1066        configure_tools(&mut tools);
1067        let tools = Arc::new(tools);
1068
1069        // Mount BuiltinFs so `ls /v/bin` lists builtins
1070        vfs.mount("/v/bin", BuiltinFs::new(tools.clone()));
1071
1072        let vfs = Arc::new(vfs);
1073
1074        let runner = PipelineRunner::new(tools.clone());
1075
1076        let (stderr_writer, stderr_receiver) = stderr_stream();
1077
1078        let mut exec_ctx = make_ctx(&vfs, &tools);
1079        exec_ctx.set_cwd(cwd);
1080        exec_ctx.set_job_manager(jobs.clone());
1081        exec_ctx.set_tool_schemas(tools.schemas());
1082        exec_ctx.set_tools(tools.clone());
1083        #[cfg(feature = "os-integration")]
1084        exec_ctx.set_trash_backend(Arc::new(crate::trash_system::SystemTrash));
1085        exec_ctx.stderr = Some(stderr_writer);
1086        exec_ctx.ignore_config = ignore_config;
1087        exec_ctx.output_limit = output_limit;
1088        exec_ctx.allow_external_commands = allow_external_commands;
1089        exec_ctx.vfs_budget = vfs_budget.clone();
1090        if let Some(store) = nonce_store {
1091            exec_ctx.nonce_store = store;
1092        }
1093
1094        Ok(Self {
1095            name,
1096            scope: RwLock::new({
1097                let mut scope = Scope::new();
1098                scope.set_pid(KERNEL_COUNTER.fetch_add(1, Ordering::Relaxed));
1099                // HOME is NOT read from the host env here — the kernel is
1100                // hermetic. Frontends (REPL, MCP) seed it via `initial_vars`
1101                // below (from `std::env::vars()`); a hermetic embedder leaves
1102                // `initial_vars` empty and gets no HOME (tilde stays literal).
1103                // Apply caller-supplied initial variables, all marked exported.
1104                // Frontends (REPL, MCP) populate this from std::env::vars()
1105                // for shell-like UX; embedders that want hermetic behavior
1106                // simply leave it empty.
1107                for (name, value) in initial_vars {
1108                    scope.set_exported(name, value);
1109                }
1110                scope.set_latch_enabled(latch_enabled);
1111                scope.set_trash_enabled(trash_enabled);
1112                scope
1113            }),
1114            tools,
1115            user_tools: RwLock::new(HashMap::new()),
1116            vfs,
1117            jobs,
1118            runner,
1119            exec_ctx: RwLock::new(exec_ctx),
1120            skip_validation,
1121            interactive,
1122            allow_external_commands,
1123            vfs_budget,
1124            request_timeout,
1125            kill_grace,
1126            stderr_receiver: tokio::sync::Mutex::new(stderr_receiver),
1127            cancel_token: std::sync::Mutex::new(tokio_util::sync::CancellationToken::new()),
1128            #[cfg(all(unix, feature = "subprocess"))]
1129            terminal_state: None,
1130            self_weak: std::sync::OnceLock::new(),
1131            execute_lock: tokio::sync::Mutex::new(()),
1132            bg_job_id: None,
1133            // Overlay handle is set by Kernel::new after assemble returns;
1134            // assemble itself doesn't know the handle (it's constructed in setup_vfs).
1135            // with_backend always has None (overlay=true is rejected above).
1136            #[cfg(all(feature = "localfs", feature = "overlay"))]
1137            overlay_handle: None,
1138        })
1139    }
1140
1141    /// Get the kernel name.
1142    pub fn name(&self) -> &str {
1143        &self.name
1144    }
1145
1146    /// Wrap this Kernel in an Arc and initialize its self-reference.
1147    ///
1148    /// This enables the Kernel to hand out `Arc<dyn CommandDispatcher>` references
1149    /// to child contexts, allowing builtins like `timeout` to dispatch inner
1150    /// commands through the full resolution chain (user tools → builtins →
1151    /// .kai scripts → external commands).
1152    pub fn into_arc(self) -> Arc<Self> {
1153        let arc = Arc::new(self);
1154        let _ = arc.self_weak.set(Arc::downgrade(&arc));
1155        arc
1156    }
1157
1158    /// Fork a subsidiary kernel for concurrent execution.
1159    ///
1160    /// The fork is a fully-functional `Kernel` that:
1161    /// - **Snapshots** per-session state from the parent: scope (COW — cheap),
1162    ///   user-defined tools, cwd, aliases, ignore config, etc. Mutations on
1163    ///   the fork do NOT propagate back to the parent — matching bash
1164    ///   subshell / background-job semantics.
1165    /// - **Shares** read-mostly resources with the parent via `Arc`: the tool
1166    ///   registry, the VFS router, and the job manager. A job registered by
1167    ///   the fork is visible to the parent's `jobs` builtin, and the fork
1168    ///   sees the same VFS mounts.
1169    /// - **Owns** its own `stderr_receiver`, `cancel_token`, and
1170    ///   `execute_lock`. It is never the TTY owner, so `interactive` is
1171    ///   `false` and `terminal_state` is `None`.
1172    ///
1173    /// The returned Arc has its `self_weak` populated (via `into_arc`), so
1174    /// nested dispatch through `ctx.dispatcher` (e.g. the `timeout` builtin)
1175    /// routes through the fork itself, not the parent — which is essential
1176    /// for concurrency safety.
1177    ///
1178    /// Use this for **detached** background concurrency where the fork should
1179    /// survive parent cancellation: the `&` background-job operator and any
1180    /// other "fire and forget" worker. The fork gets a fresh, independent
1181    /// cancellation token.
1182    ///
1183    /// For foreground concurrency (scatter workers, concurrent pipeline
1184    /// stages, `$(...)` cmdsubs) where parent timeout/cancel must cascade
1185    /// into the fork's external children, use [`Self::fork_attached`].
1186    pub async fn fork(&self) -> Arc<Self> {
1187        self.fork_inner(tokio_util::sync::CancellationToken::new(), self.bg_job_id)
1188            .await
1189    }
1190
1191    /// Fork attached to the parent's cancellation.
1192    ///
1193    /// Same as [`Self::fork`] but the fork's `cancel_token` is a child of
1194    /// the parent's. When the parent cancels (request timeout, embedder
1195    /// `Kernel::cancel`, etc.), the fork's token also cancels, which in
1196    /// turn kills any external children spawned in the fork via the
1197    /// `wait_or_kill` / SIGTERM-grace-SIGKILL path.
1198    pub async fn fork_attached(&self) -> Arc<Self> {
1199        let child_token = {
1200            #[allow(clippy::expect_used)]
1201            let parent = self.cancel_token.lock().expect("cancel_token poisoned");
1202            parent.child_token()
1203        };
1204        self.fork_inner(child_token, self.bg_job_id).await
1205    }
1206
1207    /// Fork for a background job, stamping the job id so external commands
1208    /// spawned anywhere beneath it record their process groups on that job
1209    /// (for `kill -<sig> %N`). The caller owns `cancel` so it can also drive
1210    /// `JobManager::cancel`.
1211    pub async fn fork_for_background(
1212        &self,
1213        cancel: tokio_util::sync::CancellationToken,
1214        job_id: crate::scheduler::JobId,
1215    ) -> Arc<Self> {
1216        self.fork_inner(cancel, Some(job_id)).await
1217    }
1218
1219    /// Shared fork implementation. Caller decides the cancellation token and
1220    /// which background job (if any) this fork runs on behalf of.
1221    async fn fork_inner(
1222        &self,
1223        cancel: tokio_util::sync::CancellationToken,
1224        bg_job_id: Option<crate::scheduler::JobId>,
1225    ) -> Arc<Self> {
1226        let scope_snapshot = self.scope.read().await.clone();
1227        let user_tools_snapshot = self.user_tools.read().await.clone();
1228
1229        // Snapshot exec_ctx by cloning the cloneable fields, then override
1230        // the ones that should not carry over (stderr channel, dispatcher,
1231        // interactive flag, terminal state, cancel — set from `cancel` arg).
1232        let mut fork_ctx = {
1233            let parent_ctx = self.exec_ctx.read().await;
1234            parent_ctx.child_for_pipeline()
1235        };
1236        let (stderr_writer, stderr_receiver) = stderr_stream();
1237        fork_ctx.stderr = Some(stderr_writer);
1238        // Clear dispatcher; dispatch_command will repopulate it to point at
1239        // the fork on the first dispatch call.
1240        fork_ctx.dispatcher = None;
1241        fork_ctx.interactive = false;
1242        fork_ctx.cancel = cancel.clone();
1243        #[cfg(all(unix, feature = "subprocess"))]
1244        {
1245            fork_ctx.terminal_state = None;
1246        }
1247
1248        let fork = Self {
1249            name: format!("{}:fork", self.name),
1250            scope: RwLock::new(scope_snapshot),
1251            tools: Arc::clone(&self.tools),
1252            user_tools: RwLock::new(user_tools_snapshot),
1253            vfs: Arc::clone(&self.vfs),
1254            jobs: Arc::clone(&self.jobs),
1255            runner: self.runner.clone(),
1256            exec_ctx: RwLock::new(fork_ctx),
1257            skip_validation: self.skip_validation,
1258            // Forks are never the TTY owner — they run in the background.
1259            interactive: false,
1260            allow_external_commands: self.allow_external_commands,
1261            // Arc-clone the budget so the fork draws from the same pool as the
1262            // parent — background jobs and scatter workers count against the same
1263            // cap as foreground writes.
1264            vfs_budget: self.vfs_budget.clone(),
1265            request_timeout: self.request_timeout,
1266            kill_grace: self.kill_grace,
1267            stderr_receiver: tokio::sync::Mutex::new(stderr_receiver),
1268            cancel_token: std::sync::Mutex::new(cancel),
1269            #[cfg(all(unix, feature = "subprocess"))]
1270            terminal_state: None,
1271            self_weak: std::sync::OnceLock::new(),
1272            execute_lock: tokio::sync::Mutex::new(()),
1273            bg_job_id,
1274            // Arc-clone the overlay handle so forks (background jobs, scatter
1275            // workers, pipeline stages) can reach the same overlay transaction
1276            // via `kaish-vfs status/diff/commit/reset`.
1277            #[cfg(all(feature = "localfs", feature = "overlay"))]
1278            overlay_handle: self.overlay_handle.clone(),
1279        };
1280
1281        fork.into_arc()
1282    }
1283
1284    /// Get an `Arc<dyn CommandDispatcher>` to this Kernel, if wrapped via `into_arc()`.
1285    ///
1286    /// Returns `None` if the Kernel was not wrapped, or if all strong references
1287    /// have been dropped (the `Weak` can no longer upgrade).
1288    pub fn dispatcher(&self) -> Option<Arc<dyn CommandDispatcher>> {
1289        self.self_weak
1290            .get()
1291            .and_then(|weak| weak.upgrade())
1292            .map(|arc| arc as Arc<dyn CommandDispatcher>)
1293    }
1294
1295    /// Initialize terminal state for interactive job control.
1296    ///
1297    /// Call this after kernel creation when running as an interactive REPL
1298    /// and stdin is a TTY. Sets up process groups and signal handling.
1299    #[cfg(all(unix, feature = "subprocess"))]
1300    pub fn init_terminal(&mut self) {
1301        if !self.interactive {
1302            return;
1303        }
1304        match crate::terminal::TerminalState::init() {
1305            Ok(state) => {
1306                let state = Arc::new(state);
1307                self.terminal_state = Some(state.clone());
1308                // Set on exec_ctx so builtins (fg, bg, kill) can access it
1309                self.exec_ctx.get_mut().terminal_state = Some(state);
1310                tracing::debug!("terminal job control initialized");
1311            }
1312            Err(e) => {
1313                tracing::warn!("failed to initialize terminal job control: {}", e);
1314            }
1315        }
1316    }
1317
1318    /// Replace or remove the trash backend used by `rm` and `kaish-trash`.
1319    ///
1320    /// The kernel installs the OS trash (`SystemTrash`) automatically when
1321    /// built with the `os-integration` feature. Embedders and tests can swap
1322    /// in a custom [`crate::trash::TrashBackend`], or pass `None` to remove
1323    /// it — with trash enabled but no backend present, `rm` fails loud
1324    /// rather than falling through to permanent delete.
1325    pub fn set_trash_backend(&mut self, backend: Option<Arc<dyn crate::trash::TrashBackend>>) {
1326        self.exec_ctx.get_mut().trash_backend = backend;
1327    }
1328
1329    /// Cancel the current execution.
1330    ///
1331    /// This cancels the current cancellation token, causing any execution
1332    /// loop to exit at the next checkpoint with exit code 130 (SIGINT).
1333    /// A fresh token is installed for the next `execute()` call.
1334    pub fn cancel(&self) {
1335        #[allow(clippy::expect_used)]
1336        let token = self.cancel_token.lock().expect("cancel_token poisoned");
1337        token.cancel();
1338    }
1339
1340    /// Check if the current execution has been cancelled.
1341    pub fn is_cancelled(&self) -> bool {
1342        #[allow(clippy::expect_used)]
1343        let token = self.cancel_token.lock().expect("cancel_token poisoned");
1344        token.is_cancelled()
1345    }
1346
1347    /// Reset the cancellation token (called at the start of each execute).
1348    fn reset_cancel(&self) -> tokio_util::sync::CancellationToken {
1349        #[allow(clippy::expect_used)]
1350        let mut token = self.cancel_token.lock().expect("cancel_token poisoned");
1351        if token.is_cancelled() {
1352            *token = tokio_util::sync::CancellationToken::new();
1353        }
1354        token.clone()
1355    }
1356
1357    /// Acquire the per-Kernel execute lock, warning on contention.
1358    ///
1359    /// Tokio's Mutex is fair (FIFO) so callers queue in arrival order. When
1360    /// the lock is already held, emit a warning so the silent serialization
1361    /// is observable in logs — if you need real parallelism, fork the kernel.
1362    async fn acquire_execute_lock(&self) -> tokio::sync::MutexGuard<'_, ()> {
1363        match self.execute_lock.try_lock() {
1364            Ok(guard) => guard,
1365            Err(_) => {
1366                tracing::warn!(
1367                    target: "kaish::kernel::concurrency",
1368                    kernel = %self.name,
1369                    "execute() contended — serializing concurrent caller; \
1370                     use Kernel::fork() for parallelism instead of sharing"
1371                );
1372                self.execute_lock.lock().await
1373            }
1374        }
1375    }
1376
1377    /// Execute kaish source code with default options.
1378    ///
1379    /// Equivalent to `execute_with_options(input, ExecuteOptions::default())`.
1380    /// Returns the result of the last statement executed.
1381    pub async fn execute(&self, input: &str) -> Result<ExecResult> {
1382        self.run_inner(input, ExecuteOptions::default(), None).await
1383    }
1384
1385    /// Execute with per-call options. The primary entry point for embedders
1386    /// that don't need per-statement output streaming.
1387    ///
1388    /// `opts` carries timeout, transient vars overlay, optional cwd override,
1389    /// and optional embedder-owned cancellation token. See [`ExecuteOptions`]
1390    /// for semantics. For streaming, use [`Self::execute_with_options_streaming`].
1391    ///
1392    /// **Cancellation:** if `opts.cancel_token` is `Some`, it is *raced*
1393    /// against the kernel's internal token. Either firing cancels and kills
1394    /// external children. The embedder's token is read-only — kernel
1395    /// timeouts do NOT propagate into it. Distinguish via the returned
1396    /// `code`: 124 = timeout, 130 = cancellation.
1397    ///
1398    /// **Timeout:** `opts.timeout` overrides `KernelConfig::request_timeout`.
1399    /// `Some(Duration::ZERO)` returns 124 immediately without spawning.
1400    ///
1401    /// Concurrent callers on the same Kernel serialize on the kernel-wide
1402    /// execute lock. For true parallelism, call [`Kernel::fork`] (detached)
1403    /// or [`Kernel::fork_attached`] (cancellation cascades from this kernel).
1404    pub async fn execute_with_options(
1405        &self,
1406        input: &str,
1407        opts: ExecuteOptions,
1408    ) -> Result<ExecResult> {
1409        self.run_inner(input, opts, None).await
1410    }
1411
1412    /// Same as [`Self::execute_with_options`] but with a per-statement output
1413    /// callback. The callback fires after each top-level statement so the
1414    /// embedder (REPL, MCP streaming) can flush output incrementally.
1415    pub async fn execute_with_options_streaming(
1416        &self,
1417        input: &str,
1418        opts: ExecuteOptions,
1419        on_output: &mut (dyn FnMut(&ExecResult) + Send),
1420    ) -> Result<ExecResult> {
1421        self.run_inner(input, opts, Some(on_output)).await
1422    }
1423
1424    /// Execute kaish source code with a transient overlay of exported variables.
1425    ///
1426    /// Deprecated thin wrapper over [`Self::execute_with_options`]. New code
1427    /// should use that method directly:
1428    /// `execute_with_options(input, ExecuteOptions::new().with_vars(vars))`.
1429    #[deprecated(note = "use Kernel::execute_with_options with ExecuteOptions::with_vars")]
1430    pub async fn execute_with_vars(
1431        &self,
1432        input: &str,
1433        vars: HashMap<String, Value>,
1434    ) -> Result<ExecResult> {
1435        self.run_inner(input, ExecuteOptions::new().with_vars(vars), None).await
1436    }
1437
1438    /// Execute kaish source code with a per-statement callback.
1439    ///
1440    /// Deprecated thin wrapper. New code should use
1441    /// [`Self::execute_with_options_streaming`].
1442    #[deprecated(note = "use Kernel::execute_with_options_streaming")]
1443    pub async fn execute_streaming(
1444        &self,
1445        input: &str,
1446        on_output: &mut (dyn FnMut(&ExecResult) + Send),
1447    ) -> Result<ExecResult> {
1448        self.run_inner(input, ExecuteOptions::default(), Some(on_output)).await
1449    }
1450
1451    /// Link embedder trace context, then run [`Self::execute_with_options_inner`].
1452    ///
1453    /// The `#[instrument]` execution span resolves its parent from the *current*
1454    /// OpenTelemetry context (see `tracing-opentelemetry`'s `parent_context`),
1455    /// captured when the span is first entered — not when the future is
1456    /// constructed. So a thread-local `attach()` scoped to construction is too
1457    /// early to be seen (the integration test confirms this). `with_context`
1458    /// re-attaches the embedder's context on *every* poll of the inner future,
1459    /// so the context is current at first-enter and survives runtime thread
1460    /// hops. With no embedder trace context, the future runs unwrapped.
1461    async fn run_inner(
1462        &self,
1463        input: &str,
1464        opts: ExecuteOptions,
1465        on_output: Option<&mut (dyn FnMut(&ExecResult) + Send)>,
1466    ) -> Result<ExecResult> {
1467        use opentelemetry::context::FutureExt;
1468
1469        // Capture the embedder's baggage before `opts` is consumed so it can be
1470        // echoed back onto the result on egress (see `merge_egress_baggage`).
1471        let embedder_baggage = opts.baggage.clone();
1472
1473        let result = match crate::telemetry::extract_parent(&opts) {
1474            Some(parent) => self
1475                .execute_with_options_inner(input, opts, on_output)
1476                .with_context(parent)
1477                .await,
1478            None => self.execute_with_options_inner(input, opts, on_output).await,
1479        };
1480
1481        result.map(|mut r| {
1482            crate::telemetry::merge_egress_baggage(&mut r, embedder_baggage);
1483            r
1484        })
1485    }
1486
1487    /// Shared body for `execute`, `execute_with_options(_streaming)`, and
1488    /// the deprecated wrappers. Owns the per-call cancel token, vars overlay,
1489    /// cwd override, and timeout race.
1490    #[tracing::instrument(level = "info", skip(self, opts, on_output), fields(input_len = input.len()))]
1491    async fn execute_with_options_inner(
1492        &self,
1493        input: &str,
1494        opts: ExecuteOptions,
1495        on_output: Option<&mut (dyn FnMut(&ExecResult) + Send)>,
1496    ) -> Result<ExecResult> {
1497        let _guard = self.acquire_execute_lock().await;
1498
1499        // Always reset to a fresh internal token; this is the kernel's own
1500        // cancel surface for embedders calling `Kernel::cancel()`. The
1501        // embedder-supplied `opts.cancel_token` is a *read-only input* — it
1502        // is NOT written into `self.cancel_token`, because doing so would
1503        // (a) leak the embedder's token past this call's lifetime,
1504        // (b) re-route a later `Kernel::cancel()` into the embedder's token,
1505        // (c) extend the token's lifetime via the kernel's strong clone.
1506        let internal = self.reset_cancel();
1507        // Race the embedder token against the kernel's internal token via a
1508        // tracked watcher task. We hold the JoinHandle so we can abort the
1509        // task at function exit — otherwise it would wait forever for either
1510        // token to fire and leak per call.
1511        let (effective_cancel, watcher_handle): (
1512            tokio_util::sync::CancellationToken,
1513            Option<tokio::task::JoinHandle<()>>,
1514        ) = if let Some(ext) = opts.cancel_token {
1515            let combined = tokio_util::sync::CancellationToken::new();
1516            let combined_writer = combined.clone();
1517            let i = internal.clone();
1518            let handle = tokio::spawn(async move {
1519                tokio::select! {
1520                    _ = i.cancelled() => combined_writer.cancel(),
1521                    _ = ext.cancelled() => combined_writer.cancel(),
1522                }
1523            });
1524            (combined, Some(handle))
1525        } else {
1526            (internal, None)
1527        };
1528
1529        // Effective timeout: per-call wins over kernel-config default.
1530        let timeout = opts.timeout.or(self.request_timeout);
1531
1532        // ZERO timeout: return 124 immediately without spawning anything.
1533        if timeout == Some(Duration::ZERO) {
1534            if let Some(h) = watcher_handle {
1535                h.abort();
1536            }
1537            return Ok(ExecResult::failure(124, "timeout: timed out after 0s".to_string()));
1538        }
1539
1540        // Apply per-call vars overlay (push frame + set_exported), wrapped in
1541        // an RAII guard so a panic inside `execute_streaming_inner` still
1542        // pops the frame and unexports the temporarily-exported names.
1543        struct VarsFrameGuard<'a> {
1544            kernel: &'a Kernel,
1545            newly_exported: Vec<String>,
1546        }
1547        impl Drop for VarsFrameGuard<'_> {
1548            fn drop(&mut self) {
1549                // Best-effort cleanup using try_write. The execute_lock held
1550                // throughout execute_with_options means there is no concurrent
1551                // foreground caller; forks have their own scope and won't
1552                // block this. blocking_write would deadlock the runtime when
1553                // called from a tokio worker thread, so we explicitly do NOT
1554                // fall back to it — if try_write fails (which we've never
1555                // seen in practice), log loudly and accept the leak rather
1556                // than deadlock the entire kernel.
1557                let Ok(mut scope) = self.kernel.scope.try_write() else {
1558                    tracing::error!(
1559                        "vars frame guard: scope lock unexpectedly busy; \
1560                         skipping pop_frame to avoid runtime deadlock — \
1561                         transient vars may leak"
1562                    );
1563                    return;
1564                };
1565                scope.pop_frame();
1566                for name in self.newly_exported.drain(..) {
1567                    scope.unexport(&name);
1568                }
1569            }
1570        }
1571
1572        // Per-call cwd override: save current cwd, set the new one, restore
1573        // on Drop so the kernel's persistent cwd doesn't leak between calls.
1574        // Same RAII pattern as VarsFrameGuard, same blocking_write trade-off.
1575        struct CwdGuard<'a> {
1576            kernel: &'a Kernel,
1577            saved: PathBuf,
1578        }
1579        impl Drop for CwdGuard<'_> {
1580            fn drop(&mut self) {
1581                let Ok(mut ec) = self.kernel.exec_ctx.try_write() else {
1582                    tracing::error!(
1583                        "cwd guard: exec_ctx lock unexpectedly busy; \
1584                         skipping cwd restore — kernel cwd may be wrong for next call"
1585                    );
1586                    return;
1587                };
1588                ec.cwd = std::mem::take(&mut self.saved);
1589            }
1590        }
1591        let _cwd_guard: Option<CwdGuard<'_>> = if let Some(new_cwd) = opts.cwd {
1592            let mut ec = self.exec_ctx.write().await;
1593            let saved = std::mem::replace(&mut ec.cwd, new_cwd);
1594            drop(ec);
1595            Some(CwdGuard { kernel: self, saved })
1596        } else {
1597            None
1598        };
1599
1600        let _vars_guard: Option<VarsFrameGuard<'_>> = if !opts.vars.is_empty() {
1601            let mut scope = self.scope.write().await;
1602            scope.push_frame();
1603            let mut newly = Vec::with_capacity(opts.vars.len());
1604            for (name, value) in opts.vars {
1605                if !scope.is_exported(&name) {
1606                    newly.push(name.clone());
1607                }
1608                scope.set_exported(name, value);
1609            }
1610            drop(scope);
1611            Some(VarsFrameGuard { kernel: self, newly_exported: newly })
1612        } else {
1613            None
1614        };
1615
1616        // Sync the effective cancel into self.exec_ctx so try_execute_external
1617        // (which reads via self.cancel_token) sees cancellation. We also need
1618        // builtins to see it via ctx.cancel — handled in execute_command.
1619        // For simplicity here we mirror effective_cancel into self.cancel_token
1620        // for the duration of this call, then restore the internal token at
1621        // the end (so a later Kernel::cancel still hits our internal surface).
1622        {
1623            #[allow(clippy::expect_used)]
1624            let mut cur = self.cancel_token.lock().expect("cancel_token poisoned");
1625            *cur = effective_cancel.clone();
1626        }
1627
1628        // The movable-deadline watchdog for this call (None without a timeout),
1629        // mirrored into exec_ctx — like the cancel token — so builtins can
1630        // suspend the script clock via `ctx.patient`. Assigned unconditionally
1631        // (clearing any stale handle) and reset to None in the restore block.
1632        let watchdog = timeout.map(|d| Arc::new(crate::watchdog::Watchdog::new(d)));
1633        {
1634            let mut ec = self.exec_ctx.write().await;
1635            ec.watchdog = watchdog.clone();
1636        }
1637
1638        // Run inner with optional timeout. The watchdog task cancels our token
1639        // on elapsed; the cascade fires SIGTERM/SIGKILL on any external
1640        // children via the wait_or_kill discipline in try_execute_external.
1641        let mut noop_cb: Box<dyn FnMut(&ExecResult) + Send> = Box::new(|_| {});
1642        let cb_ref: &mut (dyn FnMut(&ExecResult) + Send) = match on_output {
1643            Some(cb) => cb,
1644            None => &mut *noop_cb,
1645        };
1646
1647        let result = if let Some(d) = timeout {
1648            #[allow(clippy::expect_used)]
1649            let watchdog = watchdog.clone().expect("watchdog constructed when timeout is set");
1650            let elapsed = Arc::new(std::sync::atomic::AtomicBool::new(false));
1651            let timer = tokio::spawn(watchdog.run(elapsed.clone(), effective_cancel.clone()));
1652            let r = self.execute_streaming_inner(input, cb_ref).await;
1653            timer.abort();
1654            match r {
1655                Ok(mut res) => {
1656                    if elapsed.load(std::sync::atomic::Ordering::SeqCst) {
1657                        res.code = 124;
1658                        if res.err.is_empty() {
1659                            res.err = format!("timeout: timed out after {:?}", d);
1660                        }
1661                    }
1662                    Ok(res)
1663                }
1664                Err(e) => Err(e),
1665            }
1666        } else {
1667            self.execute_streaming_inner(input, cb_ref).await
1668        };
1669
1670        // Restore self.cancel_token to a fresh, uncancelled token so the
1671        // embedder's view of `Kernel::cancel()` stays predictable on the
1672        // next call (it cancels the kernel's own token, not whatever was
1673        // left over from this call's combined token).
1674        {
1675            #[allow(clippy::expect_used)]
1676            let mut cur = self.cancel_token.lock().expect("cancel_token poisoned");
1677            *cur = tokio_util::sync::CancellationToken::new();
1678        }
1679
1680        // Drop the watchdog handle from exec_ctx — its timer task is gone
1681        // (fired or aborted above); a patient hold acquired against a stale
1682        // handle would silently suspend nothing.
1683        {
1684            let mut ec = self.exec_ctx.write().await;
1685            ec.watchdog = None;
1686        }
1687
1688        // Tear down the embedder-token race watcher (if any). Leaving it
1689        // alive would idle forever waiting for tokens that may never fire.
1690        if let Some(h) = watcher_handle {
1691            h.abort();
1692        }
1693
1694        // VarsFrameGuard drops here on the success path and on early-return
1695        // paths above (error path included). Panic safety preserved.
1696        result
1697    }
1698
1699    /// The actual body of `execute_streaming`, run while holding the execute lock.
1700    ///
1701    /// Split out so internal kernel paths that are already under the lock can
1702    /// call this without deadlocking on re-entry. External callers must go
1703    /// through [`Self::execute_streaming`] so they acquire the lock.
1704    async fn execute_streaming_inner(
1705        &self,
1706        input: &str,
1707        on_output: &mut (dyn FnMut(&ExecResult) + Send),
1708    ) -> Result<ExecResult> {
1709        let program = parse(input).map_err(|errors| {
1710            let msg = errors
1711                .iter()
1712                .map(|e| e.format(input))
1713                .collect::<Vec<_>>()
1714                .join("\n");
1715            anyhow::anyhow!("parse error:\n{}", msg)
1716        })?;
1717
1718        // AST display mode: show AST instead of executing
1719        {
1720            let scope = self.scope.read().await;
1721            if scope.show_ast() {
1722                let output = format!("{:#?}\n", program);
1723                return Ok(ExecResult::with_output(crate::interpreter::OutputData::text(output)));
1724            }
1725        }
1726
1727        // Pre-execution validation
1728        if !self.skip_validation {
1729            let user_tools = self.user_tools.read().await;
1730            let validator = Validator::new(&self.tools, &user_tools);
1731            let issues = validator.validate(&program);
1732
1733            // Collect errors (warnings are logged but don't prevent execution)
1734            let errors: Vec<_> = issues
1735                .iter()
1736                .filter(|i| i.severity == Severity::Error)
1737                .collect();
1738
1739            if !errors.is_empty() {
1740                let error_msg = errors
1741                    .iter()
1742                    .map(|e| e.format(input))
1743                    .collect::<Vec<_>>()
1744                    .join("\n");
1745                return Err(anyhow::anyhow!("validation failed:\n{}", error_msg));
1746            }
1747
1748            // Log warnings via tracing (trace level to avoid noise)
1749            for warning in issues.iter().filter(|i| i.severity == Severity::Warning) {
1750                tracing::trace!("validation: {}", warning.format(input));
1751            }
1752        }
1753
1754        let mut result = ExecResult::success("");
1755
1756        // Reset cancellation token for this execution.
1757        let cancel = self.reset_cancel();
1758
1759        for stmt in program.statements {
1760            if matches!(stmt, Stmt::Empty) {
1761                continue;
1762            }
1763
1764            // Cancellation checkpoint
1765            if cancel.is_cancelled() {
1766                result.code = 130;
1767                return Ok(result);
1768            }
1769
1770            let flow = self.execute_stmt_flow(&stmt).await?;
1771
1772            // Drain any stderr written by pipeline stages during this statement.
1773            // This captures stderr from intermediate pipeline stages that would
1774            // otherwise be lost (only the last stage's result is returned).
1775            let drained_stderr = {
1776                let mut receiver = self.stderr_receiver.lock().await;
1777                receiver.drain_lossy()
1778            };
1779
1780            match flow {
1781                ControlFlow::Normal(mut r) => {
1782                    if !drained_stderr.is_empty() {
1783                        if !r.err.is_empty() && !r.err.ends_with('\n') {
1784                            r.err.push('\n');
1785                        }
1786                        // Prepend pipeline stderr before the last stage's stderr
1787                        let combined = format!("{}{}", drained_stderr, r.err);
1788                        r.err = combined;
1789                    }
1790                    on_output(&r);
1791                    // Carry the last statement's structured output for MCP TOON encoding.
1792                    // Must be done here (not in accumulate_result) because accumulate_result
1793                    // is also used in loops where per-iteration output would be wrong.
1794                    let last_output = r.output().cloned();
1795                    accumulate_result(&mut result, &r);
1796                    result.set_output(last_output);
1797                }
1798                ControlFlow::Exit { code } => {
1799                    if !drained_stderr.is_empty() {
1800                        result.err.push_str(&drained_stderr);
1801                    }
1802                    result.code = code;
1803                    return Ok(result);
1804                }
1805                ControlFlow::Return { mut value } => {
1806                    if !drained_stderr.is_empty() {
1807                        value.err = format!("{}{}", drained_stderr, value.err);
1808                    }
1809                    on_output(&value);
1810                    result = value;
1811                }
1812                ControlFlow::Break { result: mut r, .. } | ControlFlow::Continue { result: mut r, .. } => {
1813                    if !drained_stderr.is_empty() {
1814                        r.err = format!("{}{}", drained_stderr, r.err);
1815                    }
1816                    on_output(&r);
1817                    result = r;
1818                }
1819            }
1820        }
1821
1822        Ok(result)
1823    }
1824
1825    /// Execute a single statement, returning control flow information.
1826    fn execute_stmt_flow<'a>(
1827        &'a self,
1828        stmt: &'a Stmt,
1829    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<ControlFlow>> + Send + 'a>> {
1830        use tracing::Instrument;
1831        let span = tracing::debug_span!("execute_stmt_flow", stmt_type = %stmt.kind_name());
1832        Box::pin(async move {
1833        match stmt {
1834            Stmt::Assignment(assign) => {
1835                // Use async evaluator to support command substitution
1836                let value = self.eval_expr_async(&assign.value).await
1837                    .context("failed to evaluate assignment")?;
1838                let mut scope = self.scope.write().await;
1839                if assign.local {
1840                    // local: set in innermost (current function) frame
1841                    scope.set(&assign.name, value.clone());
1842                } else {
1843                    // non-local: update existing or create in root frame
1844                    scope.set_global(&assign.name, value.clone());
1845                }
1846                drop(scope);
1847
1848                // Assignments don't produce output (like sh)
1849                Ok(ControlFlow::ok(ExecResult::success("")))
1850            }
1851            Stmt::Command(cmd) => {
1852                // Route single commands through execute_pipeline for a unified path.
1853                // This ensures all commands go through the dispatcher chain.
1854                let pipeline = crate::ast::Pipeline {
1855                    commands: vec![cmd.clone()],
1856                    background: false,
1857                };
1858                let result = self.execute_pipeline(&pipeline).await?;
1859                self.update_last_result(&result).await;
1860
1861                // Check for error exit mode (set -e)
1862                if !result.ok() {
1863                    let scope = self.scope.read().await;
1864                    if scope.error_exit_enabled() {
1865                        return Ok(ControlFlow::exit_code(result.code));
1866                    }
1867                }
1868
1869                Ok(ControlFlow::ok(result))
1870            }
1871            Stmt::Pipeline(pipeline) => {
1872                let result = self.execute_pipeline(pipeline).await?;
1873                self.update_last_result(&result).await;
1874
1875                // Check for error exit mode (set -e)
1876                if !result.ok() {
1877                    let scope = self.scope.read().await;
1878                    if scope.error_exit_enabled() {
1879                        return Ok(ControlFlow::exit_code(result.code));
1880                    }
1881                }
1882
1883                Ok(ControlFlow::ok(result))
1884            }
1885            Stmt::If(if_stmt) => {
1886                // Use async evaluator to support command substitution in conditions
1887                let cond_value = self.eval_expr_async(&if_stmt.condition).await?;
1888
1889                let branch = if is_truthy(&cond_value) {
1890                    &if_stmt.then_branch
1891                } else {
1892                    if_stmt.else_branch.as_deref().unwrap_or(&[])
1893                };
1894
1895                let mut result = ExecResult::success("");
1896                for stmt in branch {
1897                    let flow = self.execute_stmt_flow(stmt).await?;
1898                    match flow {
1899                        ControlFlow::Normal(r) => {
1900                            accumulate_result(&mut result, &r);
1901                            self.drain_stderr_into(&mut result).await;
1902                        }
1903                        other => {
1904                            self.drain_stderr_into(&mut result).await;
1905                            return Ok(other);
1906                        }
1907                    }
1908                }
1909                Ok(ControlFlow::ok(result))
1910            }
1911            Stmt::For(for_loop) => {
1912                // Evaluate all items and collect values for iteration
1913                // Use async evaluator to support command substitution like $(seq 1 5)
1914                let mut items: Vec<Value> = Vec::new();
1915                for item_expr in &for_loop.items {
1916                    // Glob expansion in for-loop items: `for f in *.txt`
1917                    if let Expr::GlobPattern(pattern) = item_expr {
1918                        let glob_enabled = {
1919                            let scope = self.scope.read().await;
1920                            scope.glob_enabled()
1921                        };
1922                        if glob_enabled {
1923                            let (paths, cwd) = {
1924                                let ctx = self.exec_ctx.read().await;
1925                                let paths = ctx.expand_glob(pattern).await
1926                                    .map_err(|e| anyhow::anyhow!("glob: {}", e))?;
1927                                let cwd = ctx.resolve_path(".");
1928                                (paths, cwd)
1929                            };
1930                            if paths.is_empty() {
1931                                return Err(anyhow::anyhow!("no matches: {}", pattern));
1932                            }
1933                            for path in paths {
1934                                let display = if !pattern.starts_with('/') {
1935                                    path.strip_prefix(&cwd)
1936                                        .unwrap_or(&path)
1937                                        .to_string_lossy().into_owned()
1938                                } else {
1939                                    path.to_string_lossy().into_owned()
1940                                };
1941                                items.push(Value::String(display));
1942                            }
1943                            continue;
1944                        }
1945                    }
1946                    // Track whether this item came from $(cmd); that's the
1947                    // only position where multi-line stdout auto-splits per
1948                    // line. Arrays still spread element-by-element; bare
1949                    // $VAR is rejected upstream by validator E012. See
1950                    // docs/plan-for-loop-newline-split.md.
1951                    let from_command_subst = matches!(item_expr, Expr::CommandSubst(_));
1952                    let item = self.eval_expr_async(item_expr).await?;
1953                    match item {
1954                        // JSON arrays iterate over elements (preferred path
1955                        // when builtins emit .data — seq, jq, cut, find, …)
1956                        Value::Json(serde_json::Value::Array(arr)) => {
1957                            for elem in arr {
1958                                items.push(json_to_value(elem));
1959                            }
1960                        }
1961                        // Strings from $(cmd): empty → 0 iterations,
1962                        // multi-line → split per line (trimming trailing
1963                        // newlines and per-line trailing \r), single-line
1964                        // → one iteration. Whitespace within a line is
1965                        // NOT split — the "$VAR with spaces just works"
1966                        // promise is preserved because this only fires
1967                        // in CommandSubst position.
1968                        Value::String(s) if from_command_subst => {
1969                            let trimmed = s.trim_end_matches(['\n', '\r']);
1970                            if trimmed.is_empty() {
1971                                continue;
1972                            }
1973                            if trimmed.contains('\n') {
1974                                for line in trimmed.split('\n') {
1975                                    let line = line.trim_end_matches('\r');
1976                                    items.push(Value::String(line.to_string()));
1977                                }
1978                            } else {
1979                                items.push(Value::String(trimmed.to_string()));
1980                            }
1981                        }
1982                        // Binary isn't iterable — fail loud rather than loop
1983                        // once over an opaque byte blob.
1984                        Value::Bytes(_) => {
1985                            anyhow::bail!(
1986                                "for: cannot iterate over binary data — decode it \
1987                                 (base64/xxd) first"
1988                            );
1989                        }
1990                        // Strings not from $(cmd) stay as one value.
1991                        other => items.push(other),
1992                    }
1993                }
1994
1995                let mut result = ExecResult::success("");
1996                {
1997                    let mut scope = self.scope.write().await;
1998                    scope.push_frame();
1999                }
2000
2001                'outer: for item in items {
2002                    // Cancellation checkpoint per iteration
2003                    if self.is_cancelled() {
2004                        let mut scope = self.scope.write().await;
2005                        scope.pop_frame();
2006                        result.code = 130;
2007                        return Ok(ControlFlow::ok(result));
2008                    }
2009                    {
2010                        let mut scope = self.scope.write().await;
2011                        scope.set(&for_loop.variable, item);
2012                    }
2013                    for stmt in &for_loop.body {
2014                        let mut flow = match self.execute_stmt_flow(stmt).await {
2015                            Ok(f) => f,
2016                            Err(e) => {
2017                                let mut scope = self.scope.write().await;
2018                                scope.pop_frame();
2019                                return Err(e);
2020                            }
2021                        };
2022                        self.drain_stderr_into(&mut result).await;
2023                        match &mut flow {
2024                            ControlFlow::Normal(r) => {
2025                                accumulate_result(&mut result, r);
2026                                if !r.ok() {
2027                                    let scope = self.scope.read().await;
2028                                    if scope.error_exit_enabled() {
2029                                        drop(scope);
2030                                        let mut scope = self.scope.write().await;
2031                                        scope.pop_frame();
2032                                        return Ok(ControlFlow::exit_code(r.code));
2033                                    }
2034                                }
2035                            }
2036                            ControlFlow::Break { .. } => {
2037                                if flow.decrement_level() {
2038                                    accumulate_flow_output(&mut result, &flow);
2039                                    break 'outer;
2040                                }
2041                                fold_loop_output_into_flow(std::mem::take(&mut result), &mut flow);
2042                                let mut scope = self.scope.write().await;
2043                                scope.pop_frame();
2044                                return Ok(flow);
2045                            }
2046                            ControlFlow::Continue { .. } => {
2047                                if flow.decrement_level() {
2048                                    accumulate_flow_output(&mut result, &flow);
2049                                    continue 'outer;
2050                                }
2051                                fold_loop_output_into_flow(std::mem::take(&mut result), &mut flow);
2052                                let mut scope = self.scope.write().await;
2053                                scope.pop_frame();
2054                                return Ok(flow);
2055                            }
2056                            ControlFlow::Return { .. } | ControlFlow::Exit { .. } => {
2057                                let mut scope = self.scope.write().await;
2058                                scope.pop_frame();
2059                                return Ok(flow);
2060                            }
2061                        }
2062                    }
2063                }
2064
2065                {
2066                    let mut scope = self.scope.write().await;
2067                    scope.pop_frame();
2068                }
2069                Ok(ControlFlow::ok(result))
2070            }
2071            Stmt::While(while_loop) => {
2072                let mut result = ExecResult::success("");
2073
2074                'outer: loop {
2075                    // Evaluate condition - use async to support command substitution
2076                    // Cancellation checkpoint per iteration
2077                    if self.is_cancelled() {
2078                        result.code = 130;
2079                        return Ok(ControlFlow::ok(result));
2080                    }
2081
2082                    let cond_value = self.eval_expr_async(&while_loop.condition).await?;
2083
2084                    if !is_truthy(&cond_value) {
2085                        break;
2086                    }
2087
2088                    // Execute body
2089                    for stmt in &while_loop.body {
2090                        let mut flow = self.execute_stmt_flow(stmt).await?;
2091                        self.drain_stderr_into(&mut result).await;
2092                        match &mut flow {
2093                            ControlFlow::Normal(r) => {
2094                                accumulate_result(&mut result, r);
2095                                if !r.ok() {
2096                                    let scope = self.scope.read().await;
2097                                    if scope.error_exit_enabled() {
2098                                        return Ok(ControlFlow::exit_code(r.code));
2099                                    }
2100                                }
2101                            }
2102                            ControlFlow::Break { .. } => {
2103                                if flow.decrement_level() {
2104                                    accumulate_flow_output(&mut result, &flow);
2105                                    break 'outer;
2106                                }
2107                                fold_loop_output_into_flow(std::mem::take(&mut result), &mut flow);
2108                                return Ok(flow);
2109                            }
2110                            ControlFlow::Continue { .. } => {
2111                                if flow.decrement_level() {
2112                                    accumulate_flow_output(&mut result, &flow);
2113                                    continue 'outer;
2114                                }
2115                                fold_loop_output_into_flow(std::mem::take(&mut result), &mut flow);
2116                                return Ok(flow);
2117                            }
2118                            ControlFlow::Return { .. } | ControlFlow::Exit { .. } => {
2119                                return Ok(flow);
2120                            }
2121                        }
2122                    }
2123                }
2124
2125                Ok(ControlFlow::ok(result))
2126            }
2127            Stmt::Case(case_stmt) => {
2128                // Evaluate the expression to match against
2129                let match_value = {
2130                    let value = self.eval_expr_async(&case_stmt.expr).await?;
2131                    value_to_string(&value)
2132                };
2133
2134                // Try each branch until we find a match
2135                for branch in &case_stmt.branches {
2136                    let matched = branch.patterns.iter().any(|pattern| {
2137                        glob_match(pattern, &match_value)
2138                    });
2139
2140                    if matched {
2141                        // Execute the branch body
2142                        let mut result = ExecResult::success("");
2143                        for stmt in &branch.body {
2144                            let flow = self.execute_stmt_flow(stmt).await?;
2145                            match flow {
2146                                ControlFlow::Normal(r) => {
2147                                    accumulate_result(&mut result, &r);
2148                                    self.drain_stderr_into(&mut result).await;
2149                                }
2150                                other => {
2151                                    self.drain_stderr_into(&mut result).await;
2152                                    return Ok(other);
2153                                }
2154                            }
2155                        }
2156                        return Ok(ControlFlow::ok(result));
2157                    }
2158                }
2159
2160                // No match - return success with empty output (like sh)
2161                Ok(ControlFlow::ok(ExecResult::success("")))
2162            }
2163            Stmt::Break(levels) => {
2164                Ok(ControlFlow::break_n(levels.unwrap_or(1)))
2165            }
2166            Stmt::Continue(levels) => {
2167                Ok(ControlFlow::continue_n(levels.unwrap_or(1)))
2168            }
2169            Stmt::Return(expr) => {
2170                // return [N] - N becomes the exit code, NOT stdout
2171                // Shell semantics: return sets exit code, doesn't produce output
2172                let result = if let Some(e) = expr {
2173                    let val = self.eval_expr_async(e).await?;
2174                    let code = crate::interpreter::value_to_exit_code(&val)
2175                        .map_err(|e| anyhow::anyhow!("return: {}", e))?;
2176                    ExecResult::from_parts(code, String::new(), String::new(), None)
2177                } else {
2178                    ExecResult::success("")
2179                };
2180                Ok(ControlFlow::return_value(result))
2181            }
2182            Stmt::Exit(expr) => {
2183                let code = if let Some(e) = expr {
2184                    let val = self.eval_expr_async(e).await?;
2185                    crate::interpreter::value_to_exit_code(&val)
2186                        .map_err(|e| anyhow::anyhow!("exit: {}", e))?
2187                } else {
2188                    0
2189                };
2190                Ok(ControlFlow::exit_code(code))
2191            }
2192            Stmt::ToolDef(tool_def) => {
2193                let mut user_tools = self.user_tools.write().await;
2194                user_tools.insert(tool_def.name.clone(), tool_def.clone());
2195                Ok(ControlFlow::ok(ExecResult::success("")))
2196            }
2197            Stmt::AndChain { left, right } => {
2198                // cmd1 && cmd2 - run cmd2 only if cmd1 succeeds (exit code 0)
2199                // Suppress errexit for the left side — && handles failure itself.
2200                {
2201                    let mut scope = self.scope.write().await;
2202                    scope.suppress_errexit();
2203                }
2204                let left_flow = match self.execute_stmt_flow(left).await {
2205                    Ok(f) => f,
2206                    Err(e) => {
2207                        let mut scope = self.scope.write().await;
2208                        scope.unsuppress_errexit();
2209                        return Err(e);
2210                    }
2211                };
2212                {
2213                    let mut scope = self.scope.write().await;
2214                    scope.unsuppress_errexit();
2215                }
2216                match left_flow {
2217                    ControlFlow::Normal(mut left_result) => {
2218                        self.drain_stderr_into(&mut left_result).await;
2219                        self.update_last_result(&left_result).await;
2220                        if left_result.ok() {
2221                            let right_flow = self.execute_stmt_flow(right).await?;
2222                            match right_flow {
2223                                ControlFlow::Normal(mut right_result) => {
2224                                    self.drain_stderr_into(&mut right_result).await;
2225                                    self.update_last_result(&right_result).await;
2226                                    let mut combined = left_result;
2227                                    accumulate_result(&mut combined, &right_result);
2228                                    Ok(ControlFlow::ok(combined))
2229                                }
2230                                other => Ok(other),
2231                            }
2232                        } else {
2233                            Ok(ControlFlow::ok(left_result))
2234                        }
2235                    }
2236                    _ => Ok(left_flow),
2237                }
2238            }
2239            Stmt::OrChain { left, right } => {
2240                // cmd1 || cmd2 - run cmd2 only if cmd1 fails (non-zero exit code)
2241                // Suppress errexit for the left side — || handles failure itself.
2242                {
2243                    let mut scope = self.scope.write().await;
2244                    scope.suppress_errexit();
2245                }
2246                let left_flow = match self.execute_stmt_flow(left).await {
2247                    Ok(f) => f,
2248                    Err(e) => {
2249                        let mut scope = self.scope.write().await;
2250                        scope.unsuppress_errexit();
2251                        return Err(e);
2252                    }
2253                };
2254                {
2255                    let mut scope = self.scope.write().await;
2256                    scope.unsuppress_errexit();
2257                }
2258                match left_flow {
2259                    ControlFlow::Normal(mut left_result) => {
2260                        self.drain_stderr_into(&mut left_result).await;
2261                        self.update_last_result(&left_result).await;
2262                        if !left_result.ok() {
2263                            let right_flow = self.execute_stmt_flow(right).await?;
2264                            match right_flow {
2265                                ControlFlow::Normal(mut right_result) => {
2266                                    self.drain_stderr_into(&mut right_result).await;
2267                                    self.update_last_result(&right_result).await;
2268                                    let mut combined = left_result;
2269                                    accumulate_result(&mut combined, &right_result);
2270                                    Ok(ControlFlow::ok(combined))
2271                                }
2272                                other => Ok(other),
2273                            }
2274                        } else {
2275                            Ok(ControlFlow::ok(left_result))
2276                        }
2277                    }
2278                    _ => Ok(left_flow), // Propagate non-normal flow
2279                }
2280            }
2281            Stmt::Test(test_expr) => {
2282                let is_true = self.eval_test_async(test_expr).await?;
2283                if is_true {
2284                    Ok(ControlFlow::ok(ExecResult::success("")))
2285                } else {
2286                    Ok(ControlFlow::ok(ExecResult::failure(1, "")))
2287                }
2288            }
2289            Stmt::EnvScoped { assignments, body } => {
2290                // Inline env prefix (`NAME=value ... command`): apply the
2291                // assignments as EXPORTED vars in a fresh frame so the command
2292                // — and its subprocess environment — sees them, then unwind so
2293                // they do NOT persist (bash-style command-scoped env). Values
2294                // evaluate left-to-right with earlier ones already in scope, so
2295                // `A=1 B=$A cmd` works.
2296                {
2297                    let mut scope = self.scope.write().await;
2298                    scope.push_frame();
2299                }
2300                let mut prior_export: Vec<(String, bool)> =
2301                    Vec::with_capacity(assignments.len());
2302                let mut setup_err: Option<anyhow::Error> = None;
2303                for assign in assignments {
2304                    match self.eval_expr_async(&assign.value).await {
2305                        Ok(value) => {
2306                            let mut scope = self.scope.write().await;
2307                            prior_export
2308                                .push((assign.name.clone(), scope.is_exported(&assign.name)));
2309                            scope.set_exported(&assign.name, value);
2310                        }
2311                        Err(e) => {
2312                            setup_err = Some(e);
2313                            break;
2314                        }
2315                    }
2316                }
2317
2318                let flow = if setup_err.is_none() {
2319                    self.execute_stmt_flow(body).await
2320                } else {
2321                    Ok(ControlFlow::ok(ExecResult::success("")))
2322                };
2323
2324                // Unwind the env frame and restore export marks unconditionally
2325                // (names that were not exported before must not stay exported).
2326                {
2327                    let mut scope = self.scope.write().await;
2328                    scope.pop_frame();
2329                    for (name, was_exported) in &prior_export {
2330                        if !*was_exported {
2331                            scope.unexport(name);
2332                        }
2333                    }
2334                }
2335
2336                match setup_err {
2337                    Some(e) => Err(e),
2338                    None => flow,
2339                }
2340            }
2341            Stmt::Empty => Ok(ControlFlow::ok(ExecResult::success(""))),
2342        }
2343        }.instrument(span))
2344    }
2345
2346    /// Execute a pipeline.
2347    #[tracing::instrument(level = "debug", skip(self, pipeline), fields(background = pipeline.background, command_count = pipeline.commands.len()))]
2348    async fn execute_pipeline(&self, pipeline: &crate::ast::Pipeline) -> Result<ExecResult> {
2349        if pipeline.commands.is_empty() {
2350            return Ok(ExecResult::success(""));
2351        }
2352
2353        // Handle background execution (`&` operator)
2354        if pipeline.background {
2355            return self.execute_background(pipeline).await;
2356        }
2357
2358        // All commands go through the runner with the Kernel as dispatcher.
2359        // This is the single execution path — no fast path for single commands.
2360        //
2361        // IMPORTANT: We snapshot exec_ctx into a local context and release the
2362        // lock before running. This prevents deadlocks when dispatch_command
2363        // is called from within the pipeline and recursively triggers another
2364        // pipeline (e.g., via user-defined tools).
2365        let mut ctx = {
2366            let ec = self.exec_ctx.read().await;
2367            let scope = self.scope.read().await;
2368            ExecContext {
2369                backend: ec.backend.clone(),
2370                scope: scope.clone(),
2371                cwd: ec.cwd.clone(),
2372                prev_cwd: ec.prev_cwd.clone(),
2373                stdin: None,
2374                stdin_data: None,
2375                pipe_stdin: None,
2376                pipe_stdout: None,
2377                stderr: ec.stderr.clone(),
2378                tool_schemas: ec.tool_schemas.clone(),
2379                tools: ec.tools.clone(),
2380                job_manager: ec.job_manager.clone(),
2381                pipeline_position: PipelinePosition::Only,
2382                interactive: self.interactive,
2383                aliases: ec.aliases.clone(),
2384                ignore_config: ec.ignore_config.clone(),
2385                output_limit: ec.output_limit.clone(),
2386                allow_external_commands: self.allow_external_commands,
2387                nonce_store: ec.nonce_store.clone(),
2388                trash_backend: ec.trash_backend.clone(),
2389                #[cfg(all(unix, feature = "subprocess"))]
2390                terminal_state: ec.terminal_state.clone(),
2391                dispatcher: self.dispatcher(),
2392                cancel: {
2393                    #[allow(clippy::expect_used)]
2394                    let token = self.cancel_token.lock().expect("cancel_token poisoned");
2395                    token.clone()
2396                },
2397                output_format: None,
2398                vfs_budget: self.vfs_budget.clone(),
2399                watchdog: ec.watchdog.clone(),
2400                #[cfg(all(feature = "localfs", feature = "overlay"))]
2401                overlay_handle: self.overlay_handle.clone(),
2402            }
2403        }; // locks released
2404
2405        let mut result = self.runner.run(&pipeline.commands, &mut ctx, self).await;
2406
2407        // Post-hoc spill check (catches builtins and fast external commands)
2408        if ctx.output_limit.is_enabled() {
2409            let _ = crate::output_limit::spill_if_needed(&mut result, &ctx.output_limit).await;
2410        }
2411
2412        // Signal spill with exit 3; agent reads the spill file directly
2413        // (use `set +o output-limit` before cat/head/tail to bypass the limit)
2414        if result.did_spill {
2415            result.original_code = Some(result.code);
2416            result.code = 3;
2417        }
2418
2419        // Sync changes back from context
2420        {
2421            let mut ec = self.exec_ctx.write().await;
2422            ec.cwd = ctx.cwd.clone();
2423            ec.prev_cwd = ctx.prev_cwd.clone();
2424            ec.aliases = ctx.aliases.clone();
2425            ec.ignore_config = ctx.ignore_config.clone();
2426            ec.output_limit = ctx.output_limit.clone();
2427        }
2428        {
2429            let mut scope = self.scope.write().await;
2430            *scope = ctx.scope.clone();
2431        }
2432
2433        Ok(result)
2434    }
2435
2436    /// Execute a pipeline in the background.
2437    ///
2438    /// The command is spawned as a tokio task, registered with the JobManager,
2439    /// and its output is captured via BoundedStreams. The job is observable via
2440    /// `/v/jobs/{id}/stdout`, `/v/jobs/{id}/stderr`, and `/v/jobs/{id}/status`.
2441    ///
2442    /// Returns immediately with a job ID like "[1]".
2443    #[tracing::instrument(level = "debug", skip(self, pipeline), fields(command_count = pipeline.commands.len()))]
2444    async fn execute_background(&self, pipeline: &crate::ast::Pipeline) -> Result<ExecResult> {
2445        use tokio::sync::oneshot;
2446
2447        // Format the command for display in /v/jobs/{id}/command
2448        let command_str = self.format_pipeline(pipeline);
2449
2450        // Create bounded streams for output capture
2451        let stdout = Arc::new(BoundedStream::default_size());
2452        let stderr = Arc::new(BoundedStream::default_size());
2453
2454        // Create channel for result notification
2455        let (tx, rx) = oneshot::channel();
2456
2457        // Register with JobManager to get job ID and create VFS entries
2458        let job_id = self.jobs.register_with_streams(
2459            command_str.clone(),
2460            rx,
2461            stdout.clone(),
2462            stderr.clone(),
2463        ).await;
2464
2465        // Fork the kernel for this background job. The fork snapshots the
2466        // parent's scope/cwd/aliases/user_tools so mutations stay isolated,
2467        // while sharing the job manager, VFS, and tool registry. The fork's
2468        // full dispatch chain (user tools, .kai scripts, `$(...)` in args)
2469        // is available here — something BackendDispatcher couldn't provide.
2470        //
2471        // The fork gets its own cancellation token (recorded on the job so
2472        // `kill %N` can stop the job — including a pure-builtin job with no OS
2473        // process group) and is stamped with the job id so any external
2474        // command it spawns records its process group for `kill -<sig> %N`.
2475        let cancel = tokio_util::sync::CancellationToken::new();
2476        self.jobs.set_cancel_token(job_id, cancel.clone()).await;
2477        let fork = self.fork_for_background(cancel, job_id).await;
2478        let runner = self.runner.clone();
2479        let commands = pipeline.commands.clone();
2480
2481        // Snapshot the fork's exec_ctx for the spawned task. We have to do
2482        // this before tokio::spawn because the fork's exec_ctx is behind a
2483        // tokio RwLock and we want the spawned task to own its ctx.
2484        let mut bg_ctx = {
2485            let ec = fork.exec_ctx.read().await;
2486            ec.child_for_pipeline()
2487        };
2488        bg_ctx.scope = fork.scope.read().await.clone();
2489        // The fork's dispatcher points at the fork itself; set it here so
2490        // builtins inside the background task (e.g. timeout) re-dispatch
2491        // through the fork, not the parent.
2492        bg_ctx.dispatcher = fork.dispatcher();
2493
2494        // Spawn the background task. Propagate the embedder's trace context
2495        // across the spawn boundary so the job's spans stay in the same trace.
2496        tokio::spawn(crate::telemetry::bind_current_context(async move {
2497            // runner.run needs a &dyn CommandDispatcher; fork.as_ref()
2498            // gives us that (Kernel implements CommandDispatcher).
2499            let result = runner.run(&commands, &mut bg_ctx, fork.as_ref()).await;
2500
2501            // Write output to streams
2502            let text = result.text_out();
2503            if !text.is_empty() {
2504                stdout.write(text.as_bytes()).await;
2505            }
2506            if !result.err.is_empty() {
2507                stderr.write(result.err.as_bytes()).await;
2508            }
2509
2510            // Close streams
2511            stdout.close().await;
2512            stderr.close().await;
2513
2514            // Send result to JobManager (ignore error if receiver dropped)
2515            let _ = tx.send(result);
2516        }));
2517
2518        Ok(ExecResult::success(format!("[{}]", job_id)))
2519    }
2520
2521    /// Format a pipeline as a command string for display.
2522    fn format_pipeline(&self, pipeline: &crate::ast::Pipeline) -> String {
2523        pipeline.commands
2524            .iter()
2525            .map(|cmd| {
2526                let mut parts = vec![cmd.name.clone()];
2527                for arg in &cmd.args {
2528                    match arg {
2529                        Arg::Positional(expr) => {
2530                            parts.push(self.format_expr(expr));
2531                        }
2532                        Arg::Named { key, value } => {
2533                            parts.push(format!("--{}={}", key, self.format_expr(value)));
2534                        }
2535                        Arg::WordAssign { key, value } => {
2536                            parts.push(format!("{}={}", key, self.format_expr(value)));
2537                        }
2538                        Arg::ShortFlag(name) => {
2539                            parts.push(format!("-{}", name));
2540                        }
2541                        Arg::LongFlag(name) => {
2542                            parts.push(format!("--{}", name));
2543                        }
2544                        Arg::DoubleDash => {
2545                            parts.push("--".to_string());
2546                        }
2547                    }
2548                }
2549                parts.join(" ")
2550            })
2551            .collect::<Vec<_>>()
2552            .join(" | ")
2553    }
2554
2555    /// Format an expression as a string for display.
2556    fn format_expr(&self, expr: &Expr) -> String {
2557        match expr {
2558            Expr::Literal(Value::String(s)) => {
2559                if s.contains(' ') || s.contains('"') {
2560                    format!("'{}'", s.replace('\'', "\\'"))
2561                } else {
2562                    s.clone()
2563                }
2564            }
2565            Expr::Literal(Value::Int(i)) => i.to_string(),
2566            Expr::Literal(Value::Float(f)) => f.to_string(),
2567            Expr::Literal(Value::Bool(b)) => b.to_string(),
2568            Expr::Literal(Value::Null) => "null".to_string(),
2569            Expr::VarRef(path) => {
2570                let name = path.segments.iter()
2571                    .map(|seg| match seg {
2572                        crate::ast::VarSegment::Field(f) => f.clone(),
2573                    })
2574                    .collect::<Vec<_>>()
2575                    .join(".");
2576                format!("${{{}}}", name)
2577            }
2578            Expr::Interpolated(_) => "\"...\"".to_string(),
2579            Expr::HereDocBody { .. } => "<<heredoc".to_string(),
2580            _ => "...".to_string(),
2581        }
2582    }
2583
2584    /// Execute a single command.
2585    async fn execute_command(&self, name: &str, args: &[Arg]) -> Result<ExecResult> {
2586        self.execute_command_depth(name, args, 0).await
2587    }
2588
2589    #[tracing::instrument(level = "info", skip(self, args, alias_depth), fields(command = %name), err)]
2590    async fn execute_command_depth(&self, name: &str, args: &[Arg], alias_depth: u8) -> Result<ExecResult> {
2591        // Special built-ins
2592        match name {
2593            "true" => return Ok(ExecResult::success("")),
2594            "false" => return Ok(ExecResult::failure(1, "")),
2595            "source" | "." => return self.execute_source(args).await,
2596            _ => {}
2597        }
2598
2599        // Alias expansion (with recursion limit)
2600        if alias_depth < 10 {
2601            let alias_value = {
2602                let ctx = self.exec_ctx.read().await;
2603                ctx.aliases.get(name).cloned()
2604            };
2605            if let Some(alias_val) = alias_value {
2606                // Split alias value into command + args
2607                let parts: Vec<&str> = alias_val.split_whitespace().collect();
2608                if let Some((alias_cmd, alias_args)) = parts.split_first() {
2609                    let mut new_args: Vec<Arg> = alias_args
2610                        .iter()
2611                        .map(|a| Arg::Positional(Expr::Literal(Value::String(a.to_string()))))
2612                        .collect();
2613                    new_args.extend_from_slice(args);
2614                    return Box::pin(self.execute_command_depth(alias_cmd, &new_args, alias_depth + 1)).await;
2615                }
2616            }
2617        }
2618
2619        // Handle /v/bin/ prefix — dispatch to builtins via virtual path
2620        if let Some(builtin_name) = name.strip_prefix("/v/bin/") {
2621            return match self.tools.get(builtin_name) {
2622                Some(_) => Box::pin(self.execute_command_depth(builtin_name, args, alias_depth)).await,
2623                None => Ok(ExecResult::failure(127, format!("command not found: {}", name))),
2624            };
2625        }
2626
2627        // Check user-defined tools first
2628        {
2629            let user_tools = self.user_tools.read().await;
2630            if let Some(tool_def) = user_tools.get(name) {
2631                let tool_def = tool_def.clone();
2632                drop(user_tools);
2633                return self.execute_user_tool(tool_def, args).await;
2634            }
2635        }
2636
2637        // Look up builtin tool
2638        let tool = match self.tools.get(name) {
2639            Some(t) => t,
2640            None => {
2641                // Try executing as .kai script from PATH
2642                if let Some(result) = self.try_execute_script(name, args).await? {
2643                    return Ok(result);
2644                }
2645                // Try executing as external command from PATH
2646                if let Some(result) = self.try_execute_external(name, args).await? {
2647                    return Ok(result);
2648                }
2649
2650                // Try backend-registered tools (embedder engines, etc.)
2651                // Look up tool schema for positional→named mapping.
2652                // Clone backend and drop read lock before awaiting (may involve network I/O).
2653                // Backend tools expect named JSON params, so enable positional mapping.
2654                let backend = self.exec_ctx.read().await.backend.clone();
2655                let tool_schema = backend.get_tool(name).await.ok().flatten().map(|t| {
2656                    let mut s = t.schema;
2657                    // Flat backend/MCP tools expect named JSON params, so map
2658                    // bare positionals onto named params. Subcommand-aware tools
2659                    // route positionals through the subcommand path and declare
2660                    // map_positionals per leaf (kj keeps it false so it re-parses
2661                    // the argv with its own clap) — don't blanket-override them.
2662                    if s.subcommands.is_empty() {
2663                        s.map_positionals = true;
2664                    }
2665                    s
2666                });
2667                let tool_args = self.build_args_async(args, tool_schema.as_ref()).await?;
2668                let mut ctx = self.exec_ctx.write().await;
2669                {
2670                    let scope = self.scope.read().await;
2671                    ctx.scope = scope.clone();
2672                }
2673                let backend = ctx.backend.clone();
2674                match backend.call_tool(name, tool_args, &mut *ctx).await {
2675                    Ok(tool_result) => {
2676                        let mut scope = self.scope.write().await;
2677                        *scope = ctx.scope.clone();
2678                        let mut exec = ExecResult::from_output(
2679                            tool_result.code as i64, tool_result.stdout, tool_result.stderr,
2680                        );
2681                        exec.set_output(tool_result.output);
2682                        return Ok(exec);
2683                    }
2684                    Err(BackendError::ToolNotFound(_)) => {
2685                        // Fall through to "command not found"
2686                    }
2687                    Err(e) => {
2688                        // Backend dispatch is last-resort lookup — if it fails
2689                        // for any reason, the command simply doesn't exist.
2690                        tracing::debug!("backend error for {name}: {e}");
2691                    }
2692                }
2693
2694                return Ok(ExecResult::failure(127, format!("command not found: {}", name)));
2695            }
2696        };
2697
2698        // Build arguments (async to support command substitution, schema-aware for flag values)
2699        let schema = tool.schema();
2700        let tool_args = self.build_args_async(args, Some(&schema)).await?;
2701
2702        // --help / -h: show help unless the tool's schema claims that flag
2703        let schema_claims = |flag: &str| -> bool {
2704            let bare = flag.trim_start_matches('-');
2705            schema.params.iter().any(|p| p.matches_flag(flag) || p.matches_flag(bare))
2706        };
2707        let wants_help =
2708            (tool_args.flags.contains("help") && !schema_claims("help"))
2709            || (tool_args.flags.contains("h") && !schema_claims("-h"));
2710        if wants_help {
2711            let help_topic = crate::help::HelpTopic::Tool(name.to_string());
2712            let ctx = self.exec_ctx.read().await;
2713            let content = crate::help::get_help(&help_topic, &ctx.tool_schemas);
2714            return Ok(ExecResult::with_output(crate::interpreter::OutputData::text(content)));
2715        }
2716
2717        // Snapshot exec_ctx into a local context and release the write lock
2718        // before calling tool.execute. Holding the write across tool execution
2719        // would deadlock any builtin that re-dispatches through ctx.dispatcher
2720        // (timeout, scatter) — the inner dispatch_command needs its own
2721        // exec_ctx.write() and would block forever.
2722        let mut ctx = {
2723            let ec = self.exec_ctx.write().await;
2724            let scope = self.scope.read().await;
2725            ExecContext {
2726                backend: ec.backend.clone(),
2727                scope: scope.clone(),
2728                cwd: ec.cwd.clone(),
2729                prev_cwd: ec.prev_cwd.clone(),
2730                stdin: ec.stdin.clone(),
2731                stdin_data: ec.stdin_data.clone(),
2732                pipe_stdin: None, // streaming pipes are per-pipeline; not snapshotted
2733                pipe_stdout: None,
2734                stderr: ec.stderr.clone(),
2735                tool_schemas: ec.tool_schemas.clone(),
2736                tools: ec.tools.clone(),
2737                job_manager: ec.job_manager.clone(),
2738                pipeline_position: ec.pipeline_position,
2739                interactive: self.interactive,
2740                aliases: ec.aliases.clone(),
2741                ignore_config: ec.ignore_config.clone(),
2742                output_limit: ec.output_limit.clone(),
2743                allow_external_commands: self.allow_external_commands,
2744                nonce_store: ec.nonce_store.clone(),
2745                trash_backend: ec.trash_backend.clone(),
2746                #[cfg(all(unix, feature = "subprocess"))]
2747                terminal_state: ec.terminal_state.clone(),
2748                dispatcher: self.dispatcher(),
2749                // Use ec.cancel (set by dispatch_command from the runner's
2750                // ctx.cancel) so any builtin-swapped child token (e.g. timeout's
2751                // child token) reaches the spawned external via wait_or_kill.
2752                // Falls back to the kernel's own token when ec.cancel is the
2753                // default fresh token from a non-dispatch path.
2754                cancel: ec.cancel.clone(),
2755                output_format: None,
2756                vfs_budget: self.vfs_budget.clone(),
2757                watchdog: ec.watchdog.clone(),
2758                #[cfg(all(feature = "localfs", feature = "overlay"))]
2759                overlay_handle: self.overlay_handle.clone(),
2760            }
2761        }; // both locks released — tool.execute can re-dispatch safely
2762
2763        // Move stdin out of self.exec_ctx into the snapshot (consumed-by-tool
2764        // semantics): take() so a later dispatch doesn't see stale stdin.
2765        // Done after the snapshot above so we hold the write briefly.
2766        {
2767            let mut ec = self.exec_ctx.write().await;
2768            ctx.stdin = ec.stdin.take();
2769            ctx.stdin_data = ec.stdin_data.take();
2770            ctx.pipe_stdin = ec.pipe_stdin.take();
2771            ctx.pipe_stdout = ec.pipe_stdout.take();
2772        }
2773
2774        // Honor --json before the builtin runs so its setting survives a clap
2775        // parse failure (e.g. `cmd --json --bogus-flag` would otherwise drop
2776        // --json on the floor when `try_parse_from` returns Err early).
2777        // The builtin's own `parsed.global.apply(ctx)` becomes idempotent.
2778        GlobalFlags::apply_from_args(&tool_args, &mut ctx);
2779
2780        let result = tool.execute(tool_args, &mut ctx).await;
2781
2782        // Sync mutations back. Tools may have changed scope (set/cd),
2783        // cwd/prev_cwd (cd), and aliases (alias). Also return any unused pipe
2784        // endpoints to self.exec_ctx so dispatch_command's post-execute sync
2785        // hands them back to the pipeline runner — the runner uses
2786        // stage_ctx.pipe_stdout to write the result to the next stage when
2787        // the tool itself didn't take and write to it.
2788        {
2789            let mut scope = self.scope.write().await;
2790            *scope = ctx.scope.clone();
2791        }
2792        {
2793            let mut ec = self.exec_ctx.write().await;
2794            ec.cwd = ctx.cwd;
2795            ec.prev_cwd = ctx.prev_cwd;
2796            ec.aliases = ctx.aliases;
2797            // A builtin (`set -o output-limit`, `kaish-output-limit set`) can
2798            // mutate the runtime output limit; without this sync the change is
2799            // dropped here and never reaches dispatch_command's read-back, so
2800            // it would not survive past the current statement.
2801            ec.output_limit = ctx.output_limit.clone();
2802            ec.pipe_stdin = ctx.pipe_stdin.take();
2803            ec.pipe_stdout = ctx.pipe_stdout.take();
2804        }
2805
2806        // Builtins parse --json via the GlobalFlags flatten in their clap
2807        // struct and write ctx.output_format. The kernel applies it — unless the
2808        // tool owns its own output (renders --json itself), in which case we
2809        // leave its bytes untouched.
2810        let result = finalize_output(result, ctx.output_format, schema.owns_output);
2811
2812        Ok(result)
2813    }
2814
2815    /// The session `HOME` from the kernel scope, if set. Tilde expansion reads
2816    /// this rather than `std::env::var("HOME")` so the kernel stays hermetic —
2817    /// a hermetic embedder (empty `initial_vars`) gets `None`, and `~` is left
2818    /// unexpanded rather than leaking the host home directory.
2819    async fn scope_home(&self) -> Option<String> {
2820        match self.scope.read().await.get("HOME") {
2821            Some(Value::String(s)) => Some(s.clone()),
2822            _ => None,
2823        }
2824    }
2825
2826    /// Pull `consumes` positional args after a non-bool flag and stash them
2827    /// on `tool_args.named` under the canonical param name.
2828    ///
2829    /// - `consumes == 1` keeps the historical contract: a single scalar value.
2830    /// - `consumes > 1` accumulates each occurrence as an inner
2831    ///   `serde_json::Value::Array` inside `named[canonical] =
2832    ///   Value::Json(Array(...))`, preserving invocation order. This is the
2833    ///   shape jq's `--arg NAME VAL` / `--argjson NAME VAL` land in.
2834    ///
2835    /// Errors loudly if the flag is missing required positionals — matches
2836    /// kaish's "no silent fallback" posture and mirrors real jq, which
2837    /// errors on `--arg NAME` with no value.
2838    #[allow(clippy::too_many_arguments)]
2839    async fn consume_flag_positionals(
2840        &self,
2841        args: &[Arg],
2842        flag_name: &str,
2843        canonical: &str,
2844        consumes: usize,
2845        positional_indices: &[usize],
2846        consumed: &mut std::collections::HashSet<usize>,
2847        current_idx: usize,
2848        tool_args: &mut ToolArgs,
2849    ) -> Result<()> {
2850        let home = self.scope_home().await;
2851        let mut collected: Vec<Value> = Vec::with_capacity(consumes.max(1));
2852        for _ in 0..consumes.max(1) {
2853            let next_pos = positional_indices
2854                .iter()
2855                .find(|idx| **idx > current_idx && !consumed.contains(idx))
2856                .copied();
2857            match next_pos {
2858                Some(pos_idx) => {
2859                    if let Arg::Positional(expr) = &args[pos_idx] {
2860                        let value = self.eval_expr_async(expr).await?;
2861                        let value = apply_tilde_expansion(value, home.as_deref());
2862                        collected.push(value);
2863                        consumed.insert(pos_idx);
2864                    }
2865                }
2866                None => {
2867                    if consumes <= 1 && collected.is_empty() {
2868                        // Back-compat: a flag with no follow-up positional
2869                        // becomes a bare flag. `--path` with nothing after
2870                        // lands in `flags`, same as before this refactor.
2871                        tool_args.flags.insert(flag_name.to_string());
2872                        return Ok(());
2873                    }
2874                    anyhow::bail!(
2875                        "--{flag_name} requires {consumes} argument{}, got {}",
2876                        if consumes == 1 { "" } else { "s" },
2877                        collected.len()
2878                    );
2879                }
2880            }
2881        }
2882
2883        if consumes <= 1 {
2884            if let Some(v) = collected.pop() {
2885                tool_args.named.insert(canonical.to_string(), v);
2886            }
2887            return Ok(());
2888        }
2889
2890        // Multi-consume: accumulate under named[canonical] as array-of-arrays.
2891        let occ: Vec<serde_json::Value> = collected
2892            .into_iter()
2893            .map(|v| crate::interpreter::value_to_json(&v))
2894            .collect();
2895        let entry = tool_args
2896            .named
2897            .entry(canonical.to_string())
2898            .or_insert_with(|| Value::Json(serde_json::Value::Array(Vec::new())));
2899        if let Value::Json(serde_json::Value::Array(outer)) = entry {
2900            outer.push(serde_json::Value::Array(occ));
2901        } else {
2902            anyhow::bail!(
2903                "--{flag_name}: named[{canonical}] already holds a non-array value"
2904            );
2905        }
2906        Ok(())
2907    }
2908
2909    /// Build tool arguments from AST args.
2910    ///
2911    /// Uses async evaluation to support command substitution in arguments.
2912    async fn build_args_async(&self, args: &[Arg], schema: Option<&crate::tools::ToolSchema>) -> Result<ToolArgs> {
2913        let mut tool_args = ToolArgs::new();
2914        let home = self.scope_home().await;
2915        // Subcommand-aware tools (e.g. `kj context list`) expose a tree of
2916        // schemas; pick the leaf the leading positionals route to and bind
2917        // flags against *its* params. Flat tools return the root. select_leaf
2918        // errors (fail loud) if a computed positional sits where a subcommand
2919        // selector is required.
2920        let leaf = match schema {
2921            Some(s) => Some(select_leaf(s, args)?),
2922            None => None,
2923        };
2924        // Bind against the leaf's params, but MERGE the root schema's params on
2925        // top as "global" flags: a value-flag declared at the tool's top level
2926        // (e.g. kj's `--confirm <nonce>`) must bind at every leaf, including when
2927        // it trails the subcommand path (`kj context retag a b --confirm <n>`).
2928        // The leaf wins on name conflicts. For a flat tool, leaf == root, so the
2929        // merge is a harmless no-op.
2930        let mut param_lookup = schema.map(schema_param_lookup).unwrap_or_default();
2931        if let Some(l) = leaf {
2932            param_lookup.extend(schema_param_lookup(l));
2933        }
2934        // accepts_word_assign keys off the root tool name (the WORD_ASSIGN list),
2935        // not the leaf — it's a property of the command, not the subcommand.
2936        let accepts_word_assign = schema
2937            .map(|s| crate::tools::accepts_word_assign(s.name.as_str()))
2938            .unwrap_or(false);
2939
2940        // Track which positional indices have been consumed as flag values
2941        let mut consumed: std::collections::HashSet<usize> = std::collections::HashSet::new();
2942        let mut past_double_dash = false;
2943
2944        // Find positional arg indices for flag value consumption
2945        let positional_indices: Vec<usize> = args.iter().enumerate()
2946            .filter_map(|(i, a)| matches!(a, Arg::Positional(_)).then_some(i))
2947            .collect();
2948
2949        let mut i = 0;
2950        while i < args.len() {
2951            match &args[i] {
2952                Arg::DoubleDash => {
2953                    past_double_dash = true;
2954                }
2955                Arg::Positional(expr) => {
2956                    if !consumed.contains(&i) {
2957                        // Glob expansion: bare glob patterns expand to matching files
2958                        if let Expr::GlobPattern(pattern) = expr {
2959                            let glob_enabled = {
2960                                let scope = self.scope.read().await;
2961                                scope.glob_enabled()
2962                            };
2963                            if glob_enabled {
2964                                let (paths, cwd) = {
2965                                    let ctx = self.exec_ctx.read().await;
2966                                    let paths = ctx.expand_glob(pattern).await
2967                                        .map_err(|e| anyhow::anyhow!("glob: {}", e))?;
2968                                    let cwd = ctx.resolve_path(".");
2969                                    (paths, cwd)
2970                                };
2971                                if paths.is_empty() {
2972                                    return Err(anyhow::anyhow!("no matches: {}", pattern));
2973                                }
2974                                for path in paths {
2975                                    let display = if !pattern.starts_with('/') {
2976                                        path.strip_prefix(&cwd)
2977                                            .unwrap_or(&path)
2978                                            .to_string_lossy().into_owned()
2979                                    } else {
2980                                        path.to_string_lossy().into_owned()
2981                                    };
2982                                    tool_args.positional.push(Value::String(display));
2983                                }
2984                                i += 1;
2985                                continue;
2986                            }
2987                        }
2988                        let value = self.eval_expr_async(expr).await?;
2989                        let value = apply_tilde_expansion(value, home.as_deref());
2990                        tool_args.positional.push(value);
2991                    }
2992                }
2993                Arg::Named { key, value } => {
2994                    let val = self.eval_expr_async(value).await?;
2995                    let val = apply_tilde_expansion(val, home.as_deref());
2996                    tool_args.named.insert(key.clone(), val);
2997                }
2998                Arg::WordAssign { key, value } => {
2999                    let val = self.eval_expr_async(value).await?;
3000                    let val = apply_tilde_expansion(val, home.as_deref());
3001                    if accepts_word_assign {
3002                        tool_args.named.insert(key.clone(), val);
3003                    } else {
3004                        // Stringify "key=value" and pass as a positional.
3005                        // Matches bash: `cat foo=bar` opens a file named `foo=bar`.
3006                        let val_str = crate::interpreter::value_to_string(&val);
3007                        tool_args.positional.push(Value::String(format!("{key}={val_str}")));
3008                    }
3009                }
3010                Arg::ShortFlag(name) => {
3011                    if past_double_dash {
3012                        tool_args.positional.push(Value::String(format!("-{name}")));
3013                    } else if name.len() == 1 {
3014                        let flag_name = name.as_str();
3015                        let lookup = param_lookup.get(flag_name);
3016                        let is_bool = lookup.map(|(_, typ, _)| is_bool_type(typ)).unwrap_or(true);
3017
3018                        if is_bool {
3019                            tool_args.flags.insert(flag_name.to_string());
3020                        } else {
3021                            // Non-bool: consume `consumes` positionals as value(s)
3022                            let canonical = lookup.map(|(n, _, _)| *n).unwrap_or(flag_name);
3023                            let consumes = lookup.map(|(_, _, c)| *c).unwrap_or(1);
3024                            self.consume_flag_positionals(
3025                                args,
3026                                name,
3027                                canonical,
3028                                consumes,
3029                                &positional_indices,
3030                                &mut consumed,
3031                                i,
3032                                &mut tool_args,
3033                            )
3034                            .await?;
3035                        }
3036                    } else if let Some(&(canonical, typ, consumes)) = param_lookup.get(name.as_str()) {
3037                        // Multi-char short flag matches a schema param (POSIX style: -name value)
3038                        if is_bool_type(typ) {
3039                            tool_args.flags.insert(canonical.to_string());
3040                        } else {
3041                            self.consume_flag_positionals(
3042                                args,
3043                                name,
3044                                canonical,
3045                                consumes,
3046                                &positional_indices,
3047                                &mut consumed,
3048                                i,
3049                                &mut tool_args,
3050                            )
3051                            .await?;
3052                        }
3053                    } else if let Some(&(canonical, _, _)) = param_lookup
3054                        .get(&name[..1])
3055                        .filter(|(_, typ, _)| !is_bool_type(typ))
3056                    {
3057                        // Glued short-flag value: `cut -f1`, `head -c5`, `cut -f1-3`,
3058                        // `grep -A1`. The first char is a declared value-taking short
3059                        // flag, so the rest of the token is its value — the coreutils
3060                        // idiom. The lexer's flag char class is `[a-zA-Z][a-zA-Z0-9-]*`,
3061                        // so the first byte is always ASCII (safe to slice) and the tail
3062                        // is a plain literal. Single value only; no builtin short flag
3063                        // declares consumes>1.
3064                        tool_args
3065                            .named
3066                            .insert(canonical.to_string(), Value::String(name[1..].to_string()));
3067                    } else {
3068                        // Multi-char combined flags like -la: always boolean
3069                        for c in name.chars() {
3070                            tool_args.flags.insert(c.to_string());
3071                        }
3072                    }
3073                }
3074                Arg::LongFlag(name) => {
3075                    if past_double_dash {
3076                        tool_args.positional.push(Value::String(format!("--{name}")));
3077                    } else {
3078                        let lookup = param_lookup.get(name.as_str());
3079                        // An *undeclared* long flag under a `map_positionals`
3080                        // (backend/MCP) schema that is immediately followed by an
3081                        // unconsumed positional is ambiguous: kaish can't tell the
3082                        // space-form value (`--type explorer`) from a bool flag
3083                        // before a real positional (`--force file.txt`). Defaulting
3084                        // to bool here silently divorces the value and misroutes it
3085                        // — a privilege-escalation-by-typo against deny-by-default
3086                        // embedders (docs/issues.md). Fail loud instead of guessing.
3087                        let ambiguous_value = (lookup.is_none()
3088                            && leaf.is_some_and(|s| s.map_positionals)
3089                            && !consumed.contains(&(i + 1)))
3090                            .then(|| match args.get(i + 1) {
3091                                // Echo a concrete value for a copy-pasteable fix
3092                                // when it's a plain literal; fall back to VALUE.
3093                                Some(Arg::Positional(Expr::Literal(Value::String(s)))) => {
3094                                    Some(s.clone())
3095                                }
3096                                Some(Arg::Positional(_)) => Some("VALUE".to_string()),
3097                                _ => None,
3098                            })
3099                            .flatten();
3100                        if let Some(val) = ambiguous_value {
3101                            let tool = leaf.map(|s| s.name.as_str()).unwrap_or("command");
3102                            anyhow::bail!(
3103                                "{tool}: --{name} is not a declared flag, so the \
3104                                 space-separated value would be silently dropped. \
3105                                 Use --{name}={val}, or have {tool} declare --{name} \
3106                                 in its schema."
3107                            );
3108                        }
3109                        let is_bool = lookup.map(|(_, typ, _)| is_bool_type(typ)).unwrap_or(true);
3110
3111                        if is_bool {
3112                            tool_args.flags.insert(name.clone());
3113                        } else {
3114                            let canonical = lookup.map(|(n, _, _)| *n).unwrap_or(name.as_str());
3115                            let consumes = lookup.map(|(_, _, c)| *c).unwrap_or(1);
3116                            self.consume_flag_positionals(
3117                                args,
3118                                name,
3119                                canonical,
3120                                consumes,
3121                                &positional_indices,
3122                                &mut consumed,
3123                                i,
3124                                &mut tool_args,
3125                            )
3126                            .await?;
3127                        }
3128                    }
3129                }
3130            }
3131            i += 1;
3132        }
3133
3134        // Map remaining positionals to unfilled non-bool schema params (in order).
3135        // This enables `drift_push "abc" "hello"` → named["target_ctx"] = "abc", named["content"] = "hello"
3136        // Positionals that appeared after `--` are never mapped (they're raw data).
3137        // Only for backend/external tools (map_positionals=true). Builtins handle their own positionals.
3138        // Keyed off the routed leaf so a subcommand tool maps against the active
3139        // leaf's params (kj leaves keep map_positionals=false → block skipped).
3140        if let Some(schema) = leaf.filter(|s| s.map_positionals) {
3141            let pre_dash_count = if past_double_dash {
3142                let dash_pos = args.iter().position(|a| matches!(a, Arg::DoubleDash)).unwrap_or(args.len());
3143                positional_indices.iter()
3144                    .filter(|idx| **idx < dash_pos && !consumed.contains(idx))
3145                    .count()
3146            } else {
3147                tool_args.positional.len()
3148            };
3149
3150            let mut remaining = Vec::new();
3151            let mut positional_iter = tool_args.positional.drain(..).enumerate();
3152
3153            for param in &schema.params {
3154                if tool_args.named.contains_key(&param.name) || tool_args.flags.contains(&param.name) {
3155                    continue;
3156                }
3157                if is_bool_type(&param.param_type) {
3158                    continue;
3159                }
3160                loop {
3161                    match positional_iter.next() {
3162                        Some((idx, val)) if idx < pre_dash_count => {
3163                            tool_args.named.insert(param.name.clone(), val);
3164                            break;
3165                        }
3166                        Some((_, val)) => {
3167                            remaining.push(val);
3168                        }
3169                        None => break,
3170                    }
3171                }
3172            }
3173
3174            remaining.extend(positional_iter.map(|(_, v)| v));
3175            tool_args.positional = remaining;
3176        }
3177
3178        Ok(tool_args)
3179    }
3180
3181    /// Build arguments as flat string list for external commands.
3182    ///
3183    /// Unlike `build_args_async` which separates flags into a HashSet (for schema-aware builtins),
3184    /// this preserves the original flag format as strings for external commands:
3185    /// - `-l` stays as `-l`
3186    /// - `--verbose` stays as `--verbose`
3187    /// - `key=value` stays as `key=value`
3188    ///
3189    /// This is what external commands expect in their argv.
3190    #[cfg(feature = "subprocess")]
3191    async fn build_args_flat(&self, args: &[Arg]) -> Result<Vec<String>> {
3192        let mut argv = Vec::new();
3193        let home = self.scope_home().await;
3194        for arg in args {
3195            match arg {
3196                Arg::Positional(expr) => {
3197                    // Glob expansion for external commands
3198                    if let Expr::GlobPattern(pattern) = expr {
3199                        let glob_enabled = {
3200                            let scope = self.scope.read().await;
3201                            scope.glob_enabled()
3202                        };
3203                        if glob_enabled {
3204                            let (paths, cwd) = {
3205                                let ctx = self.exec_ctx.read().await;
3206                                let paths = ctx.expand_glob(pattern).await
3207                                    .map_err(|e| anyhow::anyhow!("glob: {}", e))?;
3208                                let cwd = ctx.resolve_path(".");
3209                                (paths, cwd)
3210                            };
3211                            if paths.is_empty() {
3212                                return Err(anyhow::anyhow!("no matches: {}", pattern));
3213                            }
3214                            for path in paths {
3215                                let display = if !pattern.starts_with('/') {
3216                                    path.strip_prefix(&cwd)
3217                                        .unwrap_or(&path)
3218                                        .to_string_lossy().into_owned()
3219                                } else {
3220                                    path.to_string_lossy().into_owned()
3221                                };
3222                                argv.push(display);
3223                            }
3224                            continue;
3225                        }
3226                    }
3227                    let value = self.eval_expr_async(expr).await?;
3228                    let value = apply_tilde_expansion(value, home.as_deref());
3229                    argv.push(value_to_string(&value));
3230                }
3231                Arg::Named { key, value } => {
3232                    let val = self.eval_expr_async(value).await?;
3233                    let val = apply_tilde_expansion(val, home.as_deref());
3234                    argv.push(format!("--{}={}", key, value_to_string(&val)));
3235                }
3236                Arg::WordAssign { key, value } => {
3237                    let val = self.eval_expr_async(value).await?;
3238                    let val = apply_tilde_expansion(val, home.as_deref());
3239                    argv.push(format!("{}={}", key, value_to_string(&val)));
3240                }
3241                Arg::ShortFlag(name) => {
3242                    // Preserve original format: -l, -la (combined flags)
3243                    argv.push(format!("-{}", name));
3244                }
3245                Arg::LongFlag(name) => {
3246                    // Preserve original format: --verbose
3247                    argv.push(format!("--{}", name));
3248                }
3249                Arg::DoubleDash => {
3250                    // Preserve the -- marker
3251                    argv.push("--".to_string());
3252                }
3253            }
3254        }
3255        Ok(argv)
3256    }
3257
3258    /// Async expression evaluator that supports command substitution.
3259    ///
3260    /// This is used for contexts where expressions may contain `$(...)` command
3261    /// substitution. Unlike the sync `eval_expr`, this can execute pipelines.
3262    fn eval_expr_async<'a>(&'a self, expr: &'a Expr) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Value>> + Send + 'a>> {
3263        Box::pin(async move {
3264        match expr {
3265            Expr::Literal(value) => Ok(value.clone()),
3266            Expr::VarRef(path) => {
3267                let scope = self.scope.read().await;
3268                scope.resolve_path(path)
3269                    .ok_or_else(|| anyhow::anyhow!("undefined variable"))
3270            }
3271            Expr::Interpolated(parts) => {
3272                let mut result = String::new();
3273                for part in parts {
3274                    result.push_str(&self.eval_string_part_async(part).await?);
3275                }
3276                Ok(Value::String(result))
3277            }
3278            Expr::HereDocBody { parts, strip_tabs } => {
3279                let mut result = String::new();
3280                for sp in parts {
3281                    result.push_str(&self.eval_string_part_async(&sp.part).await?);
3282                }
3283                if *strip_tabs {
3284                    Ok(Value::String(crate::interpreter::strip_leading_tabs(&result)))
3285                } else {
3286                    Ok(Value::String(result))
3287                }
3288            }
3289            Expr::BinaryOp { left, op, right } => match op {
3290                BinaryOp::And => {
3291                    let left_val = self.eval_expr_async(left).await?;
3292                    if !is_truthy(&left_val) {
3293                        return Ok(left_val);
3294                    }
3295                    self.eval_expr_async(right).await
3296                }
3297                BinaryOp::Or => {
3298                    let left_val = self.eval_expr_async(left).await?;
3299                    if is_truthy(&left_val) {
3300                        return Ok(left_val);
3301                    }
3302                    self.eval_expr_async(right).await
3303                }
3304            },
3305            Expr::CommandSubst(stmts) => {
3306                // Snapshot scope+cwd before running — only output escapes,
3307                // not side effects like `cd` or variable assignments.
3308                let saved_scope = { self.scope.read().await.clone() };
3309                let saved_cwd = {
3310                    let ec = self.exec_ctx.read().await;
3311                    (ec.cwd.clone(), ec.prev_cwd.clone())
3312                };
3313
3314                // Capture result without `?` — restore state unconditionally
3315                let run_result = self.execute_block_capturing(stmts).await;
3316
3317                // Restore scope and cwd regardless of success/failure
3318                {
3319                    let mut scope = self.scope.write().await;
3320                    *scope = saved_scope;
3321                    if let Ok(ref r) = run_result {
3322                        scope.set_last_result(r.clone());
3323                    }
3324                }
3325                {
3326                    let mut ec = self.exec_ctx.write().await;
3327                    ec.cwd = saved_cwd.0;
3328                    ec.prev_cwd = saved_cwd.1;
3329                }
3330
3331                // Now propagate the error
3332                let result = run_result?;
3333
3334                // A binary result is preserved as bytes — never lossy-decoded to
3335                // a string. No trailing-newline trim (every byte is significant).
3336                if let Some(bytes) = result.out_bytes() {
3337                    Ok(Value::Bytes(bytes.to_vec()))
3338                // Prefer structured data (enables `for i in $(cmd)` iteration)
3339                } else if let Some(data) = &result.data {
3340                    Ok(data.clone())
3341                } else if let Some(output) = result.output() {
3342                    // Flat non-text node lists (glob, ls, tree) → iterable array
3343                    if output.is_flat() && !output.is_simple_text() && !output.root.is_empty() {
3344                        let items: Vec<serde_json::Value> = output.root.iter()
3345                            .map(|n| serde_json::Value::String(n.display_name().to_string()))
3346                            .collect();
3347                        Ok(Value::Json(serde_json::Value::Array(items)))
3348                    } else {
3349                        Ok(Value::String(result.text_out().trim_end().to_string()))
3350                    }
3351                } else {
3352                    // Otherwise return stdout as single string (NO implicit splitting)
3353                    Ok(Value::String(result.text_out().trim_end().to_string()))
3354                }
3355            }
3356            Expr::Test(test_expr) => {
3357                Ok(Value::Bool(self.eval_test_async(test_expr).await?))
3358            }
3359            Expr::Positional(n) => {
3360                let scope = self.scope.read().await;
3361                match scope.get_positional(*n) {
3362                    Some(s) => Ok(Value::String(s.to_string())),
3363                    None => Ok(Value::String(String::new())),
3364                }
3365            }
3366            Expr::AllArgs => {
3367                let scope = self.scope.read().await;
3368                Ok(Value::String(scope.all_args().join(" ")))
3369            }
3370            Expr::ArgCount => {
3371                let scope = self.scope.read().await;
3372                Ok(Value::Int(scope.arg_count() as i64))
3373            }
3374            Expr::VarLength(name) => {
3375                let scope = self.scope.read().await;
3376                match scope.get(name) {
3377                    Some(value) => Ok(Value::Int(value_to_string(value).len() as i64)),
3378                    None => Ok(Value::Int(0)),
3379                }
3380            }
3381            Expr::VarWithDefault { name, default } => {
3382                let scope = self.scope.read().await;
3383                let use_default = match scope.get(name) {
3384                    Some(value) => value_to_string(value).is_empty(),
3385                    None => true,
3386                };
3387                drop(scope); // Release the lock before recursive evaluation
3388                if use_default {
3389                    // Evaluate the default parts (supports nested expansions)
3390                    self.eval_string_parts_async(default).await.map(Value::String)
3391                } else {
3392                    let scope = self.scope.read().await;
3393                    scope.get(name).cloned().ok_or_else(|| anyhow::anyhow!("variable '{}' not found", name))
3394                }
3395            }
3396            Expr::Arithmetic(expr_str) => {
3397                let scope = self.scope.read().await;
3398                crate::arithmetic::eval_arithmetic(expr_str, &scope)
3399                    .map(Value::Int)
3400                    .map_err(|e| anyhow::anyhow!("arithmetic error: {}", e))
3401            }
3402            Expr::Command(cmd) => {
3403                // Execute command and return boolean based on exit code
3404                let result = self.execute_command(&cmd.name, &cmd.args).await?;
3405                Ok(Value::Bool(result.code == 0))
3406            }
3407            Expr::LastExitCode => {
3408                let scope = self.scope.read().await;
3409                Ok(Value::Int(scope.last_result().code))
3410            }
3411            Expr::CurrentPid => {
3412                let scope = self.scope.read().await;
3413                Ok(Value::Int(scope.pid() as i64))
3414            }
3415            Expr::GlobPattern(s) => Ok(Value::String(s.clone())),
3416        }
3417        })
3418    }
3419
3420    /// Async helper to evaluate multiple StringParts into a single string.
3421    fn eval_string_parts_async<'a>(&'a self, parts: &'a [StringPart]) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<String>> + Send + 'a>> {
3422        Box::pin(async move {
3423            let mut result = String::new();
3424            for part in parts {
3425                result.push_str(&self.eval_string_part_async(part).await?);
3426            }
3427            Ok(result)
3428        })
3429    }
3430
3431    /// Async helper to evaluate a StringPart.
3432    /// Evaluate a `[[ ]]` test expression asynchronously, routing file tests
3433    /// through the VFS backend instead of using raw `std::path`.
3434    fn eval_test_async<'a>(&'a self, test_expr: &'a TestExpr) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<bool>> + Send + 'a>> {
3435        Box::pin(async move {
3436            match test_expr {
3437                TestExpr::FileTest { op, path } => {
3438                    let path_value = self.eval_expr_async(path).await?;
3439                    let path_str = value_to_string(&path_value);
3440                    let backend = self.exec_ctx.read().await.backend.clone();
3441                    let entry = backend.stat(std::path::Path::new(&path_str)).await.ok();
3442                    Ok(match op {
3443                        FileTestOp::Exists => entry.is_some(),
3444                        FileTestOp::IsFile => entry.as_ref().is_some_and(|e| e.is_file()),
3445                        FileTestOp::IsDir => entry.as_ref().is_some_and(|e| e.is_dir()),
3446                        FileTestOp::Readable => entry.is_some(),
3447                        FileTestOp::Writable => entry.as_ref().is_some_and(|e| {
3448                            e.permissions.is_none_or(|p| p & 0o222 != 0)
3449                        }),
3450                        FileTestOp::Executable => entry.as_ref().is_some_and(|e| {
3451                            e.permissions.is_some_and(|p| p & 0o111 != 0)
3452                        }),
3453                    })
3454                }
3455                TestExpr::StringTest { op, value } => {
3456                    let val = self.eval_expr_async(value).await?;
3457                    let s = value_to_string(&val);
3458                    Ok(match op {
3459                        crate::ast::StringTestOp::IsEmpty => s.is_empty(),
3460                        crate::ast::StringTestOp::IsNonEmpty => !s.is_empty(),
3461                    })
3462                }
3463                TestExpr::Comparison { left, op, right } => {
3464                    // Evaluate operands async (handles $(cmd)), then compare sync
3465                    let left_val = self.eval_expr_async(left).await?;
3466                    let right_val = self.eval_expr_async(right).await?;
3467                    let resolved = TestExpr::Comparison {
3468                        left: Box::new(Expr::Literal(left_val)),
3469                        op: *op,
3470                        right: Box::new(Expr::Literal(right_val)),
3471                    };
3472                    let expr = Expr::Test(Box::new(resolved));
3473                    let mut scope = self.scope.write().await;
3474                    let value = eval_expr(&expr, &mut scope)
3475                        .map_err(|e| anyhow::anyhow!("{}", e))?;
3476                    Ok(value_to_bool(&value))
3477                }
3478                TestExpr::And { left, right } => {
3479                    if !self.eval_test_async(left).await? {
3480                        Ok(false)
3481                    } else {
3482                        self.eval_test_async(right).await
3483                    }
3484                }
3485                TestExpr::Or { left, right } => {
3486                    if self.eval_test_async(left).await? {
3487                        Ok(true)
3488                    } else {
3489                        self.eval_test_async(right).await
3490                    }
3491                }
3492                TestExpr::Not { expr } => {
3493                    Ok(!self.eval_test_async(expr).await?)
3494                }
3495            }
3496        })
3497    }
3498
3499    fn eval_string_part_async<'a>(&'a self, part: &'a StringPart) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<String>> + Send + 'a>> {
3500        Box::pin(async move {
3501            match part {
3502                StringPart::Literal(s) => Ok(s.clone()),
3503                StringPart::Var(path) => {
3504                    let scope = self.scope.read().await;
3505                    match scope.resolve_path(path) {
3506                        Some(value) => Ok(value_to_string(&value)),
3507                        None => Ok(String::new()), // Unset vars expand to empty
3508                    }
3509                }
3510                StringPart::VarWithDefault { name, default } => {
3511                    let scope = self.scope.read().await;
3512                    let use_default = match scope.get(name) {
3513                        Some(value) => value_to_string(value).is_empty(),
3514                        None => true,
3515                    };
3516                    drop(scope); // Release lock before recursive evaluation
3517                    if use_default {
3518                        // Evaluate the default parts (supports nested expansions)
3519                        self.eval_string_parts_async(default).await
3520                    } else {
3521                        let scope = self.scope.read().await;
3522                        Ok(value_to_string(scope.get(name).ok_or_else(|| anyhow::anyhow!("variable '{}' not found", name))?))
3523                    }
3524                }
3525            StringPart::VarLength(name) => {
3526                let scope = self.scope.read().await;
3527                match scope.get(name) {
3528                    Some(value) => Ok(value_to_string(value).len().to_string()),
3529                    None => Ok("0".to_string()),
3530                }
3531            }
3532            StringPart::Positional(n) => {
3533                let scope = self.scope.read().await;
3534                match scope.get_positional(*n) {
3535                    Some(s) => Ok(s.to_string()),
3536                    None => Ok(String::new()),
3537                }
3538            }
3539            StringPart::AllArgs => {
3540                let scope = self.scope.read().await;
3541                Ok(scope.all_args().join(" "))
3542            }
3543            StringPart::ArgCount => {
3544                let scope = self.scope.read().await;
3545                Ok(scope.arg_count().to_string())
3546            }
3547            StringPart::Arithmetic(expr) => {
3548                let scope = self.scope.read().await;
3549                match crate::arithmetic::eval_arithmetic(expr, &scope) {
3550                    Ok(value) => Ok(value.to_string()),
3551                    Err(_) => Ok(String::new()),
3552                }
3553            }
3554            StringPart::CommandSubst(stmts) => {
3555                // Snapshot scope+cwd — command substitution in strings must
3556                // not leak side effects (e.g., `"dir: $(cd /; pwd)"` must not change cwd).
3557                let saved_scope = { self.scope.read().await.clone() };
3558                let saved_cwd = {
3559                    let ec = self.exec_ctx.read().await;
3560                    (ec.cwd.clone(), ec.prev_cwd.clone())
3561                };
3562
3563                // Capture result without `?` — restore state unconditionally
3564                let run_result = self.execute_block_capturing(stmts).await;
3565
3566                // Restore scope and cwd regardless of success/failure
3567                {
3568                    let mut scope = self.scope.write().await;
3569                    *scope = saved_scope;
3570                    if let Ok(ref r) = run_result {
3571                        scope.set_last_result(r.clone());
3572                    }
3573                }
3574                {
3575                    let mut ec = self.exec_ctx.write().await;
3576                    ec.cwd = saved_cwd.0;
3577                    ec.prev_cwd = saved_cwd.1;
3578                }
3579
3580                // Now propagate the error
3581                let result = run_result?;
3582
3583                // Embedding binary into a string is a text context: fail loud
3584                // rather than splice in U+FFFD garbage.
3585                match result.try_text_out() {
3586                    Ok(s) => Ok(s.trim_end_matches('\n').to_string()),
3587                    Err(e) => anyhow::bail!(
3588                        "command substitution in a string produced binary data ({e}) — \
3589                         pipe through base64/xxd"
3590                    ),
3591                }
3592            }
3593            StringPart::LastExitCode => {
3594                let scope = self.scope.read().await;
3595                Ok(scope.last_result().code.to_string())
3596            }
3597            StringPart::CurrentPid => {
3598                let scope = self.scope.read().await;
3599                Ok(scope.pid().to_string())
3600            }
3601        }
3602        })
3603    }
3604
3605    /// Update the last result in scope.
3606    async fn update_last_result(&self, result: &ExecResult) {
3607        let mut scope = self.scope.write().await;
3608        scope.set_last_result(result.clone());
3609    }
3610
3611    /// Drain accumulated pipeline stderr into a result.
3612    ///
3613    /// Called after each sub-statement inside control structures (`if`, `for`,
3614    /// `while`, `case`, `&&`, `||`) so that stderr appears incrementally rather
3615    /// than batching until the entire structure finishes.
3616    async fn drain_stderr_into(&self, result: &mut ExecResult) {
3617        let drained = {
3618            let mut receiver = self.stderr_receiver.lock().await;
3619            receiver.drain_lossy()
3620        };
3621        if !drained.is_empty() {
3622            if !result.err.is_empty() && !result.err.ends_with('\n') {
3623                result.err.push('\n');
3624            }
3625            result.err.push_str(&drained);
3626        }
3627    }
3628
3629    /// Execute a user-defined function with local variable scoping.
3630    ///
3631    /// Functions push a new scope frame for local variables. Variables declared
3632    /// with `local` are scoped to the function; other assignments modify outer
3633    /// scopes (or create in root if new).
3634    async fn execute_user_tool(&self, def: ToolDef, args: &[Arg]) -> Result<ExecResult> {
3635        // 1. Build function args from AST args (async to support command substitution)
3636        let tool_args = self.build_args_async(args, None).await?;
3637
3638        // 2. Push a new scope frame for local variables
3639        {
3640            let mut scope = self.scope.write().await;
3641            scope.push_frame();
3642        }
3643
3644        // 3. Save current positional parameters and set new ones for this function
3645        let saved_positional = {
3646            let mut scope = self.scope.write().await;
3647            let saved = scope.save_positional();
3648
3649            // Set up new positional parameters ($0 = function name, $1, $2, ... = args)
3650            let positional_args: Vec<String> = tool_args.positional
3651                .iter()
3652                .map(value_to_string)
3653                .collect();
3654            scope.set_positional(&def.name, positional_args);
3655
3656            saved
3657        };
3658
3659        // 3. Execute body statements with control flow handling
3660        // Accumulate output across statements (like sh)
3661        // Accumulate stdout as raw bytes so a binary-producing statement in a
3662        // function body survives instead of being lossy-decoded here.
3663        let mut accumulated_out: Vec<u8> = Vec::new();
3664        let mut accumulated_err = String::new();
3665        let mut last_code = 0i64;
3666        let mut last_data: Option<Value> = None;
3667
3668        fn push_out(buf: &mut Vec<u8>, r: &ExecResult) {
3669            match r.out_bytes() {
3670                Some(b) => buf.extend_from_slice(b),
3671                None => buf.extend_from_slice(r.text_out().as_bytes()),
3672            }
3673        }
3674
3675        // Track execution error for propagation after cleanup
3676        let mut exec_error: Option<anyhow::Error> = None;
3677        let mut exit_code: Option<i64> = None;
3678
3679        for stmt in &def.body {
3680            match self.execute_stmt_flow(stmt).await {
3681                Ok(flow) => {
3682                    // Drain pipeline stderr after each sub-statement.
3683                    let drained = {
3684                        let mut receiver = self.stderr_receiver.lock().await;
3685                        receiver.drain_lossy()
3686                    };
3687                    if !drained.is_empty() {
3688                        accumulated_err.push_str(&drained);
3689                    }
3690
3691                    match flow {
3692                        ControlFlow::Normal(r) => {
3693                            push_out(&mut accumulated_out, &r);
3694                            accumulated_err.push_str(&r.err);
3695                            last_code = r.code;
3696                            last_data = r.data;
3697                        }
3698                        ControlFlow::Return { value } => {
3699                            push_out(&mut accumulated_out, &value);
3700                            accumulated_err.push_str(&value.err);
3701                            last_code = value.code;
3702                            last_data = value.data;
3703                            break;
3704                        }
3705                        ControlFlow::Exit { code } => {
3706                            exit_code = Some(code);
3707                            break;
3708                        }
3709                        ControlFlow::Break { result: r, .. } | ControlFlow::Continue { result: r, .. } => {
3710                            push_out(&mut accumulated_out, &r);
3711                            accumulated_err.push_str(&r.err);
3712                            last_code = r.code;
3713                            last_data = r.data;
3714                        }
3715                    }
3716                }
3717                Err(e) => {
3718                    exec_error = Some(e);
3719                    break;
3720                }
3721            }
3722        }
3723
3724        // 4. Pop scope frame and restore original positional parameters (unconditionally)
3725        {
3726            let mut scope = self.scope.write().await;
3727            scope.pop_frame();
3728            scope.set_positional(saved_positional.0, saved_positional.1);
3729        }
3730
3731        // 5. Propagate error or exit after cleanup
3732        if let Some(e) = exec_error {
3733            return Err(e);
3734        }
3735        let code = exit_code.unwrap_or(last_code);
3736        let mut result = ExecResult::success_text_or_bytes(accumulated_out).with_code(code);
3737        result.err = accumulated_err;
3738        result.data = last_data;
3739        Ok(result)
3740    }
3741
3742    /// Execute a command-substitution body — a block of statements — and return
3743    /// the combined result. Stdout/stderr accumulate across statements with **no
3744    /// inserted separator** (matching bash and the `;`/`&&`/`||` output model),
3745    /// and the last statement's exit code and structured `.data` ride through,
3746    /// so `for x in $(seq 3)` still iterates the array and `$(printf a; printf b)`
3747    /// captures `ab`. Scope/cwd snapshotting (so `$(cd / && pwd)` cannot leak the
3748    /// cwd) is the caller's responsibility.
3749    async fn execute_block_capturing(&self, stmts: &[Stmt]) -> Result<ExecResult> {
3750        // Accumulate stdout as raw bytes so a binary-producing statement
3751        // (`$(dd …)`, `$(base64 -d …)`) isn't lossy-decoded here before the
3752        // caller can preserve it. The final result is text iff valid UTF-8.
3753        let mut accumulated_out: Vec<u8> = Vec::new();
3754        let mut accumulated_err = String::new();
3755        let mut last_code = 0i64;
3756        let mut last_data: Option<Value> = None;
3757
3758        // Append a statement's stdout as raw bytes (binary) or its UTF-8 bytes.
3759        fn push_out(buf: &mut Vec<u8>, r: &ExecResult) {
3760            match r.out_bytes() {
3761                Some(b) => buf.extend_from_slice(b),
3762                None => buf.extend_from_slice(r.text_out().as_bytes()),
3763            }
3764        }
3765
3766        for stmt in stmts {
3767            let flow = self.execute_stmt_flow(stmt).await?;
3768
3769            // Drain pipeline stderr after each sub-statement (incremental, like
3770            // the control-structure and function-body executors).
3771            let drained = {
3772                let mut receiver = self.stderr_receiver.lock().await;
3773                receiver.drain_lossy()
3774            };
3775            if !drained.is_empty() {
3776                accumulated_err.push_str(&drained);
3777            }
3778
3779            match flow {
3780                ControlFlow::Normal(r)
3781                | ControlFlow::Break { result: r, .. }
3782                | ControlFlow::Continue { result: r, .. } => {
3783                    push_out(&mut accumulated_out, &r);
3784                    accumulated_err.push_str(&r.err);
3785                    last_code = r.code;
3786                    last_data = r.data;
3787                }
3788                ControlFlow::Return { value } => {
3789                    push_out(&mut accumulated_out, &value);
3790                    accumulated_err.push_str(&value.err);
3791                    last_code = value.code;
3792                    last_data = value.data;
3793                    break;
3794                }
3795                ControlFlow::Exit { code } => {
3796                    last_code = code;
3797                    break;
3798                }
3799            }
3800        }
3801
3802        let mut result = ExecResult::success_text_or_bytes(accumulated_out).with_code(last_code);
3803        result.err = accumulated_err;
3804        result.data = last_data;
3805        Ok(result)
3806    }
3807
3808    /// Execute the `source` / `.` command to include and run a script.
3809    ///
3810    /// Unlike regular tool execution, `source` executes in the CURRENT scope,
3811    /// allowing the sourced script to set variables and modify shell state.
3812    async fn execute_source(&self, args: &[Arg]) -> Result<ExecResult> {
3813        // Get the file path from the first positional argument
3814        let tool_args = self.build_args_async(args, None).await?;
3815        let path = match tool_args.positional.first() {
3816            Some(Value::String(s)) => s.clone(),
3817            Some(v) => value_to_string(v),
3818            None => {
3819                return Ok(ExecResult::failure(1, "source: missing filename"));
3820            }
3821        };
3822
3823        // Resolve path relative to cwd
3824        let full_path = {
3825            let ctx = self.exec_ctx.read().await;
3826            if path.starts_with('/') {
3827                std::path::PathBuf::from(&path)
3828            } else {
3829                ctx.cwd.join(&path)
3830            }
3831        };
3832
3833        // Read file content via backend
3834        let content = {
3835            let ctx = self.exec_ctx.read().await;
3836            match ctx.backend.read(&full_path, None).await {
3837                Ok(bytes) => {
3838                    String::from_utf8(bytes).map_err(|e| {
3839                        anyhow::anyhow!("source: {}: invalid UTF-8: {}", path, e)
3840                    })?
3841                }
3842                Err(e) => {
3843                    return Ok(ExecResult::failure(
3844                        1,
3845                        format!("source: {}: {}", path, e),
3846                    ));
3847                }
3848            }
3849        };
3850
3851        // Parse the content
3852        let program = match crate::parser::parse(&content) {
3853            Ok(p) => p,
3854            Err(errors) => {
3855                let msg = errors
3856                    .iter()
3857                    .map(|e| format!("{}:{}: {}", path, e.span.start, e.message))
3858                    .collect::<Vec<_>>()
3859                    .join("\n");
3860                return Ok(ExecResult::failure(1, format!("source: {}", msg)));
3861            }
3862        };
3863
3864        // Execute each statement in the CURRENT scope (not isolated)
3865        let mut result = ExecResult::success("");
3866        for stmt in program.statements {
3867            if matches!(stmt, crate::ast::Stmt::Empty) {
3868                continue;
3869            }
3870
3871            match self.execute_stmt_flow(&stmt).await {
3872                Ok(flow) => {
3873                    self.drain_stderr_into(&mut result).await;
3874                    match flow {
3875                        ControlFlow::Normal(r) => {
3876                            result = r.clone();
3877                            self.update_last_result(&r).await;
3878                        }
3879                        ControlFlow::Break { .. } | ControlFlow::Continue { .. } => {
3880                            return Err(anyhow::anyhow!(
3881                                "source: {}: unexpected break/continue outside loop",
3882                                path
3883                            ));
3884                        }
3885                        ControlFlow::Return { value } => {
3886                            return Ok(value);
3887                        }
3888                        ControlFlow::Exit { code } => {
3889                            result.code = code;
3890                            return Ok(result);
3891                        }
3892                    }
3893                }
3894                Err(e) => {
3895                    return Err(e.context(format!("source: {}", path)));
3896                }
3897            }
3898        }
3899
3900        Ok(result)
3901    }
3902
3903    /// Try to execute a script from PATH directories.
3904    ///
3905    /// Searches PATH for `{name}.kai` files and executes them in isolated scope
3906    /// (like user-defined tools). Returns None if no script is found.
3907    async fn try_execute_script(&self, name: &str, args: &[Arg]) -> Result<Option<ExecResult>> {
3908        // Get PATH from scope (default to "/bin")
3909        let path_value = {
3910            let scope = self.scope.read().await;
3911            scope
3912                .get("PATH")
3913                .map(value_to_string)
3914                .unwrap_or_else(|| "/bin".to_string())
3915        };
3916
3917        // Search PATH directories for script
3918        for dir in path_value.split(':') {
3919            if dir.is_empty() {
3920                continue;
3921            }
3922
3923            // Build script path: {dir}/{name}.kai
3924            let script_path = PathBuf::from(dir).join(format!("{}.kai", name));
3925
3926            // Check if script exists
3927            let exists = {
3928                let ctx = self.exec_ctx.read().await;
3929                ctx.backend.exists(&script_path).await
3930            };
3931
3932            if !exists {
3933                continue;
3934            }
3935
3936            // Read script content
3937            let content = {
3938                let ctx = self.exec_ctx.read().await;
3939                match ctx.backend.read(&script_path, None).await {
3940                    Ok(bytes) => match String::from_utf8(bytes) {
3941                        Ok(s) => s,
3942                        Err(e) => {
3943                            return Ok(Some(ExecResult::failure(
3944                                1,
3945                                format!("{}: invalid UTF-8: {}", script_path.display(), e),
3946                            )));
3947                        }
3948                    },
3949                    Err(e) => {
3950                        return Ok(Some(ExecResult::failure(
3951                            1,
3952                            format!("{}: {}", script_path.display(), e),
3953                        )));
3954                    }
3955                }
3956            };
3957
3958            // Parse the script
3959            let program = match crate::parser::parse(&content) {
3960                Ok(p) => p,
3961                Err(errors) => {
3962                    let msg = errors
3963                        .iter()
3964                        .map(|e| format!("{}:{}: {}", script_path.display(), e.span.start, e.message))
3965                        .collect::<Vec<_>>()
3966                        .join("\n");
3967                    return Ok(Some(ExecResult::failure(1, msg)));
3968                }
3969            };
3970
3971            // Build tool_args from args (async for command substitution support)
3972            let tool_args = self.build_args_async(args, None).await?;
3973
3974            // Create isolated scope (like user tools)
3975            let mut isolated_scope = Scope::new();
3976
3977            // Set up positional parameters ($0 = script name, $1, $2, ... = args)
3978            let positional_args: Vec<String> = tool_args.positional
3979                .iter()
3980                .map(value_to_string)
3981                .collect();
3982            isolated_scope.set_positional(name, positional_args);
3983
3984            // Save current scope and swap with isolated scope
3985            let original_scope = {
3986                let mut scope = self.scope.write().await;
3987                std::mem::replace(&mut *scope, isolated_scope)
3988            };
3989
3990            // Execute script statements — track outcome for cleanup
3991            let mut result = ExecResult::success("");
3992            let mut exec_error: Option<anyhow::Error> = None;
3993            let mut exit_code: Option<i64> = None;
3994
3995            for stmt in program.statements {
3996                if matches!(stmt, crate::ast::Stmt::Empty) {
3997                    continue;
3998                }
3999
4000                match self.execute_stmt_flow(&stmt).await {
4001                    Ok(flow) => {
4002                        match flow {
4003                            ControlFlow::Normal(r) => result = r,
4004                            ControlFlow::Return { value } => {
4005                                result = value;
4006                                break;
4007                            }
4008                            ControlFlow::Exit { code } => {
4009                                exit_code = Some(code);
4010                                break;
4011                            }
4012                            ControlFlow::Break { result: r, .. } | ControlFlow::Continue { result: r, .. } => {
4013                                result = r;
4014                            }
4015                        }
4016                    }
4017                    Err(e) => {
4018                        exec_error = Some(e);
4019                        break;
4020                    }
4021                }
4022            }
4023
4024            // Restore original scope unconditionally
4025            {
4026                let mut scope = self.scope.write().await;
4027                *scope = original_scope;
4028            }
4029
4030            // Propagate error or exit after cleanup
4031            if let Some(e) = exec_error {
4032                return Err(e.context(format!("script: {}", script_path.display())));
4033            }
4034            if let Some(code) = exit_code {
4035                result.code = code;
4036                return Ok(Some(result));
4037            }
4038
4039            return Ok(Some(result));
4040        }
4041
4042        // No script found
4043        Ok(None)
4044    }
4045
4046    /// Try to execute an external command from PATH.
4047    ///
4048    /// This is the fallback when no builtin or user-defined tool matches.
4049    /// External commands receive a clean argv (flags preserved in their original format).
4050    ///
4051    /// # Requirements
4052    /// - Command must be found in PATH
4053    /// - Current working directory must be on a real filesystem (not virtual like /v)
4054    ///
4055    /// # Returns
4056    /// - `Ok(Some(result))` if command was found and executed
4057    /// - `Ok(None)` if command was not found in PATH
4058    /// - `Err` on execution errors
4059    #[cfg(not(feature = "subprocess"))]
4060    async fn try_execute_external(&self, _name: &str, _args: &[Arg]) -> Result<Option<ExecResult>> {
4061        Ok(None)
4062    }
4063
4064    /// Try to execute an external command from PATH.
4065    #[cfg(feature = "subprocess")]
4066    #[tracing::instrument(level = "debug", skip(self, args), fields(command = %name))]
4067    async fn try_execute_external(&self, name: &str, args: &[Arg]) -> Result<Option<ExecResult>> {
4068        // Read the cancel token from `self.exec_ctx`, which `dispatch_command`
4069        // populates from the inbound ctx.cancel on every dispatch. This is
4070        // what makes the `timeout` builtin's swapped child token reach the
4071        // wait_or_kill discipline below — reading `self.cancel_token` would
4072        // give the kernel-wide token and miss the timeout's child cascade.
4073        let cancel = {
4074            let ec = self.exec_ctx.read().await;
4075            ec.cancel.clone()
4076        };
4077        let kill_grace = self.kill_grace;
4078        if !self.allow_external_commands {
4079            return Ok(None);
4080        }
4081
4082        // Get real working directory for relative path resolution and child cwd.
4083        // If the CWD is virtual (no real filesystem path), skip external command
4084        // execution entirely — return None so the dispatch can fall through to
4085        // backend-registered tools.
4086        let real_cwd = {
4087            let ctx = self.exec_ctx.read().await;
4088            match ctx.backend.resolve_real_path(&ctx.cwd) {
4089                Some(p) => p,
4090                None => return Ok(None),
4091            }
4092        };
4093
4094        let executable = if name.contains('/') {
4095            // Resolve relative paths (./script, ../bin/tool) against the shell's cwd
4096            let resolved = if std::path::Path::new(name).is_absolute() {
4097                std::path::PathBuf::from(name)
4098            } else {
4099                real_cwd.join(name)
4100            };
4101            if !resolved.exists() {
4102                return Ok(Some(ExecResult::failure(
4103                    127,
4104                    format!("{}: No such file or directory", name),
4105                )));
4106            }
4107            if !resolved.is_file() {
4108                return Ok(Some(ExecResult::failure(
4109                    126,
4110                    format!("{}: Is a directory", name),
4111                )));
4112            }
4113            #[cfg(unix)]
4114            {
4115                use std::os::unix::fs::PermissionsExt;
4116                let mode = std::fs::metadata(&resolved)
4117                    .map(|m| m.permissions().mode())
4118                    .unwrap_or(0);
4119                if mode & 0o111 == 0 {
4120                    return Ok(Some(ExecResult::failure(
4121                        126,
4122                        format!("{}: Permission denied", name),
4123                    )));
4124                }
4125            }
4126            resolved.to_string_lossy().into_owned()
4127        } else {
4128            // Get PATH from scope or environment
4129            let path_var = {
4130                let scope = self.scope.read().await;
4131                scope
4132                    .get("PATH")
4133                    .map(value_to_string)
4134                    .unwrap_or_else(|| std::env::var("PATH").unwrap_or_default())
4135            };
4136
4137            // Resolve command in PATH
4138            match resolve_in_path(name, &path_var) {
4139                Some(path) => path,
4140                None => return Ok(None), // Not found - let caller handle error
4141            }
4142        };
4143
4144        tracing::debug!(executable = %executable, "resolved external command");
4145
4146        // Build flat argv (preserves flag format)
4147        let argv = self.build_args_flat(args).await?;
4148
4149        // Get stdin if available
4150        let stdin_data = {
4151            let mut ctx = self.exec_ctx.write().await;
4152            ctx.take_stdin()
4153        };
4154
4155        // Build and spawn the command
4156        use tokio::process::Command;
4157
4158        let mut cmd = Command::new(&executable);
4159        cmd.args(&argv);
4160        cmd.current_dir(&real_cwd);
4161
4162        // Hermetic env: child sees only kaish's exported vars, not the kaish
4163        // process's OS env. Frontends that want OS-env passthrough (REPL, MCP)
4164        // populate it via KernelConfig::initial_vars at construction.
4165        cmd.env_clear();
4166        {
4167            let scope = self.scope.read().await;
4168            for (var_name, value) in scope.exported_vars() {
4169                cmd.env(var_name, value_to_string(&value));
4170            }
4171        }
4172
4173        // Handle stdin
4174        cmd.stdin(if stdin_data.is_some() {
4175            std::process::Stdio::piped()
4176        } else if self.interactive {
4177            std::process::Stdio::inherit()
4178        } else {
4179            std::process::Stdio::null()
4180        });
4181
4182        // In interactive mode, standalone or last-in-pipeline commands inherit
4183        // the terminal's stdout/stderr so output streams in real-time.
4184        // First/middle commands must capture stdout for the pipe — same as bash.
4185        let pipeline_position = {
4186            let ctx = self.exec_ctx.read().await;
4187            ctx.pipeline_position
4188        };
4189        let inherit_output = self.interactive
4190            && matches!(pipeline_position, PipelinePosition::Only | PipelinePosition::Last);
4191
4192        if inherit_output {
4193            cmd.stdout(std::process::Stdio::inherit());
4194            cmd.stderr(std::process::Stdio::inherit());
4195        } else {
4196            cmd.stdout(std::process::Stdio::piped());
4197            cmd.stderr(std::process::Stdio::piped());
4198        }
4199
4200        // On Unix, always put the child in its own process group so cancellation
4201        // can `killpg` the whole tree (the child plus any grandchildren).
4202        // Restoring default tty-related signal handlers stays gated on
4203        // job-control mode — those only matter when the child has a controlling
4204        // terminal.
4205        #[cfg(unix)]
4206        {
4207            let restore_jc_signals = self.terminal_state.is_some() && inherit_output;
4208            // SAFETY: setpgid and sigaction(SIG_DFL) are async-signal-safe per POSIX
4209            #[allow(unsafe_code)]
4210            unsafe {
4211                cmd.pre_exec(move || {
4212                    // Own process group — for kill scope.
4213                    nix::unistd::setpgid(nix::unistd::Pid::from_raw(0), nix::unistd::Pid::from_raw(0))
4214                        .map_err(|e| std::io::Error::from_raw_os_error(e as i32))?;
4215                    if restore_jc_signals {
4216                        use nix::libc::{sigaction, SIGTSTP, SIGTTOU, SIGTTIN, SIGINT, SIG_DFL};
4217                        let mut sa: nix::libc::sigaction = std::mem::zeroed();
4218                        sa.sa_sigaction = SIG_DFL;
4219                        if sigaction(SIGTSTP, &sa, std::ptr::null_mut()) != 0 {
4220                            return Err(std::io::Error::last_os_error());
4221                        }
4222                        if sigaction(SIGTTOU, &sa, std::ptr::null_mut()) != 0 {
4223                            return Err(std::io::Error::last_os_error());
4224                        }
4225                        if sigaction(SIGTTIN, &sa, std::ptr::null_mut()) != 0 {
4226                            return Err(std::io::Error::last_os_error());
4227                        }
4228                        if sigaction(SIGINT, &sa, std::ptr::null_mut()) != 0 {
4229                            return Err(std::io::Error::last_os_error());
4230                        }
4231                    }
4232                    Ok(())
4233                });
4234            }
4235        }
4236
4237        // Backstop for kill on drop in case our explicit kill path is bypassed
4238        // (panic, early return, etc) on the **capture** wait path. We do NOT
4239        // set this on the JC inherit path: that uses sync `waitpid` outside
4240        // tokio's view of the child, so on drop tokio would try to kill an
4241        // already-reaped (possibly-reused) PID. The JC path has its own
4242        // cancel handling via the side-task watcher.
4243        let in_jc_inherit_path = inherit_output && self.terminal_state.is_some();
4244        if !in_jc_inherit_path {
4245            cmd.kill_on_drop(true);
4246        }
4247
4248        // Spawn the process. Capture a `KillTarget` immediately so cancel/
4249        // timeout paths can deliver signals via pidfd (Linux ≥ 5.3) — bound
4250        // to this process's generation, immune to PID reuse if the OS reaps
4251        // the child before our kill syscalls fire.
4252        let mut child = match cmd.spawn() {
4253            Ok(child) => child,
4254            Err(e) => {
4255                return Ok(Some(ExecResult::failure(
4256                    127,
4257                    format!("{}: {}", name, e),
4258                )));
4259            }
4260        };
4261        let kill_target = crate::pidfd::KillTarget::from_child(&child);
4262
4263        // If this external runs on behalf of a background job, record its
4264        // process group on the job so `kill -<sig> %N` can signal the real
4265        // process directly (STOP/CONT/USR1/…, not just terminate). The child
4266        // did `setpgid(0, 0)` in pre_exec, so its PGID equals its PID.
4267        if let Some(job_id) = self.bg_job_id
4268            && let Some(pid) = child.id()
4269        {
4270            self.jobs.add_pgid(job_id, pid).await;
4271        }
4272
4273        // Write stdin if present
4274        if let Some(data) = stdin_data
4275            && let Some(mut stdin) = child.stdin.take()
4276        {
4277            use tokio::io::AsyncWriteExt;
4278            if let Err(e) = stdin.write_all(data.as_bytes()).await {
4279                return Ok(Some(ExecResult::failure(
4280                    1,
4281                    format!("{}: failed to write stdin: {}", name, e),
4282                )));
4283            }
4284            // Drop stdin to signal EOF
4285        }
4286
4287        if inherit_output {
4288            // Job control path: use waitpid with WUNTRACED for Ctrl-Z support
4289            #[cfg(unix)]
4290            if let Some(ref term) = self.terminal_state {
4291                let child_id = child.id().unwrap_or(0);
4292                let pid = nix::unistd::Pid::from_raw(child_id as i32);
4293                let pgid = pid; // child is its own pgid leader
4294
4295                // Give the terminal to the child's process group
4296                if let Err(e) = term.give_terminal_to(pgid) {
4297                    tracing::warn!("failed to give terminal to child: {}", e);
4298                }
4299
4300                let term_clone = term.clone();
4301                let cmd_name = name.to_string();
4302                let cmd_display = format!("{} {}", name, argv.join(" "));
4303                let jobs = self.jobs.clone();
4304
4305                // Side task that watches for cancellation while the blocking
4306                // waitpid runs. On cancel, it SIGTERMs the process group, waits
4307                // the grace period, then SIGKILLs. The blocking waitpid returns
4308                // when the child dies. AbortOnDrop guard cancels the watcher
4309                // on the success path so it doesn't keep running after wait
4310                // returns naturally.
4311                //
4312                // `wait_complete` shrinks the PID-reuse race: the watcher
4313                // checks it before each kill syscall and bails out if
4314                // wait_for_foreground has already reaped the child. This
4315                // doesn't fully eliminate the race (atomic load + kill is
4316                // not atomic with the OS reap+reuse), but narrows the window
4317                // to nanoseconds — enough to be ignorable in practice.
4318                let wait_complete = std::sync::Arc::new(
4319                    std::sync::atomic::AtomicBool::new(false)
4320                );
4321                let cancel_watcher = {
4322                    let cancel = cancel.clone();
4323                    let wc = wait_complete.clone();
4324                    // Ownership transfer: the JC path's sync wait inside
4325                    // block_in_place owns the child's reaping, so the
4326                    // cancel_watcher drives the kill side via KillTarget
4327                    // (pidfd-bound on Linux). When kill_target is None
4328                    // (older kernel + open failure, or non-Linux), falls
4329                    // through to the older PID-based path the closure
4330                    // captures from `pid`.
4331                    let target = kill_target.as_ref().map(|t| {
4332                        // Re-borrow the components we need into Owned-ish form
4333                        // so the spawned task is 'static. We can't move
4334                        // KillTarget directly because try_execute_external
4335                        // still uses it after the spawn — but on the JC path
4336                        // there is no further use after the watcher spawn,
4337                        // so a clone-of-pid + owned None pidfd is safe.
4338                        // Simpler: signal via the existing target by cloning
4339                        // a fresh pidfd; the original keeps its handle.
4340                        // Pidfd is just an OwnedFd — not Clone — so do it
4341                        // by re-opening from the pid. Fall back if reopen
4342                        // fails (race already reaped → best-effort kill).
4343                        crate::pidfd::KillTarget::from_pid(t.pid())
4344                    });
4345                    tokio::spawn(async move {
4346                        cancel.cancelled().await;
4347                        if wc.load(std::sync::atomic::Ordering::SeqCst) { return; }
4348                        use nix::sys::signal::Signal;
4349                        if let Some(t) = &target {
4350                            t.signal(Signal::SIGTERM);
4351                            t.signal_pg(Signal::SIGTERM);
4352                        } else {
4353                            let _ = nix::sys::signal::kill(pid, Signal::SIGTERM);
4354                            let _ = nix::sys::signal::killpg(pid, Signal::SIGTERM);
4355                        }
4356                        if kill_grace > Duration::ZERO {
4357                            tokio::time::sleep(kill_grace).await;
4358                            if wc.load(std::sync::atomic::Ordering::SeqCst) { return; }
4359                        }
4360                        if let Some(t) = &target {
4361                            t.signal(Signal::SIGKILL);
4362                            t.signal_pg(Signal::SIGKILL);
4363                        } else {
4364                            let _ = nix::sys::signal::kill(pid, Signal::SIGKILL);
4365                            let _ = nix::sys::signal::killpg(pid, Signal::SIGKILL);
4366                        }
4367                    })
4368                };
4369                struct AbortOnDrop(tokio::task::JoinHandle<()>);
4370                impl Drop for AbortOnDrop {
4371                    fn drop(&mut self) {
4372                        self.0.abort();
4373                    }
4374                }
4375                let _watcher_guard = AbortOnDrop(cancel_watcher);
4376
4377                let wait_complete_setter = wait_complete.clone();
4378                let code = tokio::task::block_in_place(move || {
4379                    let result = term_clone.wait_for_foreground(pid);
4380                    // Mark wait done before the watcher might fire.
4381                    wait_complete_setter.store(true, std::sync::atomic::Ordering::SeqCst);
4382
4383                    // Always reclaim the terminal
4384                    if let Err(e) = term_clone.reclaim_terminal() {
4385                        tracing::warn!("failed to reclaim terminal: {}", e);
4386                    }
4387
4388                    match result {
4389                        crate::terminal::WaitResult::Exited(code) => code as i64,
4390                        crate::terminal::WaitResult::Signaled(sig) => 128 + sig as i64,
4391                        crate::terminal::WaitResult::Stopped(_sig) => {
4392                            // Register as a stopped job
4393                            let rt = tokio::runtime::Handle::current();
4394                            let job_id = rt.block_on(jobs.register_stopped(
4395                                cmd_display,
4396                                child_id,
4397                                child_id, // pgid = pid for group leader
4398                            ));
4399                            eprintln!("\n[{}]+ Stopped\t{}", job_id, cmd_name);
4400                            148 // 128 + SIGTSTP(20) on most systems, but we use a fixed value
4401                        }
4402                    }
4403                });
4404
4405                return Ok(Some(ExecResult::from_output(code, String::new(), String::new())));
4406            }
4407
4408            // Non-job-control path with inherited stdio.
4409            let status = match wait_or_kill(&mut child, kill_target.as_ref(), &cancel, kill_grace).await {
4410                Ok(s) => s,
4411                Err(e) => {
4412                    return Ok(Some(ExecResult::failure(
4413                        1,
4414                        format!("{}: failed to wait: {}", name, e),
4415                    )));
4416                }
4417            };
4418
4419            let code = status.code().unwrap_or_else(|| {
4420                #[cfg(unix)]
4421                {
4422                    use std::os::unix::process::ExitStatusExt;
4423                    128 + status.signal().unwrap_or(0)
4424                }
4425                #[cfg(not(unix))]
4426                {
4427                    -1
4428                }
4429            }) as i64;
4430
4431            // stdout/stderr already went to the terminal
4432            Ok(Some(ExecResult::from_output(code, String::new(), String::new())))
4433        } else {
4434            // Capture output via bounded streams
4435            let stdout_stream = Arc::new(BoundedStream::new(DEFAULT_STREAM_MAX_SIZE));
4436            let stderr_stream = Arc::new(BoundedStream::new(DEFAULT_STREAM_MAX_SIZE));
4437
4438            let stdout_pipe = child.stdout.take();
4439            let stderr_pipe = child.stderr.take();
4440
4441            let stdout_clone = stdout_stream.clone();
4442            let stderr_clone = stderr_stream.clone();
4443
4444            let stdout_task = stdout_pipe.map(|pipe| {
4445                tokio::spawn(async move {
4446                    drain_to_stream(pipe, stdout_clone).await;
4447                })
4448            });
4449
4450            let stderr_task = stderr_pipe.map(|pipe| {
4451                tokio::spawn(async move {
4452                    drain_to_stream(pipe, stderr_clone).await;
4453                })
4454            });
4455
4456            let cancelled_before_wait = cancel.is_cancelled();
4457            let status = match wait_or_kill(&mut child, kill_target.as_ref(), &cancel, kill_grace).await {
4458                Ok(s) => s,
4459                Err(e) => {
4460                    if let Some(task) = stdout_task { task.abort(); let _ = task.await; }
4461                    if let Some(task) = stderr_task { task.abort(); let _ = task.await; }
4462                    return Ok(Some(ExecResult::failure(
4463                        1,
4464                        format!("{}: failed to wait: {}", name, e),
4465                    )));
4466                }
4467            };
4468
4469            // On cancel, abort the drain tasks (the child's pipes are gone;
4470            // late output is lost but predictable death beats partial capture).
4471            // On normal exit, await drains so we don't lose buffered output.
4472            if cancelled_before_wait || cancel.is_cancelled() {
4473                if let Some(task) = stdout_task { task.abort(); let _ = task.await; }
4474                if let Some(task) = stderr_task { task.abort(); let _ = task.await; }
4475            } else {
4476                if let Some(task) = stdout_task {
4477                    // Ignore join error — the drain task logs its own errors
4478                    let _ = task.await;
4479                }
4480                if let Some(task) = stderr_task {
4481                    let _ = task.await;
4482                }
4483            }
4484
4485            let code = status.code().unwrap_or_else(|| {
4486                #[cfg(unix)]
4487                {
4488                    use std::os::unix::process::ExitStatusExt;
4489                    128 + status.signal().unwrap_or(0)
4490                }
4491                #[cfg(not(unix))]
4492                {
4493                    -1
4494                }
4495            }) as i64;
4496
4497            // Read stdout as RAW bytes: text if valid UTF-8, else a Bytes
4498            // result, so `curl url`, `curl url > file.bin`, etc. keep binary
4499            // intact. stderr stays text. See docs/binary-data.md.
4500            let stdout = stdout_stream.read().await;
4501            let stderr = stderr_stream.read_string().await;
4502            let mut result = ExecResult::success_text_or_bytes(stdout).with_code(code);
4503            result.err = stderr;
4504            Ok(Some(result))
4505        }
4506    }
4507
4508    // --- Variable Access ---
4509
4510    /// Get a variable value.
4511    pub async fn get_var(&self, name: &str) -> Option<Value> {
4512        let scope = self.scope.read().await;
4513        scope.get(name).cloned()
4514    }
4515
4516    /// Check if error-exit mode is enabled (for testing).
4517    #[cfg(test)]
4518    pub async fn error_exit_enabled(&self) -> bool {
4519        let scope = self.scope.read().await;
4520        scope.error_exit_enabled()
4521    }
4522
4523    /// Set a variable value.
4524    pub async fn set_var(&self, name: &str, value: Value) {
4525        let mut scope = self.scope.write().await;
4526        scope.set(name.to_string(), value);
4527    }
4528
4529    /// Set positional parameters ($0 script name and $1-$9 args).
4530    pub async fn set_positional(&self, script_name: impl Into<String>, args: Vec<String>) {
4531        let mut scope = self.scope.write().await;
4532        scope.set_positional(script_name, args);
4533    }
4534
4535    /// List all variables.
4536    pub async fn list_vars(&self) -> Vec<(String, Value)> {
4537        let scope = self.scope.read().await;
4538        scope.all()
4539    }
4540
4541    /// List exported variables (name, value), sorted by name. These are the
4542    /// vars a child process would see (see `dispatch`'s hermetic env build).
4543    pub async fn exported_vars(&self) -> Vec<(String, Value)> {
4544        let scope = self.scope.read().await;
4545        scope.exported_vars()
4546    }
4547
4548    // --- CWD ---
4549
4550    /// Get current working directory.
4551    pub async fn cwd(&self) -> PathBuf {
4552        self.exec_ctx.read().await.cwd.clone()
4553    }
4554
4555    /// Set current working directory.
4556    pub async fn set_cwd(&self, path: PathBuf) {
4557        let mut ctx = self.exec_ctx.write().await;
4558        ctx.set_cwd(path);
4559    }
4560
4561    /// Set the working directory only if `path` resolves to a directory in the
4562    /// kernel's backend — the same namespace `cd` validates against. Unlike a
4563    /// raw host-FS `is_dir()` check, this correctly accepts virtual mounts
4564    /// (`/v/docs`, in-memory scratch, …) and rejects real paths that have since
4565    /// disappeared. Returns whether the cwd was changed.
4566    pub async fn try_set_cwd(&self, path: PathBuf) -> bool {
4567        // Clone the backend Arc out before the stat so we never hold the
4568        // exec_ctx lock across the await.
4569        let backend = self.exec_ctx.read().await.backend.clone();
4570        let is_dir = matches!(backend.stat(&path).await, Ok(entry) if entry.is_dir());
4571        if is_dir {
4572            self.exec_ctx.write().await.set_cwd(path);
4573        }
4574        is_dir
4575    }
4576
4577    // --- Last Result ---
4578
4579    /// Get the last result ($?).
4580    pub async fn last_result(&self) -> ExecResult {
4581        let scope = self.scope.read().await;
4582        scope.last_result().clone()
4583    }
4584
4585    // --- Tools ---
4586
4587    /// Check if a user-defined function exists.
4588    pub async fn has_function(&self, name: &str) -> bool {
4589        self.user_tools.read().await.contains_key(name)
4590    }
4591
4592    /// Get available tool schemas.
4593    pub fn tool_schemas(&self) -> Vec<crate::tools::ToolSchema> {
4594        self.tools.schemas()
4595    }
4596
4597    // --- Jobs ---
4598
4599    /// Get job manager.
4600    pub fn jobs(&self) -> Arc<JobManager> {
4601        self.jobs.clone()
4602    }
4603
4604    // --- VFS ---
4605
4606    /// Get VFS router.
4607    pub fn vfs(&self) -> Arc<VfsRouter> {
4608        self.vfs.clone()
4609    }
4610
4611    // --- State ---
4612
4613    /// Reset kernel to initial state.
4614    ///
4615    /// Clears in-memory variables and resets cwd to root.
4616    /// History is not cleared (it persists across resets).
4617    pub async fn reset(&self) -> Result<()> {
4618        {
4619            let mut scope = self.scope.write().await;
4620            *scope = Scope::new();
4621        }
4622        {
4623            let mut ctx = self.exec_ctx.write().await;
4624            ctx.cwd = PathBuf::from("/");
4625        }
4626        Ok(())
4627    }
4628
4629    /// Shutdown the kernel.
4630    pub async fn shutdown(self) -> Result<()> {
4631        // Wait for all background jobs
4632        self.jobs.wait_all().await;
4633        Ok(())
4634    }
4635
4636    /// Dispatch a single command using the full resolution chain.
4637    ///
4638    /// This is the core of `CommandDispatcher` — it syncs state between the
4639    /// passed-in `ExecContext` and kernel-internal state (scope, exec_ctx),
4640    /// then delegates to `execute_command` for the actual dispatch.
4641    ///
4642    /// State flow:
4643    /// 1. ctx → self: sync scope, cwd, stdin so internal methods see current state
4644    /// 2. execute_command: full dispatch chain (user tools, builtins, scripts, external, backend)
4645    /// 3. self → ctx: sync scope, cwd changes back so the pipeline runner sees them
4646    async fn dispatch_command(&self, cmd: &Command, ctx: &mut ExecContext) -> Result<ExecResult> {
4647        // Ensure nested dispatch (e.g. the `timeout` builtin re-dispatching
4648        // its inner command via ctx.dispatcher) routes through THIS kernel,
4649        // not a stale parent. Critical for forks: the fork's builtins must
4650        // use the fork's dispatcher, not the parent's.
4651        if let Some(d) = self.dispatcher() {
4652            ctx.dispatcher = Some(d);
4653        }
4654
4655        // 1. Sync ctx → self internals
4656        {
4657            let mut scope = self.scope.write().await;
4658            *scope = ctx.scope.clone();
4659        }
4660        {
4661            let mut ec = self.exec_ctx.write().await;
4662            ec.cwd = ctx.cwd.clone();
4663            ec.prev_cwd = ctx.prev_cwd.clone();
4664            ec.stdin = ctx.stdin.take();
4665            ec.stdin_data = ctx.stdin_data.take();
4666            // Streaming pipe endpoints and kernel stderr must flow to the
4667            // tool via self.exec_ctx — execute_command reads that, not the
4668            // passed-in ctx. Without moving these, concurrent pipeline
4669            // stages dispatched via a fork get pipe_stdin = None and
4670            // silently read nothing.
4671            ec.pipe_stdin = ctx.pipe_stdin.take();
4672            ec.pipe_stdout = ctx.pipe_stdout.take();
4673            if let Some(stderr) = ctx.stderr.clone() {
4674                ec.stderr = Some(stderr);
4675            }
4676            ec.aliases = ctx.aliases.clone();
4677            ec.ignore_config = ctx.ignore_config.clone();
4678            ec.output_limit = ctx.output_limit.clone();
4679            ec.pipeline_position = ctx.pipeline_position;
4680            // Sync the cancel token from ctx → ec. Builtins like `timeout`
4681            // swap ctx.cancel to a derived child token before re-dispatching;
4682            // execute_command's snapshot reads ec.cancel (kept aligned by
4683            // this sync), so try_execute_external sees the right token.
4684            ec.cancel = ctx.cancel.clone();
4685            // Same alignment for the watchdog: a fork dispatching through its
4686            // own kernel must hand the shared script clock to the snapshot so
4687            // patient holds in forked stages suspend the right timer.
4688            ec.watchdog = ctx.watchdog.clone();
4689        }
4690
4691        // 2. Execute via the full dispatch chain
4692        let result = self.execute_command(&cmd.name, &cmd.args).await?;
4693
4694        // 3. Sync self → ctx
4695        {
4696            let scope = self.scope.read().await;
4697            ctx.scope = scope.clone();
4698        }
4699        {
4700            let mut ec = self.exec_ctx.write().await;
4701            ctx.cwd = ec.cwd.clone();
4702            ctx.prev_cwd = ec.prev_cwd.clone();
4703            ctx.aliases = ec.aliases.clone();
4704            ctx.ignore_config = ec.ignore_config.clone();
4705            ctx.output_limit = ec.output_limit.clone();
4706            // Return any pipe endpoints that the tool didn't consume.
4707            // `take()` here keeps the fork's exec_ctx in a clean state for
4708            // the next dispatch — these are per-command and shouldn't leak
4709            // between calls.
4710            ctx.pipe_stdin = ec.pipe_stdin.take();
4711            ctx.pipe_stdout = ec.pipe_stdout.take();
4712        }
4713
4714        Ok(result)
4715    }
4716}
4717
4718#[async_trait]
4719impl CommandDispatcher for Kernel {
4720    /// Dispatch a command through the Kernel's full resolution chain.
4721    ///
4722    /// This is the single path for all command execution when called from
4723    /// the pipeline runner. It provides the full dispatch chain:
4724    /// user tools → builtins → .kai scripts → external commands → backend tools.
4725    async fn dispatch(&self, cmd: &Command, ctx: &mut ExecContext) -> Result<ExecResult> {
4726        self.dispatch_command(cmd, ctx).await
4727    }
4728
4729    /// Evaluate an expression through the kernel's async chain, including
4730    /// command substitution. Delegates to `eval_expr_async`, which snapshots
4731    /// the kernel's scope/cwd and restores them after any `$(...)` runs, so
4732    /// only command output escapes. The `ctx` is unused here because the
4733    /// kernel evaluates against its own session state (a fork carries the
4734    /// pipeline stage's snapshot); var refs resolve against that scope.
4735    async fn eval_expr(&self, expr: &Expr, _ctx: &ExecContext) -> Result<Value> {
4736        self.eval_expr_async(expr).await
4737    }
4738
4739    /// Produce a forked dispatcher with independent mutable state (detached).
4740    ///
4741    /// Calls the inherent `Kernel::fork` method (note the UFCS to avoid
4742    /// recursing into the trait method we're defining) and coerces the
4743    /// returned `Arc<Kernel>` to `Arc<dyn CommandDispatcher>`.
4744    async fn fork(&self) -> Arc<dyn CommandDispatcher> {
4745        let fork: Arc<Kernel> = Kernel::fork(self).await;
4746        fork
4747    }
4748
4749    /// Produce a forked dispatcher with cancellation cascading from this kernel.
4750    async fn fork_attached(&self) -> Arc<dyn CommandDispatcher> {
4751        let fork: Arc<Kernel> = Kernel::fork_attached(self).await;
4752        fork
4753    }
4754}
4755
4756/// Apply the requested output format to a builtin's result, unless the tool
4757/// owns its own output.
4758///
4759/// `format` is `ctx.output_format` (set from `--json`). When `owns_output` is
4760/// true the tool already rendered its bytes (bespoke JSON envelope), so the
4761/// kernel leaves the result untouched rather than re-formatting its
4762/// `OutputData`. Otherwise the kernel renders the typed `OutputData` uniformly.
4763fn finalize_output(
4764    result: ExecResult,
4765    format: Option<crate::interpreter::OutputFormat>,
4766    owns_output: bool,
4767) -> ExecResult {
4768    match format {
4769        Some(_) if owns_output => result,
4770        Some(format) => apply_output_format(result, format),
4771        None => result,
4772    }
4773}
4774
4775/// Accumulate output from one result into another.
4776///
4777/// Appends stdout and stderr verbatim and updates the exit code to match the
4778/// new result. Used to preserve output from multiple statements, loop
4779/// iterations, and command chains. No separator is inserted between outputs —
4780/// each command's output concatenates raw, matching bash (`printf a; printf b`
4781/// and `printf a && printf b` both yield `ab`; a trailing newline only appears
4782/// when a command emits its own, as `echo` does).
4783fn accumulate_result(accumulated: &mut ExecResult, new: &ExecResult) {
4784    // Materialize lazy OutputData into .out before accumulating.
4785    // Without this, the first command's output stays in .output while
4786    // the second's text gets appended to .out, losing the first.
4787    accumulated.materialize();
4788    match new.out_bytes() {
4789        // A binary result must not be lossy-decoded by text_out(): concatenate
4790        // raw bytes so the combined output stays binary (this is the path every
4791        // top-level statement's result flows through). See docs/binary-data.md.
4792        Some(new_bytes) => {
4793            let mut combined: Vec<u8> = match accumulated.out_bytes() {
4794                Some(b) => b.to_vec(),
4795                None => accumulated.text_out().into_owned().into_bytes(),
4796            };
4797            combined.extend_from_slice(new_bytes);
4798            accumulated.set_out_bytes(combined);
4799        }
4800        None => accumulated.push_out(&new.text_out()),
4801    }
4802    accumulated.err.push_str(&new.err);
4803    accumulated.code = new.code;
4804    accumulated.data = new.data.clone();
4805    accumulated.did_spill = new.did_spill;
4806    accumulated.original_code = new.original_code;
4807    accumulated.content_type = new.content_type.clone();
4808    accumulated.baggage.clone_from(&new.baggage);
4809}
4810
4811/// Fold a loop's accumulated output into a break/continue signal that is
4812/// propagating to an *outer* loop. Output printed before `break N`/`continue N`
4813/// (with `N > 1`) would otherwise be discarded when the signal replaces the
4814/// loop's result on its way up. The loop's output comes first (it ran before
4815/// the signal was raised), then the signal's already-carried output.
4816fn fold_loop_output_into_flow(loop_output: ExecResult, flow: &mut ControlFlow) {
4817    if let ControlFlow::Break { result, .. } | ControlFlow::Continue { result, .. } = flow {
4818        let mut merged = loop_output;
4819        accumulate_result(&mut merged, result);
4820        *result = merged;
4821    }
4822}
4823
4824/// Accumulate the output a break/continue signal carried (from inner loops it
4825/// propagated through) into the loop that finally handles it, so it survives
4826/// into that loop's result.
4827fn accumulate_flow_output(accumulated: &mut ExecResult, flow: &ControlFlow) {
4828    if let ControlFlow::Break { result, .. } | ControlFlow::Continue { result, .. } = flow {
4829        accumulate_result(accumulated, result);
4830    }
4831}
4832
4833/// Check if a value is truthy.
4834fn is_truthy(value: &Value) -> bool {
4835    match value {
4836        Value::Null => false,
4837        Value::Bool(b) => *b,
4838        Value::Int(i) => *i != 0,
4839        Value::Float(f) => *f != 0.0,
4840        Value::String(s) => !s.is_empty(),
4841        Value::Json(json) => match json {
4842            serde_json::Value::Null => false,
4843            serde_json::Value::Array(arr) => !arr.is_empty(),
4844            serde_json::Value::Object(obj) => !obj.is_empty(),
4845            serde_json::Value::Bool(b) => *b,
4846            serde_json::Value::Number(n) => n.as_f64().map(|f| f != 0.0).unwrap_or(false),
4847            serde_json::Value::String(s) => !s.is_empty(),
4848        },
4849        Value::Bytes(b) => !b.is_empty(), // empty bytes are falsy, like ""
4850    }
4851}
4852
4853/// Apply tilde expansion to a value.
4854///
4855/// Only string values starting with `~` are expanded. `home` is the session
4856/// `HOME` from the kernel scope (the kernel is hermetic and never reads the
4857/// host env); `None` leaves `~`/`~/path` unexpanded. See [`expand_tilde`].
4858fn apply_tilde_expansion(value: Value, home: Option<&str>) -> Value {
4859    match value {
4860        Value::String(s) if s.starts_with('~') => Value::String(expand_tilde(&s, home)),
4861        _ => value,
4862    }
4863}
4864
4865/// Wait for a child to exit, killing it if `cancel` fires first.
4866///
4867/// `target` carries a Linux pidfd (when available) for race-free direct-child
4868/// kill; fall-through to PID-based kill otherwise. On non-unix targets the
4869/// parameter is ignored and we use tokio's cross-platform `start_kill`.
4870#[cfg(all(unix, feature = "subprocess"))]
4871pub(crate) async fn wait_or_kill(
4872    child: &mut tokio::process::Child,
4873    target: Option<&crate::pidfd::KillTarget>,
4874    cancel: &tokio_util::sync::CancellationToken,
4875    grace: Duration,
4876) -> std::io::Result<std::process::ExitStatus> {
4877    tokio::select! {
4878        biased;
4879        status = child.wait() => status,
4880        _ = cancel.cancelled() => kill_with_grace(child, target, grace).await,
4881    }
4882}
4883
4884#[cfg(all(not(unix), feature = "subprocess"))]
4885pub(crate) async fn wait_or_kill(
4886    child: &mut tokio::process::Child,
4887    _target: Option<&()>,
4888    cancel: &tokio_util::sync::CancellationToken,
4889    _grace: Duration,
4890) -> std::io::Result<std::process::ExitStatus> {
4891    tokio::select! {
4892        biased;
4893        status = child.wait() => status,
4894        _ = cancel.cancelled() => {
4895            let _ = child.start_kill();
4896            child.wait().await
4897        }
4898    }
4899}
4900
4901/// Send SIGTERM to the child and its process group; wait `grace`; then SIGKILL.
4902///
4903/// Direct-child kill goes through `target.signal()`, which on Linux uses a
4904/// pidfd (immune to PID reuse). Process-group kill uses `killpg` — there is
4905/// no PGID-equivalent of pidfd, so grandchildren retain a small reuse window.
4906#[cfg(all(unix, feature = "subprocess"))]
4907pub(crate) async fn kill_with_grace(
4908    child: &mut tokio::process::Child,
4909    target: Option<&crate::pidfd::KillTarget>,
4910    grace: Duration,
4911) -> std::io::Result<std::process::ExitStatus> {
4912    use nix::sys::signal::Signal;
4913
4914    if let Some(t) = target {
4915        t.signal(Signal::SIGTERM);
4916        t.signal_pg(Signal::SIGTERM);
4917        if grace > Duration::ZERO
4918            && let Ok(status) = tokio::time::timeout(grace, child.wait()).await
4919        {
4920            return status;
4921        }
4922        t.signal(Signal::SIGKILL);
4923        t.signal_pg(Signal::SIGKILL);
4924    }
4925    child.wait().await
4926}
4927
4928#[cfg(all(test, feature = "subprocess"))]
4929#[allow(clippy::expect_used)]
4930mod tests {
4931    use super::*;
4932
4933    #[tokio::test]
4934    async fn test_kernel_transient() {
4935        let kernel = Kernel::transient().expect("failed to create kernel");
4936        assert_eq!(kernel.name(), "transient");
4937    }
4938
4939    #[tokio::test]
4940    async fn test_kernel_execute_echo() {
4941        let kernel = Kernel::transient().expect("failed to create kernel");
4942        let result = kernel.execute("echo hello").await.expect("execution failed");
4943        assert!(result.ok());
4944        assert_eq!(result.text_out().trim(), "hello");
4945    }
4946
4947    #[tokio::test]
4948    async fn test_multiple_statements_accumulate_output() {
4949        let kernel = Kernel::transient().expect("failed to create kernel");
4950        let result = kernel
4951            .execute("echo one\necho two\necho three")
4952            .await
4953            .expect("execution failed");
4954        assert!(result.ok());
4955        // Should have all three outputs separated by newlines
4956        assert!(result.text_out().contains("one"), "missing 'one': {}", result.text_out());
4957        assert!(result.text_out().contains("two"), "missing 'two': {}", result.text_out());
4958        assert!(result.text_out().contains("three"), "missing 'three': {}", result.text_out());
4959    }
4960
4961    #[tokio::test]
4962    async fn test_and_chain_accumulates_output() {
4963        let kernel = Kernel::transient().expect("failed to create kernel");
4964        let result = kernel
4965            .execute("echo first && echo second")
4966            .await
4967            .expect("execution failed");
4968        assert!(result.ok());
4969        assert!(result.text_out().contains("first"), "missing 'first': {}", result.text_out());
4970        assert!(result.text_out().contains("second"), "missing 'second': {}", result.text_out());
4971    }
4972
4973    #[tokio::test]
4974    async fn test_for_loop_accumulates_output() {
4975        let kernel = Kernel::transient().expect("failed to create kernel");
4976        let result = kernel
4977            .execute(r#"for X in a b c; do echo "item: ${X}"; done"#)
4978            .await
4979            .expect("execution failed");
4980        assert!(result.ok());
4981        assert!(result.text_out().contains("item: a"), "missing 'item: a': {}", result.text_out());
4982        assert!(result.text_out().contains("item: b"), "missing 'item: b': {}", result.text_out());
4983        assert!(result.text_out().contains("item: c"), "missing 'item: c': {}", result.text_out());
4984    }
4985
4986    #[tokio::test]
4987    async fn test_while_loop_accumulates_output() {
4988        let kernel = Kernel::transient().expect("failed to create kernel");
4989        let result = kernel
4990            .execute(r#"
4991                N=3
4992                while [[ ${N} -gt 0 ]]; do
4993                    echo "N=${N}"
4994                    N=$((N - 1))
4995                done
4996            "#)
4997            .await
4998            .expect("execution failed");
4999        assert!(result.ok());
5000        assert!(result.text_out().contains("N=3"), "missing 'N=3': {}", result.text_out());
5001        assert!(result.text_out().contains("N=2"), "missing 'N=2': {}", result.text_out());
5002        assert!(result.text_out().contains("N=1"), "missing 'N=1': {}", result.text_out());
5003    }
5004
5005    #[tokio::test]
5006    async fn test_kernel_set_var() {
5007        let kernel = Kernel::transient().expect("failed to create kernel");
5008
5009        kernel.execute("X=42").await.expect("set failed");
5010
5011        let value = kernel.get_var("X").await;
5012        assert_eq!(value, Some(Value::Int(42)));
5013    }
5014
5015    #[tokio::test]
5016    async fn test_kernel_var_expansion() {
5017        let kernel = Kernel::transient().expect("failed to create kernel");
5018
5019        kernel.execute("NAME=\"world\"").await.expect("set failed");
5020        let result = kernel.execute("echo \"hello ${NAME}\"").await.expect("echo failed");
5021
5022        assert!(result.ok());
5023        assert_eq!(result.text_out().trim(), "hello world");
5024    }
5025
5026    #[tokio::test]
5027    async fn test_kernel_last_result() {
5028        let kernel = Kernel::transient().expect("failed to create kernel");
5029
5030        kernel.execute("echo test").await.expect("echo failed");
5031
5032        let last = kernel.last_result().await;
5033        assert!(last.ok());
5034        assert_eq!(last.text_out().trim(), "test");
5035    }
5036
5037    #[tokio::test]
5038    async fn test_kernel_tool_not_found() {
5039        let kernel = Kernel::transient().expect("failed to create kernel");
5040
5041        let result = kernel.execute("nonexistent_tool").await.expect("execution failed");
5042        assert!(!result.ok());
5043        assert_eq!(result.code, 127);
5044        assert!(result.err.contains("command not found"));
5045    }
5046
5047    #[tokio::test]
5048    async fn test_external_command_true() {
5049        // Use REPL config for passthrough filesystem access
5050        let kernel = Kernel::new(KernelConfig::repl()).expect("failed to create kernel");
5051
5052        // /bin/true should be available on any Unix system
5053        let result = kernel.execute("true").await.expect("execution failed");
5054        // This should use the builtin true, which returns 0
5055        assert!(result.ok(), "true should succeed: {:?}", result);
5056    }
5057
5058    #[tokio::test]
5059    async fn test_external_command_basic() {
5060        // Use REPL config for passthrough filesystem access
5061        let kernel = Kernel::new(KernelConfig::repl()).expect("failed to create kernel");
5062
5063        // Test with /bin/echo which is external
5064        // Note: kaish has a builtin echo, so this will use the builtin
5065        // Let's test with a command that's not a builtin
5066        // Actually, let's just test that PATH resolution works by checking the PATH var
5067        let path_var = std::env::var("PATH").unwrap_or_default();
5068        eprintln!("System PATH: {}", path_var);
5069
5070        // Set PATH in kernel to ensure it's available
5071        kernel.execute(&format!(r#"PATH="{}""#, path_var)).await.expect("set PATH failed");
5072
5073        // Now try an external command like /usr/bin/env
5074        // But env is also a builtin... let's try uname
5075        let result = kernel.execute("uname").await.expect("execution failed");
5076        eprintln!("uname result: {:?}", result);
5077        // uname should succeed if external commands work
5078        assert!(result.ok() || result.code == 127, "uname: {:?}", result);
5079    }
5080
5081    #[tokio::test]
5082    async fn test_kernel_reset() {
5083        let kernel = Kernel::transient().expect("failed to create kernel");
5084
5085        kernel.execute("X=1").await.expect("set failed");
5086        assert!(kernel.get_var("X").await.is_some());
5087
5088        kernel.reset().await.expect("reset failed");
5089        assert!(kernel.get_var("X").await.is_none());
5090    }
5091
5092    #[tokio::test]
5093    async fn test_kernel_cwd() {
5094        let kernel = Kernel::transient().expect("failed to create kernel");
5095
5096        // Transient kernel uses sandboxed mode with cwd=$HOME
5097        let cwd = kernel.cwd().await;
5098        let home = std::env::var("HOME")
5099            .map(PathBuf::from)
5100            .unwrap_or_else(|_| PathBuf::from("/"));
5101        assert_eq!(cwd, home);
5102
5103        kernel.set_cwd(PathBuf::from("/tmp")).await;
5104        assert_eq!(kernel.cwd().await, PathBuf::from("/tmp"));
5105    }
5106
5107    #[tokio::test]
5108    async fn test_kernel_list_vars() {
5109        let kernel = Kernel::transient().expect("failed to create kernel");
5110
5111        kernel.execute("A=1").await.ok();
5112        kernel.execute("B=2").await.ok();
5113
5114        let vars = kernel.list_vars().await;
5115        assert!(vars.iter().any(|(n, v)| n == "A" && *v == Value::Int(1)));
5116        assert!(vars.iter().any(|(n, v)| n == "B" && *v == Value::Int(2)));
5117    }
5118
5119    #[tokio::test]
5120    async fn test_is_truthy() {
5121        assert!(!is_truthy(&Value::Null));
5122        assert!(!is_truthy(&Value::Bool(false)));
5123        assert!(is_truthy(&Value::Bool(true)));
5124        assert!(!is_truthy(&Value::Int(0)));
5125        assert!(is_truthy(&Value::Int(1)));
5126        assert!(!is_truthy(&Value::String("".into())));
5127        assert!(is_truthy(&Value::String("x".into())));
5128    }
5129
5130    #[tokio::test]
5131    async fn test_jq_in_pipeline() {
5132        let kernel = Kernel::transient().expect("failed to create kernel");
5133        // kaish uses double quotes only; escape inner quotes
5134        let result = kernel
5135            .execute(r#"echo "{\"name\": \"Alice\"}" | jq ".name" -r"#)
5136            .await
5137            .expect("execution failed");
5138        assert!(result.ok(), "jq pipeline failed: {}", result.err);
5139        assert_eq!(result.text_out().trim(), "Alice");
5140    }
5141
5142    #[tokio::test]
5143    async fn test_user_defined_tool() {
5144        let kernel = Kernel::transient().expect("failed to create kernel");
5145
5146        // Define a function
5147        kernel
5148            .execute(r#"greet() { echo "Hello, $1!" }"#)
5149            .await
5150            .expect("function definition failed");
5151
5152        // Call the function
5153        let result = kernel
5154            .execute(r#"greet "World""#)
5155            .await
5156            .expect("function call failed");
5157
5158        assert!(result.ok(), "greet failed: {}", result.err);
5159        assert_eq!(result.text_out().trim(), "Hello, World!");
5160    }
5161
5162    #[tokio::test]
5163    async fn test_user_tool_positional_args() {
5164        let kernel = Kernel::transient().expect("failed to create kernel");
5165
5166        // Define a function with positional param
5167        kernel
5168            .execute(r#"greet() { echo "Hi $1" }"#)
5169            .await
5170            .expect("function definition failed");
5171
5172        // Call with positional argument
5173        let result = kernel
5174            .execute(r#"greet "Amy""#)
5175            .await
5176            .expect("function call failed");
5177
5178        assert!(result.ok(), "greet failed: {}", result.err);
5179        assert_eq!(result.text_out().trim(), "Hi Amy");
5180    }
5181
5182    #[tokio::test]
5183    async fn test_function_shared_scope() {
5184        let kernel = Kernel::transient().expect("failed to create kernel");
5185
5186        // Set a variable in parent scope
5187        kernel
5188            .execute(r#"SECRET="hidden""#)
5189            .await
5190            .expect("set failed");
5191
5192        // Define a function that accesses and modifies parent variable
5193        kernel
5194            .execute(r#"access_parent() {
5195                echo "${SECRET}"
5196                SECRET="modified"
5197            }"#)
5198            .await
5199            .expect("function definition failed");
5200
5201        // Call the function - it SHOULD see SECRET (shared scope like sh)
5202        let result = kernel.execute("access_parent").await.expect("function call failed");
5203
5204        // Function should have access to parent scope
5205        assert!(
5206            result.text_out().contains("hidden"),
5207            "Function should access parent scope, got: {}",
5208            result.text_out()
5209        );
5210
5211        // Function should have modified the parent variable
5212        let secret = kernel.get_var("SECRET").await;
5213        assert_eq!(
5214            secret,
5215            Some(Value::String("modified".into())),
5216            "Function should modify parent scope"
5217        );
5218    }
5219
5220    #[tokio::test]
5221    #[ignore = "exec replaces the test binary via CommandExt::exec, hangs libtest; cannot be run under cargo test"]
5222    async fn test_exec_builtin() {
5223        let kernel = Kernel::transient().expect("failed to create kernel");
5224        // argv is now a space-separated string or JSON array string
5225        let result = kernel
5226            .execute(r#"exec command="/bin/echo" argv="hello world""#)
5227            .await
5228            .expect("exec failed");
5229
5230        assert!(result.ok(), "exec failed: {}", result.err);
5231        assert_eq!(result.text_out().trim(), "hello world");
5232    }
5233
5234    #[tokio::test]
5235    async fn test_while_false_never_runs() {
5236        let kernel = Kernel::transient().expect("failed to create kernel");
5237
5238        // A while loop with false condition should never run
5239        let result = kernel
5240            .execute(r#"
5241                while false; do
5242                    echo "should not run"
5243                done
5244            "#)
5245            .await
5246            .expect("while false failed");
5247
5248        assert!(result.ok());
5249        assert!(result.text_out().is_empty(), "while false should not execute body: {}", result.text_out());
5250    }
5251
5252    #[tokio::test]
5253    async fn test_while_string_comparison() {
5254        let kernel = Kernel::transient().expect("failed to create kernel");
5255
5256        // Set a flag
5257        kernel.execute(r#"FLAG="go""#).await.expect("set failed");
5258
5259        // Use string comparison as condition (shell-compatible [[ ]] syntax)
5260        // Note: Put echo last so we can check the output
5261        let result = kernel
5262            .execute(r#"
5263                while [[ ${FLAG} == "go" ]]; do
5264                    FLAG="stop"
5265                    echo "running"
5266                done
5267            "#)
5268            .await
5269            .expect("while with string cmp failed");
5270
5271        assert!(result.ok());
5272        assert!(result.text_out().contains("running"), "should have run once: {}", result.text_out());
5273
5274        // Verify flag was changed
5275        let flag = kernel.get_var("FLAG").await;
5276        assert_eq!(flag, Some(Value::String("stop".into())));
5277    }
5278
5279    #[tokio::test]
5280    async fn test_while_numeric_comparison() {
5281        let kernel = Kernel::transient().expect("failed to create kernel");
5282
5283        // Test > comparison (shell-compatible [[ ]] with -gt)
5284        kernel.execute("N=5").await.expect("set failed");
5285
5286        // Note: Put echo last so we can check the output
5287        let result = kernel
5288            .execute(r#"
5289                while [[ ${N} -gt 3 ]]; do
5290                    N=3
5291                    echo "N was greater"
5292                done
5293            "#)
5294            .await
5295            .expect("while with > failed");
5296
5297        assert!(result.ok());
5298        assert!(result.text_out().contains("N was greater"), "should have run once: {}", result.text_out());
5299    }
5300
5301    #[tokio::test]
5302    async fn test_break_in_while_loop() {
5303        let kernel = Kernel::transient().expect("failed to create kernel");
5304
5305        let result = kernel
5306            .execute(r#"
5307                I=0
5308                while true; do
5309                    I=1
5310                    echo "before break"
5311                    break
5312                    echo "after break"
5313                done
5314            "#)
5315            .await
5316            .expect("while with break failed");
5317
5318        assert!(result.ok());
5319        assert!(result.text_out().contains("before break"), "should see before break: {}", result.text_out());
5320        assert!(!result.text_out().contains("after break"), "should not see after break: {}", result.text_out());
5321
5322        // Verify we exited the loop
5323        let i = kernel.get_var("I").await;
5324        assert_eq!(i, Some(Value::Int(1)));
5325    }
5326
5327    #[tokio::test]
5328    async fn test_continue_in_while_loop() {
5329        let kernel = Kernel::transient().expect("failed to create kernel");
5330
5331        // Test continue in a while loop where variables persist
5332        // We use string state transition: "start" -> "middle" -> "end"
5333        // continue on "middle" should skip to next iteration
5334        // Shell-compatible: use [[ ]] for comparisons
5335        let result = kernel
5336            .execute(r#"
5337                STATE="start"
5338                AFTER_CONTINUE="no"
5339                while [[ ${STATE} != "done" ]]; do
5340                    if [[ ${STATE} == "start" ]]; then
5341                        STATE="middle"
5342                        continue
5343                        AFTER_CONTINUE="yes"
5344                    fi
5345                    if [[ ${STATE} == "middle" ]]; then
5346                        STATE="done"
5347                    fi
5348                done
5349            "#)
5350            .await
5351            .expect("while with continue failed");
5352
5353        assert!(result.ok());
5354
5355        // STATE should be "done" (we completed the loop)
5356        let state = kernel.get_var("STATE").await;
5357        assert_eq!(state, Some(Value::String("done".into())));
5358
5359        // AFTER_CONTINUE should still be "no" (continue skipped the assignment)
5360        let after = kernel.get_var("AFTER_CONTINUE").await;
5361        assert_eq!(after, Some(Value::String("no".into())));
5362    }
5363
5364    #[tokio::test]
5365    async fn test_break_with_level() {
5366        let kernel = Kernel::transient().expect("failed to create kernel");
5367
5368        // Nested loop with break 2 to exit both loops
5369        // We verify by checking OUTER value:
5370        // - If break 2 works, OUTER stays at 1 (set before for loop)
5371        // - If break 2 fails, OUTER becomes 2 (set after for loop)
5372        let result = kernel
5373            .execute(r#"
5374                OUTER=0
5375                while true; do
5376                    OUTER=1
5377                    for X in "1 2"; do
5378                        break 2
5379                    done
5380                    OUTER=2
5381                done
5382            "#)
5383            .await
5384            .expect("nested break failed");
5385
5386        assert!(result.ok());
5387
5388        // OUTER should be 1 (set before for loop), not 2 (would be set after for loop)
5389        let outer = kernel.get_var("OUTER").await;
5390        assert_eq!(outer, Some(Value::Int(1)), "break 2 should have skipped OUTER=2");
5391    }
5392
5393    #[tokio::test]
5394    async fn test_return_from_tool() {
5395        let kernel = Kernel::transient().expect("failed to create kernel");
5396
5397        // Define a function that returns early
5398        kernel
5399            .execute(r#"early_return() {
5400                if [[ $1 == 1 ]]; then
5401                    return 42
5402                fi
5403                echo "not returned"
5404            }"#)
5405            .await
5406            .expect("function definition failed");
5407
5408        // Call with arg=1 should return with exit code 42
5409        // (POSIX shell behavior: return N sets exit code, doesn't output N)
5410        let result = kernel
5411            .execute("early_return 1")
5412            .await
5413            .expect("function call failed");
5414
5415        // Exit code should be 42 (non-zero, so not ok())
5416        assert_eq!(result.code, 42);
5417        // Output should be empty (we returned before echo)
5418        assert!(result.text_out().is_empty());
5419    }
5420
5421    #[tokio::test]
5422    async fn test_return_without_value() {
5423        let kernel = Kernel::transient().expect("failed to create kernel");
5424
5425        // Define a function that returns without a value
5426        kernel
5427            .execute(r#"early_exit() {
5428                if [[ $1 == "stop" ]]; then
5429                    return
5430                fi
5431                echo "continued"
5432            }"#)
5433            .await
5434            .expect("function definition failed");
5435
5436        // Call with arg="stop" should return early
5437        let result = kernel
5438            .execute(r#"early_exit "stop""#)
5439            .await
5440            .expect("function call failed");
5441
5442        assert!(result.ok());
5443        assert!(result.text_out().is_empty() || result.text_out().trim().is_empty());
5444    }
5445
5446    #[tokio::test]
5447    async fn test_exit_stops_execution() {
5448        let kernel = Kernel::transient().expect("failed to create kernel");
5449
5450        // exit should stop further execution
5451        kernel
5452            .execute(r#"
5453                BEFORE="yes"
5454                exit 0
5455                AFTER="yes"
5456            "#)
5457            .await
5458            .expect("execution failed");
5459
5460        // BEFORE should be set, AFTER should not
5461        let before = kernel.get_var("BEFORE").await;
5462        assert_eq!(before, Some(Value::String("yes".into())));
5463
5464        let after = kernel.get_var("AFTER").await;
5465        assert!(after.is_none(), "AFTER should not be set after exit");
5466    }
5467
5468    #[tokio::test]
5469    async fn test_exit_with_code() {
5470        let kernel = Kernel::transient().expect("failed to create kernel");
5471
5472        // exit with code should propagate the exit code
5473        let result = kernel
5474            .execute("exit 42")
5475            .await
5476            .expect("exit failed");
5477
5478        assert_eq!(result.code, 42);
5479        assert!(result.text_out().is_empty(), "exit should not produce stdout");
5480    }
5481
5482    #[tokio::test]
5483    async fn test_set_e_stops_on_failure() {
5484        let kernel = Kernel::transient().expect("failed to create kernel");
5485
5486        // Enable error-exit mode
5487        kernel.execute("set -e").await.expect("set -e failed");
5488
5489        // Run a sequence where the middle command fails
5490        kernel
5491            .execute(r#"
5492                STEP1="done"
5493                false
5494                STEP2="done"
5495            "#)
5496            .await
5497            .expect("execution failed");
5498
5499        // STEP1 should be set, but STEP2 should NOT be set (exit on false)
5500        let step1 = kernel.get_var("STEP1").await;
5501        assert_eq!(step1, Some(Value::String("done".into())));
5502
5503        let step2 = kernel.get_var("STEP2").await;
5504        assert!(step2.is_none(), "STEP2 should not be set after false with set -e");
5505    }
5506
5507    #[tokio::test]
5508    async fn test_set_plus_e_disables_error_exit() {
5509        let kernel = Kernel::transient().expect("failed to create kernel");
5510
5511        // Enable then disable error-exit mode
5512        kernel.execute("set -e").await.expect("set -e failed");
5513        kernel.execute("set +e").await.expect("set +e failed");
5514
5515        // Now failure should NOT stop execution
5516        kernel
5517            .execute(r#"
5518                STEP1="done"
5519                false
5520                STEP2="done"
5521            "#)
5522            .await
5523            .expect("execution failed");
5524
5525        // Both should be set since +e disables error exit
5526        let step1 = kernel.get_var("STEP1").await;
5527        assert_eq!(step1, Some(Value::String("done".into())));
5528
5529        let step2 = kernel.get_var("STEP2").await;
5530        assert_eq!(step2, Some(Value::String("done".into())));
5531    }
5532
5533    #[tokio::test]
5534    async fn test_set_ignores_unknown_options() {
5535        let kernel = Kernel::transient().expect("failed to create kernel");
5536
5537        // Bash idiom: set -euo pipefail (we support -e, ignore the rest)
5538        let result = kernel
5539            .execute("set -e -u -o pipefail")
5540            .await
5541            .expect("set with unknown options failed");
5542
5543        assert!(result.ok(), "set should succeed with unknown options");
5544
5545        // -e should still be enabled
5546        kernel
5547            .execute(r#"
5548                BEFORE="yes"
5549                false
5550                AFTER="yes"
5551            "#)
5552            .await
5553            .ok();
5554
5555        let after = kernel.get_var("AFTER").await;
5556        assert!(after.is_none(), "-e should be enabled despite unknown options");
5557    }
5558
5559    #[tokio::test]
5560    async fn test_set_no_args_shows_settings() {
5561        let kernel = Kernel::transient().expect("failed to create kernel");
5562
5563        // Enable -e
5564        kernel.execute("set -e").await.expect("set -e failed");
5565
5566        // Call set with no args to see settings
5567        let result = kernel.execute("set").await.expect("set failed");
5568
5569        assert!(result.ok());
5570        assert!(result.text_out().contains("set -e"), "should show -e is enabled: {}", result.text_out());
5571    }
5572
5573    #[tokio::test]
5574    async fn test_set_e_in_pipeline() {
5575        let kernel = Kernel::transient().expect("failed to create kernel");
5576
5577        kernel.execute("set -e").await.expect("set -e failed");
5578
5579        // Pipeline failure should trigger exit
5580        kernel
5581            .execute(r#"
5582                BEFORE="yes"
5583                false | cat
5584                AFTER="yes"
5585            "#)
5586            .await
5587            .ok();
5588
5589        let before = kernel.get_var("BEFORE").await;
5590        assert_eq!(before, Some(Value::String("yes".into())));
5591
5592        // AFTER should not be set if pipeline failure triggers exit
5593        // Note: The exit code of a pipeline is the exit code of the last command
5594        // So `false | cat` returns 0 (cat succeeds). This is bash-compatible behavior.
5595        // To test pipeline failure, we need the last command to fail.
5596    }
5597
5598    #[tokio::test]
5599    async fn test_set_e_with_and_chain() {
5600        let kernel = Kernel::transient().expect("failed to create kernel");
5601
5602        kernel.execute("set -e").await.expect("set -e failed");
5603
5604        // Commands in && chain should not trigger -e on the first failure
5605        // because && explicitly handles the error
5606        kernel
5607            .execute(r#"
5608                RESULT="initial"
5609                false && RESULT="chained"
5610                RESULT="continued"
5611            "#)
5612            .await
5613            .ok();
5614
5615        // In bash, commands in && don't trigger -e. The chain handles the failure.
5616        // Our implementation may differ - let's verify current behavior.
5617        let result = kernel.get_var("RESULT").await;
5618        // If we follow bash semantics, RESULT should be "continued"
5619        // If we trigger -e on the false, RESULT stays "initial"
5620        assert!(result.is_some(), "RESULT should be set");
5621    }
5622
5623    #[tokio::test]
5624    async fn test_set_e_exits_in_for_loop() {
5625        let kernel = Kernel::transient().expect("failed to create kernel");
5626
5627        kernel.execute("set -e").await.expect("set -e failed");
5628
5629        kernel
5630            .execute(r#"
5631                REACHED="no"
5632                for x in 1 2 3; do
5633                    false
5634                    REACHED="yes"
5635                done
5636            "#)
5637            .await
5638            .ok();
5639
5640        // With set -e, false should trigger exit; REACHED should remain "no"
5641        let reached = kernel.get_var("REACHED").await;
5642        assert_eq!(reached, Some(Value::String("no".into())),
5643            "set -e should exit on failure in for loop body");
5644    }
5645
5646    #[tokio::test]
5647    async fn test_for_loop_continues_without_set_e() {
5648        let kernel = Kernel::transient().expect("failed to create kernel");
5649
5650        // Without set -e, for loop should continue normally
5651        kernel
5652            .execute(r#"
5653                COUNT=0
5654                for x in 1 2 3; do
5655                    false
5656                    COUNT=$((COUNT + 1))
5657                done
5658            "#)
5659            .await
5660            .ok();
5661
5662        let count = kernel.get_var("COUNT").await;
5663        // Arithmetic produces Int values; accept either Int or String representation
5664        let count_val = match &count {
5665            Some(Value::Int(n)) => *n,
5666            Some(Value::String(s)) => s.parse().unwrap_or(-1),
5667            _ => -1,
5668        };
5669        assert_eq!(count_val, 3,
5670            "without set -e, loop should complete all iterations (got {:?})", count);
5671    }
5672
5673    // ═══════════════════════════════════════════════════════════════════════════
5674    // Source Tests
5675    // ═══════════════════════════════════════════════════════════════════════════
5676
5677    #[tokio::test]
5678    async fn test_source_sets_variables() {
5679        let kernel = Kernel::transient().expect("failed to create kernel");
5680
5681        // Write a script to the VFS
5682        kernel
5683            .execute(r#"write "/test.kai" 'FOO="bar"'"#)
5684            .await
5685            .expect("write failed");
5686
5687        // Source the script
5688        let result = kernel
5689            .execute(r#"source "/test.kai""#)
5690            .await
5691            .expect("source failed");
5692
5693        assert!(result.ok(), "source should succeed");
5694
5695        // Variable should be set in current scope
5696        let foo = kernel.get_var("FOO").await;
5697        assert_eq!(foo, Some(Value::String("bar".into())));
5698    }
5699
5700    #[tokio::test]
5701    async fn test_source_with_dot_alias() {
5702        let kernel = Kernel::transient().expect("failed to create kernel");
5703
5704        // Write a script to the VFS
5705        kernel
5706            .execute(r#"write "/vars.kai" 'X=42'"#)
5707            .await
5708            .expect("write failed");
5709
5710        // Source using . alias
5711        let result = kernel
5712            .execute(r#". "/vars.kai""#)
5713            .await
5714            .expect(". failed");
5715
5716        assert!(result.ok(), ". should succeed");
5717
5718        // Variable should be set in current scope
5719        let x = kernel.get_var("X").await;
5720        assert_eq!(x, Some(Value::Int(42)));
5721    }
5722
5723    #[tokio::test]
5724    async fn test_source_not_found() {
5725        let kernel = Kernel::transient().expect("failed to create kernel");
5726
5727        // Try to source a non-existent file
5728        let result = kernel
5729            .execute(r#"source "/nonexistent.kai""#)
5730            .await
5731            .expect("source should not fail with error");
5732
5733        assert!(!result.ok(), "source of non-existent file should fail");
5734        assert!(result.err.contains("nonexistent.kai"), "error should mention filename");
5735    }
5736
5737    #[tokio::test]
5738    async fn test_source_missing_filename() {
5739        let kernel = Kernel::transient().expect("failed to create kernel");
5740
5741        // Call source with no arguments
5742        let result = kernel
5743            .execute("source")
5744            .await
5745            .expect("source should not fail with error");
5746
5747        assert!(!result.ok(), "source without filename should fail");
5748        assert!(result.err.contains("missing filename"), "error should mention missing filename");
5749    }
5750
5751    #[tokio::test]
5752    async fn test_source_executes_multiple_statements() {
5753        let kernel = Kernel::transient().expect("failed to create kernel");
5754
5755        // Write a script with multiple statements
5756        kernel
5757            .execute(r#"write "/multi.kai" 'A=1
5758B=2
5759C=3'"#)
5760            .await
5761            .expect("write failed");
5762
5763        // Source it
5764        kernel
5765            .execute(r#"source "/multi.kai""#)
5766            .await
5767            .expect("source failed");
5768
5769        // All variables should be set
5770        assert_eq!(kernel.get_var("A").await, Some(Value::Int(1)));
5771        assert_eq!(kernel.get_var("B").await, Some(Value::Int(2)));
5772        assert_eq!(kernel.get_var("C").await, Some(Value::Int(3)));
5773    }
5774
5775    #[tokio::test]
5776    async fn test_source_can_define_functions() {
5777        let kernel = Kernel::transient().expect("failed to create kernel");
5778
5779        // Write a script that defines a function
5780        kernel
5781            .execute(r#"write "/functions.kai" 'greet() {
5782    echo "Hello, $1!"
5783}'"#)
5784            .await
5785            .expect("write failed");
5786
5787        // Source it
5788        kernel
5789            .execute(r#"source "/functions.kai""#)
5790            .await
5791            .expect("source failed");
5792
5793        // Use the defined function
5794        let result = kernel
5795            .execute(r#"greet "World""#)
5796            .await
5797            .expect("greet failed");
5798
5799        assert!(result.ok());
5800        assert!(result.text_out().contains("Hello, World!"));
5801    }
5802
5803    #[tokio::test]
5804    async fn test_source_inherits_error_exit() {
5805        let kernel = Kernel::transient().expect("failed to create kernel");
5806
5807        // Enable error exit
5808        kernel.execute("set -e").await.expect("set -e failed");
5809
5810        // Write a script that has a failure
5811        kernel
5812            .execute(r#"write "/fail.kai" 'BEFORE="yes"
5813false
5814AFTER="yes"'"#)
5815            .await
5816            .expect("write failed");
5817
5818        // Source it (should exit on false due to set -e)
5819        kernel
5820            .execute(r#"source "/fail.kai""#)
5821            .await
5822            .ok();
5823
5824        // BEFORE should be set, AFTER should NOT be set due to error exit
5825        let before = kernel.get_var("BEFORE").await;
5826        assert_eq!(before, Some(Value::String("yes".into())));
5827
5828        // Note: This test depends on whether error exit is checked within source
5829        // Currently our implementation checks per-statement in the main kernel
5830    }
5831
5832    // ═══════════════════════════════════════════════════════════════════════════
5833    // set -e with && / || chains
5834    // ═══════════════════════════════════════════════════════════════════════════
5835
5836    #[tokio::test]
5837    async fn test_set_e_and_chain_left_fails() {
5838        // set -e; false && echo hi; REACHED=1 → REACHED should be set
5839        let kernel = Kernel::transient().expect("failed to create kernel");
5840        kernel.execute("set -e").await.expect("set -e failed");
5841
5842        kernel
5843            .execute("false && echo hi; REACHED=1")
5844            .await
5845            .expect("execution failed");
5846
5847        let reached = kernel.get_var("REACHED").await;
5848        assert_eq!(
5849            reached,
5850            Some(Value::Int(1)),
5851            "set -e should not trigger on left side of &&"
5852        );
5853    }
5854
5855    #[tokio::test]
5856    async fn test_set_e_and_chain_right_fails() {
5857        // set -e; true && false; REACHED=1 → REACHED should NOT be set
5858        let kernel = Kernel::transient().expect("failed to create kernel");
5859        kernel.execute("set -e").await.expect("set -e failed");
5860
5861        kernel
5862            .execute("true && false; REACHED=1")
5863            .await
5864            .expect("execution failed");
5865
5866        let reached = kernel.get_var("REACHED").await;
5867        assert!(
5868            reached.is_none(),
5869            "set -e should trigger when right side of && fails"
5870        );
5871    }
5872
5873    #[tokio::test]
5874    async fn test_set_e_or_chain_recovers() {
5875        // set -e; false || echo recovered; REACHED=1 → REACHED should be set
5876        let kernel = Kernel::transient().expect("failed to create kernel");
5877        kernel.execute("set -e").await.expect("set -e failed");
5878
5879        kernel
5880            .execute("false || echo recovered; REACHED=1")
5881            .await
5882            .expect("execution failed");
5883
5884        let reached = kernel.get_var("REACHED").await;
5885        assert_eq!(
5886            reached,
5887            Some(Value::Int(1)),
5888            "set -e should not trigger when || recovers the failure"
5889        );
5890    }
5891
5892    #[tokio::test]
5893    async fn test_set_e_or_chain_both_fail() {
5894        // set -e; false || false; REACHED=1 → REACHED should NOT be set
5895        let kernel = Kernel::transient().expect("failed to create kernel");
5896        kernel.execute("set -e").await.expect("set -e failed");
5897
5898        kernel
5899            .execute("false || false; REACHED=1")
5900            .await
5901            .expect("execution failed");
5902
5903        let reached = kernel.get_var("REACHED").await;
5904        assert!(
5905            reached.is_none(),
5906            "set -e should trigger when || chain ultimately fails"
5907        );
5908    }
5909
5910    // ═══════════════════════════════════════════════════════════════════════════
5911    // Cancellation Tests
5912    // ═══════════════════════════════════════════════════════════════════════════
5913
5914    /// Helper: schedule a cancel after a delay from a background thread.
5915    /// Uses std::thread because cancel() is sync and Kernel is not Send.
5916    fn schedule_cancel(kernel: &Arc<Kernel>, delay: std::time::Duration) {
5917        let k = Arc::clone(kernel);
5918        std::thread::spawn(move || {
5919            std::thread::sleep(delay);
5920            k.cancel();
5921        });
5922    }
5923
5924    #[tokio::test]
5925    async fn test_cancel_interrupts_for_loop() {
5926        let kernel = Arc::new(Kernel::transient().expect("failed to create kernel"));
5927
5928        // Schedule cancel after a short delay from a background OS thread
5929        schedule_cancel(&kernel, std::time::Duration::from_millis(10));
5930
5931        let result = kernel
5932            .execute("for i in $(seq 1 100000); do X=$i; done")
5933            .await
5934            .expect("execute failed");
5935
5936        assert_eq!(result.code, 130, "cancelled execution should exit with code 130");
5937
5938        // The loop variable should be set to something < 100000
5939        let x = kernel.get_var("X").await;
5940        if let Some(Value::Int(n)) = x {
5941            assert!(n < 100000, "loop should have been interrupted before finishing, got X={n}");
5942        }
5943    }
5944
5945    #[tokio::test]
5946    async fn test_cancel_interrupts_while_loop() {
5947        let kernel = Arc::new(Kernel::transient().expect("failed to create kernel"));
5948        kernel.execute("COUNT=0").await.expect("init failed");
5949
5950        schedule_cancel(&kernel, std::time::Duration::from_millis(10));
5951
5952        let result = kernel
5953            .execute("while true; do COUNT=$((COUNT + 1)); done")
5954            .await
5955            .expect("execute failed");
5956
5957        assert_eq!(result.code, 130);
5958
5959        let count = kernel.get_var("COUNT").await;
5960        if let Some(Value::Int(n)) = count {
5961            assert!(n > 0, "loop should have run at least once");
5962        }
5963    }
5964
5965    #[tokio::test]
5966    async fn test_reset_after_cancel() {
5967        // After cancellation, the next execute() should work normally
5968        let kernel = Kernel::transient().expect("failed to create kernel");
5969        kernel.cancel(); // cancel with nothing running
5970
5971        let result = kernel.execute("echo hello").await.expect("execute failed");
5972        assert!(result.ok(), "execute after cancel should succeed");
5973        assert_eq!(result.text_out().trim(), "hello");
5974    }
5975
5976    #[tokio::test]
5977    async fn test_cancel_interrupts_statement_sequence() {
5978        let kernel = Arc::new(Kernel::transient().expect("failed to create kernel"));
5979
5980        // Schedule cancel after the first statement runs but before sleep finishes
5981        schedule_cancel(&kernel, std::time::Duration::from_millis(50));
5982
5983        let result = kernel
5984            .execute("STEP=1; sleep 5; STEP=2; sleep 5; STEP=3")
5985            .await
5986            .expect("execute failed");
5987
5988        assert_eq!(result.code, 130);
5989
5990        // STEP should be 1 (set before sleep), not 2 or 3
5991        let step = kernel.get_var("STEP").await;
5992        assert_eq!(step, Some(Value::Int(1)), "cancel should stop before STEP=2");
5993    }
5994
5995    // ═══════════════════════════════════════════════════════════════════════════
5996    // Case Statement Tests
5997    // ═══════════════════════════════════════════════════════════════════════════
5998
5999    #[tokio::test]
6000    async fn test_case_simple_match() {
6001        let kernel = Kernel::transient().expect("failed to create kernel");
6002
6003        let result = kernel
6004            .execute(r#"
6005                case "hello" in
6006                    hello) echo "matched hello" ;;
6007                    world) echo "matched world" ;;
6008                esac
6009            "#)
6010            .await
6011            .expect("case failed");
6012
6013        assert!(result.ok());
6014        assert_eq!(result.text_out().trim(), "matched hello");
6015    }
6016
6017    #[tokio::test]
6018    async fn test_case_wildcard_match() {
6019        let kernel = Kernel::transient().expect("failed to create kernel");
6020
6021        let result = kernel
6022            .execute(r#"
6023                case "main.rs" in
6024                    *.py) echo "Python" ;;
6025                    *.rs) echo "Rust" ;;
6026                    *) echo "Unknown" ;;
6027                esac
6028            "#)
6029            .await
6030            .expect("case failed");
6031
6032        assert!(result.ok());
6033        assert_eq!(result.text_out().trim(), "Rust");
6034    }
6035
6036    #[tokio::test]
6037    async fn test_case_default_match() {
6038        let kernel = Kernel::transient().expect("failed to create kernel");
6039
6040        let result = kernel
6041            .execute(r#"
6042                case "unknown.xyz" in
6043                    *.py) echo "Python" ;;
6044                    *.rs) echo "Rust" ;;
6045                    *) echo "Default" ;;
6046                esac
6047            "#)
6048            .await
6049            .expect("case failed");
6050
6051        assert!(result.ok());
6052        assert_eq!(result.text_out().trim(), "Default");
6053    }
6054
6055    #[tokio::test]
6056    async fn test_case_no_match() {
6057        let kernel = Kernel::transient().expect("failed to create kernel");
6058
6059        // Case with no default branch and no match
6060        let result = kernel
6061            .execute(r#"
6062                case "nope" in
6063                    "yes") echo "yes" ;;
6064                    "no") echo "no" ;;
6065                esac
6066            "#)
6067            .await
6068            .expect("case failed");
6069
6070        assert!(result.ok());
6071        assert!(result.text_out().is_empty(), "no match should produce empty output");
6072    }
6073
6074    #[tokio::test]
6075    async fn test_case_with_variable() {
6076        let kernel = Kernel::transient().expect("failed to create kernel");
6077
6078        kernel.execute(r#"LANG="rust""#).await.expect("set failed");
6079
6080        let result = kernel
6081            .execute(r#"
6082                case ${LANG} in
6083                    python) echo "snake" ;;
6084                    rust) echo "crab" ;;
6085                    go) echo "gopher" ;;
6086                esac
6087            "#)
6088            .await
6089            .expect("case failed");
6090
6091        assert!(result.ok());
6092        assert_eq!(result.text_out().trim(), "crab");
6093    }
6094
6095    #[tokio::test]
6096    async fn test_case_multiple_patterns() {
6097        let kernel = Kernel::transient().expect("failed to create kernel");
6098
6099        let result = kernel
6100            .execute(r#"
6101                case "yes" in
6102                    "y"|"yes"|"Y"|"YES") echo "affirmative" ;;
6103                    "n"|"no"|"N"|"NO") echo "negative" ;;
6104                esac
6105            "#)
6106            .await
6107            .expect("case failed");
6108
6109        assert!(result.ok());
6110        assert_eq!(result.text_out().trim(), "affirmative");
6111    }
6112
6113    #[tokio::test]
6114    async fn test_case_glob_question_mark() {
6115        let kernel = Kernel::transient().expect("failed to create kernel");
6116
6117        let result = kernel
6118            .execute(r#"
6119                case "test1" in
6120                    test?) echo "matched test?" ;;
6121                    *) echo "default" ;;
6122                esac
6123            "#)
6124            .await
6125            .expect("case failed");
6126
6127        assert!(result.ok());
6128        assert_eq!(result.text_out().trim(), "matched test?");
6129    }
6130
6131    #[tokio::test]
6132    async fn test_case_char_class() {
6133        let kernel = Kernel::transient().expect("failed to create kernel");
6134
6135        let result = kernel
6136            .execute(r#"
6137                case "Yes" in
6138                    [Yy]*) echo "yes-like" ;;
6139                    [Nn]*) echo "no-like" ;;
6140                esac
6141            "#)
6142            .await
6143            .expect("case failed");
6144
6145        assert!(result.ok());
6146        assert_eq!(result.text_out().trim(), "yes-like");
6147    }
6148
6149    // ═══════════════════════════════════════════════════════════════════════════
6150    // Cat Stdin Tests
6151    // ═══════════════════════════════════════════════════════════════════════════
6152
6153    #[tokio::test]
6154    async fn test_cat_from_pipeline() {
6155        let kernel = Kernel::transient().expect("failed to create kernel");
6156
6157        let result = kernel
6158            .execute(r#"echo "piped text" | cat"#)
6159            .await
6160            .expect("cat pipeline failed");
6161
6162        assert!(result.ok(), "cat failed: {}", result.err);
6163        assert_eq!(result.text_out().trim(), "piped text");
6164    }
6165
6166    #[tokio::test]
6167    async fn test_cat_from_pipeline_multiline() {
6168        let kernel = Kernel::transient().expect("failed to create kernel");
6169
6170        let result = kernel
6171            .execute(r#"echo "line1\nline2" | cat -n"#)
6172            .await
6173            .expect("cat pipeline failed");
6174
6175        assert!(result.ok(), "cat failed: {}", result.err);
6176        assert!(result.text_out().contains("1\t"), "output: {}", result.text_out());
6177    }
6178
6179    // ═══════════════════════════════════════════════════════════════════════════
6180    // Heredoc Tests
6181    // ═══════════════════════════════════════════════════════════════════════════
6182
6183    #[tokio::test]
6184    async fn test_heredoc_basic() {
6185        let kernel = Kernel::transient().expect("failed to create kernel");
6186
6187        let result = kernel
6188            .execute("cat <<EOF\nhello\nEOF")
6189            .await
6190            .expect("heredoc failed");
6191
6192        assert!(result.ok(), "cat with heredoc failed: {}", result.err);
6193        assert_eq!(result.text_out().trim(), "hello");
6194    }
6195
6196    #[tokio::test]
6197    async fn test_arithmetic_in_string() {
6198        let kernel = Kernel::transient().expect("failed to create kernel");
6199
6200        let result = kernel
6201            .execute(r#"echo "result: $((1 + 2))""#)
6202            .await
6203            .expect("arithmetic in string failed");
6204
6205        assert!(result.ok(), "echo failed: {}", result.err);
6206        assert_eq!(result.text_out().trim(), "result: 3");
6207    }
6208
6209    #[tokio::test]
6210    async fn test_heredoc_multiline() {
6211        let kernel = Kernel::transient().expect("failed to create kernel");
6212
6213        let result = kernel
6214            .execute("cat <<EOF\nline1\nline2\nline3\nEOF")
6215            .await
6216            .expect("heredoc failed");
6217
6218        assert!(result.ok(), "cat with heredoc failed: {}", result.err);
6219        assert!(result.text_out().contains("line1"), "output: {}", result.text_out());
6220        assert!(result.text_out().contains("line2"), "output: {}", result.text_out());
6221        assert!(result.text_out().contains("line3"), "output: {}", result.text_out());
6222    }
6223
6224    #[tokio::test]
6225    async fn test_heredoc_variable_expansion() {
6226        // Bug N: unquoted heredoc should expand variables
6227        let kernel = Kernel::transient().expect("failed to create kernel");
6228
6229        kernel.execute("GREETING=hello").await.expect("set var");
6230
6231        let result = kernel
6232            .execute("cat <<EOF\n$GREETING world\nEOF")
6233            .await
6234            .expect("heredoc expansion failed");
6235
6236        assert!(result.ok(), "heredoc expansion failed: {}", result.err);
6237        assert_eq!(result.text_out().trim(), "hello world");
6238    }
6239
6240    #[tokio::test]
6241    async fn test_heredoc_quoted_no_expansion() {
6242        // Bug N: quoted heredoc (<<'EOF') should NOT expand variables
6243        let kernel = Kernel::transient().expect("failed to create kernel");
6244
6245        kernel.execute("GREETING=hello").await.expect("set var");
6246
6247        let result = kernel
6248            .execute("cat <<'EOF'\n$GREETING world\nEOF")
6249            .await
6250            .expect("quoted heredoc failed");
6251
6252        assert!(result.ok(), "quoted heredoc failed: {}", result.err);
6253        assert_eq!(result.text_out().trim(), "$GREETING world");
6254    }
6255
6256    #[tokio::test]
6257    async fn test_heredoc_default_value_expansion() {
6258        // Bug N: ${VAR:-default} should expand in unquoted heredocs
6259        let kernel = Kernel::transient().expect("failed to create kernel");
6260
6261        let result = kernel
6262            .execute("cat <<EOF\n${UNSET:-fallback}\nEOF")
6263            .await
6264            .expect("heredoc default expansion failed");
6265
6266        assert!(result.ok(), "heredoc default expansion failed: {}", result.err);
6267        assert_eq!(result.text_out().trim(), "fallback");
6268    }
6269
6270    // ═══════════════════════════════════════════════════════════════════════════
6271    // Read Builtin Tests
6272    // ═══════════════════════════════════════════════════════════════════════════
6273
6274    #[tokio::test]
6275    async fn test_read_from_pipeline() {
6276        let kernel = Kernel::transient().expect("failed to create kernel");
6277
6278        // Pipe input to read
6279        let result = kernel
6280            .execute(r#"echo "Alice" | read NAME; echo "Hello, ${NAME}""#)
6281            .await
6282            .expect("read pipeline failed");
6283
6284        assert!(result.ok(), "read failed: {}", result.err);
6285        assert!(result.text_out().contains("Hello, Alice"), "output: {}", result.text_out());
6286    }
6287
6288    #[tokio::test]
6289    async fn test_read_multiple_vars_from_pipeline() {
6290        let kernel = Kernel::transient().expect("failed to create kernel");
6291
6292        let result = kernel
6293            .execute(r#"echo "John Doe 42" | read FIRST LAST AGE; echo "${FIRST} is ${AGE}""#)
6294            .await
6295            .expect("read pipeline failed");
6296
6297        assert!(result.ok(), "read failed: {}", result.err);
6298        assert!(result.text_out().contains("John is 42"), "output: {}", result.text_out());
6299    }
6300
6301    // ═══════════════════════════════════════════════════════════════════════════
6302    // Shell-Style Function Tests
6303    // ═══════════════════════════════════════════════════════════════════════════
6304
6305    #[tokio::test]
6306    async fn test_posix_function_with_positional_params() {
6307        let kernel = Kernel::transient().expect("failed to create kernel");
6308
6309        // Define POSIX-style function
6310        kernel
6311            .execute(r#"greet() { echo "Hello, $1!" }"#)
6312            .await
6313            .expect("function definition failed");
6314
6315        // Call the function
6316        let result = kernel
6317            .execute(r#"greet "Amy""#)
6318            .await
6319            .expect("function call failed");
6320
6321        assert!(result.ok(), "greet failed: {}", result.err);
6322        assert_eq!(result.text_out().trim(), "Hello, Amy!");
6323    }
6324
6325    #[tokio::test]
6326    async fn test_posix_function_multiple_args() {
6327        let kernel = Kernel::transient().expect("failed to create kernel");
6328
6329        // Define function using $1 and $2
6330        kernel
6331            .execute(r#"add_greeting() { echo "$1 $2!" }"#)
6332            .await
6333            .expect("function definition failed");
6334
6335        // Call the function
6336        let result = kernel
6337            .execute(r#"add_greeting "Hello" "World""#)
6338            .await
6339            .expect("function call failed");
6340
6341        assert!(result.ok(), "function failed: {}", result.err);
6342        assert_eq!(result.text_out().trim(), "Hello World!");
6343    }
6344
6345    #[tokio::test]
6346    async fn test_bash_function_with_positional_params() {
6347        let kernel = Kernel::transient().expect("failed to create kernel");
6348
6349        // Define bash-style function (function keyword, no parens)
6350        kernel
6351            .execute(r#"function greet { echo "Hi $1" }"#)
6352            .await
6353            .expect("function definition failed");
6354
6355        // Call the function
6356        let result = kernel
6357            .execute(r#"greet "Bob""#)
6358            .await
6359            .expect("function call failed");
6360
6361        assert!(result.ok(), "greet failed: {}", result.err);
6362        assert_eq!(result.text_out().trim(), "Hi Bob");
6363    }
6364
6365    #[tokio::test]
6366    async fn test_shell_function_with_all_args() {
6367        let kernel = Kernel::transient().expect("failed to create kernel");
6368
6369        // Define function using $@ (all args)
6370        kernel
6371            .execute(r#"echo_all() { echo "args: $@" }"#)
6372            .await
6373            .expect("function definition failed");
6374
6375        // Call with multiple args
6376        let result = kernel
6377            .execute(r#"echo_all "a" "b" "c""#)
6378            .await
6379            .expect("function call failed");
6380
6381        assert!(result.ok(), "function failed: {}", result.err);
6382        assert_eq!(result.text_out().trim(), "args: a b c");
6383    }
6384
6385    #[tokio::test]
6386    async fn test_shell_function_with_arg_count() {
6387        let kernel = Kernel::transient().expect("failed to create kernel");
6388
6389        // Define function using $# (arg count)
6390        kernel
6391            .execute(r#"count_args() { echo "count: $#" }"#)
6392            .await
6393            .expect("function definition failed");
6394
6395        // Call with three args
6396        let result = kernel
6397            .execute(r#"count_args "x" "y" "z""#)
6398            .await
6399            .expect("function call failed");
6400
6401        assert!(result.ok(), "function failed: {}", result.err);
6402        assert_eq!(result.text_out().trim(), "count: 3");
6403    }
6404
6405    #[tokio::test]
6406    async fn test_shell_function_shared_scope() {
6407        let kernel = Kernel::transient().expect("failed to create kernel");
6408
6409        // Set a variable in parent scope
6410        kernel
6411            .execute(r#"PARENT_VAR="visible""#)
6412            .await
6413            .expect("set failed");
6414
6415        // Define shell function that reads and writes parent variable
6416        kernel
6417            .execute(r#"modify_parent() {
6418                echo "saw: ${PARENT_VAR}"
6419                PARENT_VAR="changed by function"
6420            }"#)
6421            .await
6422            .expect("function definition failed");
6423
6424        // Call the function - it SHOULD see PARENT_VAR (bash-compatible shared scope)
6425        let result = kernel.execute("modify_parent").await.expect("function failed");
6426
6427        assert!(
6428            result.text_out().contains("visible"),
6429            "Shell function should access parent scope, got: {}",
6430            result.text_out()
6431        );
6432
6433        // Parent variable should be modified
6434        let var = kernel.get_var("PARENT_VAR").await;
6435        assert_eq!(
6436            var,
6437            Some(Value::String("changed by function".into())),
6438            "Shell function should modify parent scope"
6439        );
6440    }
6441
6442    // ═══════════════════════════════════════════════════════════════════════════
6443    // Script Execution via PATH Tests
6444    // ═══════════════════════════════════════════════════════════════════════════
6445
6446    #[tokio::test]
6447    async fn test_script_execution_from_path() {
6448        let kernel = Kernel::transient().expect("failed to create kernel");
6449
6450        // Create /bin directory and script
6451        kernel.execute(r#"mkdir "/bin""#).await.ok();
6452        kernel
6453            .execute(r#"write "/bin/hello.kai" 'echo "Hello from script!"'"#)
6454            .await
6455            .expect("write script failed");
6456
6457        // Set PATH to /bin
6458        kernel.execute(r#"PATH="/bin""#).await.expect("set PATH failed");
6459
6460        // Call script by name (without .kai extension)
6461        let result = kernel
6462            .execute("hello")
6463            .await
6464            .expect("script execution failed");
6465
6466        assert!(result.ok(), "script failed: {}", result.err);
6467        assert_eq!(result.text_out().trim(), "Hello from script!");
6468    }
6469
6470    #[tokio::test]
6471    async fn test_script_with_args() {
6472        let kernel = Kernel::transient().expect("failed to create kernel");
6473
6474        // Create script that uses positional params
6475        kernel.execute(r#"mkdir "/bin""#).await.ok();
6476        kernel
6477            .execute(r#"write "/bin/greet.kai" 'echo "Hello, $1!"'"#)
6478            .await
6479            .expect("write script failed");
6480
6481        // Set PATH
6482        kernel.execute(r#"PATH="/bin""#).await.expect("set PATH failed");
6483
6484        // Call script with arg
6485        let result = kernel
6486            .execute(r#"greet "World""#)
6487            .await
6488            .expect("script execution failed");
6489
6490        assert!(result.ok(), "script failed: {}", result.err);
6491        assert_eq!(result.text_out().trim(), "Hello, World!");
6492    }
6493
6494    #[tokio::test]
6495    async fn test_script_not_found() {
6496        let kernel = Kernel::transient().expect("failed to create kernel");
6497
6498        // Set empty PATH
6499        kernel.execute(r#"PATH="/nonexistent""#).await.expect("set PATH failed");
6500
6501        // Call non-existent script
6502        let result = kernel
6503            .execute("noscript")
6504            .await
6505            .expect("execution failed");
6506
6507        assert!(!result.ok(), "should fail with command not found");
6508        assert_eq!(result.code, 127);
6509        assert!(result.err.contains("command not found"));
6510    }
6511
6512    #[tokio::test]
6513    async fn test_script_path_search_order() {
6514        let kernel = Kernel::transient().expect("failed to create kernel");
6515
6516        // Create two directories with same-named script
6517        // Note: using "myscript" not "test" to avoid conflict with test builtin
6518        kernel.execute(r#"mkdir "/first""#).await.ok();
6519        kernel.execute(r#"mkdir "/second""#).await.ok();
6520        kernel
6521            .execute(r#"write "/first/myscript.kai" 'echo "from first"'"#)
6522            .await
6523            .expect("write failed");
6524        kernel
6525            .execute(r#"write "/second/myscript.kai" 'echo "from second"'"#)
6526            .await
6527            .expect("write failed");
6528
6529        // Set PATH with first before second
6530        kernel.execute(r#"PATH="/first:/second""#).await.expect("set PATH failed");
6531
6532        // Should find first one
6533        let result = kernel
6534            .execute("myscript")
6535            .await
6536            .expect("script execution failed");
6537
6538        assert!(result.ok(), "script failed: {}", result.err);
6539        assert_eq!(result.text_out().trim(), "from first");
6540    }
6541
6542    // ═══════════════════════════════════════════════════════════════════════════
6543    // Special Variable Tests ($?, $$, unset vars)
6544    // ═══════════════════════════════════════════════════════════════════════════
6545
6546    #[tokio::test]
6547    async fn test_last_exit_code_success() {
6548        let kernel = Kernel::transient().expect("failed to create kernel");
6549
6550        // true exits with 0
6551        let result = kernel.execute("true; echo $?").await.expect("execution failed");
6552        assert!(result.text_out().contains("0"), "expected 0, got: {}", result.text_out());
6553    }
6554
6555    #[tokio::test]
6556    async fn test_last_exit_code_failure() {
6557        let kernel = Kernel::transient().expect("failed to create kernel");
6558
6559        // false exits with 1
6560        let result = kernel.execute("false; echo $?").await.expect("execution failed");
6561        assert!(result.text_out().contains("1"), "expected 1, got: {}", result.text_out());
6562    }
6563
6564    #[tokio::test]
6565    async fn test_current_pid() {
6566        let kernel = Kernel::transient().expect("failed to create kernel");
6567
6568        let result = kernel.execute("echo $$").await.expect("execution failed");
6569        // PID should be a positive number
6570        let pid: u32 = result.text_out().trim().parse().expect("PID should be a number");
6571        assert!(pid > 0, "PID should be positive");
6572    }
6573
6574    #[tokio::test]
6575    async fn test_unset_variable_expands_to_empty() {
6576        let kernel = Kernel::transient().expect("failed to create kernel");
6577
6578        // Unset variable in interpolation should be empty
6579        let result = kernel.execute(r#"echo "prefix:${UNSET_VAR}:suffix""#).await.expect("execution failed");
6580        assert_eq!(result.text_out().trim(), "prefix::suffix");
6581    }
6582
6583    #[tokio::test]
6584    async fn test_eq_ne_operators() {
6585        let kernel = Kernel::transient().expect("failed to create kernel");
6586
6587        // Test -eq operator
6588        let result = kernel.execute(r#"if [[ 5 -eq 5 ]]; then echo "eq works"; fi"#).await.expect("execution failed");
6589        assert_eq!(result.text_out().trim(), "eq works");
6590
6591        // Test -ne operator
6592        let result = kernel.execute(r#"if [[ 5 -ne 3 ]]; then echo "ne works"; fi"#).await.expect("execution failed");
6593        assert_eq!(result.text_out().trim(), "ne works");
6594
6595        // Test -eq with different values
6596        let result = kernel.execute(r#"if [[ 5 -eq 3 ]]; then echo "wrong"; else echo "correct"; fi"#).await.expect("execution failed");
6597        assert_eq!(result.text_out().trim(), "correct");
6598    }
6599
6600    #[tokio::test]
6601    async fn test_escaped_dollar_in_string() {
6602        let kernel = Kernel::transient().expect("failed to create kernel");
6603
6604        // \$ should produce literal $
6605        let result = kernel.execute(r#"echo "\$100""#).await.expect("execution failed");
6606        assert_eq!(result.text_out().trim(), "$100");
6607    }
6608
6609    #[tokio::test]
6610    async fn test_special_vars_in_interpolation() {
6611        let kernel = Kernel::transient().expect("failed to create kernel");
6612
6613        // Test $? in string interpolation
6614        let result = kernel.execute(r#"true; echo "exit: $?""#).await.expect("execution failed");
6615        assert_eq!(result.text_out().trim(), "exit: 0");
6616
6617        // Test $$ in string interpolation
6618        let result = kernel.execute(r#"echo "pid: $$""#).await.expect("execution failed");
6619        assert!(result.text_out().starts_with("pid: "), "unexpected output: {}", result.text_out());
6620        let text = result.text_out();
6621        let pid_part = text.trim().strip_prefix("pid: ").unwrap();
6622        let _pid: u32 = pid_part.parse().expect("PID in string should be a number");
6623    }
6624
6625    // ═══════════════════════════════════════════════════════════════════════════
6626    // Command Substitution Tests
6627    // ═══════════════════════════════════════════════════════════════════════════
6628
6629    #[tokio::test]
6630    async fn test_command_subst_assignment() {
6631        let kernel = Kernel::transient().expect("failed to create kernel");
6632
6633        // Command substitution in assignment
6634        let result = kernel.execute(r#"X=$(echo hello); echo "$X""#).await.expect("execution failed");
6635        assert_eq!(result.text_out().trim(), "hello");
6636    }
6637
6638    #[tokio::test]
6639    async fn test_command_subst_with_args() {
6640        let kernel = Kernel::transient().expect("failed to create kernel");
6641
6642        // Command substitution with string argument
6643        let result = kernel.execute(r#"X=$(echo "a b c"); echo "$X""#).await.expect("execution failed");
6644        assert_eq!(result.text_out().trim(), "a b c");
6645    }
6646
6647    #[tokio::test]
6648    async fn test_command_subst_nested_vars() {
6649        let kernel = Kernel::transient().expect("failed to create kernel");
6650
6651        // Variables inside command substitution
6652        let result = kernel.execute(r#"Y=world; X=$(echo "hello $Y"); echo "$X""#).await.expect("execution failed");
6653        assert_eq!(result.text_out().trim(), "hello world");
6654    }
6655
6656    #[tokio::test]
6657    async fn test_background_job_basic() {
6658        use std::time::Duration;
6659
6660        let kernel = Kernel::new(KernelConfig::isolated()).expect("failed to create kernel");
6661
6662        // Run a simple background command
6663        let result = kernel.execute("echo hello &").await.expect("execution failed");
6664        assert!(result.ok(), "background command should succeed: {}", result.err);
6665        assert!(result.text_out().contains("[1]"), "should return job ID: {}", result.text_out());
6666
6667        // Give the job time to complete
6668        tokio::time::sleep(Duration::from_millis(100)).await;
6669
6670        // Check job status
6671        let status = kernel.execute("cat /v/jobs/1/status").await.expect("status check failed");
6672        assert!(status.ok(), "status should succeed: {}", status.err);
6673        assert!(
6674            status.text_out().contains("done:") || status.text_out().contains("running"),
6675            "should have valid status: {}",
6676            status.text_out()
6677        );
6678
6679        // Check stdout
6680        let stdout = kernel.execute("cat /v/jobs/1/stdout").await.expect("stdout check failed");
6681        assert!(stdout.ok());
6682        assert!(stdout.text_out().contains("hello"));
6683    }
6684
6685    #[tokio::test]
6686    async fn test_heredoc_piped_to_command() {
6687        // Bug 4: heredoc content should pipe through to next command
6688        let kernel = Kernel::transient().expect("kernel");
6689        let result = kernel.execute("cat <<EOF | cat\nhello world\nEOF").await.expect("exec");
6690        assert!(result.ok(), "heredoc | cat failed: {}", result.err);
6691        assert_eq!(result.text_out().trim(), "hello world");
6692    }
6693
6694    /// A transient kernel paired with a real, auto-cleaning tempdir. The
6695    /// transient (Sandboxed) kernel mounts `/tmp` as a real `LocalFs`, so glob
6696    /// tests need actual files on disk. Hold the returned `TempDir` for the
6697    /// test's lifetime: it removes the directory tree on drop — including on
6698    /// panic — so no test scratch leaks into `/tmp` (the project's tmp-builder
6699    /// convention; never hardcode `/tmp/...` paths). Returns the absolute path
6700    /// as a string for interpolation into scripts.
6701    fn transient_with_tempdir() -> (Kernel, tempfile::TempDir, String) {
6702        let kernel = Kernel::transient().expect("kernel");
6703        let tmp = tempfile::tempdir().expect("tempdir");
6704        let dir = tmp.path().display().to_string();
6705        (kernel, tmp, dir)
6706    }
6707
6708    #[tokio::test]
6709    async fn test_for_loop_glob_iterates() {
6710        // Bug 1: for F in $(glob ...) should iterate per file, not once
6711        let (kernel, _tmp, dir) = transient_with_tempdir();
6712        kernel.execute(&format!("echo a > {dir}/a.txt")).await.unwrap();
6713        kernel.execute(&format!("echo b > {dir}/b.txt")).await.unwrap();
6714        let result = kernel.execute(&format!(r#"
6715            N=0
6716            for F in $(glob "{dir}/*.txt"); do
6717                N=$((N + 1))
6718            done
6719            echo $N
6720        "#)).await.unwrap();
6721        assert!(result.ok(), "for glob failed: {}", result.err);
6722        assert_eq!(result.text_out().trim(), "2", "Should iterate 2 files, got: {}", result.text_out());
6723    }
6724
6725    #[tokio::test]
6726    async fn test_bare_glob_expansion_echo() {
6727        let (kernel, _tmp, dir) = transient_with_tempdir();
6728        kernel.execute(&format!("echo a > {dir}/a.txt")).await.unwrap();
6729        kernel.execute(&format!("echo b > {dir}/b.txt")).await.unwrap();
6730        kernel.execute(&format!("echo c > {dir}/c.rs")).await.unwrap();
6731        kernel.execute(&format!("cd {dir}")).await.unwrap();
6732        let result = kernel.execute("echo *.txt").await.unwrap();
6733        assert!(result.ok(), "echo *.txt failed: {}", result.err);
6734        let out = result.text_out();
6735        let out = out.trim();
6736        // Should contain both .txt files (order may vary)
6737        assert!(out.contains("a.txt"), "missing a.txt in: {}", out);
6738        assert!(out.contains("b.txt"), "missing b.txt in: {}", out);
6739        assert!(!out.contains("c.rs"), "should not contain c.rs in: {}", out);
6740    }
6741
6742    #[tokio::test]
6743    async fn test_bare_glob_no_matches_errors() {
6744        let (kernel, _tmp, dir) = transient_with_tempdir();
6745        kernel.execute(&format!("cd {dir}")).await.unwrap();
6746        let result = kernel.execute("echo *.nonexistent").await;
6747        match &result {
6748            Ok(exec) => {
6749                // No-match glob should produce a non-zero exit code
6750                assert!(!exec.ok(), "expected failure, got success: out={}, err={}", exec.text_out(), exec.err);
6751                assert!(exec.err.contains("no matches"), "error should say no matches: {}", exec.err);
6752            }
6753            Err(e) => {
6754                assert!(e.to_string().contains("no matches"), "error should say no matches: {}", e);
6755            }
6756        }
6757    }
6758
6759    #[tokio::test]
6760    async fn test_bare_glob_disabled_with_set() {
6761        let (kernel, _tmp, dir) = transient_with_tempdir();
6762        kernel.execute(&format!("echo a > {dir}/a.txt")).await.unwrap();
6763        kernel.execute(&format!("cd {dir}")).await.unwrap();
6764        // Disable glob expansion
6765        kernel.execute("set +o glob").await.unwrap();
6766        let result = kernel.execute("echo *.txt").await.unwrap();
6767        // With glob disabled, *.txt should be passed as literal string
6768        assert!(result.ok(), "echo should succeed: {}", result.err);
6769        assert_eq!(result.text_out().trim(), "*.txt", "should be literal: {}", result.text_out());
6770    }
6771
6772    #[tokio::test]
6773    async fn test_bare_glob_quoted_not_expanded() {
6774        let (kernel, _tmp, dir) = transient_with_tempdir();
6775        kernel.execute(&format!("echo a > {dir}/a.txt")).await.unwrap();
6776        kernel.execute(&format!("cd {dir}")).await.unwrap();
6777        // Quoted globs should NOT expand
6778        let result = kernel.execute("echo \"*.txt\"").await.unwrap();
6779        assert!(result.ok(), "echo should succeed: {}", result.err);
6780        assert_eq!(result.text_out().trim(), "*.txt", "quoted should be literal: {}", result.text_out());
6781    }
6782
6783    #[tokio::test]
6784    async fn test_bare_glob_for_loop() {
6785        let (kernel, _tmp, dir) = transient_with_tempdir();
6786        kernel.execute(&format!("echo a > {dir}/a.txt")).await.unwrap();
6787        kernel.execute(&format!("echo b > {dir}/b.txt")).await.unwrap();
6788        kernel.execute(&format!("cd {dir}")).await.unwrap();
6789        let result = kernel.execute(r#"
6790            N=0
6791            for f in *.txt; do
6792                N=$((N + 1))
6793            done
6794            echo $N
6795        "#).await.unwrap();
6796        assert!(result.ok(), "for loop failed: {}", result.err);
6797        assert_eq!(result.text_out().trim(), "2", "should iterate 2 files: {}", result.text_out());
6798    }
6799
6800    #[tokio::test]
6801    async fn test_glob_in_assignment_is_literal() {
6802        let kernel = Kernel::transient().expect("kernel");
6803        let result = kernel.execute("X=*.txt; echo $X").await.unwrap();
6804        assert!(result.ok());
6805        assert_eq!(result.text_out().trim(), "*.txt", "glob in assignment should be literal");
6806    }
6807
6808    #[tokio::test]
6809    async fn test_glob_in_test_expr_is_literal() {
6810        let kernel = Kernel::transient().expect("kernel");
6811        let result = kernel.execute(r#"
6812            if [[ *.txt == "*.txt" ]]; then
6813                echo "match"
6814            else
6815                echo "no"
6816            fi
6817        "#).await.unwrap();
6818        assert!(result.ok());
6819        assert_eq!(result.text_out().trim(), "match", "glob in test expr should be literal");
6820    }
6821
6822    #[tokio::test]
6823    async fn test_command_subst_echo_not_iterable() {
6824        // Regression guard: $(echo "a b c") must remain a single string
6825        let kernel = Kernel::transient().expect("kernel");
6826        let result = kernel.execute(r#"
6827            N=0
6828            for X in $(echo "a b c"); do N=$((N + 1)); done
6829            echo $N
6830        "#).await.unwrap();
6831        assert!(result.ok());
6832        assert_eq!(result.text_out().trim(), "1", "echo should be one item: {}", result.text_out());
6833    }
6834
6835    // -- accumulate_result / newline tests --
6836
6837    #[test]
6838    fn test_accumulate_preserves_own_newlines() {
6839        // Outputs concatenate verbatim — a command's own trailing newline is
6840        // kept, none is invented.
6841        let mut acc = ExecResult::success("line1\n");
6842        let new = ExecResult::success("line2\n");
6843        accumulate_result(&mut acc, &new);
6844        assert_eq!(&*acc.text_out(), "line1\nline2\n");
6845        assert!(!acc.text_out().contains("\n\n"), "should not have double newlines: {:?}", acc.text_out());
6846    }
6847
6848    #[test]
6849    fn test_accumulate_inserts_no_separator() {
6850        // No artificial separator: `printf a; printf b` style concatenates to
6851        // `ab`, matching bash (regression for the 2026-06-09 finding).
6852        let mut acc = ExecResult::success("line1");
6853        let new = ExecResult::success("line2");
6854        accumulate_result(&mut acc, &new);
6855        assert_eq!(&*acc.text_out(), "line1line2");
6856    }
6857
6858    #[test]
6859    fn test_accumulate_empty_into_nonempty() {
6860        let mut acc = ExecResult::success("");
6861        let new = ExecResult::success("hello\n");
6862        accumulate_result(&mut acc, &new);
6863        assert_eq!(&*acc.text_out(), "hello\n");
6864    }
6865
6866    #[test]
6867    fn test_accumulate_nonempty_into_empty() {
6868        let mut acc = ExecResult::success("hello\n");
6869        let new = ExecResult::success("");
6870        accumulate_result(&mut acc, &new);
6871        assert_eq!(&*acc.text_out(), "hello\n");
6872    }
6873
6874    #[test]
6875    fn test_accumulate_stderr_no_double_newlines() {
6876        let mut acc = ExecResult::failure(1, "err1\n");
6877        let new = ExecResult::failure(1, "err2\n");
6878        accumulate_result(&mut acc, &new);
6879        assert!(!acc.err.contains("\n\n"), "stderr should not have double newlines: {:?}", acc.err);
6880    }
6881
6882    #[tokio::test]
6883    async fn test_multiple_echo_no_blank_lines() {
6884        let kernel = Kernel::transient().expect("kernel");
6885        let result = kernel
6886            .execute("echo one\necho two\necho three")
6887            .await
6888            .expect("execution failed");
6889        assert!(result.ok());
6890        assert_eq!(&*result.text_out(), "one\ntwo\nthree\n");
6891    }
6892
6893    #[tokio::test]
6894    async fn test_for_loop_no_blank_lines() {
6895        let kernel = Kernel::transient().expect("kernel");
6896        let result = kernel
6897            .execute(r#"for X in a b c; do echo "item: ${X}"; done"#)
6898            .await
6899            .expect("execution failed");
6900        assert!(result.ok());
6901        assert_eq!(&*result.text_out(), "item: a\nitem: b\nitem: c\n");
6902    }
6903
6904    #[tokio::test]
6905    async fn test_for_command_subst_no_blank_lines() {
6906        let kernel = Kernel::transient().expect("kernel");
6907        let result = kernel
6908            .execute(r#"for N in $(seq 1 3); do echo "n=${N}"; done"#)
6909            .await
6910            .expect("execution failed");
6911        assert!(result.ok());
6912        assert_eq!(&*result.text_out(), "n=1\nn=2\nn=3\n");
6913    }
6914
6915    // ------------------------------------------------------------------
6916    // build_args_async: multi-consume flags (jq --arg NAME VALUE pattern)
6917    // ------------------------------------------------------------------
6918
6919    /// Helper: a throwaway schema with one `--pair` param declared as
6920    /// consuming two positionals per occurrence. Modelled after what
6921    /// jq_native will declare for `--arg` / `--argjson`.
6922    fn multi_consume_schema() -> crate::tools::ToolSchema {
6923        use crate::tools::{ParamSchema, ToolSchema};
6924        ToolSchema::new("test", "multi-consume smoke")
6925            .param(
6926                ParamSchema::optional("pair", "array", Value::Null, "name+value pair")
6927                    .consumes(2),
6928            )
6929    }
6930
6931    fn pos(s: &str) -> Arg {
6932        Arg::Positional(Expr::Literal(Value::String(s.to_string())))
6933    }
6934
6935    #[tokio::test]
6936    async fn build_args_multi_consume_single_occurrence() {
6937        let kernel = Kernel::transient().expect("kernel");
6938        let schema = multi_consume_schema();
6939        // Simulates:  test --pair NAME VALUE filter
6940        let args = vec![
6941            Arg::LongFlag("pair".into()),
6942            pos("NAME"),
6943            pos("VALUE"),
6944            pos("filter"),
6945        ];
6946        let built = kernel
6947            .build_args_async(&args, Some(&schema))
6948            .await
6949            .expect("build_args should succeed");
6950
6951        // `--pair` + its two positionals are consumed into named["pair"],
6952        // which becomes an outer array of one inner 2-element array.
6953        let pair = built.named.get("pair").expect("named[pair] missing");
6954        match pair {
6955            Value::Json(serde_json::Value::Array(occurrences)) => {
6956                assert_eq!(occurrences.len(), 1, "expected one occurrence");
6957                match &occurrences[0] {
6958                    serde_json::Value::Array(values) => {
6959                        assert_eq!(values.len(), 2, "pair must have 2 values");
6960                        assert_eq!(values[0], serde_json::Value::String("NAME".into()));
6961                        assert_eq!(values[1], serde_json::Value::String("VALUE".into()));
6962                    }
6963                    other => panic!("expected inner array, got {other:?}"),
6964                }
6965            }
6966            other => panic!("expected Json(Array(...)) for named[pair], got {other:?}"),
6967        }
6968
6969        // The un-consumed positional ("filter") remains in `positional`.
6970        assert_eq!(built.positional.len(), 1);
6971        assert_eq!(built.positional[0], Value::String("filter".into()));
6972    }
6973    #[tokio::test]
6974    async fn build_args_multi_consume_two_occurrences_accumulate() {
6975        let kernel = Kernel::transient().expect("kernel");
6976        let schema = multi_consume_schema();
6977        // Simulates:  test --pair A 1 --pair B 2 filter
6978        let args = vec![
6979            Arg::LongFlag("pair".into()),
6980            pos("A"),
6981            pos("1"),
6982            Arg::LongFlag("pair".into()),
6983            pos("B"),
6984            pos("2"),
6985            pos("filter"),
6986        ];
6987        let built = kernel
6988            .build_args_async(&args, Some(&schema))
6989            .await
6990            .expect("build_args should succeed");
6991
6992        let pair = built.named.get("pair").expect("named[pair] missing");
6993        match pair {
6994            Value::Json(serde_json::Value::Array(occurrences)) => {
6995                assert_eq!(occurrences.len(), 2, "expected two occurrences");
6996                // Preserved in invocation order.
6997                match &occurrences[0] {
6998                    serde_json::Value::Array(values) => {
6999                        assert_eq!(values[0], serde_json::Value::String("A".into()));
7000                        assert_eq!(values[1], serde_json::Value::String("1".into()));
7001                    }
7002                    other => panic!("expected inner array, got {other:?}"),
7003                }
7004                match &occurrences[1] {
7005                    serde_json::Value::Array(values) => {
7006                        assert_eq!(values[0], serde_json::Value::String("B".into()));
7007                        assert_eq!(values[1], serde_json::Value::String("2".into()));
7008                    }
7009                    other => panic!("expected inner array, got {other:?}"),
7010                }
7011            }
7012            other => panic!("expected Json(Array(...)), got {other:?}"),
7013        }
7014    }
7015
7016    // ── undeclared space-form flag under map_positionals (kj --type val) ──
7017    //
7018    // A backend/MCP tool whose schema does NOT declare a flag must not let
7019    // `--flag value` (space form) silently divorce the value: that was a
7020    // privilege-escalation-by-typo against kaijutsu (see docs/issues.md).
7021    // kaish fails loud rather than guessing.
7022
7023    use crate::tools::{ParamSchema, ToolSchema};
7024
7025    /// Backend-style schema (map_positionals) declaring only a `name`
7026    /// positional — `--type` is intentionally undeclared.
7027    fn kj_like_schema() -> ToolSchema {
7028        ToolSchema::new("kj", "incomplete backend schema")
7029            .param(ParamSchema::optional("name", "string", Value::Null, "context name"))
7030            .with_positional_mapping()
7031    }
7032
7033    #[tokio::test]
7034    async fn build_args_undeclared_space_flag_errors_under_map_positionals() {
7035        let kernel = Kernel::transient().expect("kernel");
7036        let schema = kj_like_schema();
7037        // kj context create exp --type explorer
7038        let args = vec![
7039            pos("context"),
7040            pos("create"),
7041            pos("exp"),
7042            Arg::LongFlag("type".into()),
7043            pos("explorer"),
7044        ];
7045        let err = kernel
7046            .build_args_async(&args, Some(&schema))
7047            .await
7048            .expect_err("undeclared --type with a space value must fail loud");
7049        let msg = err.to_string();
7050        assert!(msg.contains("--type"), "message should name the flag: {msg}");
7051        assert!(msg.contains("--type=explorer"), "message should suggest the = form: {msg}");
7052        assert!(msg.contains("kj"), "message should name the tool: {msg}");
7053    }
7054
7055    #[tokio::test]
7056    async fn build_args_declared_space_flag_still_binds() {
7057        let kernel = Kernel::transient().expect("kernel");
7058        // Same tool, but now the schema DECLARES --type as a string param.
7059        let schema = ToolSchema::new("kj", "complete schema")
7060            .param(ParamSchema::optional("name", "string", Value::Null, "context name"))
7061            .param(ParamSchema::optional("type", "string", Value::Null, "role type"))
7062            .with_positional_mapping();
7063        let args = vec![
7064            pos("exp"),
7065            Arg::LongFlag("type".into()),
7066            pos("explorer"),
7067        ];
7068        let built = kernel.build_args_async(&args, Some(&schema)).await.unwrap();
7069        assert_eq!(built.named.get("type"), Some(&Value::String("explorer".into())));
7070    }
7071
7072    #[tokio::test]
7073    async fn build_args_equals_form_binds_for_undeclared_flag() {
7074        let kernel = Kernel::transient().expect("kernel");
7075        let schema = kj_like_schema();
7076        // The unambiguous `=` form must keep working even when undeclared.
7077        let args = vec![
7078            pos("exp"),
7079            Arg::Named { key: "type".into(), value: Expr::Literal(Value::String("explorer".into())) },
7080        ];
7081        let built = kernel.build_args_async(&args, Some(&schema)).await.unwrap();
7082        assert_eq!(built.named.get("type"), Some(&Value::String("explorer".into())));
7083    }
7084
7085    #[tokio::test]
7086    async fn build_args_undeclared_bool_flag_at_end_is_ok() {
7087        let kernel = Kernel::transient().expect("kernel");
7088        let schema = kj_like_schema();
7089        // No positional follows --force → unambiguously a bare flag.
7090        let args = vec![pos("exp"), Arg::LongFlag("force".into())];
7091        let built = kernel.build_args_async(&args, Some(&schema)).await.unwrap();
7092        assert!(built.flags.contains("force"));
7093    }
7094
7095    #[tokio::test]
7096    async fn build_args_undeclared_flag_before_another_flag_is_ok() {
7097        let kernel = Kernel::transient().expect("kernel");
7098        let schema = kj_like_schema();
7099        // --verbose is followed by a flag, not a positional → not ambiguous.
7100        let args = vec![
7101            Arg::LongFlag("verbose".into()),
7102            Arg::Named { key: "name".into(), value: Expr::Literal(Value::String("x".into())) },
7103        ];
7104        let built = kernel.build_args_async(&args, Some(&schema)).await.unwrap();
7105        assert!(built.flags.contains("verbose"));
7106    }
7107
7108    #[tokio::test]
7109    async fn build_args_undeclared_space_flag_ok_for_builtin_schema() {
7110        let kernel = Kernel::transient().expect("kernel");
7111        // Builtins set map_positionals=false; the ambiguity guard must not
7112        // fire there (clap validates their flags separately).
7113        let schema = ToolSchema::new("frobnicate", "builtin-style")
7114            .param(ParamSchema::optional("name", "string", Value::Null, "name"));
7115        let args = vec![Arg::LongFlag("frob".into()), pos("value")];
7116        let built = kernel.build_args_async(&args, Some(&schema)).await.unwrap();
7117        assert!(built.flags.contains("frob"));
7118    }
7119
7120    // ── subcommand-aware binding (select_leaf wired into build_args_async) ──
7121    //
7122    // A tool exposing a subcommand tree binds flags against the *routed leaf's*
7123    // params, not the root's. The subcommand-path positionals stay positional
7124    // (kj re-parses them with its own clap), and a value flag declared only on
7125    // a deep leaf still binds in space form.
7126
7127    /// kj → context (alias ctx) → create{--type value, --force bool}.
7128    /// map_positionals defaults false on every node (builtin/kj style).
7129    fn kj_tree_schema() -> ToolSchema {
7130        ToolSchema::new("kj", "subcommand tool").subcommand(
7131            ToolSchema::new("context", "context ops")
7132                .with_command_aliases(["ctx"])
7133                .subcommand(
7134                    ToolSchema::new("create", "create context")
7135                        .param(ParamSchema::new("type", "string").with_aliases(["t"]))
7136                        .param(ParamSchema::new("force", "bool")),
7137                ),
7138        )
7139    }
7140
7141    #[tokio::test]
7142    async fn build_args_binds_deep_leaf_value_flag_space_form() {
7143        let kernel = Kernel::transient().expect("kernel");
7144        let schema = kj_tree_schema();
7145        // kj context create --type explorer
7146        let args = vec![
7147            pos("context"),
7148            pos("create"),
7149            Arg::LongFlag("type".into()),
7150            pos("explorer"),
7151        ];
7152        let built = kernel.build_args_async(&args, Some(&schema)).await.expect("build_args");
7153        // --type (declared only on the create leaf) binds in space form.
7154        assert_eq!(built.named.get("type"), Some(&Value::String("explorer".into())));
7155        // The subcommand path survives as positionals for kj to re-parse.
7156        let positionals: Vec<&str> = built
7157            .positional
7158            .iter()
7159            .filter_map(|v| if let Value::String(s) = v { Some(s.as_str()) } else { None })
7160            .collect();
7161        assert_eq!(positionals, vec!["context", "create"]);
7162    }
7163
7164    #[tokio::test]
7165    async fn build_args_leaf_bool_flag_does_not_swallow_positional() {
7166        let kernel = Kernel::transient().expect("kernel");
7167        let schema = kj_tree_schema();
7168        // kj context create --force somearg  → --force is a leaf bool flag,
7169        // it must NOT consume `somearg`.
7170        let args = vec![
7171            pos("context"),
7172            pos("create"),
7173            Arg::LongFlag("force".into()),
7174            pos("somearg"),
7175        ];
7176        let built = kernel.build_args_async(&args, Some(&schema)).await.expect("build_args");
7177        assert!(built.flags.contains("force"), "force should be a bare flag");
7178        let positionals: Vec<&str> = built
7179            .positional
7180            .iter()
7181            .filter_map(|v| if let Value::String(s) = v { Some(s.as_str()) } else { None })
7182            .collect();
7183        assert_eq!(positionals, vec!["context", "create", "somearg"]);
7184    }
7185
7186    #[tokio::test]
7187    async fn build_args_alias_routed_leaf_binds_value_flag() {
7188        let kernel = Kernel::transient().expect("kernel");
7189        let schema = kj_tree_schema();
7190        // kj ctx create -t explorer  → command alias + short flag alias.
7191        let args = vec![
7192            pos("ctx"),
7193            pos("create"),
7194            Arg::ShortFlag("t".into()),
7195            pos("explorer"),
7196        ];
7197        let built = kernel.build_args_async(&args, Some(&schema)).await.expect("build_args");
7198        assert_eq!(built.named.get("type"), Some(&Value::String("explorer".into())));
7199    }
7200
7201    #[tokio::test]
7202    async fn build_args_computed_subcommand_selector_fails_loud() {
7203        let kernel = Kernel::transient().expect("kernel");
7204        let schema = kj_tree_schema();
7205        // kj $(echo context) — routing can't see the value; fail loud.
7206        let args = vec![Arg::Positional(Expr::CommandSubst(vec![Stmt::Command(
7207            crate::ast::Command { name: "echo".into(), args: vec![], redirects: vec![] },
7208        )]))];
7209        let err = kernel
7210            .build_args_async(&args, Some(&schema))
7211            .await
7212            .expect_err("computed subcommand selector must error");
7213        assert!(
7214            err.to_string().contains("subcommand name is required"),
7215            "got: {err}"
7216        );
7217    }
7218
7219    // ── finalize_output: --json rendering vs. owns_output opt-out ───────────
7220
7221    #[test]
7222    fn finalize_output_renders_when_kernel_owns_it() {
7223        use crate::interpreter::{OutputData, OutputFormat};
7224        let r = ExecResult::with_output(OutputData::text("RAW"));
7225        let out = finalize_output(r, Some(OutputFormat::Json), false);
7226        // Kernel renders the typed OutputData → JSON; text is no longer bare.
7227        assert_ne!(out.text_out(), "RAW", "kernel should reformat to JSON");
7228    }
7229
7230    #[test]
7231    fn finalize_output_skips_when_tool_owns_output() {
7232        use crate::interpreter::{OutputData, OutputFormat};
7233        let r = ExecResult::with_output(OutputData::text("RAW"));
7234        let out = finalize_output(r, Some(OutputFormat::Json), true);
7235        // owns_output: the tool already rendered; kernel leaves bytes untouched.
7236        assert_eq!(out.text_out(), "RAW", "owned output must be left as-is");
7237    }
7238
7239    #[test]
7240    fn finalize_output_no_format_is_noop() {
7241        use crate::interpreter::OutputData;
7242        let r = ExecResult::with_output(OutputData::text("RAW"));
7243        let out = finalize_output(r, None, false);
7244        assert_eq!(out.text_out(), "RAW");
7245    }
7246
7247    // ── initial_vars + execute_with_vars + hermetic env ───────────────────
7248
7249    #[tokio::test]
7250    async fn test_initial_vars_set_and_exported() {
7251        let config = KernelConfig::transient()
7252            .with_var("INIT_FOO", Value::String("bar".into()));
7253        let kernel = Kernel::new(config).expect("failed to create kernel");
7254
7255        assert_eq!(
7256            kernel.get_var("INIT_FOO").await,
7257            Some(Value::String("bar".into()))
7258        );
7259        assert!(
7260            kernel.scope.read().await.is_exported("INIT_FOO"),
7261            "initial_vars entries must be marked exported"
7262        );
7263    }
7264
7265    #[tokio::test]
7266    async fn test_execute_with_vars_overlay_visible() {
7267        let kernel = Kernel::transient().expect("failed to create kernel");
7268        let mut overlay = HashMap::new();
7269        overlay.insert("OVERLAY_X".to_string(), Value::String("yes".into()));
7270
7271        let result = kernel
7272            .execute_with_options(r#"echo "${OVERLAY_X}""#, ExecuteOptions::new().with_vars(overlay))
7273            .await
7274            .expect("execute failed");
7275
7276        assert!(result.ok());
7277        assert_eq!(result.text_out().trim(), "yes");
7278    }
7279
7280    #[tokio::test]
7281    async fn test_execute_with_vars_overlay_cleanup() {
7282        let kernel = Kernel::transient().expect("failed to create kernel");
7283        let mut overlay = HashMap::new();
7284        overlay.insert("EPHEMERAL".to_string(), Value::String("transient".into()));
7285
7286        kernel
7287            .execute_with_options("echo ignored", ExecuteOptions::new().with_vars(overlay))
7288            .await
7289            .expect("execute failed");
7290
7291        assert_eq!(kernel.get_var("EPHEMERAL").await, None);
7292        assert!(
7293            !kernel.scope.read().await.is_exported("EPHEMERAL"),
7294            "overlay-only export must be cleared on return"
7295        );
7296    }
7297
7298    #[tokio::test]
7299    async fn test_execute_with_vars_does_not_clobber_existing_export() {
7300        let kernel = Kernel::transient().expect("failed to create kernel");
7301        kernel
7302            .execute("export OUTER=outer")
7303            .await
7304            .expect("export failed");
7305
7306        let mut overlay = HashMap::new();
7307        overlay.insert("OUTER".to_string(), Value::String("inner".into()));
7308        let result = kernel
7309            .execute_with_options(r#"echo "${OUTER}""#, ExecuteOptions::new().with_vars(overlay))
7310            .await
7311            .expect("execute failed");
7312        assert_eq!(result.text_out().trim(), "inner");
7313
7314        assert_eq!(
7315            kernel.get_var("OUTER").await,
7316            Some(Value::String("outer".into())),
7317            "outer value must reappear after pop"
7318        );
7319        assert!(
7320            kernel.scope.read().await.is_exported("OUTER"),
7321            "outer export must survive overlay"
7322        );
7323    }
7324
7325    #[tokio::test]
7326    async fn test_execute_with_vars_inner_assignment_is_local() {
7327        let kernel = Kernel::transient().expect("failed to create kernel");
7328        let mut overlay = HashMap::new();
7329        overlay.insert("LOCAL_FOO".to_string(), Value::String("from-overlay".into()));
7330
7331        // Variable assignment inside a single statement uses set() (innermost
7332        // frame), not set_global() — this matches bash function-local semantics.
7333        // We explicitly use `local FOO=...` style by relying on the pushed
7334        // frame; the assignment in the script body modifies the same frame.
7335        let result = kernel
7336            .execute_with_options(
7337                r#"LOCAL_FOO="reassigned"; echo "${LOCAL_FOO}""#,
7338                ExecuteOptions::new().with_vars(overlay),
7339            )
7340            .await
7341            .expect("execute failed");
7342        assert!(result.ok());
7343
7344        // After the call the frame is popped, so LOCAL_FOO is gone regardless
7345        // of how the script reassigned it.
7346        assert_eq!(kernel.get_var("LOCAL_FOO").await, None);
7347    }
7348
7349    #[tokio::test]
7350    async fn test_external_command_sees_exported_var() {
7351        let kernel = Kernel::transient().expect("failed to create kernel");
7352        let result = kernel
7353            .execute("export EXT_FOO=bar; printenv EXT_FOO")
7354            .await
7355            .expect("execute failed");
7356
7357        assert!(result.ok(), "printenv should succeed: stderr={}", result.err);
7358        assert_eq!(result.text_out().trim(), "bar");
7359    }
7360
7361    #[tokio::test]
7362    async fn test_external_command_does_not_see_unexported_var() {
7363        let kernel = Kernel::transient().expect("failed to create kernel");
7364
7365        // Set without exporting; printenv must not see it (exit code != 0,
7366        // empty stdout per printenv semantics).
7367        let result = kernel
7368            .execute("EXT_BAR=hidden; printenv EXT_BAR")
7369            .await
7370            .expect("execute failed");
7371
7372        assert!(!result.ok(), "printenv should fail when var is unexported");
7373        assert!(
7374            result.text_out().trim().is_empty(),
7375            "no stdout when var is missing, got: {}",
7376            result.text_out()
7377        );
7378    }
7379
7380    #[tokio::test]
7381    async fn test_external_command_does_not_see_os_env() {
7382        // The kernel is hermetic: it never reads std::env::vars() and only
7383        // exports what it has been told to export. Cargo always sets PATH for
7384        // tests, so PATH is reliably present in the OS env — but a transient
7385        // kernel doesn't seed it into initial_vars, so `printenv PATH` from
7386        // inside the kernel must fail.
7387        assert!(
7388            std::env::var_os("PATH").is_some(),
7389            "test precondition: cargo should set PATH"
7390        );
7391
7392        let kernel = Kernel::transient().expect("failed to create kernel");
7393        let result = kernel
7394            .execute("printenv PATH")
7395            .await
7396            .expect("execute failed");
7397
7398        assert!(
7399            !result.ok(),
7400            "printenv PATH must fail in hermetic kernel, got stdout={:?}",
7401            result.text_out()
7402        );
7403        assert!(
7404            result.text_out().trim().is_empty(),
7405            "no PATH in subprocess env, got stdout={:?}",
7406            result.text_out()
7407        );
7408    }
7409
7410    #[tokio::test]
7411    async fn test_execute_with_vars_overlay_reaches_subprocess() {
7412        let kernel = Kernel::transient().expect("failed to create kernel");
7413        let mut overlay = HashMap::new();
7414        overlay.insert("SUB_FOO".to_string(), Value::String("subproc".into()));
7415
7416        let result = kernel
7417            .execute_with_options("printenv SUB_FOO", ExecuteOptions::new().with_vars(overlay))
7418            .await
7419            .expect("execute failed");
7420
7421        assert!(
7422            result.ok(),
7423            "printenv should succeed: code={} stdout={:?} stderr={:?}",
7424            result.code,
7425            result.text_out(),
7426            result.err
7427        );
7428        assert_eq!(result.text_out().trim(), "subproc");
7429    }
7430}