Skip to main content

kaish_kernel/
kernel.rs

1//! The Kernel (核) — the heart of kaish.
2//!
3//! The Kernel owns and coordinates all core components:
4//! - Interpreter state (scope, $?)
5//! - Tool registry (builtins, user tools)
6//! - VFS router (mount points)
7//! - Job manager (background jobs)
8//!
9//! # Architecture
10//!
11//! ```text
12//! ┌────────────────────────────────────────────────────────────┐
13//! │                         Kernel (核)                         │
14//! │  ┌──────────────┐  ┌──────────────┐  ┌──────────────────┐  │
15//! │  │   Scope      │  │ ToolRegistry │  │  VfsRouter       │  │
16//! │  │  (variables) │  │  (builtins,  │  │  (mount points)  │  │
17//! │  │              │  │   user tools)│  │                  │  │
18//! │  └──────────────┘  └──────────────┘  └──────────────────┘  │
19//! │  ┌──────────────────────────────┐  ┌──────────────────┐    │
20//! │  │  JobManager (background)     │  │  ExecResult ($?) │    │
21//! │  └──────────────────────────────┘  └──────────────────┘    │
22//! └────────────────────────────────────────────────────────────┘
23//! ```
24
25use std::collections::HashMap;
26use std::path::PathBuf;
27use std::sync::Arc;
28use std::sync::atomic::{AtomicU64, Ordering};
29use std::time::Duration;
30
31use anyhow::{Context, Result};
32use tokio::sync::RwLock;
33
34/// Monotonic counter assigned to each Kernel at construction time, exposed
35/// via `$$` / `${$}`. Starts at 1; each new Kernel gets the next value.
36/// `Kernel::fork()` inherits the parent's value (matching bash's "subshell
37/// keeps parent's $$" semantics) because forks clone the parent's Scope
38/// rather than calling `set_pid` again.
39///
40/// Deliberately *not* the OS PID — kaish runs as a long-lived MCP server
41/// or embedded inside other binaries (kaijutsu), where the host PID is
42/// meaningless to the script. See
43/// `~/.claude/projects/-home-atobey-src-kaish/memory/lang_dollar_dollar_identifier.md`
44/// for the design rationale.
45static KERNEL_COUNTER: AtomicU64 = AtomicU64::new(1);
46
47use async_trait::async_trait;
48
49use crate::ast::{Arg, Command, Expr, FileTestOp, Stmt, StringPart, TestExpr, ToolDef, Value, BinaryOp};
50pub use kaish_types::ExecuteOptions;
51use crate::backend::{BackendError, KernelBackend};
52use kaish_glob::glob_match;
53use crate::dispatch::{CommandDispatcher, PipelinePosition};
54use crate::interpreter::{apply_output_format, eval_expr, expand_tilde, json_to_value, value_to_bool, value_to_string, ControlFlow, ExecResult, Scope};
55use crate::parser::parse;
56use crate::scheduler::{is_bool_type, schema_param_lookup, select_leaf, stderr_stream, BoundedStream, JobManager, PipelineRunner, StderrReceiver};
57#[cfg(feature = "subprocess")]
58use crate::scheduler::{drain_to_stream, DEFAULT_STREAM_MAX_SIZE};
59use crate::tools::{register_builtins, ExecContext, GlobalFlags, ToolArgs, ToolRegistry};
60#[cfg(feature = "subprocess")]
61use crate::tools::resolve_in_path;
62use crate::validator::{Severity, Validator};
63#[cfg(feature = "localfs")]
64use crate::vfs::LocalFs;
65use crate::vfs::{BuiltinFs, DevFs, JobFs, MemoryFs, VfsRouter};
66use kaish_vfs::ByteBudget;
67#[cfg(all(feature = "localfs", feature = "overlay"))]
68use kaish_vfs::OverlayFs;
69
70/// VFS mount mode determines how the local filesystem is exposed.
71///
72/// Different modes trade off convenience vs. security:
73/// - `Passthrough` gives native path access (best for human REPL use)
74/// - `Sandboxed` restricts access to a subtree (safer for agents)
75/// - `NoLocal` provides complete isolation (tests, pure memory mode)
76#[derive(Debug, Clone)]
77pub enum VfsMountMode {
78    /// LocalFs at "/" — native paths work directly.
79    ///
80    /// Full filesystem access. Use for human-operated REPL sessions where
81    /// native paths like `/home/user/project` should just work.
82    ///
83    /// Mounts:
84    /// - `/` → LocalFs("/")
85    /// - `/v` → MemoryFs (blob storage)
86    #[cfg(feature = "localfs")]
87    Passthrough,
88
89    /// Transparent sandbox — paths look native but access is restricted.
90    ///
91    /// The local filesystem is mounted at its real path (e.g., `/home/user`),
92    /// so `/home/user/src/project` just works. But paths outside the sandbox
93    /// root are not accessible.
94    ///
95    /// **Note:** This only restricts VFS (builtin) operations. External commands
96    /// bypass the sandbox entirely — see [`KernelConfig::allow_external_commands`].
97    ///
98    /// Mounts:
99    /// - `/` → MemoryFs (catches paths outside sandbox)
100    /// - `{root}` → LocalFs(root)  (e.g., `/home/user` → LocalFs)
101    /// - `/tmp` → LocalFs("/tmp")
102    /// - `/dev` → DevFs (synthetic /dev/null, /dev/zero)
103    /// - `/v` → MemoryFs (blob storage)
104    #[cfg(feature = "localfs")]
105    Sandboxed {
106        /// Root path for local filesystem. Defaults to `$HOME`.
107        /// Can be restricted further, e.g., `~/src`.
108        root: Option<PathBuf>,
109    },
110
111    /// No local filesystem. Memory only.
112    ///
113    /// Complete isolation — no access to the host filesystem.
114    /// Useful for tests or pure sandboxed execution.
115    ///
116    /// Output spill is forced to [`SpillMode::Memory`](crate::output_limit::SpillMode::Memory)
117    /// for this mode at kernel construction: with no host filesystem mounted,
118    /// large output must not write a host spill file (`paths::spill_dir()`
119    /// bypasses the VFS). This overrides any explicit `SpillMode::Disk`.
120    ///
121    /// Mounts:
122    /// - `/` → MemoryFs
123    /// - `/tmp` → MemoryFs
124    /// - `/v` → MemoryFs
125    /// - `/dev` → DevFs (synthetic /dev/null, /dev/zero)
126    NoLocal,
127}
128
129#[allow(clippy::derivable_impls)] // native has multiple variants; not derivable cross-feature
130impl Default for VfsMountMode {
131    fn default() -> Self {
132        #[cfg(feature = "localfs")]
133        { VfsMountMode::Sandboxed { root: None } }
134        #[cfg(not(feature = "localfs"))]
135        { VfsMountMode::NoLocal }
136    }
137}
138
139/// Configuration for kernel initialization.
140#[derive(Debug, Clone)]
141pub struct KernelConfig {
142    /// Name of this kernel (for identification).
143    pub name: String,
144
145    /// VFS mount mode — controls how local filesystem is exposed.
146    pub vfs_mode: VfsMountMode,
147
148    /// Initial working directory (VFS path).
149    pub cwd: PathBuf,
150
151    /// Whether to skip pre-execution validation.
152    ///
153    /// When false (default), scripts are validated before execution to catch
154    /// errors early. Set to true to skip validation for performance or to
155    /// allow dynamic/external commands.
156    pub skip_validation: bool,
157
158    /// When true, standalone external commands inherit stdio for real-time output.
159    ///
160    /// Set by script runner and REPL for human-visible output.
161    /// Not set by MCP server (output must be captured for structured responses).
162    pub interactive: bool,
163
164    /// Ignore file configuration for file-walking tools.
165    pub ignore_config: crate::ignore_config::IgnoreConfig,
166
167    /// Output size limit configuration for agent safety.
168    pub output_limit: crate::output_limit::OutputLimitConfig,
169
170    /// Whether external command execution (PATH lookup, `exec`, `spawn`) is allowed.
171    ///
172    /// When `true` (default), commands not found as builtins are resolved via PATH
173    /// and executed as child processes. When `false`, only kaish builtins and
174    /// backend-registered tools are available.
175    ///
176    /// **Security:** External commands bypass the VFS sandbox entirely — they see
177    /// the real filesystem, network, and environment. Set to `false` when running
178    /// untrusted input.
179    pub allow_external_commands: bool,
180
181    /// Enable confirmation latch for dangerous operations (set -o latch).
182    ///
183    /// When enabled, destructive operations like `rm` require nonce confirmation.
184    /// Can also be enabled at runtime with `set -o latch` or via `KAISH_LATCH=1`.
185    pub latch_enabled: bool,
186
187    /// Enable trash-on-delete for rm (set -o trash).
188    ///
189    /// When enabled, small files are moved to freedesktop.org Trash instead of
190    /// being permanently deleted. Can also be enabled at runtime with `set -o trash`
191    /// or via `KAISH_TRASH=1`.
192    pub trash_enabled: bool,
193
194    /// Shared nonce store for cross-request confirmation latch.
195    ///
196    /// When `Some`, the kernel uses this store instead of creating a fresh one.
197    /// This allows nonces issued in one MCP `execute()` call to be validated
198    /// in a subsequent call. When `None` (default), a fresh store is created.
199    pub nonce_store: Option<crate::nonce::NonceStore>,
200
201    /// Variables to populate the root scope with at construction, all marked
202    /// for export to child processes.
203    ///
204    /// The kernel itself is hermetic — it never reads `std::env::vars()` —
205    /// so frontends that want OS-env passthrough (REPL, MCP) populate this
206    /// from `std::env::vars()`. Embedders that want isolation pass nothing
207    /// (or only the keys they curate).
208    pub initial_vars: HashMap<String, Value>,
209
210    /// Default per-request timeout. When `Some`, every `execute_with_options`
211    /// call without an explicit `ExecuteOptions::timeout` uses this duration.
212    /// When elapsed, the kernel cancels the request, kills any external
213    /// children with the configured grace, and returns exit code 124.
214    ///
215    /// `None` means no default timeout — only explicit per-call timeouts apply.
216    pub request_timeout: Option<Duration>,
217
218    /// Grace period between SIGTERM and SIGKILL when killing an external
219    /// child on cancellation or timeout.
220    ///
221    /// Defaults to 2 seconds. Set to `Duration::ZERO` to escalate immediately
222    /// to SIGKILL. Long-shutdown processes (databases, etc.) may need more.
223    pub kill_grace: Duration,
224
225    /// Cap on memory-resident bytes across all kernel-owned `MemoryFs` mounts.
226    ///
227    /// One shared `ByteBudget` (labeled `"vfs-memory"`) is created at kernel
228    /// construction and handed to every `MemoryFs` the kernel builds in
229    /// `setup_vfs` (Passthrough `/v`; Sandboxed `/` and `/v`; NoLocal `/`,
230    /// `/tmp`, `/v`). Writes that would exceed the cap fail loudly with
231    /// `StorageFull` — an in-band error a model reads and adapts to; fail
232    /// loud over quietly eating RAM.
233    ///
234    /// **Why MCP is bounded by default:** each `execute()` call creates a fresh
235    /// kernel (see `server/execute.rs`), so the 64 MiB cap is per-call, not
236    /// per-session. Embedders that know their workload needs more opt out with
237    /// `without_vfs_budget()` or raise the cap with `with_vfs_budget(bytes)` —
238    /// protection on by default, opt out knowingly. All other profiles default
239    /// to `None` (unbounded).
240    ///
241    /// Follows the same pattern as `OutputLimitConfig`: MCP bounded, rest unbounded.
242    pub vfs_budget_bytes: Option<u64>,
243
244    /// Enable copy-on-write overlay mode (opt-in).
245    ///
246    /// When `true`, the primary local filesystem mount is wrapped in an
247    /// `OverlayFs` so writes are virtual — the lower layer is never touched.
248    /// Use `kaish-vfs status/diff/commit/reset` to inspect and manage the
249    /// overlay transaction.
250    ///
251    /// **Passthrough:** `/` becomes `OverlayFs over LocalFs::read_only("/")`.
252    /// **Sandboxed{root}:** the `{root}` mount becomes
253    /// `OverlayFs over LocalFs::read_only(root)`; the `/tmp` and XDG runtime
254    /// mounts stay as real `LocalFs` (real writes escape the transaction —
255    /// see `docs/kaish-overlayfs.md` for the escape-hatch inventory).
256    /// **NoLocal:** incompatible — construction fails loudly (everything is
257    /// already virtual; an overlay adds no value and no lower layer to wrap).
258    /// **with_backend:** incompatible — the embedder controls the VFS; the
259    /// kernel cannot wrap it without bypassing the embedder's semantics.
260    ///
261    /// **Not default-on for MCP:** each `execute()` call gets a fresh kernel,
262    /// making the overlay a per-call transaction — `kaish-vfs commit` must run
263    /// in the same call as the writes, or the transaction is discarded on drop.
264    /// Frontends (REPL, MCP) expose `--overlay` as an explicit opt-in flag.
265    pub overlay: bool,
266}
267
268/// Get the default sandbox root ($HOME).
269#[cfg(feature = "localfs")]
270fn default_sandbox_root() -> PathBuf {
271    std::env::var("HOME")
272        .map(PathBuf::from)
273        .unwrap_or_else(|_| PathBuf::from("/"))
274}
275
276impl Default for KernelConfig {
277    fn default() -> Self {
278        #[cfg(feature = "localfs")]
279        {
280            let home = default_sandbox_root();
281            Self {
282                name: "default".to_string(),
283                vfs_mode: VfsMountMode::Sandboxed { root: None },
284                cwd: home,
285                skip_validation: false,
286                interactive: false,
287                ignore_config: crate::ignore_config::IgnoreConfig::none(),
288                output_limit: crate::output_limit::OutputLimitConfig::none(),
289                allow_external_commands: cfg!(feature = "subprocess"),
290                latch_enabled: std::env::var("KAISH_LATCH").is_ok_and(|v| v == "1"),
291                trash_enabled: std::env::var("KAISH_TRASH").is_ok_and(|v| v == "1"),
292                nonce_store: None,
293                initial_vars: HashMap::new(),
294                request_timeout: None,
295                kill_grace: Duration::from_secs(2),
296                vfs_budget_bytes: None,
297                overlay: false,
298            }
299        }
300        #[cfg(not(feature = "localfs"))]
301        {
302            Self {
303                name: "default".to_string(),
304                vfs_mode: VfsMountMode::NoLocal,
305                cwd: PathBuf::from("/"),
306                skip_validation: false,
307                interactive: false,
308                ignore_config: crate::ignore_config::IgnoreConfig::none(),
309                output_limit: crate::output_limit::OutputLimitConfig::none(),
310                allow_external_commands: false,
311                latch_enabled: false,
312                trash_enabled: false,
313                nonce_store: None,
314                initial_vars: HashMap::new(),
315                request_timeout: None,
316                kill_grace: Duration::from_secs(2),
317                vfs_budget_bytes: None,
318                overlay: false,
319            }
320        }
321    }
322}
323
324impl KernelConfig {
325    /// Create a transient kernel config (sandboxed, for temporary use).
326    #[cfg(feature = "localfs")]
327    pub fn transient() -> Self {
328        let home = default_sandbox_root();
329        Self {
330            name: "transient".to_string(),
331            vfs_mode: VfsMountMode::Sandboxed { root: None },
332            cwd: home,
333            skip_validation: false,
334            interactive: false,
335            ignore_config: crate::ignore_config::IgnoreConfig::none(),
336            output_limit: crate::output_limit::OutputLimitConfig::none(),
337            allow_external_commands: cfg!(feature = "subprocess"),
338            latch_enabled: false,
339            trash_enabled: false,
340            nonce_store: None,
341            initial_vars: HashMap::new(),
342            request_timeout: None,
343            kill_grace: Duration::from_secs(2),
344            vfs_budget_bytes: None,
345            overlay: false,
346        }
347    }
348
349    /// Create a transient kernel config (isolated, no-default-features).
350    #[cfg(not(feature = "localfs"))]
351    pub fn transient() -> Self {
352        Self::isolated()
353    }
354
355    /// Create a kernel config with the given name (sandboxed by default).
356    #[cfg(feature = "localfs")]
357    pub fn named(name: &str) -> Self {
358        let home = default_sandbox_root();
359        Self {
360            name: name.to_string(),
361            vfs_mode: VfsMountMode::Sandboxed { root: None },
362            cwd: home,
363            skip_validation: false,
364            interactive: false,
365            ignore_config: crate::ignore_config::IgnoreConfig::none(),
366            output_limit: crate::output_limit::OutputLimitConfig::none(),
367            allow_external_commands: cfg!(feature = "subprocess"),
368            latch_enabled: false,
369            trash_enabled: false,
370            nonce_store: None,
371            initial_vars: HashMap::new(),
372            request_timeout: None,
373            kill_grace: Duration::from_secs(2),
374            vfs_budget_bytes: None,
375            overlay: false,
376        }
377    }
378
379    /// Create a kernel config with the given name (isolated, no-default-features).
380    #[cfg(not(feature = "localfs"))]
381    pub fn named(name: &str) -> Self {
382        Self {
383            name: name.to_string(),
384            ..Self::isolated()
385        }
386    }
387
388    /// Create a REPL config with passthrough filesystem access.
389    ///
390    /// Native paths like `/home/user/project` work directly.
391    /// The cwd is set to the actual current working directory.
392    #[cfg(feature = "localfs")]
393    pub fn repl() -> Self {
394        let cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("/"));
395        Self {
396            name: "repl".to_string(),
397            vfs_mode: VfsMountMode::Passthrough,
398            cwd,
399            skip_validation: false,
400            interactive: false,
401            ignore_config: crate::ignore_config::IgnoreConfig::none(),
402            output_limit: crate::output_limit::OutputLimitConfig::none(),
403            allow_external_commands: cfg!(feature = "subprocess"),
404            latch_enabled: std::env::var("KAISH_LATCH").is_ok_and(|v| v == "1"),
405            trash_enabled: std::env::var("KAISH_TRASH").is_ok_and(|v| v == "1"),
406            nonce_store: None,
407            initial_vars: HashMap::new(),
408            request_timeout: None,
409            kill_grace: Duration::from_secs(2),
410            vfs_budget_bytes: None,
411            overlay: false,
412        }
413    }
414
415    /// Create an MCP server config with sandboxed filesystem access.
416    ///
417    /// Local filesystem is accessible at its real path (e.g., `/home/user`),
418    /// but sandboxed to `$HOME`. Paths outside the sandbox are not accessible
419    /// through builtins. External commands still access the real filesystem —
420    /// use `.with_allow_external_commands(false)` to block them.
421    ///
422    /// VFS memory is bounded at 64 MiB per `execute()` call by default
423    /// (MCP creates a fresh kernel per call). Raise or remove with
424    /// `with_vfs_budget` / `without_vfs_budget`.
425    #[cfg(feature = "localfs")]
426    pub fn mcp() -> Self {
427        let home = default_sandbox_root();
428        Self {
429            name: "mcp".to_string(),
430            vfs_mode: VfsMountMode::Sandboxed { root: None },
431            cwd: home,
432            skip_validation: false,
433            interactive: false,
434            ignore_config: crate::ignore_config::IgnoreConfig::mcp(),
435            output_limit: crate::output_limit::OutputLimitConfig::mcp(),
436            allow_external_commands: cfg!(feature = "subprocess"),
437            latch_enabled: std::env::var("KAISH_LATCH").is_ok_and(|v| v == "1"),
438            trash_enabled: std::env::var("KAISH_TRASH").is_ok_and(|v| v == "1"),
439            nonce_store: None,
440            initial_vars: HashMap::new(),
441            request_timeout: None,
442            kill_grace: Duration::from_secs(2),
443            vfs_budget_bytes: Some(64 * 1024 * 1024),
444            overlay: false,
445        }
446    }
447
448    /// Create an MCP server config with a custom sandbox root.
449    ///
450    /// Use this to restrict access to a subdirectory like `~/src`.
451    ///
452    /// VFS memory is bounded at 64 MiB per `execute()` call by default.
453    /// Raise or remove with `with_vfs_budget` / `without_vfs_budget`.
454    #[cfg(feature = "localfs")]
455    pub fn mcp_with_root(root: PathBuf) -> Self {
456        Self {
457            name: "mcp".to_string(),
458            vfs_mode: VfsMountMode::Sandboxed { root: Some(root.clone()) },
459            cwd: root,
460            skip_validation: false,
461            interactive: false,
462            ignore_config: crate::ignore_config::IgnoreConfig::mcp(),
463            output_limit: crate::output_limit::OutputLimitConfig::mcp(),
464            allow_external_commands: cfg!(feature = "subprocess"),
465            latch_enabled: std::env::var("KAISH_LATCH").is_ok_and(|v| v == "1"),
466            trash_enabled: std::env::var("KAISH_TRASH").is_ok_and(|v| v == "1"),
467            nonce_store: None,
468            initial_vars: HashMap::new(),
469            request_timeout: None,
470            kill_grace: Duration::from_secs(2),
471            vfs_budget_bytes: Some(64 * 1024 * 1024),
472            overlay: false,
473        }
474    }
475
476    /// Create a config with no local filesystem (memory only).
477    ///
478    /// Complete isolation: no local filesystem and external commands are disabled.
479    /// Useful for tests or pure sandboxed execution.
480    pub fn isolated() -> Self {
481        Self {
482            name: "isolated".to_string(),
483            vfs_mode: VfsMountMode::NoLocal,
484            cwd: PathBuf::from("/"),
485            skip_validation: false,
486            interactive: false,
487            ignore_config: crate::ignore_config::IgnoreConfig::none(),
488            output_limit: crate::output_limit::OutputLimitConfig::none(),
489            allow_external_commands: false,
490            latch_enabled: false,
491            trash_enabled: false,
492            nonce_store: None,
493            initial_vars: HashMap::new(),
494            request_timeout: None,
495            kill_grace: Duration::from_secs(2),
496            vfs_budget_bytes: None,
497            overlay: false,
498        }
499    }
500
501    /// Set the VFS mount mode.
502    pub fn with_vfs_mode(mut self, mode: VfsMountMode) -> Self {
503        self.vfs_mode = mode;
504        self
505    }
506
507    /// Set the initial working directory.
508    pub fn with_cwd(mut self, cwd: PathBuf) -> Self {
509        self.cwd = cwd;
510        self
511    }
512
513    /// Skip pre-execution validation.
514    pub fn with_skip_validation(mut self, skip: bool) -> Self {
515        self.skip_validation = skip;
516        self
517    }
518
519    /// Enable interactive mode (external commands inherit stdio).
520    pub fn with_interactive(mut self, interactive: bool) -> Self {
521        self.interactive = interactive;
522        self
523    }
524
525    /// Set the ignore file configuration.
526    pub fn with_ignore_config(mut self, config: crate::ignore_config::IgnoreConfig) -> Self {
527        self.ignore_config = config;
528        self
529    }
530
531    /// Set the output limit configuration.
532    pub fn with_output_limit(mut self, config: crate::output_limit::OutputLimitConfig) -> Self {
533        self.output_limit = config;
534        self
535    }
536
537    /// Set whether external command execution is allowed.
538    ///
539    /// When `false`, commands not found as builtins produce "command not found"
540    /// instead of searching PATH. The `exec` and `spawn` builtins also return
541    /// errors. Use this to prevent VFS sandbox bypass via external binaries.
542    pub fn with_allow_external_commands(mut self, allow: bool) -> Self {
543        self.allow_external_commands = allow;
544        self
545    }
546
547    /// Enable or disable confirmation latch at startup.
548    pub fn with_latch(mut self, enabled: bool) -> Self {
549        self.latch_enabled = enabled;
550        self
551    }
552
553    /// Enable or disable trash-on-delete at startup.
554    pub fn with_trash(mut self, enabled: bool) -> Self {
555        self.trash_enabled = enabled;
556        self
557    }
558
559    /// Use a shared nonce store for cross-request confirmation latch.
560    ///
561    /// Pass a `NonceStore` that outlives individual kernel instances so nonces
562    /// issued in one MCP `execute()` call can be validated in subsequent calls.
563    pub fn with_nonce_store(mut self, store: crate::nonce::NonceStore) -> Self {
564        self.nonce_store = Some(store);
565        self
566    }
567
568    /// Add a single initial variable; marked exported when the kernel boots.
569    ///
570    /// Repeated calls add (last write wins on key collision).
571    pub fn with_var(mut self, name: impl Into<String>, value: Value) -> Self {
572        self.initial_vars.insert(name.into(), value);
573        self
574    }
575
576    /// Replace the entire initial-vars map. All entries are marked exported.
577    pub fn with_initial_vars(mut self, vars: HashMap<String, Value>) -> Self {
578        self.initial_vars = vars;
579        self
580    }
581
582    /// Extend the initial-vars map with the given entries (last write wins).
583    pub fn with_vars(mut self, vars: HashMap<String, Value>) -> Self {
584        self.initial_vars.extend(vars);
585        self
586    }
587
588    /// Set the default per-request timeout (kernel-wide).
589    ///
590    /// Each `execute_with_options` call without an explicit timeout uses
591    /// this. On elapsed, the kernel cancels and returns exit code 124.
592    pub fn with_request_timeout(mut self, timeout: Duration) -> Self {
593        self.request_timeout = Some(timeout);
594        self
595    }
596
597    /// Set the SIGTERM-to-SIGKILL grace period for child kills.
598    pub fn with_kill_grace(mut self, grace: Duration) -> Self {
599        self.kill_grace = grace;
600        self
601    }
602
603    /// Cap VFS memory-resident bytes at `bytes` across all kernel-owned
604    /// `MemoryFs` mounts. A shared `ByteBudget` labeled `"vfs-memory"` is
605    /// created at kernel construction and passed to every `MemoryFs` the
606    /// kernel builds (see `setup_vfs` and `with_backend`).
607    ///
608    /// Writes that would exceed the cap fail loudly with `StorageFull` — an
609    /// in-band error a model reads and adapts to; fail loud over quietly eating
610    /// RAM. Use `without_vfs_budget` to remove the cap entirely.
611    pub fn with_vfs_budget(mut self, bytes: u64) -> Self {
612        self.vfs_budget_bytes = Some(bytes);
613        self
614    }
615
616    /// Remove the VFS memory budget — all `MemoryFs` mounts are unbounded.
617    ///
618    /// Use when the caller knows the workload and the default 64 MiB cap
619    /// (set by `KernelConfig::mcp`) is too conservative.
620    pub fn without_vfs_budget(mut self) -> Self {
621        self.vfs_budget_bytes = None;
622        self
623    }
624
625    /// Enable or disable copy-on-write overlay mode.
626    ///
627    /// When `true`, the primary local filesystem mount is wrapped in an
628    /// `OverlayFs` so writes are virtual — the lower layer is never touched.
629    /// Incompatible with `VfsMountMode::NoLocal` (fails loudly at construction)
630    /// and `with_backend` kernels (same — the embedder controls the VFS).
631    pub fn with_overlay(mut self, overlay: bool) -> Self {
632        self.overlay = overlay;
633        self
634    }
635}
636
637/// Handle to an active overlay session, kept on the kernel and shared to
638/// `ExecContext` so the `kaish-vfs` builtin can reach the `OverlayFs`.
639///
640/// The `mount_path` is the VFS prefix the overlay was mounted under (e.g.
641/// `/home/user`); `commit_root` is the real filesystem path the overlay's
642/// lower is backed by (used as the target for `kaish-vfs commit`).
643#[cfg(all(feature = "localfs", feature = "overlay"))]
644#[derive(Clone)]
645pub struct OverlayHandle {
646    /// The mounted `OverlayFs`, Arc-shared so the builtin can call inspection
647    /// methods without holding a VfsRouter lock.
648    pub fs: Arc<OverlayFs>,
649    /// VFS path this overlay is mounted at (e.g. `/home/user`).
650    pub mount_path: PathBuf,
651    /// Real filesystem root to commit into. Same as the lower's root.
652    pub commit_root: PathBuf,
653}
654
655/// The Kernel (核) — executes kaish code.
656///
657/// This is the primary interface for running kaish commands. It owns all
658/// the runtime state: variables, tools, VFS, jobs, and persistence.
659pub struct Kernel {
660    /// Kernel name.
661    name: String,
662    /// Variable scope.
663    scope: RwLock<Scope>,
664    /// Tool registry.
665    tools: Arc<ToolRegistry>,
666    /// User-defined tools (from `tool name { body }` statements).
667    user_tools: RwLock<HashMap<String, ToolDef>>,
668    /// Virtual filesystem router.
669    vfs: Arc<VfsRouter>,
670    /// Background job manager.
671    jobs: Arc<JobManager>,
672    /// Pipeline runner.
673    runner: PipelineRunner,
674    /// Execution context (cwd, stdin, etc.).
675    exec_ctx: RwLock<ExecContext>,
676    /// Whether to skip pre-execution validation.
677    skip_validation: bool,
678    /// When true, standalone external commands inherit stdio for real-time output.
679    interactive: bool,
680    /// Whether external command execution is allowed.
681    allow_external_commands: bool,
682    /// Shared memory budget for all kernel-owned `MemoryFs` mounts.
683    ///
684    /// `None` when `KernelConfig::vfs_budget_bytes` was `None` (unbounded).
685    /// `Some` is Arc-cloned into forks so all concurrent execution draws from
686    /// the same pool — a background job's writes reduce the same cap as
687    /// foreground writes, which is the correct behaviour.
688    vfs_budget: Option<Arc<kaish_vfs::ByteBudget>>,
689    /// Active overlay session handle, if this kernel was constructed with
690    /// `overlay: true`. Arc-shared so `ExecContext` (and thus the
691    /// `kaish-vfs` builtin) can inspect and mutate the overlay without
692    /// holding a kernel write lock. Propagated to forks via `fork_inner`
693    /// and `child_for_pipeline` so `kaish-vfs` works inside background
694    /// jobs, scatter workers, and pipeline stages.
695    #[cfg(all(feature = "localfs", feature = "overlay"))]
696    overlay_handle: Option<Arc<OverlayHandle>>,
697    /// Default per-request timeout (None = no default).
698    request_timeout: Option<Duration>,
699    /// SIGTERM-to-SIGKILL grace period for child kills.
700    kill_grace: Duration,
701    /// Receiver for the kernel stderr stream.
702    ///
703    /// Pipeline stages write to the corresponding `StderrStream` (set on ExecContext).
704    /// The kernel drains this after each statement in `execute_streaming`.
705    stderr_receiver: tokio::sync::Mutex<StderrReceiver>,
706    /// Cancellation token for interrupting execution (Ctrl-C).
707    ///
708    /// Protected by `std::sync::Mutex` (not tokio) because the SIGINT handler
709    /// needs sync access. Each `execute()` call gets a fresh child token;
710    /// `cancel()` cancels the current token and replaces it.
711    cancel_token: std::sync::Mutex<tokio_util::sync::CancellationToken>,
712    /// Terminal state for job control (interactive mode only, Unix only).
713    #[cfg(all(unix, feature = "subprocess"))]
714    terminal_state: Option<Arc<crate::terminal::TerminalState>>,
715    /// Weak self-reference for handing out `Arc<dyn CommandDispatcher>`.
716    ///
717    /// Set by `into_arc()`. Allows builtins to re-dispatch inner commands
718    /// through the full Kernel resolution chain.
719    self_weak: std::sync::OnceLock<std::sync::Weak<Self>>,
720    /// Background job this kernel (a fork) is executing on behalf of, if any.
721    /// Set on the fork created by `execute_background` and inherited by all its
722    /// sub-forks (pipeline stages, scatter workers), so an external command
723    /// spawned anywhere under a background job can record its process group on
724    /// that job for `kill -<sig> %N`. `None` for foreground execution.
725    bg_job_id: Option<crate::scheduler::JobId>,
726    /// Serializes concurrent `execute()` / `execute_streaming()` callers on
727    /// this Kernel instance. Tokio's Mutex is fair (FIFO) and acts as the
728    /// queue. Background jobs, scatter workers, and concurrent pipeline
729    /// stages do NOT take this lock — they run against a *forked* Kernel
730    /// (see [`Kernel::fork`]) so they never contend with the foreground.
731    execute_lock: tokio::sync::Mutex<()>,
732}
733
734/// Internal result of [`Kernel::setup_vfs`].
735struct VfsSetupResult {
736    vfs: VfsRouter,
737    budget: Option<Arc<ByteBudget>>,
738    #[cfg(all(feature = "localfs", feature = "overlay"))]
739    overlay_handle: Option<Arc<OverlayHandle>>,
740}
741
742impl Kernel {
743    /// Create a new kernel with the given configuration.
744    pub fn new(config: KernelConfig) -> Result<Self> {
745        let mut setup = Self::setup_vfs(&config)?;
746        let jobs = Arc::new(JobManager::new());
747
748        // Mount JobFs for job observability at /v/jobs
749        setup.vfs.mount("/v/jobs", JobFs::new(jobs.clone()));
750
751        #[cfg(all(feature = "localfs", feature = "overlay"))]
752        let overlay_handle = setup.overlay_handle.take();
753
754        // Mode-based construction: the kernel owns its host mounts, so whether
755        // host side channels are allowed is decided by the VFS mode inside
756        // `assemble` (NoLocal forbids them).
757        let kernel = Self::assemble(config, setup.vfs, jobs, false, setup.budget, |_| {}, |vfs_ref, tools| {
758            ExecContext::with_vfs_and_tools(vfs_ref.clone(), tools.clone())
759        })?;
760
761        #[cfg(all(feature = "localfs", feature = "overlay"))]
762        {
763            let mut kernel = kernel;
764            kernel.overlay_handle = overlay_handle;
765            // Also set it on the ExecContext so builtins can access it.
766            if let Some(ref handle) = kernel.overlay_handle {
767                kernel.exec_ctx.get_mut().overlay_handle = Some(Arc::clone(handle));
768            }
769            return Ok(kernel);
770        }
771
772        #[allow(unreachable_code)]
773        Ok(kernel)
774    }
775
776    /// Set up VFS based on mount mode.
777    ///
778    /// Returns the router, the budget handle (if bounded), and an optional
779    /// overlay handle when `config.overlay` is true. The budget is Arc-shared:
780    /// every `MemoryFs` the kernel creates here holds a clone of the same
781    /// `Arc<ByteBudget>`, so the total charged against it is the sum of all
782    /// in-memory content across all kernel-owned memory mounts.
783    ///
784    /// # Errors
785    /// Returns `Err` if `config.overlay` is true and the mode is `NoLocal`
786    /// (overlay is meaningless when everything is already virtual — there is
787    /// no real lower layer to wrap). The caller (`Kernel::new`) propagates
788    /// this as an `anyhow::Error`.
789    fn setup_vfs(config: &KernelConfig) -> Result<VfsSetupResult> {
790        let mut vfs = VfsRouter::new();
791
792        // One budget for all memory mounts this kernel owns — labeled so the
793        // error message tells the user exactly which knob to raise.
794        let budget: Option<Arc<ByteBudget>> = config
795            .vfs_budget_bytes
796            .map(|bytes| Arc::new(ByteBudget::labeled(bytes, "vfs-memory")));
797
798        /// Helper: construct a `MemoryFs` wired to `budget` if present.
799        fn mem(budget: &Option<Arc<ByteBudget>>) -> MemoryFs {
800            match budget {
801                Some(b) => MemoryFs::with_budget(Arc::clone(b)),
802                None => MemoryFs::new(),
803            }
804        }
805
806        // Overlay handle — populated below if config.overlay is true.
807        #[cfg(all(feature = "localfs", feature = "overlay"))]
808        let mut overlay_handle: Option<Arc<OverlayHandle>> = None;
809
810        match &config.vfs_mode {
811            #[cfg(feature = "localfs")]
812            VfsMountMode::Passthrough => {
813                #[cfg(feature = "overlay")]
814                if config.overlay {
815                    // Wrap "/" in an OverlayFs so writes are virtual.
816                    let lower = Arc::new(LocalFs::read_only(PathBuf::from("/")));
817                    let overlay_fs = Arc::new(match &budget {
818                        Some(b) => OverlayFs::over_with_budget(lower, Arc::clone(b)),
819                        None => OverlayFs::over(lower),
820                    });
821                    let handle = Arc::new(OverlayHandle {
822                        fs: Arc::clone(&overlay_fs),
823                        mount_path: PathBuf::from("/"),
824                        commit_root: PathBuf::from("/"),
825                    });
826                    vfs.mount_arc("/", overlay_fs as Arc<dyn kaish_vfs::Filesystem>);
827                    overlay_handle = Some(handle);
828                } else {
829                    // LocalFs at "/" — native paths work directly
830                    vfs.mount("/", LocalFs::new(PathBuf::from("/")));
831                }
832                #[cfg(not(feature = "overlay"))]
833                {
834                    if config.overlay {
835                        return Err(anyhow::anyhow!(
836                            "overlay=true requires the `overlay` feature, but this build \
837                             was compiled without it. Recompile with --features overlay \
838                             (or the default feature set) to enable overlay mode."
839                        ));
840                    }
841                    // LocalFs at "/" — native paths work directly
842                    vfs.mount("/", LocalFs::new(PathBuf::from("/")));
843                }
844                // Memory for blobs
845                vfs.mount("/v", mem(&budget));
846            }
847            #[cfg(feature = "localfs")]
848            VfsMountMode::Sandboxed { root } => {
849                // Memory at root for safety (catches paths outside sandbox).
850                // Note: /tmp and the XDG runtime dir are LocalFs — writes
851                // there escape the VFS budget and are NOT virtual. This is
852                // intentional: /tmp interop with other processes matters more
853                // than accounting for scratch files there.
854                vfs.mount("/", mem(&budget));
855                vfs.mount("/v", mem(&budget));
856
857                // Synthetic /dev: the host's real /dev isn't reachable here, so
858                // /dev/null and /dev/zero are software-backed (see DevFs).
859                vfs.mount("/dev", DevFs::new());
860
861                // Real /tmp for interop with other processes
862                vfs.mount("/tmp", LocalFs::new(PathBuf::from("/tmp")));
863
864                // Mount XDG runtime dir for spill files and socket access
865                let runtime = crate::paths::xdg_runtime_dir();
866                if runtime.exists() {
867                    let runtime_str = runtime.to_string_lossy().to_string();
868                    vfs.mount(&runtime_str, LocalFs::new(runtime));
869                }
870
871                // Resolve the sandbox root (defaults to $HOME)
872                let local_root = root.clone().unwrap_or_else(|| {
873                    std::env::var("HOME")
874                        .map(PathBuf::from)
875                        .unwrap_or_else(|_| PathBuf::from("/"))
876                });
877
878                let mount_point = local_root.to_string_lossy().to_string();
879
880                #[cfg(feature = "overlay")]
881                if config.overlay {
882                    // Wrap the sandbox root in an OverlayFs.
883                    let lower = Arc::new(LocalFs::read_only(local_root.clone()));
884                    let overlay_fs = Arc::new(match &budget {
885                        Some(b) => OverlayFs::over_with_budget(lower, Arc::clone(b)),
886                        None => OverlayFs::over(lower),
887                    });
888                    let handle = Arc::new(OverlayHandle {
889                        fs: Arc::clone(&overlay_fs),
890                        mount_path: PathBuf::from(&mount_point),
891                        commit_root: local_root,
892                    });
893                    vfs.mount_arc(&mount_point, overlay_fs as Arc<dyn kaish_vfs::Filesystem>);
894                    overlay_handle = Some(handle);
895                } else {
896                    // Mount at the real path for transparent access
897                    // e.g., /home/atobey → LocalFs("/home/atobey")
898                    // so /home/atobey/src/kaish just works
899                    vfs.mount(&mount_point, LocalFs::new(local_root));
900                }
901                #[cfg(not(feature = "overlay"))]
902                {
903                    if config.overlay {
904                        return Err(anyhow::anyhow!(
905                            "overlay=true requires the `overlay` feature, but this build \
906                             was compiled without it. Recompile with --features overlay \
907                             (or the default feature set) to enable overlay mode."
908                        ));
909                    }
910                    // Mount at the real path for transparent access
911                    vfs.mount(&mount_point, LocalFs::new(local_root));
912                }
913            }
914            VfsMountMode::NoLocal => {
915                if config.overlay {
916                    return Err(anyhow::anyhow!(
917                        "overlay=true is incompatible with VfsMountMode::NoLocal: \
918                         everything is already virtual, there is no real lower layer \
919                         to wrap. Use with_overlay(false) or switch to a Passthrough \
920                         or Sandboxed VFS mode."
921                    ));
922                }
923                // Pure memory mode — no local filesystem
924                vfs.mount("/", mem(&budget));
925                vfs.mount("/tmp", mem(&budget));
926                vfs.mount("/v", mem(&budget));
927                // Synthetic /dev so /dev/null and /dev/zero work hermetically.
928                vfs.mount("/dev", DevFs::new());
929            }
930        }
931
932        Ok(VfsSetupResult {
933            vfs,
934            budget,
935            #[cfg(all(feature = "localfs", feature = "overlay"))]
936            overlay_handle,
937        })
938    }
939
940    /// Create a transient kernel (no persistence).
941    pub fn transient() -> Result<Self> {
942        Self::new(KernelConfig::transient())
943    }
944
945    /// Create a kernel with a custom backend and `/v/*` virtual path support.
946    ///
947    /// This is the constructor for embedding kaish in other systems that provide
948    /// their own storage backend (e.g., CRDT-backed storage in kaijutsu).
949    ///
950    /// A `VirtualOverlayBackend` routes paths automatically:
951    /// - `/v/*` → Internal VFS (JobFs at `/v/jobs`, MemoryFs at `/v/blobs`)
952    /// - Everything else → Your custom backend
953    ///
954    /// The optional `configure_vfs` closure lets you add additional virtual mounts
955    /// (e.g., `/v/docs` for CRDT blocks) after the built-in mounts are set up.
956    ///
957    /// **Note:** The config's `vfs_mode` is ignored — all non-`/v/*` path routing
958    /// is handled by your custom backend. The config is only used for `name`, `cwd`,
959    /// `skip_validation`, and `interactive`.
960    ///
961    /// # Example
962    ///
963    /// ```ignore
964    /// // Simple: default /v/* mounts only
965    /// let kernel = Kernel::with_backend(backend, config, |_| {}, |_| {})?;
966    ///
967    /// // With custom mounts
968    /// let kernel = Kernel::with_backend(backend, config, |vfs| {
969    ///     vfs.mount_arc("/v/docs", docs_fs);
970    ///     vfs.mount_arc("/v/g", git_fs);
971    /// }, |_| {})?;
972    ///
973    /// // With custom tools
974    /// let kernel = Kernel::with_backend(backend, config, |_| {}, |tools| {
975    ///     tools.register(MyCustomTool::new());
976    /// })?;
977    /// ```
978    pub fn with_backend(
979        backend: Arc<dyn KernelBackend>,
980        config: KernelConfig,
981        configure_vfs: impl FnOnce(&mut VfsRouter),
982        configure_tools: impl FnOnce(&mut ToolRegistry),
983    ) -> Result<Self> {
984        use crate::backend::VirtualOverlayBackend;
985
986        // overlay=true is incompatible with with_backend: the embedder controls
987        // the VFS and the kernel cannot wrap it without bypassing the embedder's
988        // semantics. Fail loudly rather than silently ignoring the flag.
989        if config.overlay {
990            return Err(anyhow::anyhow!(
991                "overlay=true is incompatible with Kernel::with_backend: the embedder \
992                 controls the VFS; the kernel cannot wrap it with an OverlayFs without \
993                 bypassing the embedder's storage semantics. Use KernelConfig::with_overlay(false)."
994            ));
995        }
996
997        let mut vfs = VfsRouter::new();
998        let jobs = Arc::new(JobManager::new());
999
1000        // Create the budget from config so `with_vfs_budget` / `without_vfs_budget`
1001        // work for `with_backend` callers too. The /v/blobs MemoryFs is the only
1002        // kernel-owned memory mount here — embedders own the rest of the VFS.
1003        let vfs_budget: Option<Arc<ByteBudget>> = config
1004            .vfs_budget_bytes
1005            .map(|bytes| Arc::new(ByteBudget::labeled(bytes, "vfs-memory")));
1006
1007        vfs.mount("/v/jobs", JobFs::new(jobs.clone()));
1008        let blobs_fs = match &vfs_budget {
1009            Some(b) => MemoryFs::with_budget(Arc::clone(b)),
1010            None => MemoryFs::new(),
1011        };
1012        vfs.mount("/v/blobs", blobs_fs);
1013
1014        // Let caller add custom mounts (e.g., /v/docs, /v/g)
1015        configure_vfs(&mut vfs);
1016
1017        // A custom-backend kernel owns no host mounts — the embedder supplies
1018        // the entire VFS — so any kernel write to a host filesystem via
1019        // `std::fs` (output spill, job output files) bypasses that VFS and its
1020        // read-only guarantees. Forbid host side channels unconditionally.
1021        Self::assemble(config, vfs, jobs, true, vfs_budget, configure_tools, |vfs_arc: &Arc<VfsRouter>, _: &Arc<ToolRegistry>| {
1022            let overlay: Arc<dyn KernelBackend> =
1023                Arc::new(VirtualOverlayBackend::new(backend, vfs_arc.clone()));
1024            ExecContext::with_backend(overlay)
1025        })
1026    }
1027
1028    /// Shared assembly: wires up tools, runner, scope, and ExecContext.
1029    ///
1030    /// The `make_ctx` closure receives the VFS and tools so backends that need
1031    /// them (like `LocalBackend::with_tools`) can capture them. Custom backends
1032    /// that already have their own storage can ignore these parameters.
1033    fn assemble(
1034        config: KernelConfig,
1035        mut vfs: VfsRouter,
1036        jobs: Arc<JobManager>,
1037        no_host_filesystem: bool,
1038        vfs_budget: Option<Arc<ByteBudget>>,
1039        configure_tools: impl FnOnce(&mut ToolRegistry),
1040        make_ctx: impl FnOnce(&Arc<VfsRouter>, &Arc<ToolRegistry>) -> ExecContext,
1041    ) -> Result<Self> {
1042        // A kernel with no host filesystem of its own must never write to one
1043        // through a side channel. Two paths bypass the VFS by going straight to
1044        // `std::fs`: output spill (`paths::spill_dir()` → host temp/cache) and
1045        // background-job output files (`Job::write_output_file` → host temp).
1046        // Both would punch through the isolation, so force them off:
1047        // in-memory truncation for spill, no host file for job output.
1048        //
1049        // This is true for a `NoLocal` kernel (mounts nothing) and for any
1050        // `with_backend` kernel (`no_host_filesystem` — the embedder owns the
1051        // VFS, so the kernel controls no host mounts and any host write is a
1052        // bypass). Overrides an explicit `SpillMode::Disk`, which is nonsensical
1053        // when there is no kernel-owned host filesystem to spill to.
1054        let no_host_side_channel =
1055            no_host_filesystem || matches!(config.vfs_mode, VfsMountMode::NoLocal);
1056
1057        let KernelConfig { name, cwd, skip_validation, interactive, ignore_config, mut output_limit, allow_external_commands, latch_enabled, trash_enabled, nonce_store, initial_vars, request_timeout, kill_grace, .. } = config;
1058
1059        if no_host_side_channel {
1060            output_limit.set_spill_mode(crate::output_limit::SpillMode::Memory);
1061            jobs.set_persist_output_files(false);
1062        }
1063
1064        let mut tools = ToolRegistry::new();
1065        register_builtins(&mut tools);
1066        configure_tools(&mut tools);
1067        let tools = Arc::new(tools);
1068
1069        // Mount BuiltinFs so `ls /v/bin` lists builtins
1070        vfs.mount("/v/bin", BuiltinFs::new(tools.clone()));
1071
1072        let vfs = Arc::new(vfs);
1073
1074        let runner = PipelineRunner::new(tools.clone());
1075
1076        let (stderr_writer, stderr_receiver) = stderr_stream();
1077
1078        let mut exec_ctx = make_ctx(&vfs, &tools);
1079        exec_ctx.set_cwd(cwd);
1080        exec_ctx.set_job_manager(jobs.clone());
1081        exec_ctx.set_tool_schemas(tools.schemas());
1082        exec_ctx.set_tools(tools.clone());
1083        #[cfg(feature = "os-integration")]
1084        exec_ctx.set_trash_backend(Arc::new(crate::trash_system::SystemTrash));
1085        exec_ctx.stderr = Some(stderr_writer);
1086        exec_ctx.ignore_config = ignore_config;
1087        exec_ctx.output_limit = output_limit;
1088        exec_ctx.allow_external_commands = allow_external_commands;
1089        exec_ctx.vfs_budget = vfs_budget.clone();
1090        if let Some(store) = nonce_store {
1091            exec_ctx.nonce_store = store;
1092        }
1093
1094        Ok(Self {
1095            name,
1096            scope: RwLock::new({
1097                let mut scope = Scope::new();
1098                scope.set_pid(KERNEL_COUNTER.fetch_add(1, Ordering::Relaxed));
1099                // HOME is NOT read from the host env here — the kernel is
1100                // hermetic. Frontends (REPL, MCP) seed it via `initial_vars`
1101                // below (from `std::env::vars()`); a hermetic embedder leaves
1102                // `initial_vars` empty and gets no HOME (tilde stays literal).
1103                // Apply caller-supplied initial variables, all marked exported.
1104                // Frontends (REPL, MCP) populate this from std::env::vars()
1105                // for shell-like UX; embedders that want hermetic behavior
1106                // simply leave it empty.
1107                for (name, value) in initial_vars {
1108                    scope.set_exported(name, value);
1109                }
1110                scope.set_latch_enabled(latch_enabled);
1111                scope.set_trash_enabled(trash_enabled);
1112                scope
1113            }),
1114            tools,
1115            user_tools: RwLock::new(HashMap::new()),
1116            vfs,
1117            jobs,
1118            runner,
1119            exec_ctx: RwLock::new(exec_ctx),
1120            skip_validation,
1121            interactive,
1122            allow_external_commands,
1123            vfs_budget,
1124            request_timeout,
1125            kill_grace,
1126            stderr_receiver: tokio::sync::Mutex::new(stderr_receiver),
1127            cancel_token: std::sync::Mutex::new(tokio_util::sync::CancellationToken::new()),
1128            #[cfg(all(unix, feature = "subprocess"))]
1129            terminal_state: None,
1130            self_weak: std::sync::OnceLock::new(),
1131            execute_lock: tokio::sync::Mutex::new(()),
1132            bg_job_id: None,
1133            // Overlay handle is set by Kernel::new after assemble returns;
1134            // assemble itself doesn't know the handle (it's constructed in setup_vfs).
1135            // with_backend always has None (overlay=true is rejected above).
1136            #[cfg(all(feature = "localfs", feature = "overlay"))]
1137            overlay_handle: None,
1138        })
1139    }
1140
1141    /// Get the kernel name.
1142    pub fn name(&self) -> &str {
1143        &self.name
1144    }
1145
1146    /// Wrap this Kernel in an Arc and initialize its self-reference.
1147    ///
1148    /// This enables the Kernel to hand out `Arc<dyn CommandDispatcher>` references
1149    /// to child contexts, allowing builtins like `timeout` to dispatch inner
1150    /// commands through the full resolution chain (user tools → builtins →
1151    /// .kai scripts → external commands).
1152    pub fn into_arc(self) -> Arc<Self> {
1153        let arc = Arc::new(self);
1154        let _ = arc.self_weak.set(Arc::downgrade(&arc));
1155        arc
1156    }
1157
1158    /// Fork a subsidiary kernel for concurrent execution.
1159    ///
1160    /// The fork is a fully-functional `Kernel` that:
1161    /// - **Snapshots** per-session state from the parent: scope (COW — cheap),
1162    ///   user-defined tools, cwd, aliases, ignore config, etc. Mutations on
1163    ///   the fork do NOT propagate back to the parent — matching bash
1164    ///   subshell / background-job semantics.
1165    /// - **Shares** read-mostly resources with the parent via `Arc`: the tool
1166    ///   registry, the VFS router, and the job manager. A job registered by
1167    ///   the fork is visible to the parent's `jobs` builtin, and the fork
1168    ///   sees the same VFS mounts.
1169    /// - **Owns** its own `stderr_receiver`, `cancel_token`, and
1170    ///   `execute_lock`. It is never the TTY owner, so `interactive` is
1171    ///   `false` and `terminal_state` is `None`.
1172    ///
1173    /// The returned Arc has its `self_weak` populated (via `into_arc`), so
1174    /// nested dispatch through `ctx.dispatcher` (e.g. the `timeout` builtin)
1175    /// routes through the fork itself, not the parent — which is essential
1176    /// for concurrency safety.
1177    ///
1178    /// Use this for **detached** background concurrency where the fork should
1179    /// survive parent cancellation: the `&` background-job operator and any
1180    /// other "fire and forget" worker. The fork gets a fresh, independent
1181    /// cancellation token.
1182    ///
1183    /// For foreground concurrency (scatter workers, concurrent pipeline
1184    /// stages, `$(...)` cmdsubs) where parent timeout/cancel must cascade
1185    /// into the fork's external children, use [`Self::fork_attached`].
1186    pub async fn fork(&self) -> Arc<Self> {
1187        self.fork_inner(tokio_util::sync::CancellationToken::new(), self.bg_job_id)
1188            .await
1189    }
1190
1191    /// Fork attached to the parent's cancellation.
1192    ///
1193    /// Same as [`Self::fork`] but the fork's `cancel_token` is a child of
1194    /// the parent's. When the parent cancels (request timeout, embedder
1195    /// `Kernel::cancel`, etc.), the fork's token also cancels, which in
1196    /// turn kills any external children spawned in the fork via the
1197    /// `wait_or_kill` / SIGTERM-grace-SIGKILL path.
1198    pub async fn fork_attached(&self) -> Arc<Self> {
1199        let child_token = {
1200            #[allow(clippy::expect_used)]
1201            let parent = self.cancel_token.lock().expect("cancel_token poisoned");
1202            parent.child_token()
1203        };
1204        self.fork_inner(child_token, self.bg_job_id).await
1205    }
1206
1207    /// Fork for a background job, stamping the job id so external commands
1208    /// spawned anywhere beneath it record their process groups on that job
1209    /// (for `kill -<sig> %N`). The caller owns `cancel` so it can also drive
1210    /// `JobManager::cancel`.
1211    pub async fn fork_for_background(
1212        &self,
1213        cancel: tokio_util::sync::CancellationToken,
1214        job_id: crate::scheduler::JobId,
1215    ) -> Arc<Self> {
1216        self.fork_inner(cancel, Some(job_id)).await
1217    }
1218
1219    /// Shared fork implementation. Caller decides the cancellation token and
1220    /// which background job (if any) this fork runs on behalf of.
1221    async fn fork_inner(
1222        &self,
1223        cancel: tokio_util::sync::CancellationToken,
1224        bg_job_id: Option<crate::scheduler::JobId>,
1225    ) -> Arc<Self> {
1226        let scope_snapshot = self.scope.read().await.clone();
1227        let user_tools_snapshot = self.user_tools.read().await.clone();
1228
1229        // Snapshot exec_ctx by cloning the cloneable fields, then override
1230        // the ones that should not carry over (stderr channel, dispatcher,
1231        // interactive flag, terminal state, cancel — set from `cancel` arg).
1232        let mut fork_ctx = {
1233            let parent_ctx = self.exec_ctx.read().await;
1234            parent_ctx.child_for_pipeline()
1235        };
1236        let (stderr_writer, stderr_receiver) = stderr_stream();
1237        fork_ctx.stderr = Some(stderr_writer);
1238        // Clear dispatcher; dispatch_command will repopulate it to point at
1239        // the fork on the first dispatch call.
1240        fork_ctx.dispatcher = None;
1241        fork_ctx.interactive = false;
1242        fork_ctx.cancel = cancel.clone();
1243        #[cfg(all(unix, feature = "subprocess"))]
1244        {
1245            fork_ctx.terminal_state = None;
1246        }
1247
1248        let fork = Self {
1249            name: format!("{}:fork", self.name),
1250            scope: RwLock::new(scope_snapshot),
1251            tools: Arc::clone(&self.tools),
1252            user_tools: RwLock::new(user_tools_snapshot),
1253            vfs: Arc::clone(&self.vfs),
1254            jobs: Arc::clone(&self.jobs),
1255            runner: self.runner.clone(),
1256            exec_ctx: RwLock::new(fork_ctx),
1257            skip_validation: self.skip_validation,
1258            // Forks are never the TTY owner — they run in the background.
1259            interactive: false,
1260            allow_external_commands: self.allow_external_commands,
1261            // Arc-clone the budget so the fork draws from the same pool as the
1262            // parent — background jobs and scatter workers count against the same
1263            // cap as foreground writes.
1264            vfs_budget: self.vfs_budget.clone(),
1265            request_timeout: self.request_timeout,
1266            kill_grace: self.kill_grace,
1267            stderr_receiver: tokio::sync::Mutex::new(stderr_receiver),
1268            cancel_token: std::sync::Mutex::new(cancel),
1269            #[cfg(all(unix, feature = "subprocess"))]
1270            terminal_state: None,
1271            self_weak: std::sync::OnceLock::new(),
1272            execute_lock: tokio::sync::Mutex::new(()),
1273            bg_job_id,
1274            // Arc-clone the overlay handle so forks (background jobs, scatter
1275            // workers, pipeline stages) can reach the same overlay transaction
1276            // via `kaish-vfs status/diff/commit/reset`.
1277            #[cfg(all(feature = "localfs", feature = "overlay"))]
1278            overlay_handle: self.overlay_handle.clone(),
1279        };
1280
1281        fork.into_arc()
1282    }
1283
1284    /// Get an `Arc<dyn CommandDispatcher>` to this Kernel, if wrapped via `into_arc()`.
1285    ///
1286    /// Returns `None` if the Kernel was not wrapped, or if all strong references
1287    /// have been dropped (the `Weak` can no longer upgrade).
1288    pub fn dispatcher(&self) -> Option<Arc<dyn CommandDispatcher>> {
1289        self.self_weak
1290            .get()
1291            .and_then(|weak| weak.upgrade())
1292            .map(|arc| arc as Arc<dyn CommandDispatcher>)
1293    }
1294
1295    /// Initialize terminal state for interactive job control.
1296    ///
1297    /// Call this after kernel creation when running as an interactive REPL
1298    /// and stdin is a TTY. Sets up process groups and signal handling.
1299    #[cfg(all(unix, feature = "subprocess"))]
1300    pub fn init_terminal(&mut self) {
1301        if !self.interactive {
1302            return;
1303        }
1304        match crate::terminal::TerminalState::init() {
1305            Ok(state) => {
1306                let state = Arc::new(state);
1307                self.terminal_state = Some(state.clone());
1308                // Set on exec_ctx so builtins (fg, bg, kill) can access it
1309                self.exec_ctx.get_mut().terminal_state = Some(state);
1310                tracing::debug!("terminal job control initialized");
1311            }
1312            Err(e) => {
1313                tracing::warn!("failed to initialize terminal job control: {}", e);
1314            }
1315        }
1316    }
1317
1318    /// Replace or remove the trash backend used by `rm` and `kaish-trash`.
1319    ///
1320    /// The kernel installs the OS trash (`SystemTrash`) automatically when
1321    /// built with the `os-integration` feature. Embedders and tests can swap
1322    /// in a custom [`crate::trash::TrashBackend`], or pass `None` to remove
1323    /// it — with trash enabled but no backend present, `rm` fails loud
1324    /// rather than falling through to permanent delete.
1325    pub fn set_trash_backend(&mut self, backend: Option<Arc<dyn crate::trash::TrashBackend>>) {
1326        self.exec_ctx.get_mut().trash_backend = backend;
1327    }
1328
1329    /// Cancel the current execution.
1330    ///
1331    /// This cancels the current cancellation token, causing any execution
1332    /// loop to exit at the next checkpoint with exit code 130 (SIGINT).
1333    /// A fresh token is installed for the next `execute()` call.
1334    pub fn cancel(&self) {
1335        #[allow(clippy::expect_used)]
1336        let token = self.cancel_token.lock().expect("cancel_token poisoned");
1337        token.cancel();
1338    }
1339
1340    /// Check if the current execution has been cancelled.
1341    pub fn is_cancelled(&self) -> bool {
1342        #[allow(clippy::expect_used)]
1343        let token = self.cancel_token.lock().expect("cancel_token poisoned");
1344        token.is_cancelled()
1345    }
1346
1347    /// Reset the cancellation token (called at the start of each execute).
1348    fn reset_cancel(&self) -> tokio_util::sync::CancellationToken {
1349        #[allow(clippy::expect_used)]
1350        let mut token = self.cancel_token.lock().expect("cancel_token poisoned");
1351        if token.is_cancelled() {
1352            *token = tokio_util::sync::CancellationToken::new();
1353        }
1354        token.clone()
1355    }
1356
1357    /// Acquire the per-Kernel execute lock, warning on contention.
1358    ///
1359    /// Tokio's Mutex is fair (FIFO) so callers queue in arrival order. When
1360    /// the lock is already held, emit a warning so the silent serialization
1361    /// is observable in logs — if you need real parallelism, fork the kernel.
1362    async fn acquire_execute_lock(&self) -> tokio::sync::MutexGuard<'_, ()> {
1363        match self.execute_lock.try_lock() {
1364            Ok(guard) => guard,
1365            Err(_) => {
1366                tracing::warn!(
1367                    target: "kaish::kernel::concurrency",
1368                    kernel = %self.name,
1369                    "execute() contended — serializing concurrent caller; \
1370                     use Kernel::fork() for parallelism instead of sharing"
1371                );
1372                self.execute_lock.lock().await
1373            }
1374        }
1375    }
1376
1377    /// Execute kaish source code with default options.
1378    ///
1379    /// Equivalent to `execute_with_options(input, ExecuteOptions::default())`.
1380    /// Returns the result of the last statement executed.
1381    pub async fn execute(&self, input: &str) -> Result<ExecResult> {
1382        self.run_inner(input, ExecuteOptions::default(), None).await
1383    }
1384
1385    /// Execute with per-call options. The primary entry point for embedders
1386    /// that don't need per-statement output streaming.
1387    ///
1388    /// `opts` carries timeout, transient vars overlay, optional cwd override,
1389    /// and optional embedder-owned cancellation token. See [`ExecuteOptions`]
1390    /// for semantics. For streaming, use [`Self::execute_with_options_streaming`].
1391    ///
1392    /// **Cancellation:** if `opts.cancel_token` is `Some`, it is *raced*
1393    /// against the kernel's internal token. Either firing cancels and kills
1394    /// external children. The embedder's token is read-only — kernel
1395    /// timeouts do NOT propagate into it. Distinguish via the returned
1396    /// `code`: 124 = timeout, 130 = cancellation.
1397    ///
1398    /// **Timeout:** `opts.timeout` overrides `KernelConfig::request_timeout`.
1399    /// `Some(Duration::ZERO)` returns 124 immediately without spawning.
1400    ///
1401    /// Concurrent callers on the same Kernel serialize on the kernel-wide
1402    /// execute lock. For true parallelism, call [`Kernel::fork`] (detached)
1403    /// or [`Kernel::fork_attached`] (cancellation cascades from this kernel).
1404    pub async fn execute_with_options(
1405        &self,
1406        input: &str,
1407        opts: ExecuteOptions,
1408    ) -> Result<ExecResult> {
1409        self.run_inner(input, opts, None).await
1410    }
1411
1412    /// Same as [`Self::execute_with_options`] but with a per-statement output
1413    /// callback. The callback fires after each top-level statement so the
1414    /// embedder (REPL, MCP streaming) can flush output incrementally.
1415    pub async fn execute_with_options_streaming(
1416        &self,
1417        input: &str,
1418        opts: ExecuteOptions,
1419        on_output: &mut (dyn FnMut(&ExecResult) + Send),
1420    ) -> Result<ExecResult> {
1421        self.run_inner(input, opts, Some(on_output)).await
1422    }
1423
1424    /// Execute kaish source code with a transient overlay of exported variables.
1425    ///
1426    /// Deprecated thin wrapper over [`Self::execute_with_options`]. New code
1427    /// should use that method directly:
1428    /// `execute_with_options(input, ExecuteOptions::new().with_vars(vars))`.
1429    #[deprecated(note = "use Kernel::execute_with_options with ExecuteOptions::with_vars")]
1430    pub async fn execute_with_vars(
1431        &self,
1432        input: &str,
1433        vars: HashMap<String, Value>,
1434    ) -> Result<ExecResult> {
1435        self.run_inner(input, ExecuteOptions::new().with_vars(vars), None).await
1436    }
1437
1438    /// Execute kaish source code with a per-statement callback.
1439    ///
1440    /// Deprecated thin wrapper. New code should use
1441    /// [`Self::execute_with_options_streaming`].
1442    #[deprecated(note = "use Kernel::execute_with_options_streaming")]
1443    pub async fn execute_streaming(
1444        &self,
1445        input: &str,
1446        on_output: &mut (dyn FnMut(&ExecResult) + Send),
1447    ) -> Result<ExecResult> {
1448        self.run_inner(input, ExecuteOptions::default(), Some(on_output)).await
1449    }
1450
1451    /// Link embedder trace context, then run [`Self::execute_with_options_inner`].
1452    ///
1453    /// The `#[instrument]` execution span resolves its parent from the *current*
1454    /// OpenTelemetry context (see `tracing-opentelemetry`'s `parent_context`),
1455    /// captured when the span is first entered — not when the future is
1456    /// constructed. So a thread-local `attach()` scoped to construction is too
1457    /// early to be seen (the integration test confirms this). `with_context`
1458    /// re-attaches the embedder's context on *every* poll of the inner future,
1459    /// so the context is current at first-enter and survives runtime thread
1460    /// hops. With no embedder trace context, the future runs unwrapped.
1461    async fn run_inner(
1462        &self,
1463        input: &str,
1464        opts: ExecuteOptions,
1465        on_output: Option<&mut (dyn FnMut(&ExecResult) + Send)>,
1466    ) -> Result<ExecResult> {
1467        use opentelemetry::context::FutureExt;
1468
1469        // Capture the embedder's baggage before `opts` is consumed so it can be
1470        // echoed back onto the result on egress (see `merge_egress_baggage`).
1471        let embedder_baggage = opts.baggage.clone();
1472
1473        let result = match crate::telemetry::extract_parent(&opts) {
1474            Some(parent) => self
1475                .execute_with_options_inner(input, opts, on_output)
1476                .with_context(parent)
1477                .await,
1478            None => self.execute_with_options_inner(input, opts, on_output).await,
1479        };
1480
1481        result.map(|mut r| {
1482            crate::telemetry::merge_egress_baggage(&mut r, embedder_baggage);
1483            r
1484        })
1485    }
1486
1487    /// Shared body for `execute`, `execute_with_options(_streaming)`, and
1488    /// the deprecated wrappers. Owns the per-call cancel token, vars overlay,
1489    /// cwd override, and timeout race.
1490    #[tracing::instrument(level = "info", skip(self, opts, on_output), fields(input_len = input.len()))]
1491    async fn execute_with_options_inner(
1492        &self,
1493        input: &str,
1494        opts: ExecuteOptions,
1495        on_output: Option<&mut (dyn FnMut(&ExecResult) + Send)>,
1496    ) -> Result<ExecResult> {
1497        let _guard = self.acquire_execute_lock().await;
1498
1499        // Always reset to a fresh internal token; this is the kernel's own
1500        // cancel surface for embedders calling `Kernel::cancel()`. The
1501        // embedder-supplied `opts.cancel_token` is a *read-only input* — it
1502        // is NOT written into `self.cancel_token`, because doing so would
1503        // (a) leak the embedder's token past this call's lifetime,
1504        // (b) re-route a later `Kernel::cancel()` into the embedder's token,
1505        // (c) extend the token's lifetime via the kernel's strong clone.
1506        let internal = self.reset_cancel();
1507        // Race the embedder token against the kernel's internal token via a
1508        // tracked watcher task. We hold the JoinHandle so we can abort the
1509        // task at function exit — otherwise it would wait forever for either
1510        // token to fire and leak per call.
1511        let (effective_cancel, watcher_handle): (
1512            tokio_util::sync::CancellationToken,
1513            Option<tokio::task::JoinHandle<()>>,
1514        ) = if let Some(ext) = opts.cancel_token {
1515            let combined = tokio_util::sync::CancellationToken::new();
1516            let combined_writer = combined.clone();
1517            let i = internal.clone();
1518            let handle = tokio::spawn(async move {
1519                tokio::select! {
1520                    _ = i.cancelled() => combined_writer.cancel(),
1521                    _ = ext.cancelled() => combined_writer.cancel(),
1522                }
1523            });
1524            (combined, Some(handle))
1525        } else {
1526            (internal, None)
1527        };
1528
1529        // Effective timeout: per-call wins over kernel-config default.
1530        let timeout = opts.timeout.or(self.request_timeout);
1531
1532        // ZERO timeout: return 124 immediately without spawning anything.
1533        if timeout == Some(Duration::ZERO) {
1534            if let Some(h) = watcher_handle {
1535                h.abort();
1536            }
1537            return Ok(ExecResult::failure(124, "timeout: timed out after 0s".to_string()));
1538        }
1539
1540        // Apply per-call vars overlay (push frame + set_exported), wrapped in
1541        // an RAII guard so a panic inside `execute_streaming_inner` still
1542        // pops the frame and unexports the temporarily-exported names.
1543        struct VarsFrameGuard<'a> {
1544            kernel: &'a Kernel,
1545            newly_exported: Vec<String>,
1546        }
1547        impl Drop for VarsFrameGuard<'_> {
1548            fn drop(&mut self) {
1549                // Best-effort cleanup using try_write. The execute_lock held
1550                // throughout execute_with_options means there is no concurrent
1551                // foreground caller; forks have their own scope and won't
1552                // block this. blocking_write would deadlock the runtime when
1553                // called from a tokio worker thread, so we explicitly do NOT
1554                // fall back to it — if try_write fails (which we've never
1555                // seen in practice), log loudly and accept the leak rather
1556                // than deadlock the entire kernel.
1557                let Ok(mut scope) = self.kernel.scope.try_write() else {
1558                    tracing::error!(
1559                        "vars frame guard: scope lock unexpectedly busy; \
1560                         skipping pop_frame to avoid runtime deadlock — \
1561                         transient vars may leak"
1562                    );
1563                    return;
1564                };
1565                scope.pop_frame();
1566                for name in self.newly_exported.drain(..) {
1567                    scope.unexport(&name);
1568                }
1569            }
1570        }
1571
1572        // Per-call cwd override: save current cwd, set the new one, restore
1573        // on Drop so the kernel's persistent cwd doesn't leak between calls.
1574        // Same RAII pattern as VarsFrameGuard, same blocking_write trade-off.
1575        struct CwdGuard<'a> {
1576            kernel: &'a Kernel,
1577            saved: PathBuf,
1578        }
1579        impl Drop for CwdGuard<'_> {
1580            fn drop(&mut self) {
1581                let Ok(mut ec) = self.kernel.exec_ctx.try_write() else {
1582                    tracing::error!(
1583                        "cwd guard: exec_ctx lock unexpectedly busy; \
1584                         skipping cwd restore — kernel cwd may be wrong for next call"
1585                    );
1586                    return;
1587                };
1588                ec.cwd = std::mem::take(&mut self.saved);
1589            }
1590        }
1591        let _cwd_guard: Option<CwdGuard<'_>> = if let Some(new_cwd) = opts.cwd {
1592            let mut ec = self.exec_ctx.write().await;
1593            let saved = std::mem::replace(&mut ec.cwd, new_cwd);
1594            drop(ec);
1595            Some(CwdGuard { kernel: self, saved })
1596        } else {
1597            None
1598        };
1599
1600        let _vars_guard: Option<VarsFrameGuard<'_>> = if !opts.vars.is_empty() {
1601            let mut scope = self.scope.write().await;
1602            scope.push_frame();
1603            let mut newly = Vec::with_capacity(opts.vars.len());
1604            for (name, value) in opts.vars {
1605                if !scope.is_exported(&name) {
1606                    newly.push(name.clone());
1607                }
1608                scope.set_exported(name, value);
1609            }
1610            drop(scope);
1611            Some(VarsFrameGuard { kernel: self, newly_exported: newly })
1612        } else {
1613            None
1614        };
1615
1616        // Sync the effective cancel into self.exec_ctx so try_execute_external
1617        // (which reads via self.cancel_token) sees cancellation. We also need
1618        // builtins to see it via ctx.cancel — handled in execute_command.
1619        // For simplicity here we mirror effective_cancel into self.cancel_token
1620        // for the duration of this call, then restore the internal token at
1621        // the end (so a later Kernel::cancel still hits our internal surface).
1622        {
1623            #[allow(clippy::expect_used)]
1624            let mut cur = self.cancel_token.lock().expect("cancel_token poisoned");
1625            *cur = effective_cancel.clone();
1626        }
1627
1628        // The movable-deadline watchdog for this call (None without a timeout),
1629        // mirrored into exec_ctx — like the cancel token — so builtins can
1630        // suspend the script clock via `ctx.patient`. Assigned unconditionally
1631        // (clearing any stale handle) and reset to None in the restore block.
1632        let watchdog = timeout.map(|d| Arc::new(crate::watchdog::Watchdog::new(d)));
1633        {
1634            let mut ec = self.exec_ctx.write().await;
1635            ec.watchdog = watchdog.clone();
1636        }
1637
1638        // Run inner with optional timeout. The watchdog task cancels our token
1639        // on elapsed; the cascade fires SIGTERM/SIGKILL on any external
1640        // children via the wait_or_kill discipline in try_execute_external.
1641        let mut noop_cb: Box<dyn FnMut(&ExecResult) + Send> = Box::new(|_| {});
1642        let cb_ref: &mut (dyn FnMut(&ExecResult) + Send) = match on_output {
1643            Some(cb) => cb,
1644            None => &mut *noop_cb,
1645        };
1646
1647        let result = if let Some(d) = timeout {
1648            #[allow(clippy::expect_used)]
1649            let watchdog = watchdog.clone().expect("watchdog constructed when timeout is set");
1650            let elapsed = Arc::new(std::sync::atomic::AtomicBool::new(false));
1651            let timer = tokio::spawn(watchdog.run(elapsed.clone(), effective_cancel.clone()));
1652            let r = self.execute_streaming_inner(input, cb_ref).await;
1653            timer.abort();
1654            match r {
1655                Ok(mut res) => {
1656                    if elapsed.load(std::sync::atomic::Ordering::SeqCst) {
1657                        res.code = 124;
1658                        if res.err.is_empty() {
1659                            res.err = format!("timeout: timed out after {:?}", d);
1660                        }
1661                    }
1662                    Ok(res)
1663                }
1664                Err(e) => Err(e),
1665            }
1666        } else {
1667            self.execute_streaming_inner(input, cb_ref).await
1668        };
1669
1670        // Restore self.cancel_token to a fresh, uncancelled token so the
1671        // embedder's view of `Kernel::cancel()` stays predictable on the
1672        // next call (it cancels the kernel's own token, not whatever was
1673        // left over from this call's combined token).
1674        {
1675            #[allow(clippy::expect_used)]
1676            let mut cur = self.cancel_token.lock().expect("cancel_token poisoned");
1677            *cur = tokio_util::sync::CancellationToken::new();
1678        }
1679
1680        // Drop the watchdog handle from exec_ctx — its timer task is gone
1681        // (fired or aborted above); a patient hold acquired against a stale
1682        // handle would silently suspend nothing.
1683        {
1684            let mut ec = self.exec_ctx.write().await;
1685            ec.watchdog = None;
1686        }
1687
1688        // Tear down the embedder-token race watcher (if any). Leaving it
1689        // alive would idle forever waiting for tokens that may never fire.
1690        if let Some(h) = watcher_handle {
1691            h.abort();
1692        }
1693
1694        // VarsFrameGuard drops here on the success path and on early-return
1695        // paths above (error path included). Panic safety preserved.
1696        result
1697    }
1698
1699    /// The actual body of `execute_streaming`, run while holding the execute lock.
1700    ///
1701    /// Split out so internal kernel paths that are already under the lock can
1702    /// call this without deadlocking on re-entry. External callers must go
1703    /// through [`Self::execute_streaming`] so they acquire the lock.
1704    async fn execute_streaming_inner(
1705        &self,
1706        input: &str,
1707        on_output: &mut (dyn FnMut(&ExecResult) + Send),
1708    ) -> Result<ExecResult> {
1709        let program = parse(input).map_err(|errors| {
1710            let msg = errors
1711                .iter()
1712                .map(|e| e.format(input))
1713                .collect::<Vec<_>>()
1714                .join("\n");
1715            anyhow::anyhow!("parse error:\n{}", msg)
1716        })?;
1717
1718        // AST display mode: show AST instead of executing
1719        {
1720            let scope = self.scope.read().await;
1721            if scope.show_ast() {
1722                let output = format!("{:#?}\n", program);
1723                return Ok(ExecResult::with_output(crate::interpreter::OutputData::text(output)));
1724            }
1725        }
1726
1727        // Pre-execution validation
1728        if !self.skip_validation {
1729            let user_tools = self.user_tools.read().await;
1730            let validator = Validator::new(&self.tools, &user_tools);
1731            let issues = validator.validate(&program);
1732
1733            // Collect errors (warnings are logged but don't prevent execution)
1734            let errors: Vec<_> = issues
1735                .iter()
1736                .filter(|i| i.severity == Severity::Error)
1737                .collect();
1738
1739            if !errors.is_empty() {
1740                let error_msg = errors
1741                    .iter()
1742                    .map(|e| e.format(input))
1743                    .collect::<Vec<_>>()
1744                    .join("\n");
1745                return Err(anyhow::anyhow!("validation failed:\n{}", error_msg));
1746            }
1747
1748            // Log warnings via tracing (trace level to avoid noise)
1749            for warning in issues.iter().filter(|i| i.severity == Severity::Warning) {
1750                tracing::trace!("validation: {}", warning.format(input));
1751            }
1752        }
1753
1754        let mut result = ExecResult::success("");
1755
1756        // Reset cancellation token for this execution.
1757        let cancel = self.reset_cancel();
1758
1759        for stmt in program.statements {
1760            if matches!(stmt, Stmt::Empty) {
1761                continue;
1762            }
1763
1764            // Cancellation checkpoint
1765            if cancel.is_cancelled() {
1766                result.code = 130;
1767                return Ok(result);
1768            }
1769
1770            let flow = self.execute_stmt_flow(&stmt).await?;
1771
1772            // Drain any stderr written by pipeline stages during this statement.
1773            // This captures stderr from intermediate pipeline stages that would
1774            // otherwise be lost (only the last stage's result is returned).
1775            let drained_stderr = {
1776                let mut receiver = self.stderr_receiver.lock().await;
1777                receiver.drain_lossy()
1778            };
1779
1780            match flow {
1781                ControlFlow::Normal(mut r) => {
1782                    if !drained_stderr.is_empty() {
1783                        if !r.err.is_empty() && !r.err.ends_with('\n') {
1784                            r.err.push('\n');
1785                        }
1786                        // Prepend pipeline stderr before the last stage's stderr
1787                        let combined = format!("{}{}", drained_stderr, r.err);
1788                        r.err = combined;
1789                    }
1790                    on_output(&r);
1791                    // Carry the last statement's structured output for MCP TOON encoding.
1792                    // Must be done here (not in accumulate_result) because accumulate_result
1793                    // is also used in loops where per-iteration output would be wrong.
1794                    let last_output = r.output().cloned();
1795                    accumulate_result(&mut result, &r);
1796                    result.set_output(last_output);
1797                }
1798                ControlFlow::Exit { code } => {
1799                    if !drained_stderr.is_empty() {
1800                        result.err.push_str(&drained_stderr);
1801                    }
1802                    result.code = code;
1803                    return Ok(result);
1804                }
1805                ControlFlow::Return { mut value } => {
1806                    if !drained_stderr.is_empty() {
1807                        value.err = format!("{}{}", drained_stderr, value.err);
1808                    }
1809                    on_output(&value);
1810                    result = value;
1811                }
1812                ControlFlow::Break { result: mut r, .. } | ControlFlow::Continue { result: mut r, .. } => {
1813                    if !drained_stderr.is_empty() {
1814                        r.err = format!("{}{}", drained_stderr, r.err);
1815                    }
1816                    on_output(&r);
1817                    result = r;
1818                }
1819            }
1820        }
1821
1822        Ok(result)
1823    }
1824
1825    /// Execute a single statement, returning control flow information.
1826    fn execute_stmt_flow<'a>(
1827        &'a self,
1828        stmt: &'a Stmt,
1829    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<ControlFlow>> + Send + 'a>> {
1830        use tracing::Instrument;
1831        let span = tracing::debug_span!("execute_stmt_flow", stmt_type = %stmt.kind_name());
1832        Box::pin(async move {
1833        match stmt {
1834            Stmt::Assignment(assign) => {
1835                // Use async evaluator to support command substitution
1836                let value = self.eval_expr_async(&assign.value).await
1837                    .context("failed to evaluate assignment")?;
1838                let mut scope = self.scope.write().await;
1839                if assign.local {
1840                    // local: set in innermost (current function) frame
1841                    scope.set(&assign.name, value.clone());
1842                } else {
1843                    // non-local: update existing or create in root frame
1844                    scope.set_global(&assign.name, value.clone());
1845                }
1846                drop(scope);
1847
1848                // Assignments don't produce output (like sh)
1849                Ok(ControlFlow::ok(ExecResult::success("")))
1850            }
1851            Stmt::Command(cmd) => {
1852                // Route single commands through execute_pipeline for a unified path.
1853                // This ensures all commands go through the dispatcher chain.
1854                let pipeline = crate::ast::Pipeline {
1855                    commands: vec![cmd.clone()],
1856                    background: false,
1857                };
1858                let result = self.execute_pipeline(&pipeline).await?;
1859                self.update_last_result(&result).await;
1860
1861                // Check for error exit mode (set -e)
1862                if !result.ok() {
1863                    let scope = self.scope.read().await;
1864                    if scope.error_exit_enabled() {
1865                        return Ok(ControlFlow::exit_code(result.code));
1866                    }
1867                }
1868
1869                Ok(ControlFlow::ok(result))
1870            }
1871            Stmt::Pipeline(pipeline) => {
1872                let result = self.execute_pipeline(pipeline).await?;
1873                self.update_last_result(&result).await;
1874
1875                // Check for error exit mode (set -e)
1876                if !result.ok() {
1877                    let scope = self.scope.read().await;
1878                    if scope.error_exit_enabled() {
1879                        return Ok(ControlFlow::exit_code(result.code));
1880                    }
1881                }
1882
1883                Ok(ControlFlow::ok(result))
1884            }
1885            Stmt::If(if_stmt) => {
1886                // Use async evaluator to support command substitution in conditions
1887                let cond_value = self.eval_expr_async(&if_stmt.condition).await?;
1888
1889                let branch = if is_truthy(&cond_value) {
1890                    &if_stmt.then_branch
1891                } else {
1892                    if_stmt.else_branch.as_deref().unwrap_or(&[])
1893                };
1894
1895                let mut result = ExecResult::success("");
1896                for stmt in branch {
1897                    let flow = self.execute_stmt_flow(stmt).await?;
1898                    match flow {
1899                        ControlFlow::Normal(r) => {
1900                            accumulate_result(&mut result, &r);
1901                            self.drain_stderr_into(&mut result).await;
1902                        }
1903                        other => {
1904                            self.drain_stderr_into(&mut result).await;
1905                            return Ok(other);
1906                        }
1907                    }
1908                }
1909                Ok(ControlFlow::ok(result))
1910            }
1911            Stmt::For(for_loop) => {
1912                // Evaluate all items and collect values for iteration
1913                // Use async evaluator to support command substitution like $(seq 1 5)
1914                let mut items: Vec<Value> = Vec::new();
1915                for item_expr in &for_loop.items {
1916                    // Glob expansion in for-loop items: `for f in *.txt`
1917                    if let Expr::GlobPattern(pattern) = item_expr {
1918                        let glob_enabled = {
1919                            let scope = self.scope.read().await;
1920                            scope.glob_enabled()
1921                        };
1922                        if glob_enabled {
1923                            let (paths, cwd) = {
1924                                let ctx = self.exec_ctx.read().await;
1925                                let paths = ctx.expand_glob(pattern).await
1926                                    .map_err(|e| anyhow::anyhow!("glob: {}", e))?;
1927                                let cwd = ctx.resolve_path(".");
1928                                (paths, cwd)
1929                            };
1930                            if paths.is_empty() {
1931                                return Err(anyhow::anyhow!("no matches: {}", pattern));
1932                            }
1933                            for path in paths {
1934                                let display = if !pattern.starts_with('/') {
1935                                    path.strip_prefix(&cwd)
1936                                        .unwrap_or(&path)
1937                                        .to_string_lossy().into_owned()
1938                                } else {
1939                                    path.to_string_lossy().into_owned()
1940                                };
1941                                items.push(Value::String(display));
1942                            }
1943                            continue;
1944                        }
1945                    }
1946                    // Track whether this item came from $(cmd); that's the
1947                    // only position where multi-line stdout auto-splits per
1948                    // line. Arrays still spread element-by-element; bare
1949                    // $VAR is rejected upstream by validator E012. See
1950                    // docs/plan-for-loop-newline-split.md.
1951                    let from_command_subst = matches!(item_expr, Expr::CommandSubst(_));
1952                    let item = self.eval_expr_async(item_expr).await?;
1953                    match item {
1954                        // JSON arrays iterate over elements (preferred path
1955                        // when builtins emit .data — seq, jq, cut, find, …)
1956                        Value::Json(serde_json::Value::Array(arr)) => {
1957                            for elem in arr {
1958                                items.push(json_to_value(elem));
1959                            }
1960                        }
1961                        // Strings from $(cmd): empty → 0 iterations,
1962                        // multi-line → split per line (trimming trailing
1963                        // newlines and per-line trailing \r), single-line
1964                        // → one iteration. Whitespace within a line is
1965                        // NOT split — the "$VAR with spaces just works"
1966                        // promise is preserved because this only fires
1967                        // in CommandSubst position.
1968                        Value::String(s) if from_command_subst => {
1969                            let trimmed = s.trim_end_matches(['\n', '\r']);
1970                            if trimmed.is_empty() {
1971                                continue;
1972                            }
1973                            if trimmed.contains('\n') {
1974                                for line in trimmed.split('\n') {
1975                                    let line = line.trim_end_matches('\r');
1976                                    items.push(Value::String(line.to_string()));
1977                                }
1978                            } else {
1979                                items.push(Value::String(trimmed.to_string()));
1980                            }
1981                        }
1982                        // Binary isn't iterable — fail loud rather than loop
1983                        // once over an opaque byte blob.
1984                        Value::Bytes(_) => {
1985                            anyhow::bail!(
1986                                "for: cannot iterate over binary data — decode it \
1987                                 (base64/xxd) first"
1988                            );
1989                        }
1990                        // Strings not from $(cmd) stay as one value.
1991                        other => items.push(other),
1992                    }
1993                }
1994
1995                let mut result = ExecResult::success("");
1996                {
1997                    let mut scope = self.scope.write().await;
1998                    scope.push_frame();
1999                }
2000
2001                'outer: for item in items {
2002                    // Cancellation checkpoint per iteration
2003                    if self.is_cancelled() {
2004                        let mut scope = self.scope.write().await;
2005                        scope.pop_frame();
2006                        result.code = 130;
2007                        return Ok(ControlFlow::ok(result));
2008                    }
2009                    {
2010                        let mut scope = self.scope.write().await;
2011                        scope.set(&for_loop.variable, item);
2012                    }
2013                    for stmt in &for_loop.body {
2014                        let mut flow = match self.execute_stmt_flow(stmt).await {
2015                            Ok(f) => f,
2016                            Err(e) => {
2017                                let mut scope = self.scope.write().await;
2018                                scope.pop_frame();
2019                                return Err(e);
2020                            }
2021                        };
2022                        self.drain_stderr_into(&mut result).await;
2023                        match &mut flow {
2024                            ControlFlow::Normal(r) => {
2025                                accumulate_result(&mut result, r);
2026                                if !r.ok() {
2027                                    let scope = self.scope.read().await;
2028                                    if scope.error_exit_enabled() {
2029                                        drop(scope);
2030                                        let mut scope = self.scope.write().await;
2031                                        scope.pop_frame();
2032                                        return Ok(ControlFlow::exit_code(r.code));
2033                                    }
2034                                }
2035                            }
2036                            ControlFlow::Break { .. } => {
2037                                if flow.decrement_level() {
2038                                    accumulate_flow_output(&mut result, &flow);
2039                                    break 'outer;
2040                                }
2041                                fold_loop_output_into_flow(std::mem::take(&mut result), &mut flow);
2042                                let mut scope = self.scope.write().await;
2043                                scope.pop_frame();
2044                                return Ok(flow);
2045                            }
2046                            ControlFlow::Continue { .. } => {
2047                                if flow.decrement_level() {
2048                                    accumulate_flow_output(&mut result, &flow);
2049                                    continue 'outer;
2050                                }
2051                                fold_loop_output_into_flow(std::mem::take(&mut result), &mut flow);
2052                                let mut scope = self.scope.write().await;
2053                                scope.pop_frame();
2054                                return Ok(flow);
2055                            }
2056                            ControlFlow::Return { .. } | ControlFlow::Exit { .. } => {
2057                                let mut scope = self.scope.write().await;
2058                                scope.pop_frame();
2059                                return Ok(flow);
2060                            }
2061                        }
2062                    }
2063                }
2064
2065                {
2066                    let mut scope = self.scope.write().await;
2067                    scope.pop_frame();
2068                }
2069                Ok(ControlFlow::ok(result))
2070            }
2071            Stmt::While(while_loop) => {
2072                let mut result = ExecResult::success("");
2073
2074                'outer: loop {
2075                    // Evaluate condition - use async to support command substitution
2076                    // Cancellation checkpoint per iteration
2077                    if self.is_cancelled() {
2078                        result.code = 130;
2079                        return Ok(ControlFlow::ok(result));
2080                    }
2081
2082                    let cond_value = self.eval_expr_async(&while_loop.condition).await?;
2083
2084                    if !is_truthy(&cond_value) {
2085                        break;
2086                    }
2087
2088                    // Execute body
2089                    for stmt in &while_loop.body {
2090                        let mut flow = self.execute_stmt_flow(stmt).await?;
2091                        self.drain_stderr_into(&mut result).await;
2092                        match &mut flow {
2093                            ControlFlow::Normal(r) => {
2094                                accumulate_result(&mut result, r);
2095                                if !r.ok() {
2096                                    let scope = self.scope.read().await;
2097                                    if scope.error_exit_enabled() {
2098                                        return Ok(ControlFlow::exit_code(r.code));
2099                                    }
2100                                }
2101                            }
2102                            ControlFlow::Break { .. } => {
2103                                if flow.decrement_level() {
2104                                    accumulate_flow_output(&mut result, &flow);
2105                                    break 'outer;
2106                                }
2107                                fold_loop_output_into_flow(std::mem::take(&mut result), &mut flow);
2108                                return Ok(flow);
2109                            }
2110                            ControlFlow::Continue { .. } => {
2111                                if flow.decrement_level() {
2112                                    accumulate_flow_output(&mut result, &flow);
2113                                    continue 'outer;
2114                                }
2115                                fold_loop_output_into_flow(std::mem::take(&mut result), &mut flow);
2116                                return Ok(flow);
2117                            }
2118                            ControlFlow::Return { .. } | ControlFlow::Exit { .. } => {
2119                                return Ok(flow);
2120                            }
2121                        }
2122                    }
2123                }
2124
2125                Ok(ControlFlow::ok(result))
2126            }
2127            Stmt::Case(case_stmt) => {
2128                // Evaluate the expression to match against
2129                let match_value = {
2130                    let value = self.eval_expr_async(&case_stmt.expr).await?;
2131                    value_to_string(&value)
2132                };
2133
2134                // Try each branch until we find a match
2135                for branch in &case_stmt.branches {
2136                    let matched = branch.patterns.iter().any(|pattern| {
2137                        glob_match(pattern, &match_value)
2138                    });
2139
2140                    if matched {
2141                        // Execute the branch body
2142                        let mut result = ExecResult::success("");
2143                        for stmt in &branch.body {
2144                            let flow = self.execute_stmt_flow(stmt).await?;
2145                            match flow {
2146                                ControlFlow::Normal(r) => {
2147                                    accumulate_result(&mut result, &r);
2148                                    self.drain_stderr_into(&mut result).await;
2149                                }
2150                                other => {
2151                                    self.drain_stderr_into(&mut result).await;
2152                                    return Ok(other);
2153                                }
2154                            }
2155                        }
2156                        return Ok(ControlFlow::ok(result));
2157                    }
2158                }
2159
2160                // No match - return success with empty output (like sh)
2161                Ok(ControlFlow::ok(ExecResult::success("")))
2162            }
2163            Stmt::Break(levels) => {
2164                Ok(ControlFlow::break_n(levels.unwrap_or(1)))
2165            }
2166            Stmt::Continue(levels) => {
2167                Ok(ControlFlow::continue_n(levels.unwrap_or(1)))
2168            }
2169            Stmt::Return(expr) => {
2170                // return [N] - N becomes the exit code, NOT stdout
2171                // Shell semantics: return sets exit code, doesn't produce output
2172                let result = if let Some(e) = expr {
2173                    let val = self.eval_expr_async(e).await?;
2174                    let code = crate::interpreter::value_to_exit_code(&val)
2175                        .map_err(|e| anyhow::anyhow!("return: {}", e))?;
2176                    ExecResult::from_parts(code, String::new(), String::new(), None)
2177                } else {
2178                    ExecResult::success("")
2179                };
2180                Ok(ControlFlow::return_value(result))
2181            }
2182            Stmt::Exit(expr) => {
2183                let code = if let Some(e) = expr {
2184                    let val = self.eval_expr_async(e).await?;
2185                    crate::interpreter::value_to_exit_code(&val)
2186                        .map_err(|e| anyhow::anyhow!("exit: {}", e))?
2187                } else {
2188                    0
2189                };
2190                Ok(ControlFlow::exit_code(code))
2191            }
2192            Stmt::ToolDef(tool_def) => {
2193                let mut user_tools = self.user_tools.write().await;
2194                user_tools.insert(tool_def.name.clone(), tool_def.clone());
2195                Ok(ControlFlow::ok(ExecResult::success("")))
2196            }
2197            Stmt::AndChain { left, right } => {
2198                // cmd1 && cmd2 - run cmd2 only if cmd1 succeeds (exit code 0)
2199                // Suppress errexit for the left side — && handles failure itself.
2200                {
2201                    let mut scope = self.scope.write().await;
2202                    scope.suppress_errexit();
2203                }
2204                let left_flow = match self.execute_stmt_flow(left).await {
2205                    Ok(f) => f,
2206                    Err(e) => {
2207                        let mut scope = self.scope.write().await;
2208                        scope.unsuppress_errexit();
2209                        return Err(e);
2210                    }
2211                };
2212                {
2213                    let mut scope = self.scope.write().await;
2214                    scope.unsuppress_errexit();
2215                }
2216                match left_flow {
2217                    ControlFlow::Normal(mut left_result) => {
2218                        self.drain_stderr_into(&mut left_result).await;
2219                        self.update_last_result(&left_result).await;
2220                        if left_result.ok() {
2221                            let right_flow = self.execute_stmt_flow(right).await?;
2222                            match right_flow {
2223                                ControlFlow::Normal(mut right_result) => {
2224                                    self.drain_stderr_into(&mut right_result).await;
2225                                    self.update_last_result(&right_result).await;
2226                                    let mut combined = left_result;
2227                                    accumulate_result(&mut combined, &right_result);
2228                                    Ok(ControlFlow::ok(combined))
2229                                }
2230                                other => Ok(other),
2231                            }
2232                        } else {
2233                            Ok(ControlFlow::ok(left_result))
2234                        }
2235                    }
2236                    _ => Ok(left_flow),
2237                }
2238            }
2239            Stmt::OrChain { left, right } => {
2240                // cmd1 || cmd2 - run cmd2 only if cmd1 fails (non-zero exit code)
2241                // Suppress errexit for the left side — || handles failure itself.
2242                {
2243                    let mut scope = self.scope.write().await;
2244                    scope.suppress_errexit();
2245                }
2246                let left_flow = match self.execute_stmt_flow(left).await {
2247                    Ok(f) => f,
2248                    Err(e) => {
2249                        let mut scope = self.scope.write().await;
2250                        scope.unsuppress_errexit();
2251                        return Err(e);
2252                    }
2253                };
2254                {
2255                    let mut scope = self.scope.write().await;
2256                    scope.unsuppress_errexit();
2257                }
2258                match left_flow {
2259                    ControlFlow::Normal(mut left_result) => {
2260                        self.drain_stderr_into(&mut left_result).await;
2261                        self.update_last_result(&left_result).await;
2262                        if !left_result.ok() {
2263                            let right_flow = self.execute_stmt_flow(right).await?;
2264                            match right_flow {
2265                                ControlFlow::Normal(mut right_result) => {
2266                                    self.drain_stderr_into(&mut right_result).await;
2267                                    self.update_last_result(&right_result).await;
2268                                    let mut combined = left_result;
2269                                    accumulate_result(&mut combined, &right_result);
2270                                    Ok(ControlFlow::ok(combined))
2271                                }
2272                                other => Ok(other),
2273                            }
2274                        } else {
2275                            Ok(ControlFlow::ok(left_result))
2276                        }
2277                    }
2278                    _ => Ok(left_flow), // Propagate non-normal flow
2279                }
2280            }
2281            Stmt::Test(test_expr) => {
2282                let is_true = self.eval_test_async(test_expr).await?;
2283                if is_true {
2284                    Ok(ControlFlow::ok(ExecResult::success("")))
2285                } else {
2286                    Ok(ControlFlow::ok(ExecResult::failure(1, "")))
2287                }
2288            }
2289            Stmt::EnvScoped { assignments, body } => {
2290                // Inline env prefix (`NAME=value ... command`): apply the
2291                // assignments as EXPORTED vars in a fresh frame so the command
2292                // — and its subprocess environment — sees them, then unwind so
2293                // they do NOT persist (bash-style command-scoped env). Values
2294                // evaluate left-to-right with earlier ones already in scope, so
2295                // `A=1 B=$A cmd` works.
2296                {
2297                    let mut scope = self.scope.write().await;
2298                    scope.push_frame();
2299                }
2300                let mut prior_export: Vec<(String, bool)> =
2301                    Vec::with_capacity(assignments.len());
2302                let mut setup_err: Option<anyhow::Error> = None;
2303                for assign in assignments {
2304                    match self.eval_expr_async(&assign.value).await {
2305                        Ok(value) => {
2306                            let mut scope = self.scope.write().await;
2307                            prior_export
2308                                .push((assign.name.clone(), scope.is_exported(&assign.name)));
2309                            scope.set_exported(&assign.name, value);
2310                        }
2311                        Err(e) => {
2312                            setup_err = Some(e);
2313                            break;
2314                        }
2315                    }
2316                }
2317
2318                let flow = if setup_err.is_none() {
2319                    self.execute_stmt_flow(body).await
2320                } else {
2321                    Ok(ControlFlow::ok(ExecResult::success("")))
2322                };
2323
2324                // Unwind the env frame and restore export marks unconditionally
2325                // (names that were not exported before must not stay exported).
2326                {
2327                    let mut scope = self.scope.write().await;
2328                    scope.pop_frame();
2329                    for (name, was_exported) in &prior_export {
2330                        if !*was_exported {
2331                            scope.unexport(name);
2332                        }
2333                    }
2334                }
2335
2336                match setup_err {
2337                    Some(e) => Err(e),
2338                    None => flow,
2339                }
2340            }
2341            Stmt::Empty => Ok(ControlFlow::ok(ExecResult::success(""))),
2342        }
2343        }.instrument(span))
2344    }
2345
2346    /// Execute a pipeline.
2347    #[tracing::instrument(level = "debug", skip(self, pipeline), fields(background = pipeline.background, command_count = pipeline.commands.len()))]
2348    async fn execute_pipeline(&self, pipeline: &crate::ast::Pipeline) -> Result<ExecResult> {
2349        if pipeline.commands.is_empty() {
2350            return Ok(ExecResult::success(""));
2351        }
2352
2353        // Handle background execution (`&` operator)
2354        if pipeline.background {
2355            return self.execute_background(pipeline).await;
2356        }
2357
2358        // All commands go through the runner with the Kernel as dispatcher.
2359        // This is the single execution path — no fast path for single commands.
2360        //
2361        // IMPORTANT: We snapshot exec_ctx into a local context and release the
2362        // lock before running. This prevents deadlocks when dispatch_command
2363        // is called from within the pipeline and recursively triggers another
2364        // pipeline (e.g., via user-defined tools).
2365        let mut ctx = {
2366            let ec = self.exec_ctx.read().await;
2367            let scope = self.scope.read().await;
2368            ExecContext {
2369                backend: ec.backend.clone(),
2370                scope: scope.clone(),
2371                cwd: ec.cwd.clone(),
2372                prev_cwd: ec.prev_cwd.clone(),
2373                stdin: None,
2374                stdin_data: None,
2375                pipe_stdin: None,
2376                pipe_stdout: None,
2377                stderr: ec.stderr.clone(),
2378                tool_schemas: ec.tool_schemas.clone(),
2379                tools: ec.tools.clone(),
2380                job_manager: ec.job_manager.clone(),
2381                pipeline_position: PipelinePosition::Only,
2382                interactive: self.interactive,
2383                aliases: ec.aliases.clone(),
2384                ignore_config: ec.ignore_config.clone(),
2385                output_limit: ec.output_limit.clone(),
2386                allow_external_commands: self.allow_external_commands,
2387                nonce_store: ec.nonce_store.clone(),
2388                trash_backend: ec.trash_backend.clone(),
2389                #[cfg(all(unix, feature = "subprocess"))]
2390                terminal_state: ec.terminal_state.clone(),
2391                dispatcher: self.dispatcher(),
2392                cancel: {
2393                    #[allow(clippy::expect_used)]
2394                    let token = self.cancel_token.lock().expect("cancel_token poisoned");
2395                    token.clone()
2396                },
2397                output_format: None,
2398                vfs_budget: self.vfs_budget.clone(),
2399                watchdog: ec.watchdog.clone(),
2400                #[cfg(all(feature = "localfs", feature = "overlay"))]
2401                overlay_handle: self.overlay_handle.clone(),
2402            }
2403        }; // locks released
2404
2405        let mut result = self.runner.run(&pipeline.commands, &mut ctx, self).await;
2406
2407        // Post-hoc spill check (catches builtins and fast external commands)
2408        if ctx.output_limit.is_enabled() {
2409            let _ = crate::output_limit::spill_if_needed(&mut result, &ctx.output_limit).await;
2410        }
2411
2412        // Signal spill with exit 3; agent reads the spill file directly
2413        // (use `set +o output-limit` before cat/head/tail to bypass the limit)
2414        if result.did_spill {
2415            result.original_code = Some(result.code);
2416            result.code = 3;
2417        }
2418
2419        // Sync changes back from context
2420        {
2421            let mut ec = self.exec_ctx.write().await;
2422            ec.cwd = ctx.cwd.clone();
2423            ec.prev_cwd = ctx.prev_cwd.clone();
2424            ec.aliases = ctx.aliases.clone();
2425            ec.ignore_config = ctx.ignore_config.clone();
2426            ec.output_limit = ctx.output_limit.clone();
2427        }
2428        {
2429            let mut scope = self.scope.write().await;
2430            *scope = ctx.scope.clone();
2431        }
2432
2433        Ok(result)
2434    }
2435
2436    /// Execute a pipeline in the background.
2437    ///
2438    /// The command is spawned as a tokio task, registered with the JobManager,
2439    /// and its output is captured via BoundedStreams. The job is observable via
2440    /// `/v/jobs/{id}/stdout`, `/v/jobs/{id}/stderr`, and `/v/jobs/{id}/status`.
2441    ///
2442    /// Returns immediately with a job ID like "[1]".
2443    #[tracing::instrument(level = "debug", skip(self, pipeline), fields(command_count = pipeline.commands.len()))]
2444    async fn execute_background(&self, pipeline: &crate::ast::Pipeline) -> Result<ExecResult> {
2445        use tokio::sync::oneshot;
2446
2447        // Format the command for display in /v/jobs/{id}/command
2448        let command_str = self.format_pipeline(pipeline);
2449
2450        // Create bounded streams for output capture
2451        let stdout = Arc::new(BoundedStream::default_size());
2452        let stderr = Arc::new(BoundedStream::default_size());
2453
2454        // Create channel for result notification
2455        let (tx, rx) = oneshot::channel();
2456
2457        // Register with JobManager to get job ID and create VFS entries
2458        let job_id = self.jobs.register_with_streams(
2459            command_str.clone(),
2460            rx,
2461            stdout.clone(),
2462            stderr.clone(),
2463        ).await;
2464
2465        // Fork the kernel for this background job. The fork snapshots the
2466        // parent's scope/cwd/aliases/user_tools so mutations stay isolated,
2467        // while sharing the job manager, VFS, and tool registry. The fork's
2468        // full dispatch chain (user tools, .kai scripts, `$(...)` in args)
2469        // is available here — something BackendDispatcher couldn't provide.
2470        //
2471        // The fork gets its own cancellation token (recorded on the job so
2472        // `kill %N` can stop the job — including a pure-builtin job with no OS
2473        // process group) and is stamped with the job id so any external
2474        // command it spawns records its process group for `kill -<sig> %N`.
2475        let cancel = tokio_util::sync::CancellationToken::new();
2476        self.jobs.set_cancel_token(job_id, cancel.clone()).await;
2477        let fork = self.fork_for_background(cancel, job_id).await;
2478        let runner = self.runner.clone();
2479        let commands = pipeline.commands.clone();
2480
2481        // Snapshot the fork's exec_ctx for the spawned task. We have to do
2482        // this before tokio::spawn because the fork's exec_ctx is behind a
2483        // tokio RwLock and we want the spawned task to own its ctx.
2484        let mut bg_ctx = {
2485            let ec = fork.exec_ctx.read().await;
2486            ec.child_for_pipeline()
2487        };
2488        bg_ctx.scope = fork.scope.read().await.clone();
2489        // The fork's dispatcher points at the fork itself; set it here so
2490        // builtins inside the background task (e.g. timeout) re-dispatch
2491        // through the fork, not the parent.
2492        bg_ctx.dispatcher = fork.dispatcher();
2493
2494        // Spawn the background task. Propagate the embedder's trace context
2495        // across the spawn boundary so the job's spans stay in the same trace.
2496        tokio::spawn(crate::telemetry::bind_current_context(async move {
2497            // runner.run needs a &dyn CommandDispatcher; fork.as_ref()
2498            // gives us that (Kernel implements CommandDispatcher).
2499            let result = runner.run(&commands, &mut bg_ctx, fork.as_ref()).await;
2500
2501            // Write output to streams
2502            let text = result.text_out();
2503            if !text.is_empty() {
2504                stdout.write(text.as_bytes()).await;
2505            }
2506            if !result.err.is_empty() {
2507                stderr.write(result.err.as_bytes()).await;
2508            }
2509
2510            // Close streams
2511            stdout.close().await;
2512            stderr.close().await;
2513
2514            // Send result to JobManager (ignore error if receiver dropped)
2515            let _ = tx.send(result);
2516        }));
2517
2518        Ok(ExecResult::success(format!("[{}]", job_id)))
2519    }
2520
2521    /// Format a pipeline as a command string for display.
2522    fn format_pipeline(&self, pipeline: &crate::ast::Pipeline) -> String {
2523        pipeline.commands
2524            .iter()
2525            .map(|cmd| {
2526                let mut parts = vec![cmd.name.clone()];
2527                for arg in &cmd.args {
2528                    match arg {
2529                        Arg::Positional(expr) => {
2530                            parts.push(self.format_expr(expr));
2531                        }
2532                        Arg::Named { key, value } => {
2533                            parts.push(format!("--{}={}", key, self.format_expr(value)));
2534                        }
2535                        Arg::WordAssign { key, value } => {
2536                            parts.push(format!("{}={}", key, self.format_expr(value)));
2537                        }
2538                        Arg::ShortFlag(name) => {
2539                            parts.push(format!("-{}", name));
2540                        }
2541                        Arg::LongFlag(name) => {
2542                            parts.push(format!("--{}", name));
2543                        }
2544                        Arg::DoubleDash => {
2545                            parts.push("--".to_string());
2546                        }
2547                    }
2548                }
2549                parts.join(" ")
2550            })
2551            .collect::<Vec<_>>()
2552            .join(" | ")
2553    }
2554
2555    /// Format an expression as a string for display.
2556    fn format_expr(&self, expr: &Expr) -> String {
2557        match expr {
2558            Expr::Literal(Value::String(s)) => {
2559                if s.contains(' ') || s.contains('"') {
2560                    format!("'{}'", s.replace('\'', "\\'"))
2561                } else {
2562                    s.clone()
2563                }
2564            }
2565            Expr::Literal(Value::Int(i)) => i.to_string(),
2566            Expr::Literal(Value::Float(f)) => f.to_string(),
2567            Expr::Literal(Value::Bool(b)) => b.to_string(),
2568            Expr::Literal(Value::Null) => "null".to_string(),
2569            Expr::VarRef(path) => {
2570                let name = path.segments.iter()
2571                    .map(|seg| match seg {
2572                        crate::ast::VarSegment::Field(f) => f.clone(),
2573                    })
2574                    .collect::<Vec<_>>()
2575                    .join(".");
2576                format!("${{{}}}", name)
2577            }
2578            Expr::Interpolated(_) => "\"...\"".to_string(),
2579            Expr::HereDocBody { .. } => "<<heredoc".to_string(),
2580            _ => "...".to_string(),
2581        }
2582    }
2583
2584    /// Execute a single command.
2585    async fn execute_command(&self, name: &str, args: &[Arg]) -> Result<ExecResult> {
2586        self.execute_command_depth(name, args, 0).await
2587    }
2588
2589    #[tracing::instrument(level = "info", skip(self, args, alias_depth), fields(command = %name), err)]
2590    async fn execute_command_depth(&self, name: &str, args: &[Arg], alias_depth: u8) -> Result<ExecResult> {
2591        // Special built-ins
2592        match name {
2593            "true" => return Ok(ExecResult::success("")),
2594            "false" => return Ok(ExecResult::failure(1, "")),
2595            "source" | "." => return self.execute_source(args).await,
2596            _ => {}
2597        }
2598
2599        // Alias expansion (with recursion limit)
2600        if alias_depth < 10 {
2601            let alias_value = {
2602                let ctx = self.exec_ctx.read().await;
2603                ctx.aliases.get(name).cloned()
2604            };
2605            if let Some(alias_val) = alias_value {
2606                // Split alias value into command + args
2607                let parts: Vec<&str> = alias_val.split_whitespace().collect();
2608                if let Some((alias_cmd, alias_args)) = parts.split_first() {
2609                    let mut new_args: Vec<Arg> = alias_args
2610                        .iter()
2611                        .map(|a| Arg::Positional(Expr::Literal(Value::String(a.to_string()))))
2612                        .collect();
2613                    new_args.extend_from_slice(args);
2614                    return Box::pin(self.execute_command_depth(alias_cmd, &new_args, alias_depth + 1)).await;
2615                }
2616            }
2617        }
2618
2619        // Handle /v/bin/ prefix — dispatch to builtins via virtual path
2620        if let Some(builtin_name) = name.strip_prefix("/v/bin/") {
2621            return match self.tools.get(builtin_name) {
2622                Some(_) => Box::pin(self.execute_command_depth(builtin_name, args, alias_depth)).await,
2623                None => Ok(ExecResult::failure(127, format!("command not found: {}", name))),
2624            };
2625        }
2626
2627        // Check user-defined tools first
2628        {
2629            let user_tools = self.user_tools.read().await;
2630            if let Some(tool_def) = user_tools.get(name) {
2631                let tool_def = tool_def.clone();
2632                drop(user_tools);
2633                return self.execute_user_tool(tool_def, args).await;
2634            }
2635        }
2636
2637        // Look up builtin tool
2638        let tool = match self.tools.get(name) {
2639            Some(t) => t,
2640            None => {
2641                // Try executing as .kai script from PATH
2642                if let Some(result) = self.try_execute_script(name, args).await? {
2643                    return Ok(result);
2644                }
2645                // Try executing as external command from PATH
2646                if let Some(result) = self.try_execute_external(name, args).await? {
2647                    return Ok(result);
2648                }
2649
2650                // Try backend-registered tools (embedder engines, etc.)
2651                // Look up tool schema for positional→named mapping.
2652                // Clone backend and drop read lock before awaiting (may involve network I/O).
2653                // Backend tools expect named JSON params, so enable positional mapping.
2654                let backend = self.exec_ctx.read().await.backend.clone();
2655                let tool_schema = backend.get_tool(name).await.ok().flatten().map(|t| {
2656                    let mut s = t.schema;
2657                    // Flat backend/MCP tools expect named JSON params, so map
2658                    // bare positionals onto named params. Subcommand-aware tools
2659                    // route positionals through the subcommand path and declare
2660                    // map_positionals per leaf (kj keeps it false so it re-parses
2661                    // the argv with its own clap) — don't blanket-override them.
2662                    if s.subcommands.is_empty() {
2663                        s.map_positionals = true;
2664                    }
2665                    s
2666                });
2667                let tool_args = self.build_args_async(args, tool_schema.as_ref()).await?;
2668                let mut ctx = self.exec_ctx.write().await;
2669                {
2670                    let scope = self.scope.read().await;
2671                    ctx.scope = scope.clone();
2672                }
2673                let backend = ctx.backend.clone();
2674                match backend.call_tool(name, tool_args, &mut *ctx).await {
2675                    Ok(tool_result) => {
2676                        let mut scope = self.scope.write().await;
2677                        *scope = ctx.scope.clone();
2678                        let mut exec = ExecResult::from_output(
2679                            tool_result.code as i64, tool_result.stdout, tool_result.stderr,
2680                        );
2681                        exec.set_output(tool_result.output);
2682                        return Ok(exec);
2683                    }
2684                    Err(BackendError::ToolNotFound(_)) => {
2685                        // Fall through to "command not found"
2686                    }
2687                    Err(e) => {
2688                        // Backend dispatch is last-resort lookup — if it fails
2689                        // for any reason, the command simply doesn't exist.
2690                        tracing::debug!("backend error for {name}: {e}");
2691                    }
2692                }
2693
2694                return Ok(ExecResult::failure(127, format!("command not found: {}", name)));
2695            }
2696        };
2697
2698        // Build arguments (async to support command substitution, schema-aware for flag values)
2699        let schema = tool.schema();
2700        let tool_args = self.build_args_async(args, Some(&schema)).await?;
2701
2702        // --help / -h: show help unless the tool's schema claims that flag
2703        let schema_claims = |flag: &str| -> bool {
2704            let bare = flag.trim_start_matches('-');
2705            schema.params.iter().any(|p| p.matches_flag(flag) || p.matches_flag(bare))
2706        };
2707        let wants_help =
2708            (tool_args.flags.contains("help") && !schema_claims("help"))
2709            || (tool_args.flags.contains("h") && !schema_claims("-h"));
2710        if wants_help {
2711            let help_topic = crate::help::HelpTopic::Tool(name.to_string());
2712            let ctx = self.exec_ctx.read().await;
2713            let content = crate::help::get_help(&help_topic, &ctx.tool_schemas);
2714            return Ok(ExecResult::with_output(crate::interpreter::OutputData::text(content)));
2715        }
2716
2717        // Snapshot exec_ctx into a local context and release the write lock
2718        // before calling tool.execute. Holding the write across tool execution
2719        // would deadlock any builtin that re-dispatches through ctx.dispatcher
2720        // (timeout, scatter) — the inner dispatch_command needs its own
2721        // exec_ctx.write() and would block forever.
2722        let mut ctx = {
2723            let ec = self.exec_ctx.write().await;
2724            let scope = self.scope.read().await;
2725            ExecContext {
2726                backend: ec.backend.clone(),
2727                scope: scope.clone(),
2728                cwd: ec.cwd.clone(),
2729                prev_cwd: ec.prev_cwd.clone(),
2730                stdin: ec.stdin.clone(),
2731                stdin_data: ec.stdin_data.clone(),
2732                pipe_stdin: None, // streaming pipes are per-pipeline; not snapshotted
2733                pipe_stdout: None,
2734                stderr: ec.stderr.clone(),
2735                tool_schemas: ec.tool_schemas.clone(),
2736                tools: ec.tools.clone(),
2737                job_manager: ec.job_manager.clone(),
2738                pipeline_position: ec.pipeline_position,
2739                interactive: self.interactive,
2740                aliases: ec.aliases.clone(),
2741                ignore_config: ec.ignore_config.clone(),
2742                output_limit: ec.output_limit.clone(),
2743                allow_external_commands: self.allow_external_commands,
2744                nonce_store: ec.nonce_store.clone(),
2745                trash_backend: ec.trash_backend.clone(),
2746                #[cfg(all(unix, feature = "subprocess"))]
2747                terminal_state: ec.terminal_state.clone(),
2748                dispatcher: self.dispatcher(),
2749                // Use ec.cancel (set by dispatch_command from the runner's
2750                // ctx.cancel) so any builtin-swapped child token (e.g. timeout's
2751                // child token) reaches the spawned external via wait_or_kill.
2752                // Falls back to the kernel's own token when ec.cancel is the
2753                // default fresh token from a non-dispatch path.
2754                cancel: ec.cancel.clone(),
2755                output_format: None,
2756                vfs_budget: self.vfs_budget.clone(),
2757                watchdog: ec.watchdog.clone(),
2758                #[cfg(all(feature = "localfs", feature = "overlay"))]
2759                overlay_handle: self.overlay_handle.clone(),
2760            }
2761        }; // both locks released — tool.execute can re-dispatch safely
2762
2763        // Move stdin out of self.exec_ctx into the snapshot (consumed-by-tool
2764        // semantics): take() so a later dispatch doesn't see stale stdin.
2765        // Done after the snapshot above so we hold the write briefly.
2766        {
2767            let mut ec = self.exec_ctx.write().await;
2768            ctx.stdin = ec.stdin.take();
2769            ctx.stdin_data = ec.stdin_data.take();
2770            ctx.pipe_stdin = ec.pipe_stdin.take();
2771            ctx.pipe_stdout = ec.pipe_stdout.take();
2772        }
2773
2774        // Honor --json before the builtin runs so its setting survives a clap
2775        // parse failure (e.g. `cmd --json --bogus-flag` would otherwise drop
2776        // --json on the floor when `try_parse_from` returns Err early).
2777        // The builtin's own `parsed.global.apply(ctx)` becomes idempotent.
2778        GlobalFlags::apply_from_args(&tool_args, &mut ctx);
2779
2780        let result = tool.execute(tool_args, &mut ctx).await;
2781
2782        // Sync mutations back. Tools may have changed scope (set/cd),
2783        // cwd/prev_cwd (cd), and aliases (alias). Also return any unused pipe
2784        // endpoints to self.exec_ctx so dispatch_command's post-execute sync
2785        // hands them back to the pipeline runner — the runner uses
2786        // stage_ctx.pipe_stdout to write the result to the next stage when
2787        // the tool itself didn't take and write to it.
2788        {
2789            let mut scope = self.scope.write().await;
2790            *scope = ctx.scope.clone();
2791        }
2792        {
2793            let mut ec = self.exec_ctx.write().await;
2794            ec.cwd = ctx.cwd;
2795            ec.prev_cwd = ctx.prev_cwd;
2796            ec.aliases = ctx.aliases;
2797            // A builtin (`set -o output-limit`, `kaish-output-limit set`) can
2798            // mutate the runtime output limit; without this sync the change is
2799            // dropped here and never reaches dispatch_command's read-back, so
2800            // it would not survive past the current statement.
2801            ec.output_limit = ctx.output_limit.clone();
2802            ec.pipe_stdin = ctx.pipe_stdin.take();
2803            ec.pipe_stdout = ctx.pipe_stdout.take();
2804        }
2805
2806        // Builtins parse --json via the GlobalFlags flatten in their clap
2807        // struct and write ctx.output_format. The kernel applies it — unless the
2808        // tool owns its own output (renders --json itself), in which case we
2809        // leave its bytes untouched.
2810        let result = finalize_output(result, ctx.output_format, schema.owns_output);
2811
2812        Ok(result)
2813    }
2814
2815    /// The session `HOME` from the kernel scope, if set. Tilde expansion reads
2816    /// this rather than `std::env::var("HOME")` so the kernel stays hermetic —
2817    /// a hermetic embedder (empty `initial_vars`) gets `None`, and `~` is left
2818    /// unexpanded rather than leaking the host home directory.
2819    async fn scope_home(&self) -> Option<String> {
2820        match self.scope.read().await.get("HOME") {
2821            Some(Value::String(s)) => Some(s.clone()),
2822            _ => None,
2823        }
2824    }
2825
2826    /// Pull `consumes` positional args after a non-bool flag and stash them
2827    /// on `tool_args.named` under the canonical param name.
2828    ///
2829    /// - `consumes == 1` keeps the historical contract: a single scalar value.
2830    /// - `consumes > 1` accumulates each occurrence as an inner
2831    ///   `serde_json::Value::Array` inside `named[canonical] =
2832    ///   Value::Json(Array(...))`, preserving invocation order. This is the
2833    ///   shape jq's `--arg NAME VAL` / `--argjson NAME VAL` land in.
2834    ///
2835    /// Errors loudly if the flag is missing required positionals — matches
2836    /// kaish's "no silent fallback" posture and mirrors real jq, which
2837    /// errors on `--arg NAME` with no value.
2838    #[allow(clippy::too_many_arguments)]
2839    async fn consume_flag_positionals(
2840        &self,
2841        args: &[Arg],
2842        flag_name: &str,
2843        canonical: &str,
2844        consumes: usize,
2845        positional_indices: &[usize],
2846        consumed: &mut std::collections::HashSet<usize>,
2847        current_idx: usize,
2848        tool_args: &mut ToolArgs,
2849    ) -> Result<()> {
2850        let home = self.scope_home().await;
2851        let mut collected: Vec<Value> = Vec::with_capacity(consumes.max(1));
2852        for _ in 0..consumes.max(1) {
2853            let next_pos = positional_indices
2854                .iter()
2855                .find(|idx| **idx > current_idx && !consumed.contains(idx))
2856                .copied();
2857            match next_pos {
2858                Some(pos_idx) => {
2859                    if let Arg::Positional(expr) = &args[pos_idx] {
2860                        let value = self.eval_expr_async(expr).await?;
2861                        let value = apply_tilde_expansion(value, home.as_deref());
2862                        collected.push(value);
2863                        consumed.insert(pos_idx);
2864                    }
2865                }
2866                None => {
2867                    if consumes <= 1 && collected.is_empty() {
2868                        // Back-compat: a flag with no follow-up positional
2869                        // becomes a bare flag. `--path` with nothing after
2870                        // lands in `flags`, same as before this refactor.
2871                        tool_args.flags.insert(flag_name.to_string());
2872                        return Ok(());
2873                    }
2874                    anyhow::bail!(
2875                        "--{flag_name} requires {consumes} argument{}, got {}",
2876                        if consumes == 1 { "" } else { "s" },
2877                        collected.len()
2878                    );
2879                }
2880            }
2881        }
2882
2883        if consumes <= 1 {
2884            if let Some(v) = collected.pop() {
2885                tool_args.named.insert(canonical.to_string(), v);
2886            }
2887            return Ok(());
2888        }
2889
2890        // Multi-consume: accumulate under named[canonical] as array-of-arrays.
2891        let occ: Vec<serde_json::Value> = collected
2892            .into_iter()
2893            .map(|v| crate::interpreter::value_to_json(&v))
2894            .collect();
2895        let entry = tool_args
2896            .named
2897            .entry(canonical.to_string())
2898            .or_insert_with(|| Value::Json(serde_json::Value::Array(Vec::new())));
2899        if let Value::Json(serde_json::Value::Array(outer)) = entry {
2900            outer.push(serde_json::Value::Array(occ));
2901        } else {
2902            anyhow::bail!(
2903                "--{flag_name}: named[{canonical}] already holds a non-array value"
2904            );
2905        }
2906        Ok(())
2907    }
2908
2909    /// Build tool arguments from AST args.
2910    ///
2911    /// Uses async evaluation to support command substitution in arguments.
2912    async fn build_args_async(&self, args: &[Arg], schema: Option<&crate::tools::ToolSchema>) -> Result<ToolArgs> {
2913        let mut tool_args = ToolArgs::new();
2914        let home = self.scope_home().await;
2915        // Subcommand-aware tools (e.g. `kj context list`) expose a tree of
2916        // schemas; pick the leaf the leading positionals route to and bind
2917        // flags against *its* params. Flat tools return the root. select_leaf
2918        // errors (fail loud) if a computed positional sits where a subcommand
2919        // selector is required.
2920        let leaf = match schema {
2921            Some(s) => Some(select_leaf(s, args)?),
2922            None => None,
2923        };
2924        // Bind against the leaf's params, but MERGE the root schema's params on
2925        // top as "global" flags: a value-flag declared at the tool's top level
2926        // (e.g. kj's `--confirm <nonce>`) must bind at every leaf, including when
2927        // it trails the subcommand path (`kj context retag a b --confirm <n>`).
2928        // The leaf wins on name conflicts. For a flat tool, leaf == root, so the
2929        // merge is a harmless no-op.
2930        let mut param_lookup = schema.map(schema_param_lookup).unwrap_or_default();
2931        if let Some(l) = leaf {
2932            param_lookup.extend(schema_param_lookup(l));
2933        }
2934        // accepts_word_assign keys off the root tool name (the WORD_ASSIGN list),
2935        // not the leaf — it's a property of the command, not the subcommand.
2936        let accepts_word_assign = schema
2937            .map(|s| crate::tools::accepts_word_assign(s.name.as_str()))
2938            .unwrap_or(false);
2939
2940        // Track which positional indices have been consumed as flag values
2941        let mut consumed: std::collections::HashSet<usize> = std::collections::HashSet::new();
2942        let mut past_double_dash = false;
2943
2944        // Find positional arg indices for flag value consumption
2945        let positional_indices: Vec<usize> = args.iter().enumerate()
2946            .filter_map(|(i, a)| matches!(a, Arg::Positional(_)).then_some(i))
2947            .collect();
2948
2949        let mut i = 0;
2950        while i < args.len() {
2951            match &args[i] {
2952                Arg::DoubleDash => {
2953                    past_double_dash = true;
2954                }
2955                Arg::Positional(expr) => {
2956                    if !consumed.contains(&i) {
2957                        // Glob expansion: bare glob patterns expand to matching files
2958                        if let Expr::GlobPattern(pattern) = expr {
2959                            let glob_enabled = {
2960                                let scope = self.scope.read().await;
2961                                scope.glob_enabled()
2962                            };
2963                            if glob_enabled {
2964                                let (paths, cwd) = {
2965                                    let ctx = self.exec_ctx.read().await;
2966                                    let paths = ctx.expand_glob(pattern).await
2967                                        .map_err(|e| anyhow::anyhow!("glob: {}", e))?;
2968                                    let cwd = ctx.resolve_path(".");
2969                                    (paths, cwd)
2970                                };
2971                                if paths.is_empty() {
2972                                    return Err(anyhow::anyhow!("no matches: {}", pattern));
2973                                }
2974                                for path in paths {
2975                                    let display = if !pattern.starts_with('/') {
2976                                        path.strip_prefix(&cwd)
2977                                            .unwrap_or(&path)
2978                                            .to_string_lossy().into_owned()
2979                                    } else {
2980                                        path.to_string_lossy().into_owned()
2981                                    };
2982                                    tool_args.positional.push(Value::String(display));
2983                                }
2984                                i += 1;
2985                                continue;
2986                            }
2987                        }
2988                        let value = self.eval_expr_async(expr).await?;
2989                        let value = apply_tilde_expansion(value, home.as_deref());
2990                        tool_args.positional.push(value);
2991                    }
2992                }
2993                Arg::Named { key, value } => {
2994                    let val = self.eval_expr_async(value).await?;
2995                    let val = apply_tilde_expansion(val, home.as_deref());
2996                    tool_args.named.insert(key.clone(), val);
2997                }
2998                Arg::WordAssign { key, value } => {
2999                    let val = self.eval_expr_async(value).await?;
3000                    let val = apply_tilde_expansion(val, home.as_deref());
3001                    if accepts_word_assign {
3002                        tool_args.named.insert(key.clone(), val);
3003                    } else {
3004                        // Stringify "key=value" and pass as a positional.
3005                        // Matches bash: `cat foo=bar` opens a file named `foo=bar`.
3006                        let val_str = crate::interpreter::value_to_string(&val);
3007                        tool_args.positional.push(Value::String(format!("{key}={val_str}")));
3008                    }
3009                }
3010                Arg::ShortFlag(name) => {
3011                    if past_double_dash {
3012                        tool_args.positional.push(Value::String(format!("-{name}")));
3013                    } else if name.len() == 1 {
3014                        let flag_name = name.as_str();
3015                        let lookup = param_lookup.get(flag_name);
3016                        let is_bool = lookup.map(|(_, typ, _)| is_bool_type(typ)).unwrap_or(true);
3017
3018                        if is_bool {
3019                            tool_args.flags.insert(flag_name.to_string());
3020                        } else {
3021                            // Non-bool: consume `consumes` positionals as value(s)
3022                            let canonical = lookup.map(|(n, _, _)| *n).unwrap_or(flag_name);
3023                            let consumes = lookup.map(|(_, _, c)| *c).unwrap_or(1);
3024                            self.consume_flag_positionals(
3025                                args,
3026                                name,
3027                                canonical,
3028                                consumes,
3029                                &positional_indices,
3030                                &mut consumed,
3031                                i,
3032                                &mut tool_args,
3033                            )
3034                            .await?;
3035                        }
3036                    } else if let Some(&(canonical, typ, consumes)) = param_lookup.get(name.as_str()) {
3037                        // Multi-char short flag matches a schema param (POSIX style: -name value)
3038                        if is_bool_type(typ) {
3039                            tool_args.flags.insert(canonical.to_string());
3040                        } else {
3041                            self.consume_flag_positionals(
3042                                args,
3043                                name,
3044                                canonical,
3045                                consumes,
3046                                &positional_indices,
3047                                &mut consumed,
3048                                i,
3049                                &mut tool_args,
3050                            )
3051                            .await?;
3052                        }
3053                    } else {
3054                        // Multi-char combined flags like -la: always boolean
3055                        for c in name.chars() {
3056                            tool_args.flags.insert(c.to_string());
3057                        }
3058                    }
3059                }
3060                Arg::LongFlag(name) => {
3061                    if past_double_dash {
3062                        tool_args.positional.push(Value::String(format!("--{name}")));
3063                    } else {
3064                        let lookup = param_lookup.get(name.as_str());
3065                        // An *undeclared* long flag under a `map_positionals`
3066                        // (backend/MCP) schema that is immediately followed by an
3067                        // unconsumed positional is ambiguous: kaish can't tell the
3068                        // space-form value (`--type explorer`) from a bool flag
3069                        // before a real positional (`--force file.txt`). Defaulting
3070                        // to bool here silently divorces the value and misroutes it
3071                        // — a privilege-escalation-by-typo against deny-by-default
3072                        // embedders (docs/issues.md). Fail loud instead of guessing.
3073                        let ambiguous_value = (lookup.is_none()
3074                            && leaf.is_some_and(|s| s.map_positionals)
3075                            && !consumed.contains(&(i + 1)))
3076                            .then(|| match args.get(i + 1) {
3077                                // Echo a concrete value for a copy-pasteable fix
3078                                // when it's a plain literal; fall back to VALUE.
3079                                Some(Arg::Positional(Expr::Literal(Value::String(s)))) => {
3080                                    Some(s.clone())
3081                                }
3082                                Some(Arg::Positional(_)) => Some("VALUE".to_string()),
3083                                _ => None,
3084                            })
3085                            .flatten();
3086                        if let Some(val) = ambiguous_value {
3087                            let tool = leaf.map(|s| s.name.as_str()).unwrap_or("command");
3088                            anyhow::bail!(
3089                                "{tool}: --{name} is not a declared flag, so the \
3090                                 space-separated value would be silently dropped. \
3091                                 Use --{name}={val}, or have {tool} declare --{name} \
3092                                 in its schema."
3093                            );
3094                        }
3095                        let is_bool = lookup.map(|(_, typ, _)| is_bool_type(typ)).unwrap_or(true);
3096
3097                        if is_bool {
3098                            tool_args.flags.insert(name.clone());
3099                        } else {
3100                            let canonical = lookup.map(|(n, _, _)| *n).unwrap_or(name.as_str());
3101                            let consumes = lookup.map(|(_, _, c)| *c).unwrap_or(1);
3102                            self.consume_flag_positionals(
3103                                args,
3104                                name,
3105                                canonical,
3106                                consumes,
3107                                &positional_indices,
3108                                &mut consumed,
3109                                i,
3110                                &mut tool_args,
3111                            )
3112                            .await?;
3113                        }
3114                    }
3115                }
3116            }
3117            i += 1;
3118        }
3119
3120        // Map remaining positionals to unfilled non-bool schema params (in order).
3121        // This enables `drift_push "abc" "hello"` → named["target_ctx"] = "abc", named["content"] = "hello"
3122        // Positionals that appeared after `--` are never mapped (they're raw data).
3123        // Only for backend/external tools (map_positionals=true). Builtins handle their own positionals.
3124        // Keyed off the routed leaf so a subcommand tool maps against the active
3125        // leaf's params (kj leaves keep map_positionals=false → block skipped).
3126        if let Some(schema) = leaf.filter(|s| s.map_positionals) {
3127            let pre_dash_count = if past_double_dash {
3128                let dash_pos = args.iter().position(|a| matches!(a, Arg::DoubleDash)).unwrap_or(args.len());
3129                positional_indices.iter()
3130                    .filter(|idx| **idx < dash_pos && !consumed.contains(idx))
3131                    .count()
3132            } else {
3133                tool_args.positional.len()
3134            };
3135
3136            let mut remaining = Vec::new();
3137            let mut positional_iter = tool_args.positional.drain(..).enumerate();
3138
3139            for param in &schema.params {
3140                if tool_args.named.contains_key(&param.name) || tool_args.flags.contains(&param.name) {
3141                    continue;
3142                }
3143                if is_bool_type(&param.param_type) {
3144                    continue;
3145                }
3146                loop {
3147                    match positional_iter.next() {
3148                        Some((idx, val)) if idx < pre_dash_count => {
3149                            tool_args.named.insert(param.name.clone(), val);
3150                            break;
3151                        }
3152                        Some((_, val)) => {
3153                            remaining.push(val);
3154                        }
3155                        None => break,
3156                    }
3157                }
3158            }
3159
3160            remaining.extend(positional_iter.map(|(_, v)| v));
3161            tool_args.positional = remaining;
3162        }
3163
3164        Ok(tool_args)
3165    }
3166
3167    /// Build arguments as flat string list for external commands.
3168    ///
3169    /// Unlike `build_args_async` which separates flags into a HashSet (for schema-aware builtins),
3170    /// this preserves the original flag format as strings for external commands:
3171    /// - `-l` stays as `-l`
3172    /// - `--verbose` stays as `--verbose`
3173    /// - `key=value` stays as `key=value`
3174    ///
3175    /// This is what external commands expect in their argv.
3176    #[cfg(feature = "subprocess")]
3177    async fn build_args_flat(&self, args: &[Arg]) -> Result<Vec<String>> {
3178        let mut argv = Vec::new();
3179        let home = self.scope_home().await;
3180        for arg in args {
3181            match arg {
3182                Arg::Positional(expr) => {
3183                    // Glob expansion for external commands
3184                    if let Expr::GlobPattern(pattern) = expr {
3185                        let glob_enabled = {
3186                            let scope = self.scope.read().await;
3187                            scope.glob_enabled()
3188                        };
3189                        if glob_enabled {
3190                            let (paths, cwd) = {
3191                                let ctx = self.exec_ctx.read().await;
3192                                let paths = ctx.expand_glob(pattern).await
3193                                    .map_err(|e| anyhow::anyhow!("glob: {}", e))?;
3194                                let cwd = ctx.resolve_path(".");
3195                                (paths, cwd)
3196                            };
3197                            if paths.is_empty() {
3198                                return Err(anyhow::anyhow!("no matches: {}", pattern));
3199                            }
3200                            for path in paths {
3201                                let display = if !pattern.starts_with('/') {
3202                                    path.strip_prefix(&cwd)
3203                                        .unwrap_or(&path)
3204                                        .to_string_lossy().into_owned()
3205                                } else {
3206                                    path.to_string_lossy().into_owned()
3207                                };
3208                                argv.push(display);
3209                            }
3210                            continue;
3211                        }
3212                    }
3213                    let value = self.eval_expr_async(expr).await?;
3214                    let value = apply_tilde_expansion(value, home.as_deref());
3215                    argv.push(value_to_string(&value));
3216                }
3217                Arg::Named { key, value } => {
3218                    let val = self.eval_expr_async(value).await?;
3219                    let val = apply_tilde_expansion(val, home.as_deref());
3220                    argv.push(format!("--{}={}", key, value_to_string(&val)));
3221                }
3222                Arg::WordAssign { key, value } => {
3223                    let val = self.eval_expr_async(value).await?;
3224                    let val = apply_tilde_expansion(val, home.as_deref());
3225                    argv.push(format!("{}={}", key, value_to_string(&val)));
3226                }
3227                Arg::ShortFlag(name) => {
3228                    // Preserve original format: -l, -la (combined flags)
3229                    argv.push(format!("-{}", name));
3230                }
3231                Arg::LongFlag(name) => {
3232                    // Preserve original format: --verbose
3233                    argv.push(format!("--{}", name));
3234                }
3235                Arg::DoubleDash => {
3236                    // Preserve the -- marker
3237                    argv.push("--".to_string());
3238                }
3239            }
3240        }
3241        Ok(argv)
3242    }
3243
3244    /// Async expression evaluator that supports command substitution.
3245    ///
3246    /// This is used for contexts where expressions may contain `$(...)` command
3247    /// substitution. Unlike the sync `eval_expr`, this can execute pipelines.
3248    fn eval_expr_async<'a>(&'a self, expr: &'a Expr) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Value>> + Send + 'a>> {
3249        Box::pin(async move {
3250        match expr {
3251            Expr::Literal(value) => Ok(value.clone()),
3252            Expr::VarRef(path) => {
3253                let scope = self.scope.read().await;
3254                scope.resolve_path(path)
3255                    .ok_or_else(|| anyhow::anyhow!("undefined variable"))
3256            }
3257            Expr::Interpolated(parts) => {
3258                let mut result = String::new();
3259                for part in parts {
3260                    result.push_str(&self.eval_string_part_async(part).await?);
3261                }
3262                Ok(Value::String(result))
3263            }
3264            Expr::HereDocBody { parts, strip_tabs } => {
3265                let mut result = String::new();
3266                for sp in parts {
3267                    result.push_str(&self.eval_string_part_async(&sp.part).await?);
3268                }
3269                if *strip_tabs {
3270                    Ok(Value::String(crate::interpreter::strip_leading_tabs(&result)))
3271                } else {
3272                    Ok(Value::String(result))
3273                }
3274            }
3275            Expr::BinaryOp { left, op, right } => match op {
3276                BinaryOp::And => {
3277                    let left_val = self.eval_expr_async(left).await?;
3278                    if !is_truthy(&left_val) {
3279                        return Ok(left_val);
3280                    }
3281                    self.eval_expr_async(right).await
3282                }
3283                BinaryOp::Or => {
3284                    let left_val = self.eval_expr_async(left).await?;
3285                    if is_truthy(&left_val) {
3286                        return Ok(left_val);
3287                    }
3288                    self.eval_expr_async(right).await
3289                }
3290            },
3291            Expr::CommandSubst(stmts) => {
3292                // Snapshot scope+cwd before running — only output escapes,
3293                // not side effects like `cd` or variable assignments.
3294                let saved_scope = { self.scope.read().await.clone() };
3295                let saved_cwd = {
3296                    let ec = self.exec_ctx.read().await;
3297                    (ec.cwd.clone(), ec.prev_cwd.clone())
3298                };
3299
3300                // Capture result without `?` — restore state unconditionally
3301                let run_result = self.execute_block_capturing(stmts).await;
3302
3303                // Restore scope and cwd regardless of success/failure
3304                {
3305                    let mut scope = self.scope.write().await;
3306                    *scope = saved_scope;
3307                    if let Ok(ref r) = run_result {
3308                        scope.set_last_result(r.clone());
3309                    }
3310                }
3311                {
3312                    let mut ec = self.exec_ctx.write().await;
3313                    ec.cwd = saved_cwd.0;
3314                    ec.prev_cwd = saved_cwd.1;
3315                }
3316
3317                // Now propagate the error
3318                let result = run_result?;
3319
3320                // A binary result is preserved as bytes — never lossy-decoded to
3321                // a string. No trailing-newline trim (every byte is significant).
3322                if let Some(bytes) = result.out_bytes() {
3323                    Ok(Value::Bytes(bytes.to_vec()))
3324                // Prefer structured data (enables `for i in $(cmd)` iteration)
3325                } else if let Some(data) = &result.data {
3326                    Ok(data.clone())
3327                } else if let Some(output) = result.output() {
3328                    // Flat non-text node lists (glob, ls, tree) → iterable array
3329                    if output.is_flat() && !output.is_simple_text() && !output.root.is_empty() {
3330                        let items: Vec<serde_json::Value> = output.root.iter()
3331                            .map(|n| serde_json::Value::String(n.display_name().to_string()))
3332                            .collect();
3333                        Ok(Value::Json(serde_json::Value::Array(items)))
3334                    } else {
3335                        Ok(Value::String(result.text_out().trim_end().to_string()))
3336                    }
3337                } else {
3338                    // Otherwise return stdout as single string (NO implicit splitting)
3339                    Ok(Value::String(result.text_out().trim_end().to_string()))
3340                }
3341            }
3342            Expr::Test(test_expr) => {
3343                Ok(Value::Bool(self.eval_test_async(test_expr).await?))
3344            }
3345            Expr::Positional(n) => {
3346                let scope = self.scope.read().await;
3347                match scope.get_positional(*n) {
3348                    Some(s) => Ok(Value::String(s.to_string())),
3349                    None => Ok(Value::String(String::new())),
3350                }
3351            }
3352            Expr::AllArgs => {
3353                let scope = self.scope.read().await;
3354                Ok(Value::String(scope.all_args().join(" ")))
3355            }
3356            Expr::ArgCount => {
3357                let scope = self.scope.read().await;
3358                Ok(Value::Int(scope.arg_count() as i64))
3359            }
3360            Expr::VarLength(name) => {
3361                let scope = self.scope.read().await;
3362                match scope.get(name) {
3363                    Some(value) => Ok(Value::Int(value_to_string(value).len() as i64)),
3364                    None => Ok(Value::Int(0)),
3365                }
3366            }
3367            Expr::VarWithDefault { name, default } => {
3368                let scope = self.scope.read().await;
3369                let use_default = match scope.get(name) {
3370                    Some(value) => value_to_string(value).is_empty(),
3371                    None => true,
3372                };
3373                drop(scope); // Release the lock before recursive evaluation
3374                if use_default {
3375                    // Evaluate the default parts (supports nested expansions)
3376                    self.eval_string_parts_async(default).await.map(Value::String)
3377                } else {
3378                    let scope = self.scope.read().await;
3379                    scope.get(name).cloned().ok_or_else(|| anyhow::anyhow!("variable '{}' not found", name))
3380                }
3381            }
3382            Expr::Arithmetic(expr_str) => {
3383                let scope = self.scope.read().await;
3384                crate::arithmetic::eval_arithmetic(expr_str, &scope)
3385                    .map(Value::Int)
3386                    .map_err(|e| anyhow::anyhow!("arithmetic error: {}", e))
3387            }
3388            Expr::Command(cmd) => {
3389                // Execute command and return boolean based on exit code
3390                let result = self.execute_command(&cmd.name, &cmd.args).await?;
3391                Ok(Value::Bool(result.code == 0))
3392            }
3393            Expr::LastExitCode => {
3394                let scope = self.scope.read().await;
3395                Ok(Value::Int(scope.last_result().code))
3396            }
3397            Expr::CurrentPid => {
3398                let scope = self.scope.read().await;
3399                Ok(Value::Int(scope.pid() as i64))
3400            }
3401            Expr::GlobPattern(s) => Ok(Value::String(s.clone())),
3402        }
3403        })
3404    }
3405
3406    /// Async helper to evaluate multiple StringParts into a single string.
3407    fn eval_string_parts_async<'a>(&'a self, parts: &'a [StringPart]) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<String>> + Send + 'a>> {
3408        Box::pin(async move {
3409            let mut result = String::new();
3410            for part in parts {
3411                result.push_str(&self.eval_string_part_async(part).await?);
3412            }
3413            Ok(result)
3414        })
3415    }
3416
3417    /// Async helper to evaluate a StringPart.
3418    /// Evaluate a `[[ ]]` test expression asynchronously, routing file tests
3419    /// through the VFS backend instead of using raw `std::path`.
3420    fn eval_test_async<'a>(&'a self, test_expr: &'a TestExpr) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<bool>> + Send + 'a>> {
3421        Box::pin(async move {
3422            match test_expr {
3423                TestExpr::FileTest { op, path } => {
3424                    let path_value = self.eval_expr_async(path).await?;
3425                    let path_str = value_to_string(&path_value);
3426                    let backend = self.exec_ctx.read().await.backend.clone();
3427                    let entry = backend.stat(std::path::Path::new(&path_str)).await.ok();
3428                    Ok(match op {
3429                        FileTestOp::Exists => entry.is_some(),
3430                        FileTestOp::IsFile => entry.as_ref().is_some_and(|e| e.is_file()),
3431                        FileTestOp::IsDir => entry.as_ref().is_some_and(|e| e.is_dir()),
3432                        FileTestOp::Readable => entry.is_some(),
3433                        FileTestOp::Writable => entry.as_ref().is_some_and(|e| {
3434                            e.permissions.is_none_or(|p| p & 0o222 != 0)
3435                        }),
3436                        FileTestOp::Executable => entry.as_ref().is_some_and(|e| {
3437                            e.permissions.is_some_and(|p| p & 0o111 != 0)
3438                        }),
3439                    })
3440                }
3441                TestExpr::StringTest { op, value } => {
3442                    let val = self.eval_expr_async(value).await?;
3443                    let s = value_to_string(&val);
3444                    Ok(match op {
3445                        crate::ast::StringTestOp::IsEmpty => s.is_empty(),
3446                        crate::ast::StringTestOp::IsNonEmpty => !s.is_empty(),
3447                    })
3448                }
3449                TestExpr::Comparison { left, op, right } => {
3450                    // Evaluate operands async (handles $(cmd)), then compare sync
3451                    let left_val = self.eval_expr_async(left).await?;
3452                    let right_val = self.eval_expr_async(right).await?;
3453                    let resolved = TestExpr::Comparison {
3454                        left: Box::new(Expr::Literal(left_val)),
3455                        op: *op,
3456                        right: Box::new(Expr::Literal(right_val)),
3457                    };
3458                    let expr = Expr::Test(Box::new(resolved));
3459                    let mut scope = self.scope.write().await;
3460                    let value = eval_expr(&expr, &mut scope)
3461                        .map_err(|e| anyhow::anyhow!("{}", e))?;
3462                    Ok(value_to_bool(&value))
3463                }
3464                TestExpr::And { left, right } => {
3465                    if !self.eval_test_async(left).await? {
3466                        Ok(false)
3467                    } else {
3468                        self.eval_test_async(right).await
3469                    }
3470                }
3471                TestExpr::Or { left, right } => {
3472                    if self.eval_test_async(left).await? {
3473                        Ok(true)
3474                    } else {
3475                        self.eval_test_async(right).await
3476                    }
3477                }
3478                TestExpr::Not { expr } => {
3479                    Ok(!self.eval_test_async(expr).await?)
3480                }
3481            }
3482        })
3483    }
3484
3485    fn eval_string_part_async<'a>(&'a self, part: &'a StringPart) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<String>> + Send + 'a>> {
3486        Box::pin(async move {
3487            match part {
3488                StringPart::Literal(s) => Ok(s.clone()),
3489                StringPart::Var(path) => {
3490                    let scope = self.scope.read().await;
3491                    match scope.resolve_path(path) {
3492                        Some(value) => Ok(value_to_string(&value)),
3493                        None => Ok(String::new()), // Unset vars expand to empty
3494                    }
3495                }
3496                StringPart::VarWithDefault { name, default } => {
3497                    let scope = self.scope.read().await;
3498                    let use_default = match scope.get(name) {
3499                        Some(value) => value_to_string(value).is_empty(),
3500                        None => true,
3501                    };
3502                    drop(scope); // Release lock before recursive evaluation
3503                    if use_default {
3504                        // Evaluate the default parts (supports nested expansions)
3505                        self.eval_string_parts_async(default).await
3506                    } else {
3507                        let scope = self.scope.read().await;
3508                        Ok(value_to_string(scope.get(name).ok_or_else(|| anyhow::anyhow!("variable '{}' not found", name))?))
3509                    }
3510                }
3511            StringPart::VarLength(name) => {
3512                let scope = self.scope.read().await;
3513                match scope.get(name) {
3514                    Some(value) => Ok(value_to_string(value).len().to_string()),
3515                    None => Ok("0".to_string()),
3516                }
3517            }
3518            StringPart::Positional(n) => {
3519                let scope = self.scope.read().await;
3520                match scope.get_positional(*n) {
3521                    Some(s) => Ok(s.to_string()),
3522                    None => Ok(String::new()),
3523                }
3524            }
3525            StringPart::AllArgs => {
3526                let scope = self.scope.read().await;
3527                Ok(scope.all_args().join(" "))
3528            }
3529            StringPart::ArgCount => {
3530                let scope = self.scope.read().await;
3531                Ok(scope.arg_count().to_string())
3532            }
3533            StringPart::Arithmetic(expr) => {
3534                let scope = self.scope.read().await;
3535                match crate::arithmetic::eval_arithmetic(expr, &scope) {
3536                    Ok(value) => Ok(value.to_string()),
3537                    Err(_) => Ok(String::new()),
3538                }
3539            }
3540            StringPart::CommandSubst(stmts) => {
3541                // Snapshot scope+cwd — command substitution in strings must
3542                // not leak side effects (e.g., `"dir: $(cd /; pwd)"` must not change cwd).
3543                let saved_scope = { self.scope.read().await.clone() };
3544                let saved_cwd = {
3545                    let ec = self.exec_ctx.read().await;
3546                    (ec.cwd.clone(), ec.prev_cwd.clone())
3547                };
3548
3549                // Capture result without `?` — restore state unconditionally
3550                let run_result = self.execute_block_capturing(stmts).await;
3551
3552                // Restore scope and cwd regardless of success/failure
3553                {
3554                    let mut scope = self.scope.write().await;
3555                    *scope = saved_scope;
3556                    if let Ok(ref r) = run_result {
3557                        scope.set_last_result(r.clone());
3558                    }
3559                }
3560                {
3561                    let mut ec = self.exec_ctx.write().await;
3562                    ec.cwd = saved_cwd.0;
3563                    ec.prev_cwd = saved_cwd.1;
3564                }
3565
3566                // Now propagate the error
3567                let result = run_result?;
3568
3569                // Embedding binary into a string is a text context: fail loud
3570                // rather than splice in U+FFFD garbage.
3571                match result.try_text_out() {
3572                    Ok(s) => Ok(s.trim_end_matches('\n').to_string()),
3573                    Err(e) => anyhow::bail!(
3574                        "command substitution in a string produced binary data ({e}) — \
3575                         pipe through base64/xxd"
3576                    ),
3577                }
3578            }
3579            StringPart::LastExitCode => {
3580                let scope = self.scope.read().await;
3581                Ok(scope.last_result().code.to_string())
3582            }
3583            StringPart::CurrentPid => {
3584                let scope = self.scope.read().await;
3585                Ok(scope.pid().to_string())
3586            }
3587        }
3588        })
3589    }
3590
3591    /// Update the last result in scope.
3592    async fn update_last_result(&self, result: &ExecResult) {
3593        let mut scope = self.scope.write().await;
3594        scope.set_last_result(result.clone());
3595    }
3596
3597    /// Drain accumulated pipeline stderr into a result.
3598    ///
3599    /// Called after each sub-statement inside control structures (`if`, `for`,
3600    /// `while`, `case`, `&&`, `||`) so that stderr appears incrementally rather
3601    /// than batching until the entire structure finishes.
3602    async fn drain_stderr_into(&self, result: &mut ExecResult) {
3603        let drained = {
3604            let mut receiver = self.stderr_receiver.lock().await;
3605            receiver.drain_lossy()
3606        };
3607        if !drained.is_empty() {
3608            if !result.err.is_empty() && !result.err.ends_with('\n') {
3609                result.err.push('\n');
3610            }
3611            result.err.push_str(&drained);
3612        }
3613    }
3614
3615    /// Execute a user-defined function with local variable scoping.
3616    ///
3617    /// Functions push a new scope frame for local variables. Variables declared
3618    /// with `local` are scoped to the function; other assignments modify outer
3619    /// scopes (or create in root if new).
3620    async fn execute_user_tool(&self, def: ToolDef, args: &[Arg]) -> Result<ExecResult> {
3621        // 1. Build function args from AST args (async to support command substitution)
3622        let tool_args = self.build_args_async(args, None).await?;
3623
3624        // 2. Push a new scope frame for local variables
3625        {
3626            let mut scope = self.scope.write().await;
3627            scope.push_frame();
3628        }
3629
3630        // 3. Save current positional parameters and set new ones for this function
3631        let saved_positional = {
3632            let mut scope = self.scope.write().await;
3633            let saved = scope.save_positional();
3634
3635            // Set up new positional parameters ($0 = function name, $1, $2, ... = args)
3636            let positional_args: Vec<String> = tool_args.positional
3637                .iter()
3638                .map(value_to_string)
3639                .collect();
3640            scope.set_positional(&def.name, positional_args);
3641
3642            saved
3643        };
3644
3645        // 3. Execute body statements with control flow handling
3646        // Accumulate output across statements (like sh)
3647        // Accumulate stdout as raw bytes so a binary-producing statement in a
3648        // function body survives instead of being lossy-decoded here.
3649        let mut accumulated_out: Vec<u8> = Vec::new();
3650        let mut accumulated_err = String::new();
3651        let mut last_code = 0i64;
3652        let mut last_data: Option<Value> = None;
3653
3654        fn push_out(buf: &mut Vec<u8>, r: &ExecResult) {
3655            match r.out_bytes() {
3656                Some(b) => buf.extend_from_slice(b),
3657                None => buf.extend_from_slice(r.text_out().as_bytes()),
3658            }
3659        }
3660
3661        // Track execution error for propagation after cleanup
3662        let mut exec_error: Option<anyhow::Error> = None;
3663        let mut exit_code: Option<i64> = None;
3664
3665        for stmt in &def.body {
3666            match self.execute_stmt_flow(stmt).await {
3667                Ok(flow) => {
3668                    // Drain pipeline stderr after each sub-statement.
3669                    let drained = {
3670                        let mut receiver = self.stderr_receiver.lock().await;
3671                        receiver.drain_lossy()
3672                    };
3673                    if !drained.is_empty() {
3674                        accumulated_err.push_str(&drained);
3675                    }
3676
3677                    match flow {
3678                        ControlFlow::Normal(r) => {
3679                            push_out(&mut accumulated_out, &r);
3680                            accumulated_err.push_str(&r.err);
3681                            last_code = r.code;
3682                            last_data = r.data;
3683                        }
3684                        ControlFlow::Return { value } => {
3685                            push_out(&mut accumulated_out, &value);
3686                            accumulated_err.push_str(&value.err);
3687                            last_code = value.code;
3688                            last_data = value.data;
3689                            break;
3690                        }
3691                        ControlFlow::Exit { code } => {
3692                            exit_code = Some(code);
3693                            break;
3694                        }
3695                        ControlFlow::Break { result: r, .. } | ControlFlow::Continue { result: r, .. } => {
3696                            push_out(&mut accumulated_out, &r);
3697                            accumulated_err.push_str(&r.err);
3698                            last_code = r.code;
3699                            last_data = r.data;
3700                        }
3701                    }
3702                }
3703                Err(e) => {
3704                    exec_error = Some(e);
3705                    break;
3706                }
3707            }
3708        }
3709
3710        // 4. Pop scope frame and restore original positional parameters (unconditionally)
3711        {
3712            let mut scope = self.scope.write().await;
3713            scope.pop_frame();
3714            scope.set_positional(saved_positional.0, saved_positional.1);
3715        }
3716
3717        // 5. Propagate error or exit after cleanup
3718        if let Some(e) = exec_error {
3719            return Err(e);
3720        }
3721        let code = exit_code.unwrap_or(last_code);
3722        let mut result = ExecResult::success_text_or_bytes(accumulated_out).with_code(code);
3723        result.err = accumulated_err;
3724        result.data = last_data;
3725        Ok(result)
3726    }
3727
3728    /// Execute a command-substitution body — a block of statements — and return
3729    /// the combined result. Stdout/stderr accumulate across statements with **no
3730    /// inserted separator** (matching bash and the `;`/`&&`/`||` output model),
3731    /// and the last statement's exit code and structured `.data` ride through,
3732    /// so `for x in $(seq 3)` still iterates the array and `$(printf a; printf b)`
3733    /// captures `ab`. Scope/cwd snapshotting (so `$(cd / && pwd)` cannot leak the
3734    /// cwd) is the caller's responsibility.
3735    async fn execute_block_capturing(&self, stmts: &[Stmt]) -> Result<ExecResult> {
3736        // Accumulate stdout as raw bytes so a binary-producing statement
3737        // (`$(dd …)`, `$(base64 -d …)`) isn't lossy-decoded here before the
3738        // caller can preserve it. The final result is text iff valid UTF-8.
3739        let mut accumulated_out: Vec<u8> = Vec::new();
3740        let mut accumulated_err = String::new();
3741        let mut last_code = 0i64;
3742        let mut last_data: Option<Value> = None;
3743
3744        // Append a statement's stdout as raw bytes (binary) or its UTF-8 bytes.
3745        fn push_out(buf: &mut Vec<u8>, r: &ExecResult) {
3746            match r.out_bytes() {
3747                Some(b) => buf.extend_from_slice(b),
3748                None => buf.extend_from_slice(r.text_out().as_bytes()),
3749            }
3750        }
3751
3752        for stmt in stmts {
3753            let flow = self.execute_stmt_flow(stmt).await?;
3754
3755            // Drain pipeline stderr after each sub-statement (incremental, like
3756            // the control-structure and function-body executors).
3757            let drained = {
3758                let mut receiver = self.stderr_receiver.lock().await;
3759                receiver.drain_lossy()
3760            };
3761            if !drained.is_empty() {
3762                accumulated_err.push_str(&drained);
3763            }
3764
3765            match flow {
3766                ControlFlow::Normal(r)
3767                | ControlFlow::Break { result: r, .. }
3768                | ControlFlow::Continue { result: r, .. } => {
3769                    push_out(&mut accumulated_out, &r);
3770                    accumulated_err.push_str(&r.err);
3771                    last_code = r.code;
3772                    last_data = r.data;
3773                }
3774                ControlFlow::Return { value } => {
3775                    push_out(&mut accumulated_out, &value);
3776                    accumulated_err.push_str(&value.err);
3777                    last_code = value.code;
3778                    last_data = value.data;
3779                    break;
3780                }
3781                ControlFlow::Exit { code } => {
3782                    last_code = code;
3783                    break;
3784                }
3785            }
3786        }
3787
3788        let mut result = ExecResult::success_text_or_bytes(accumulated_out).with_code(last_code);
3789        result.err = accumulated_err;
3790        result.data = last_data;
3791        Ok(result)
3792    }
3793
3794    /// Execute the `source` / `.` command to include and run a script.
3795    ///
3796    /// Unlike regular tool execution, `source` executes in the CURRENT scope,
3797    /// allowing the sourced script to set variables and modify shell state.
3798    async fn execute_source(&self, args: &[Arg]) -> Result<ExecResult> {
3799        // Get the file path from the first positional argument
3800        let tool_args = self.build_args_async(args, None).await?;
3801        let path = match tool_args.positional.first() {
3802            Some(Value::String(s)) => s.clone(),
3803            Some(v) => value_to_string(v),
3804            None => {
3805                return Ok(ExecResult::failure(1, "source: missing filename"));
3806            }
3807        };
3808
3809        // Resolve path relative to cwd
3810        let full_path = {
3811            let ctx = self.exec_ctx.read().await;
3812            if path.starts_with('/') {
3813                std::path::PathBuf::from(&path)
3814            } else {
3815                ctx.cwd.join(&path)
3816            }
3817        };
3818
3819        // Read file content via backend
3820        let content = {
3821            let ctx = self.exec_ctx.read().await;
3822            match ctx.backend.read(&full_path, None).await {
3823                Ok(bytes) => {
3824                    String::from_utf8(bytes).map_err(|e| {
3825                        anyhow::anyhow!("source: {}: invalid UTF-8: {}", path, e)
3826                    })?
3827                }
3828                Err(e) => {
3829                    return Ok(ExecResult::failure(
3830                        1,
3831                        format!("source: {}: {}", path, e),
3832                    ));
3833                }
3834            }
3835        };
3836
3837        // Parse the content
3838        let program = match crate::parser::parse(&content) {
3839            Ok(p) => p,
3840            Err(errors) => {
3841                let msg = errors
3842                    .iter()
3843                    .map(|e| format!("{}:{}: {}", path, e.span.start, e.message))
3844                    .collect::<Vec<_>>()
3845                    .join("\n");
3846                return Ok(ExecResult::failure(1, format!("source: {}", msg)));
3847            }
3848        };
3849
3850        // Execute each statement in the CURRENT scope (not isolated)
3851        let mut result = ExecResult::success("");
3852        for stmt in program.statements {
3853            if matches!(stmt, crate::ast::Stmt::Empty) {
3854                continue;
3855            }
3856
3857            match self.execute_stmt_flow(&stmt).await {
3858                Ok(flow) => {
3859                    self.drain_stderr_into(&mut result).await;
3860                    match flow {
3861                        ControlFlow::Normal(r) => {
3862                            result = r.clone();
3863                            self.update_last_result(&r).await;
3864                        }
3865                        ControlFlow::Break { .. } | ControlFlow::Continue { .. } => {
3866                            return Err(anyhow::anyhow!(
3867                                "source: {}: unexpected break/continue outside loop",
3868                                path
3869                            ));
3870                        }
3871                        ControlFlow::Return { value } => {
3872                            return Ok(value);
3873                        }
3874                        ControlFlow::Exit { code } => {
3875                            result.code = code;
3876                            return Ok(result);
3877                        }
3878                    }
3879                }
3880                Err(e) => {
3881                    return Err(e.context(format!("source: {}", path)));
3882                }
3883            }
3884        }
3885
3886        Ok(result)
3887    }
3888
3889    /// Try to execute a script from PATH directories.
3890    ///
3891    /// Searches PATH for `{name}.kai` files and executes them in isolated scope
3892    /// (like user-defined tools). Returns None if no script is found.
3893    async fn try_execute_script(&self, name: &str, args: &[Arg]) -> Result<Option<ExecResult>> {
3894        // Get PATH from scope (default to "/bin")
3895        let path_value = {
3896            let scope = self.scope.read().await;
3897            scope
3898                .get("PATH")
3899                .map(value_to_string)
3900                .unwrap_or_else(|| "/bin".to_string())
3901        };
3902
3903        // Search PATH directories for script
3904        for dir in path_value.split(':') {
3905            if dir.is_empty() {
3906                continue;
3907            }
3908
3909            // Build script path: {dir}/{name}.kai
3910            let script_path = PathBuf::from(dir).join(format!("{}.kai", name));
3911
3912            // Check if script exists
3913            let exists = {
3914                let ctx = self.exec_ctx.read().await;
3915                ctx.backend.exists(&script_path).await
3916            };
3917
3918            if !exists {
3919                continue;
3920            }
3921
3922            // Read script content
3923            let content = {
3924                let ctx = self.exec_ctx.read().await;
3925                match ctx.backend.read(&script_path, None).await {
3926                    Ok(bytes) => match String::from_utf8(bytes) {
3927                        Ok(s) => s,
3928                        Err(e) => {
3929                            return Ok(Some(ExecResult::failure(
3930                                1,
3931                                format!("{}: invalid UTF-8: {}", script_path.display(), e),
3932                            )));
3933                        }
3934                    },
3935                    Err(e) => {
3936                        return Ok(Some(ExecResult::failure(
3937                            1,
3938                            format!("{}: {}", script_path.display(), e),
3939                        )));
3940                    }
3941                }
3942            };
3943
3944            // Parse the script
3945            let program = match crate::parser::parse(&content) {
3946                Ok(p) => p,
3947                Err(errors) => {
3948                    let msg = errors
3949                        .iter()
3950                        .map(|e| format!("{}:{}: {}", script_path.display(), e.span.start, e.message))
3951                        .collect::<Vec<_>>()
3952                        .join("\n");
3953                    return Ok(Some(ExecResult::failure(1, msg)));
3954                }
3955            };
3956
3957            // Build tool_args from args (async for command substitution support)
3958            let tool_args = self.build_args_async(args, None).await?;
3959
3960            // Create isolated scope (like user tools)
3961            let mut isolated_scope = Scope::new();
3962
3963            // Set up positional parameters ($0 = script name, $1, $2, ... = args)
3964            let positional_args: Vec<String> = tool_args.positional
3965                .iter()
3966                .map(value_to_string)
3967                .collect();
3968            isolated_scope.set_positional(name, positional_args);
3969
3970            // Save current scope and swap with isolated scope
3971            let original_scope = {
3972                let mut scope = self.scope.write().await;
3973                std::mem::replace(&mut *scope, isolated_scope)
3974            };
3975
3976            // Execute script statements — track outcome for cleanup
3977            let mut result = ExecResult::success("");
3978            let mut exec_error: Option<anyhow::Error> = None;
3979            let mut exit_code: Option<i64> = None;
3980
3981            for stmt in program.statements {
3982                if matches!(stmt, crate::ast::Stmt::Empty) {
3983                    continue;
3984                }
3985
3986                match self.execute_stmt_flow(&stmt).await {
3987                    Ok(flow) => {
3988                        match flow {
3989                            ControlFlow::Normal(r) => result = r,
3990                            ControlFlow::Return { value } => {
3991                                result = value;
3992                                break;
3993                            }
3994                            ControlFlow::Exit { code } => {
3995                                exit_code = Some(code);
3996                                break;
3997                            }
3998                            ControlFlow::Break { result: r, .. } | ControlFlow::Continue { result: r, .. } => {
3999                                result = r;
4000                            }
4001                        }
4002                    }
4003                    Err(e) => {
4004                        exec_error = Some(e);
4005                        break;
4006                    }
4007                }
4008            }
4009
4010            // Restore original scope unconditionally
4011            {
4012                let mut scope = self.scope.write().await;
4013                *scope = original_scope;
4014            }
4015
4016            // Propagate error or exit after cleanup
4017            if let Some(e) = exec_error {
4018                return Err(e.context(format!("script: {}", script_path.display())));
4019            }
4020            if let Some(code) = exit_code {
4021                result.code = code;
4022                return Ok(Some(result));
4023            }
4024
4025            return Ok(Some(result));
4026        }
4027
4028        // No script found
4029        Ok(None)
4030    }
4031
4032    /// Try to execute an external command from PATH.
4033    ///
4034    /// This is the fallback when no builtin or user-defined tool matches.
4035    /// External commands receive a clean argv (flags preserved in their original format).
4036    ///
4037    /// # Requirements
4038    /// - Command must be found in PATH
4039    /// - Current working directory must be on a real filesystem (not virtual like /v)
4040    ///
4041    /// # Returns
4042    /// - `Ok(Some(result))` if command was found and executed
4043    /// - `Ok(None)` if command was not found in PATH
4044    /// - `Err` on execution errors
4045    #[cfg(not(feature = "subprocess"))]
4046    async fn try_execute_external(&self, _name: &str, _args: &[Arg]) -> Result<Option<ExecResult>> {
4047        Ok(None)
4048    }
4049
4050    /// Try to execute an external command from PATH.
4051    #[cfg(feature = "subprocess")]
4052    #[tracing::instrument(level = "debug", skip(self, args), fields(command = %name))]
4053    async fn try_execute_external(&self, name: &str, args: &[Arg]) -> Result<Option<ExecResult>> {
4054        // Read the cancel token from `self.exec_ctx`, which `dispatch_command`
4055        // populates from the inbound ctx.cancel on every dispatch. This is
4056        // what makes the `timeout` builtin's swapped child token reach the
4057        // wait_or_kill discipline below — reading `self.cancel_token` would
4058        // give the kernel-wide token and miss the timeout's child cascade.
4059        let cancel = {
4060            let ec = self.exec_ctx.read().await;
4061            ec.cancel.clone()
4062        };
4063        let kill_grace = self.kill_grace;
4064        if !self.allow_external_commands {
4065            return Ok(None);
4066        }
4067
4068        // Get real working directory for relative path resolution and child cwd.
4069        // If the CWD is virtual (no real filesystem path), skip external command
4070        // execution entirely — return None so the dispatch can fall through to
4071        // backend-registered tools.
4072        let real_cwd = {
4073            let ctx = self.exec_ctx.read().await;
4074            match ctx.backend.resolve_real_path(&ctx.cwd) {
4075                Some(p) => p,
4076                None => return Ok(None),
4077            }
4078        };
4079
4080        let executable = if name.contains('/') {
4081            // Resolve relative paths (./script, ../bin/tool) against the shell's cwd
4082            let resolved = if std::path::Path::new(name).is_absolute() {
4083                std::path::PathBuf::from(name)
4084            } else {
4085                real_cwd.join(name)
4086            };
4087            if !resolved.exists() {
4088                return Ok(Some(ExecResult::failure(
4089                    127,
4090                    format!("{}: No such file or directory", name),
4091                )));
4092            }
4093            if !resolved.is_file() {
4094                return Ok(Some(ExecResult::failure(
4095                    126,
4096                    format!("{}: Is a directory", name),
4097                )));
4098            }
4099            #[cfg(unix)]
4100            {
4101                use std::os::unix::fs::PermissionsExt;
4102                let mode = std::fs::metadata(&resolved)
4103                    .map(|m| m.permissions().mode())
4104                    .unwrap_or(0);
4105                if mode & 0o111 == 0 {
4106                    return Ok(Some(ExecResult::failure(
4107                        126,
4108                        format!("{}: Permission denied", name),
4109                    )));
4110                }
4111            }
4112            resolved.to_string_lossy().into_owned()
4113        } else {
4114            // Get PATH from scope or environment
4115            let path_var = {
4116                let scope = self.scope.read().await;
4117                scope
4118                    .get("PATH")
4119                    .map(value_to_string)
4120                    .unwrap_or_else(|| std::env::var("PATH").unwrap_or_default())
4121            };
4122
4123            // Resolve command in PATH
4124            match resolve_in_path(name, &path_var) {
4125                Some(path) => path,
4126                None => return Ok(None), // Not found - let caller handle error
4127            }
4128        };
4129
4130        tracing::debug!(executable = %executable, "resolved external command");
4131
4132        // Build flat argv (preserves flag format)
4133        let argv = self.build_args_flat(args).await?;
4134
4135        // Get stdin if available
4136        let stdin_data = {
4137            let mut ctx = self.exec_ctx.write().await;
4138            ctx.take_stdin()
4139        };
4140
4141        // Build and spawn the command
4142        use tokio::process::Command;
4143
4144        let mut cmd = Command::new(&executable);
4145        cmd.args(&argv);
4146        cmd.current_dir(&real_cwd);
4147
4148        // Hermetic env: child sees only kaish's exported vars, not the kaish
4149        // process's OS env. Frontends that want OS-env passthrough (REPL, MCP)
4150        // populate it via KernelConfig::initial_vars at construction.
4151        cmd.env_clear();
4152        {
4153            let scope = self.scope.read().await;
4154            for (var_name, value) in scope.exported_vars() {
4155                cmd.env(var_name, value_to_string(&value));
4156            }
4157        }
4158
4159        // Handle stdin
4160        cmd.stdin(if stdin_data.is_some() {
4161            std::process::Stdio::piped()
4162        } else if self.interactive {
4163            std::process::Stdio::inherit()
4164        } else {
4165            std::process::Stdio::null()
4166        });
4167
4168        // In interactive mode, standalone or last-in-pipeline commands inherit
4169        // the terminal's stdout/stderr so output streams in real-time.
4170        // First/middle commands must capture stdout for the pipe — same as bash.
4171        let pipeline_position = {
4172            let ctx = self.exec_ctx.read().await;
4173            ctx.pipeline_position
4174        };
4175        let inherit_output = self.interactive
4176            && matches!(pipeline_position, PipelinePosition::Only | PipelinePosition::Last);
4177
4178        if inherit_output {
4179            cmd.stdout(std::process::Stdio::inherit());
4180            cmd.stderr(std::process::Stdio::inherit());
4181        } else {
4182            cmd.stdout(std::process::Stdio::piped());
4183            cmd.stderr(std::process::Stdio::piped());
4184        }
4185
4186        // On Unix, always put the child in its own process group so cancellation
4187        // can `killpg` the whole tree (the child plus any grandchildren).
4188        // Restoring default tty-related signal handlers stays gated on
4189        // job-control mode — those only matter when the child has a controlling
4190        // terminal.
4191        #[cfg(unix)]
4192        {
4193            let restore_jc_signals = self.terminal_state.is_some() && inherit_output;
4194            // SAFETY: setpgid and sigaction(SIG_DFL) are async-signal-safe per POSIX
4195            #[allow(unsafe_code)]
4196            unsafe {
4197                cmd.pre_exec(move || {
4198                    // Own process group — for kill scope.
4199                    nix::unistd::setpgid(nix::unistd::Pid::from_raw(0), nix::unistd::Pid::from_raw(0))
4200                        .map_err(|e| std::io::Error::from_raw_os_error(e as i32))?;
4201                    if restore_jc_signals {
4202                        use nix::libc::{sigaction, SIGTSTP, SIGTTOU, SIGTTIN, SIGINT, SIG_DFL};
4203                        let mut sa: nix::libc::sigaction = std::mem::zeroed();
4204                        sa.sa_sigaction = SIG_DFL;
4205                        if sigaction(SIGTSTP, &sa, std::ptr::null_mut()) != 0 {
4206                            return Err(std::io::Error::last_os_error());
4207                        }
4208                        if sigaction(SIGTTOU, &sa, std::ptr::null_mut()) != 0 {
4209                            return Err(std::io::Error::last_os_error());
4210                        }
4211                        if sigaction(SIGTTIN, &sa, std::ptr::null_mut()) != 0 {
4212                            return Err(std::io::Error::last_os_error());
4213                        }
4214                        if sigaction(SIGINT, &sa, std::ptr::null_mut()) != 0 {
4215                            return Err(std::io::Error::last_os_error());
4216                        }
4217                    }
4218                    Ok(())
4219                });
4220            }
4221        }
4222
4223        // Backstop for kill on drop in case our explicit kill path is bypassed
4224        // (panic, early return, etc) on the **capture** wait path. We do NOT
4225        // set this on the JC inherit path: that uses sync `waitpid` outside
4226        // tokio's view of the child, so on drop tokio would try to kill an
4227        // already-reaped (possibly-reused) PID. The JC path has its own
4228        // cancel handling via the side-task watcher.
4229        let in_jc_inherit_path = inherit_output && self.terminal_state.is_some();
4230        if !in_jc_inherit_path {
4231            cmd.kill_on_drop(true);
4232        }
4233
4234        // Spawn the process. Capture a `KillTarget` immediately so cancel/
4235        // timeout paths can deliver signals via pidfd (Linux ≥ 5.3) — bound
4236        // to this process's generation, immune to PID reuse if the OS reaps
4237        // the child before our kill syscalls fire.
4238        let mut child = match cmd.spawn() {
4239            Ok(child) => child,
4240            Err(e) => {
4241                return Ok(Some(ExecResult::failure(
4242                    127,
4243                    format!("{}: {}", name, e),
4244                )));
4245            }
4246        };
4247        let kill_target = crate::pidfd::KillTarget::from_child(&child);
4248
4249        // If this external runs on behalf of a background job, record its
4250        // process group on the job so `kill -<sig> %N` can signal the real
4251        // process directly (STOP/CONT/USR1/…, not just terminate). The child
4252        // did `setpgid(0, 0)` in pre_exec, so its PGID equals its PID.
4253        if let Some(job_id) = self.bg_job_id
4254            && let Some(pid) = child.id()
4255        {
4256            self.jobs.add_pgid(job_id, pid).await;
4257        }
4258
4259        // Write stdin if present
4260        if let Some(data) = stdin_data
4261            && let Some(mut stdin) = child.stdin.take()
4262        {
4263            use tokio::io::AsyncWriteExt;
4264            if let Err(e) = stdin.write_all(data.as_bytes()).await {
4265                return Ok(Some(ExecResult::failure(
4266                    1,
4267                    format!("{}: failed to write stdin: {}", name, e),
4268                )));
4269            }
4270            // Drop stdin to signal EOF
4271        }
4272
4273        if inherit_output {
4274            // Job control path: use waitpid with WUNTRACED for Ctrl-Z support
4275            #[cfg(unix)]
4276            if let Some(ref term) = self.terminal_state {
4277                let child_id = child.id().unwrap_or(0);
4278                let pid = nix::unistd::Pid::from_raw(child_id as i32);
4279                let pgid = pid; // child is its own pgid leader
4280
4281                // Give the terminal to the child's process group
4282                if let Err(e) = term.give_terminal_to(pgid) {
4283                    tracing::warn!("failed to give terminal to child: {}", e);
4284                }
4285
4286                let term_clone = term.clone();
4287                let cmd_name = name.to_string();
4288                let cmd_display = format!("{} {}", name, argv.join(" "));
4289                let jobs = self.jobs.clone();
4290
4291                // Side task that watches for cancellation while the blocking
4292                // waitpid runs. On cancel, it SIGTERMs the process group, waits
4293                // the grace period, then SIGKILLs. The blocking waitpid returns
4294                // when the child dies. AbortOnDrop guard cancels the watcher
4295                // on the success path so it doesn't keep running after wait
4296                // returns naturally.
4297                //
4298                // `wait_complete` shrinks the PID-reuse race: the watcher
4299                // checks it before each kill syscall and bails out if
4300                // wait_for_foreground has already reaped the child. This
4301                // doesn't fully eliminate the race (atomic load + kill is
4302                // not atomic with the OS reap+reuse), but narrows the window
4303                // to nanoseconds — enough to be ignorable in practice.
4304                let wait_complete = std::sync::Arc::new(
4305                    std::sync::atomic::AtomicBool::new(false)
4306                );
4307                let cancel_watcher = {
4308                    let cancel = cancel.clone();
4309                    let wc = wait_complete.clone();
4310                    // Ownership transfer: the JC path's sync wait inside
4311                    // block_in_place owns the child's reaping, so the
4312                    // cancel_watcher drives the kill side via KillTarget
4313                    // (pidfd-bound on Linux). When kill_target is None
4314                    // (older kernel + open failure, or non-Linux), falls
4315                    // through to the older PID-based path the closure
4316                    // captures from `pid`.
4317                    let target = kill_target.as_ref().map(|t| {
4318                        // Re-borrow the components we need into Owned-ish form
4319                        // so the spawned task is 'static. We can't move
4320                        // KillTarget directly because try_execute_external
4321                        // still uses it after the spawn — but on the JC path
4322                        // there is no further use after the watcher spawn,
4323                        // so a clone-of-pid + owned None pidfd is safe.
4324                        // Simpler: signal via the existing target by cloning
4325                        // a fresh pidfd; the original keeps its handle.
4326                        // Pidfd is just an OwnedFd — not Clone — so do it
4327                        // by re-opening from the pid. Fall back if reopen
4328                        // fails (race already reaped → best-effort kill).
4329                        crate::pidfd::KillTarget::from_pid(t.pid())
4330                    });
4331                    tokio::spawn(async move {
4332                        cancel.cancelled().await;
4333                        if wc.load(std::sync::atomic::Ordering::SeqCst) { return; }
4334                        use nix::sys::signal::Signal;
4335                        if let Some(t) = &target {
4336                            t.signal(Signal::SIGTERM);
4337                            t.signal_pg(Signal::SIGTERM);
4338                        } else {
4339                            let _ = nix::sys::signal::kill(pid, Signal::SIGTERM);
4340                            let _ = nix::sys::signal::killpg(pid, Signal::SIGTERM);
4341                        }
4342                        if kill_grace > Duration::ZERO {
4343                            tokio::time::sleep(kill_grace).await;
4344                            if wc.load(std::sync::atomic::Ordering::SeqCst) { return; }
4345                        }
4346                        if let Some(t) = &target {
4347                            t.signal(Signal::SIGKILL);
4348                            t.signal_pg(Signal::SIGKILL);
4349                        } else {
4350                            let _ = nix::sys::signal::kill(pid, Signal::SIGKILL);
4351                            let _ = nix::sys::signal::killpg(pid, Signal::SIGKILL);
4352                        }
4353                    })
4354                };
4355                struct AbortOnDrop(tokio::task::JoinHandle<()>);
4356                impl Drop for AbortOnDrop {
4357                    fn drop(&mut self) {
4358                        self.0.abort();
4359                    }
4360                }
4361                let _watcher_guard = AbortOnDrop(cancel_watcher);
4362
4363                let wait_complete_setter = wait_complete.clone();
4364                let code = tokio::task::block_in_place(move || {
4365                    let result = term_clone.wait_for_foreground(pid);
4366                    // Mark wait done before the watcher might fire.
4367                    wait_complete_setter.store(true, std::sync::atomic::Ordering::SeqCst);
4368
4369                    // Always reclaim the terminal
4370                    if let Err(e) = term_clone.reclaim_terminal() {
4371                        tracing::warn!("failed to reclaim terminal: {}", e);
4372                    }
4373
4374                    match result {
4375                        crate::terminal::WaitResult::Exited(code) => code as i64,
4376                        crate::terminal::WaitResult::Signaled(sig) => 128 + sig as i64,
4377                        crate::terminal::WaitResult::Stopped(_sig) => {
4378                            // Register as a stopped job
4379                            let rt = tokio::runtime::Handle::current();
4380                            let job_id = rt.block_on(jobs.register_stopped(
4381                                cmd_display,
4382                                child_id,
4383                                child_id, // pgid = pid for group leader
4384                            ));
4385                            eprintln!("\n[{}]+ Stopped\t{}", job_id, cmd_name);
4386                            148 // 128 + SIGTSTP(20) on most systems, but we use a fixed value
4387                        }
4388                    }
4389                });
4390
4391                return Ok(Some(ExecResult::from_output(code, String::new(), String::new())));
4392            }
4393
4394            // Non-job-control path with inherited stdio.
4395            let status = match wait_or_kill(&mut child, kill_target.as_ref(), &cancel, kill_grace).await {
4396                Ok(s) => s,
4397                Err(e) => {
4398                    return Ok(Some(ExecResult::failure(
4399                        1,
4400                        format!("{}: failed to wait: {}", name, e),
4401                    )));
4402                }
4403            };
4404
4405            let code = status.code().unwrap_or_else(|| {
4406                #[cfg(unix)]
4407                {
4408                    use std::os::unix::process::ExitStatusExt;
4409                    128 + status.signal().unwrap_or(0)
4410                }
4411                #[cfg(not(unix))]
4412                {
4413                    -1
4414                }
4415            }) as i64;
4416
4417            // stdout/stderr already went to the terminal
4418            Ok(Some(ExecResult::from_output(code, String::new(), String::new())))
4419        } else {
4420            // Capture output via bounded streams
4421            let stdout_stream = Arc::new(BoundedStream::new(DEFAULT_STREAM_MAX_SIZE));
4422            let stderr_stream = Arc::new(BoundedStream::new(DEFAULT_STREAM_MAX_SIZE));
4423
4424            let stdout_pipe = child.stdout.take();
4425            let stderr_pipe = child.stderr.take();
4426
4427            let stdout_clone = stdout_stream.clone();
4428            let stderr_clone = stderr_stream.clone();
4429
4430            let stdout_task = stdout_pipe.map(|pipe| {
4431                tokio::spawn(async move {
4432                    drain_to_stream(pipe, stdout_clone).await;
4433                })
4434            });
4435
4436            let stderr_task = stderr_pipe.map(|pipe| {
4437                tokio::spawn(async move {
4438                    drain_to_stream(pipe, stderr_clone).await;
4439                })
4440            });
4441
4442            let cancelled_before_wait = cancel.is_cancelled();
4443            let status = match wait_or_kill(&mut child, kill_target.as_ref(), &cancel, kill_grace).await {
4444                Ok(s) => s,
4445                Err(e) => {
4446                    if let Some(task) = stdout_task { task.abort(); let _ = task.await; }
4447                    if let Some(task) = stderr_task { task.abort(); let _ = task.await; }
4448                    return Ok(Some(ExecResult::failure(
4449                        1,
4450                        format!("{}: failed to wait: {}", name, e),
4451                    )));
4452                }
4453            };
4454
4455            // On cancel, abort the drain tasks (the child's pipes are gone;
4456            // late output is lost but predictable death beats partial capture).
4457            // On normal exit, await drains so we don't lose buffered output.
4458            if cancelled_before_wait || cancel.is_cancelled() {
4459                if let Some(task) = stdout_task { task.abort(); let _ = task.await; }
4460                if let Some(task) = stderr_task { task.abort(); let _ = task.await; }
4461            } else {
4462                if let Some(task) = stdout_task {
4463                    // Ignore join error — the drain task logs its own errors
4464                    let _ = task.await;
4465                }
4466                if let Some(task) = stderr_task {
4467                    let _ = task.await;
4468                }
4469            }
4470
4471            let code = status.code().unwrap_or_else(|| {
4472                #[cfg(unix)]
4473                {
4474                    use std::os::unix::process::ExitStatusExt;
4475                    128 + status.signal().unwrap_or(0)
4476                }
4477                #[cfg(not(unix))]
4478                {
4479                    -1
4480                }
4481            }) as i64;
4482
4483            // Read stdout as RAW bytes: text if valid UTF-8, else a Bytes
4484            // result, so `curl url`, `curl url > file.bin`, etc. keep binary
4485            // intact. stderr stays text. See docs/binary-data.md.
4486            let stdout = stdout_stream.read().await;
4487            let stderr = stderr_stream.read_string().await;
4488            let mut result = ExecResult::success_text_or_bytes(stdout).with_code(code);
4489            result.err = stderr;
4490            Ok(Some(result))
4491        }
4492    }
4493
4494    // --- Variable Access ---
4495
4496    /// Get a variable value.
4497    pub async fn get_var(&self, name: &str) -> Option<Value> {
4498        let scope = self.scope.read().await;
4499        scope.get(name).cloned()
4500    }
4501
4502    /// Check if error-exit mode is enabled (for testing).
4503    #[cfg(test)]
4504    pub async fn error_exit_enabled(&self) -> bool {
4505        let scope = self.scope.read().await;
4506        scope.error_exit_enabled()
4507    }
4508
4509    /// Set a variable value.
4510    pub async fn set_var(&self, name: &str, value: Value) {
4511        let mut scope = self.scope.write().await;
4512        scope.set(name.to_string(), value);
4513    }
4514
4515    /// Set positional parameters ($0 script name and $1-$9 args).
4516    pub async fn set_positional(&self, script_name: impl Into<String>, args: Vec<String>) {
4517        let mut scope = self.scope.write().await;
4518        scope.set_positional(script_name, args);
4519    }
4520
4521    /// List all variables.
4522    pub async fn list_vars(&self) -> Vec<(String, Value)> {
4523        let scope = self.scope.read().await;
4524        scope.all()
4525    }
4526
4527    /// List exported variables (name, value), sorted by name. These are the
4528    /// vars a child process would see (see `dispatch`'s hermetic env build).
4529    pub async fn exported_vars(&self) -> Vec<(String, Value)> {
4530        let scope = self.scope.read().await;
4531        scope.exported_vars()
4532    }
4533
4534    // --- CWD ---
4535
4536    /// Get current working directory.
4537    pub async fn cwd(&self) -> PathBuf {
4538        self.exec_ctx.read().await.cwd.clone()
4539    }
4540
4541    /// Set current working directory.
4542    pub async fn set_cwd(&self, path: PathBuf) {
4543        let mut ctx = self.exec_ctx.write().await;
4544        ctx.set_cwd(path);
4545    }
4546
4547    /// Set the working directory only if `path` resolves to a directory in the
4548    /// kernel's backend — the same namespace `cd` validates against. Unlike a
4549    /// raw host-FS `is_dir()` check, this correctly accepts virtual mounts
4550    /// (`/v/docs`, in-memory scratch, …) and rejects real paths that have since
4551    /// disappeared. Returns whether the cwd was changed.
4552    pub async fn try_set_cwd(&self, path: PathBuf) -> bool {
4553        // Clone the backend Arc out before the stat so we never hold the
4554        // exec_ctx lock across the await.
4555        let backend = self.exec_ctx.read().await.backend.clone();
4556        let is_dir = matches!(backend.stat(&path).await, Ok(entry) if entry.is_dir());
4557        if is_dir {
4558            self.exec_ctx.write().await.set_cwd(path);
4559        }
4560        is_dir
4561    }
4562
4563    // --- Last Result ---
4564
4565    /// Get the last result ($?).
4566    pub async fn last_result(&self) -> ExecResult {
4567        let scope = self.scope.read().await;
4568        scope.last_result().clone()
4569    }
4570
4571    // --- Tools ---
4572
4573    /// Check if a user-defined function exists.
4574    pub async fn has_function(&self, name: &str) -> bool {
4575        self.user_tools.read().await.contains_key(name)
4576    }
4577
4578    /// Get available tool schemas.
4579    pub fn tool_schemas(&self) -> Vec<crate::tools::ToolSchema> {
4580        self.tools.schemas()
4581    }
4582
4583    // --- Jobs ---
4584
4585    /// Get job manager.
4586    pub fn jobs(&self) -> Arc<JobManager> {
4587        self.jobs.clone()
4588    }
4589
4590    // --- VFS ---
4591
4592    /// Get VFS router.
4593    pub fn vfs(&self) -> Arc<VfsRouter> {
4594        self.vfs.clone()
4595    }
4596
4597    // --- State ---
4598
4599    /// Reset kernel to initial state.
4600    ///
4601    /// Clears in-memory variables and resets cwd to root.
4602    /// History is not cleared (it persists across resets).
4603    pub async fn reset(&self) -> Result<()> {
4604        {
4605            let mut scope = self.scope.write().await;
4606            *scope = Scope::new();
4607        }
4608        {
4609            let mut ctx = self.exec_ctx.write().await;
4610            ctx.cwd = PathBuf::from("/");
4611        }
4612        Ok(())
4613    }
4614
4615    /// Shutdown the kernel.
4616    pub async fn shutdown(self) -> Result<()> {
4617        // Wait for all background jobs
4618        self.jobs.wait_all().await;
4619        Ok(())
4620    }
4621
4622    /// Dispatch a single command using the full resolution chain.
4623    ///
4624    /// This is the core of `CommandDispatcher` — it syncs state between the
4625    /// passed-in `ExecContext` and kernel-internal state (scope, exec_ctx),
4626    /// then delegates to `execute_command` for the actual dispatch.
4627    ///
4628    /// State flow:
4629    /// 1. ctx → self: sync scope, cwd, stdin so internal methods see current state
4630    /// 2. execute_command: full dispatch chain (user tools, builtins, scripts, external, backend)
4631    /// 3. self → ctx: sync scope, cwd changes back so the pipeline runner sees them
4632    async fn dispatch_command(&self, cmd: &Command, ctx: &mut ExecContext) -> Result<ExecResult> {
4633        // Ensure nested dispatch (e.g. the `timeout` builtin re-dispatching
4634        // its inner command via ctx.dispatcher) routes through THIS kernel,
4635        // not a stale parent. Critical for forks: the fork's builtins must
4636        // use the fork's dispatcher, not the parent's.
4637        if let Some(d) = self.dispatcher() {
4638            ctx.dispatcher = Some(d);
4639        }
4640
4641        // 1. Sync ctx → self internals
4642        {
4643            let mut scope = self.scope.write().await;
4644            *scope = ctx.scope.clone();
4645        }
4646        {
4647            let mut ec = self.exec_ctx.write().await;
4648            ec.cwd = ctx.cwd.clone();
4649            ec.prev_cwd = ctx.prev_cwd.clone();
4650            ec.stdin = ctx.stdin.take();
4651            ec.stdin_data = ctx.stdin_data.take();
4652            // Streaming pipe endpoints and kernel stderr must flow to the
4653            // tool via self.exec_ctx — execute_command reads that, not the
4654            // passed-in ctx. Without moving these, concurrent pipeline
4655            // stages dispatched via a fork get pipe_stdin = None and
4656            // silently read nothing.
4657            ec.pipe_stdin = ctx.pipe_stdin.take();
4658            ec.pipe_stdout = ctx.pipe_stdout.take();
4659            if let Some(stderr) = ctx.stderr.clone() {
4660                ec.stderr = Some(stderr);
4661            }
4662            ec.aliases = ctx.aliases.clone();
4663            ec.ignore_config = ctx.ignore_config.clone();
4664            ec.output_limit = ctx.output_limit.clone();
4665            ec.pipeline_position = ctx.pipeline_position;
4666            // Sync the cancel token from ctx → ec. Builtins like `timeout`
4667            // swap ctx.cancel to a derived child token before re-dispatching;
4668            // execute_command's snapshot reads ec.cancel (kept aligned by
4669            // this sync), so try_execute_external sees the right token.
4670            ec.cancel = ctx.cancel.clone();
4671            // Same alignment for the watchdog: a fork dispatching through its
4672            // own kernel must hand the shared script clock to the snapshot so
4673            // patient holds in forked stages suspend the right timer.
4674            ec.watchdog = ctx.watchdog.clone();
4675        }
4676
4677        // 2. Execute via the full dispatch chain
4678        let result = self.execute_command(&cmd.name, &cmd.args).await?;
4679
4680        // 3. Sync self → ctx
4681        {
4682            let scope = self.scope.read().await;
4683            ctx.scope = scope.clone();
4684        }
4685        {
4686            let mut ec = self.exec_ctx.write().await;
4687            ctx.cwd = ec.cwd.clone();
4688            ctx.prev_cwd = ec.prev_cwd.clone();
4689            ctx.aliases = ec.aliases.clone();
4690            ctx.ignore_config = ec.ignore_config.clone();
4691            ctx.output_limit = ec.output_limit.clone();
4692            // Return any pipe endpoints that the tool didn't consume.
4693            // `take()` here keeps the fork's exec_ctx in a clean state for
4694            // the next dispatch — these are per-command and shouldn't leak
4695            // between calls.
4696            ctx.pipe_stdin = ec.pipe_stdin.take();
4697            ctx.pipe_stdout = ec.pipe_stdout.take();
4698        }
4699
4700        Ok(result)
4701    }
4702}
4703
4704#[async_trait]
4705impl CommandDispatcher for Kernel {
4706    /// Dispatch a command through the Kernel's full resolution chain.
4707    ///
4708    /// This is the single path for all command execution when called from
4709    /// the pipeline runner. It provides the full dispatch chain:
4710    /// user tools → builtins → .kai scripts → external commands → backend tools.
4711    async fn dispatch(&self, cmd: &Command, ctx: &mut ExecContext) -> Result<ExecResult> {
4712        self.dispatch_command(cmd, ctx).await
4713    }
4714
4715    /// Evaluate an expression through the kernel's async chain, including
4716    /// command substitution. Delegates to `eval_expr_async`, which snapshots
4717    /// the kernel's scope/cwd and restores them after any `$(...)` runs, so
4718    /// only command output escapes. The `ctx` is unused here because the
4719    /// kernel evaluates against its own session state (a fork carries the
4720    /// pipeline stage's snapshot); var refs resolve against that scope.
4721    async fn eval_expr(&self, expr: &Expr, _ctx: &ExecContext) -> Result<Value> {
4722        self.eval_expr_async(expr).await
4723    }
4724
4725    /// Produce a forked dispatcher with independent mutable state (detached).
4726    ///
4727    /// Calls the inherent `Kernel::fork` method (note the UFCS to avoid
4728    /// recursing into the trait method we're defining) and coerces the
4729    /// returned `Arc<Kernel>` to `Arc<dyn CommandDispatcher>`.
4730    async fn fork(&self) -> Arc<dyn CommandDispatcher> {
4731        let fork: Arc<Kernel> = Kernel::fork(self).await;
4732        fork
4733    }
4734
4735    /// Produce a forked dispatcher with cancellation cascading from this kernel.
4736    async fn fork_attached(&self) -> Arc<dyn CommandDispatcher> {
4737        let fork: Arc<Kernel> = Kernel::fork_attached(self).await;
4738        fork
4739    }
4740}
4741
4742/// Apply the requested output format to a builtin's result, unless the tool
4743/// owns its own output.
4744///
4745/// `format` is `ctx.output_format` (set from `--json`). When `owns_output` is
4746/// true the tool already rendered its bytes (bespoke JSON envelope), so the
4747/// kernel leaves the result untouched rather than re-formatting its
4748/// `OutputData`. Otherwise the kernel renders the typed `OutputData` uniformly.
4749fn finalize_output(
4750    result: ExecResult,
4751    format: Option<crate::interpreter::OutputFormat>,
4752    owns_output: bool,
4753) -> ExecResult {
4754    match format {
4755        Some(_) if owns_output => result,
4756        Some(format) => apply_output_format(result, format),
4757        None => result,
4758    }
4759}
4760
4761/// Accumulate output from one result into another.
4762///
4763/// Appends stdout and stderr verbatim and updates the exit code to match the
4764/// new result. Used to preserve output from multiple statements, loop
4765/// iterations, and command chains. No separator is inserted between outputs —
4766/// each command's output concatenates raw, matching bash (`printf a; printf b`
4767/// and `printf a && printf b` both yield `ab`; a trailing newline only appears
4768/// when a command emits its own, as `echo` does).
4769fn accumulate_result(accumulated: &mut ExecResult, new: &ExecResult) {
4770    // Materialize lazy OutputData into .out before accumulating.
4771    // Without this, the first command's output stays in .output while
4772    // the second's text gets appended to .out, losing the first.
4773    accumulated.materialize();
4774    match new.out_bytes() {
4775        // A binary result must not be lossy-decoded by text_out(): concatenate
4776        // raw bytes so the combined output stays binary (this is the path every
4777        // top-level statement's result flows through). See docs/binary-data.md.
4778        Some(new_bytes) => {
4779            let mut combined: Vec<u8> = match accumulated.out_bytes() {
4780                Some(b) => b.to_vec(),
4781                None => accumulated.text_out().into_owned().into_bytes(),
4782            };
4783            combined.extend_from_slice(new_bytes);
4784            accumulated.set_out_bytes(combined);
4785        }
4786        None => accumulated.push_out(&new.text_out()),
4787    }
4788    accumulated.err.push_str(&new.err);
4789    accumulated.code = new.code;
4790    accumulated.data = new.data.clone();
4791    accumulated.did_spill = new.did_spill;
4792    accumulated.original_code = new.original_code;
4793    accumulated.content_type = new.content_type.clone();
4794    accumulated.baggage.clone_from(&new.baggage);
4795}
4796
4797/// Fold a loop's accumulated output into a break/continue signal that is
4798/// propagating to an *outer* loop. Output printed before `break N`/`continue N`
4799/// (with `N > 1`) would otherwise be discarded when the signal replaces the
4800/// loop's result on its way up. The loop's output comes first (it ran before
4801/// the signal was raised), then the signal's already-carried output.
4802fn fold_loop_output_into_flow(loop_output: ExecResult, flow: &mut ControlFlow) {
4803    if let ControlFlow::Break { result, .. } | ControlFlow::Continue { result, .. } = flow {
4804        let mut merged = loop_output;
4805        accumulate_result(&mut merged, result);
4806        *result = merged;
4807    }
4808}
4809
4810/// Accumulate the output a break/continue signal carried (from inner loops it
4811/// propagated through) into the loop that finally handles it, so it survives
4812/// into that loop's result.
4813fn accumulate_flow_output(accumulated: &mut ExecResult, flow: &ControlFlow) {
4814    if let ControlFlow::Break { result, .. } | ControlFlow::Continue { result, .. } = flow {
4815        accumulate_result(accumulated, result);
4816    }
4817}
4818
4819/// Check if a value is truthy.
4820fn is_truthy(value: &Value) -> bool {
4821    match value {
4822        Value::Null => false,
4823        Value::Bool(b) => *b,
4824        Value::Int(i) => *i != 0,
4825        Value::Float(f) => *f != 0.0,
4826        Value::String(s) => !s.is_empty(),
4827        Value::Json(json) => match json {
4828            serde_json::Value::Null => false,
4829            serde_json::Value::Array(arr) => !arr.is_empty(),
4830            serde_json::Value::Object(obj) => !obj.is_empty(),
4831            serde_json::Value::Bool(b) => *b,
4832            serde_json::Value::Number(n) => n.as_f64().map(|f| f != 0.0).unwrap_or(false),
4833            serde_json::Value::String(s) => !s.is_empty(),
4834        },
4835        Value::Bytes(b) => !b.is_empty(), // empty bytes are falsy, like ""
4836    }
4837}
4838
4839/// Apply tilde expansion to a value.
4840///
4841/// Only string values starting with `~` are expanded. `home` is the session
4842/// `HOME` from the kernel scope (the kernel is hermetic and never reads the
4843/// host env); `None` leaves `~`/`~/path` unexpanded. See [`expand_tilde`].
4844fn apply_tilde_expansion(value: Value, home: Option<&str>) -> Value {
4845    match value {
4846        Value::String(s) if s.starts_with('~') => Value::String(expand_tilde(&s, home)),
4847        _ => value,
4848    }
4849}
4850
4851/// Wait for a child to exit, killing it if `cancel` fires first.
4852///
4853/// `target` carries a Linux pidfd (when available) for race-free direct-child
4854/// kill; fall-through to PID-based kill otherwise. On non-unix targets the
4855/// parameter is ignored and we use tokio's cross-platform `start_kill`.
4856#[cfg(all(unix, feature = "subprocess"))]
4857pub(crate) async fn wait_or_kill(
4858    child: &mut tokio::process::Child,
4859    target: Option<&crate::pidfd::KillTarget>,
4860    cancel: &tokio_util::sync::CancellationToken,
4861    grace: Duration,
4862) -> std::io::Result<std::process::ExitStatus> {
4863    tokio::select! {
4864        biased;
4865        status = child.wait() => status,
4866        _ = cancel.cancelled() => kill_with_grace(child, target, grace).await,
4867    }
4868}
4869
4870#[cfg(all(not(unix), feature = "subprocess"))]
4871pub(crate) async fn wait_or_kill(
4872    child: &mut tokio::process::Child,
4873    _target: Option<&()>,
4874    cancel: &tokio_util::sync::CancellationToken,
4875    _grace: Duration,
4876) -> std::io::Result<std::process::ExitStatus> {
4877    tokio::select! {
4878        biased;
4879        status = child.wait() => status,
4880        _ = cancel.cancelled() => {
4881            let _ = child.start_kill();
4882            child.wait().await
4883        }
4884    }
4885}
4886
4887/// Send SIGTERM to the child and its process group; wait `grace`; then SIGKILL.
4888///
4889/// Direct-child kill goes through `target.signal()`, which on Linux uses a
4890/// pidfd (immune to PID reuse). Process-group kill uses `killpg` — there is
4891/// no PGID-equivalent of pidfd, so grandchildren retain a small reuse window.
4892#[cfg(all(unix, feature = "subprocess"))]
4893pub(crate) async fn kill_with_grace(
4894    child: &mut tokio::process::Child,
4895    target: Option<&crate::pidfd::KillTarget>,
4896    grace: Duration,
4897) -> std::io::Result<std::process::ExitStatus> {
4898    use nix::sys::signal::Signal;
4899
4900    if let Some(t) = target {
4901        t.signal(Signal::SIGTERM);
4902        t.signal_pg(Signal::SIGTERM);
4903        if grace > Duration::ZERO
4904            && let Ok(status) = tokio::time::timeout(grace, child.wait()).await
4905        {
4906            return status;
4907        }
4908        t.signal(Signal::SIGKILL);
4909        t.signal_pg(Signal::SIGKILL);
4910    }
4911    child.wait().await
4912}
4913
4914#[cfg(all(test, feature = "subprocess"))]
4915mod tests {
4916    use super::*;
4917
4918    #[tokio::test]
4919    async fn test_kernel_transient() {
4920        let kernel = Kernel::transient().expect("failed to create kernel");
4921        assert_eq!(kernel.name(), "transient");
4922    }
4923
4924    #[tokio::test]
4925    async fn test_kernel_execute_echo() {
4926        let kernel = Kernel::transient().expect("failed to create kernel");
4927        let result = kernel.execute("echo hello").await.expect("execution failed");
4928        assert!(result.ok());
4929        assert_eq!(result.text_out().trim(), "hello");
4930    }
4931
4932    #[tokio::test]
4933    async fn test_multiple_statements_accumulate_output() {
4934        let kernel = Kernel::transient().expect("failed to create kernel");
4935        let result = kernel
4936            .execute("echo one\necho two\necho three")
4937            .await
4938            .expect("execution failed");
4939        assert!(result.ok());
4940        // Should have all three outputs separated by newlines
4941        assert!(result.text_out().contains("one"), "missing 'one': {}", result.text_out());
4942        assert!(result.text_out().contains("two"), "missing 'two': {}", result.text_out());
4943        assert!(result.text_out().contains("three"), "missing 'three': {}", result.text_out());
4944    }
4945
4946    #[tokio::test]
4947    async fn test_and_chain_accumulates_output() {
4948        let kernel = Kernel::transient().expect("failed to create kernel");
4949        let result = kernel
4950            .execute("echo first && echo second")
4951            .await
4952            .expect("execution failed");
4953        assert!(result.ok());
4954        assert!(result.text_out().contains("first"), "missing 'first': {}", result.text_out());
4955        assert!(result.text_out().contains("second"), "missing 'second': {}", result.text_out());
4956    }
4957
4958    #[tokio::test]
4959    async fn test_for_loop_accumulates_output() {
4960        let kernel = Kernel::transient().expect("failed to create kernel");
4961        let result = kernel
4962            .execute(r#"for X in a b c; do echo "item: ${X}"; done"#)
4963            .await
4964            .expect("execution failed");
4965        assert!(result.ok());
4966        assert!(result.text_out().contains("item: a"), "missing 'item: a': {}", result.text_out());
4967        assert!(result.text_out().contains("item: b"), "missing 'item: b': {}", result.text_out());
4968        assert!(result.text_out().contains("item: c"), "missing 'item: c': {}", result.text_out());
4969    }
4970
4971    #[tokio::test]
4972    async fn test_while_loop_accumulates_output() {
4973        let kernel = Kernel::transient().expect("failed to create kernel");
4974        let result = kernel
4975            .execute(r#"
4976                N=3
4977                while [[ ${N} -gt 0 ]]; do
4978                    echo "N=${N}"
4979                    N=$((N - 1))
4980                done
4981            "#)
4982            .await
4983            .expect("execution failed");
4984        assert!(result.ok());
4985        assert!(result.text_out().contains("N=3"), "missing 'N=3': {}", result.text_out());
4986        assert!(result.text_out().contains("N=2"), "missing 'N=2': {}", result.text_out());
4987        assert!(result.text_out().contains("N=1"), "missing 'N=1': {}", result.text_out());
4988    }
4989
4990    #[tokio::test]
4991    async fn test_kernel_set_var() {
4992        let kernel = Kernel::transient().expect("failed to create kernel");
4993
4994        kernel.execute("X=42").await.expect("set failed");
4995
4996        let value = kernel.get_var("X").await;
4997        assert_eq!(value, Some(Value::Int(42)));
4998    }
4999
5000    #[tokio::test]
5001    async fn test_kernel_var_expansion() {
5002        let kernel = Kernel::transient().expect("failed to create kernel");
5003
5004        kernel.execute("NAME=\"world\"").await.expect("set failed");
5005        let result = kernel.execute("echo \"hello ${NAME}\"").await.expect("echo failed");
5006
5007        assert!(result.ok());
5008        assert_eq!(result.text_out().trim(), "hello world");
5009    }
5010
5011    #[tokio::test]
5012    async fn test_kernel_last_result() {
5013        let kernel = Kernel::transient().expect("failed to create kernel");
5014
5015        kernel.execute("echo test").await.expect("echo failed");
5016
5017        let last = kernel.last_result().await;
5018        assert!(last.ok());
5019        assert_eq!(last.text_out().trim(), "test");
5020    }
5021
5022    #[tokio::test]
5023    async fn test_kernel_tool_not_found() {
5024        let kernel = Kernel::transient().expect("failed to create kernel");
5025
5026        let result = kernel.execute("nonexistent_tool").await.expect("execution failed");
5027        assert!(!result.ok());
5028        assert_eq!(result.code, 127);
5029        assert!(result.err.contains("command not found"));
5030    }
5031
5032    #[tokio::test]
5033    async fn test_external_command_true() {
5034        // Use REPL config for passthrough filesystem access
5035        let kernel = Kernel::new(KernelConfig::repl()).expect("failed to create kernel");
5036
5037        // /bin/true should be available on any Unix system
5038        let result = kernel.execute("true").await.expect("execution failed");
5039        // This should use the builtin true, which returns 0
5040        assert!(result.ok(), "true should succeed: {:?}", result);
5041    }
5042
5043    #[tokio::test]
5044    async fn test_external_command_basic() {
5045        // Use REPL config for passthrough filesystem access
5046        let kernel = Kernel::new(KernelConfig::repl()).expect("failed to create kernel");
5047
5048        // Test with /bin/echo which is external
5049        // Note: kaish has a builtin echo, so this will use the builtin
5050        // Let's test with a command that's not a builtin
5051        // Actually, let's just test that PATH resolution works by checking the PATH var
5052        let path_var = std::env::var("PATH").unwrap_or_default();
5053        eprintln!("System PATH: {}", path_var);
5054
5055        // Set PATH in kernel to ensure it's available
5056        kernel.execute(&format!(r#"PATH="{}""#, path_var)).await.expect("set PATH failed");
5057
5058        // Now try an external command like /usr/bin/env
5059        // But env is also a builtin... let's try uname
5060        let result = kernel.execute("uname").await.expect("execution failed");
5061        eprintln!("uname result: {:?}", result);
5062        // uname should succeed if external commands work
5063        assert!(result.ok() || result.code == 127, "uname: {:?}", result);
5064    }
5065
5066    #[tokio::test]
5067    async fn test_kernel_reset() {
5068        let kernel = Kernel::transient().expect("failed to create kernel");
5069
5070        kernel.execute("X=1").await.expect("set failed");
5071        assert!(kernel.get_var("X").await.is_some());
5072
5073        kernel.reset().await.expect("reset failed");
5074        assert!(kernel.get_var("X").await.is_none());
5075    }
5076
5077    #[tokio::test]
5078    async fn test_kernel_cwd() {
5079        let kernel = Kernel::transient().expect("failed to create kernel");
5080
5081        // Transient kernel uses sandboxed mode with cwd=$HOME
5082        let cwd = kernel.cwd().await;
5083        let home = std::env::var("HOME")
5084            .map(PathBuf::from)
5085            .unwrap_or_else(|_| PathBuf::from("/"));
5086        assert_eq!(cwd, home);
5087
5088        kernel.set_cwd(PathBuf::from("/tmp")).await;
5089        assert_eq!(kernel.cwd().await, PathBuf::from("/tmp"));
5090    }
5091
5092    #[tokio::test]
5093    async fn test_kernel_list_vars() {
5094        let kernel = Kernel::transient().expect("failed to create kernel");
5095
5096        kernel.execute("A=1").await.ok();
5097        kernel.execute("B=2").await.ok();
5098
5099        let vars = kernel.list_vars().await;
5100        assert!(vars.iter().any(|(n, v)| n == "A" && *v == Value::Int(1)));
5101        assert!(vars.iter().any(|(n, v)| n == "B" && *v == Value::Int(2)));
5102    }
5103
5104    #[tokio::test]
5105    async fn test_is_truthy() {
5106        assert!(!is_truthy(&Value::Null));
5107        assert!(!is_truthy(&Value::Bool(false)));
5108        assert!(is_truthy(&Value::Bool(true)));
5109        assert!(!is_truthy(&Value::Int(0)));
5110        assert!(is_truthy(&Value::Int(1)));
5111        assert!(!is_truthy(&Value::String("".into())));
5112        assert!(is_truthy(&Value::String("x".into())));
5113    }
5114
5115    #[tokio::test]
5116    async fn test_jq_in_pipeline() {
5117        let kernel = Kernel::transient().expect("failed to create kernel");
5118        // kaish uses double quotes only; escape inner quotes
5119        let result = kernel
5120            .execute(r#"echo "{\"name\": \"Alice\"}" | jq ".name" -r"#)
5121            .await
5122            .expect("execution failed");
5123        assert!(result.ok(), "jq pipeline failed: {}", result.err);
5124        assert_eq!(result.text_out().trim(), "Alice");
5125    }
5126
5127    #[tokio::test]
5128    async fn test_user_defined_tool() {
5129        let kernel = Kernel::transient().expect("failed to create kernel");
5130
5131        // Define a function
5132        kernel
5133            .execute(r#"greet() { echo "Hello, $1!" }"#)
5134            .await
5135            .expect("function definition failed");
5136
5137        // Call the function
5138        let result = kernel
5139            .execute(r#"greet "World""#)
5140            .await
5141            .expect("function call failed");
5142
5143        assert!(result.ok(), "greet failed: {}", result.err);
5144        assert_eq!(result.text_out().trim(), "Hello, World!");
5145    }
5146
5147    #[tokio::test]
5148    async fn test_user_tool_positional_args() {
5149        let kernel = Kernel::transient().expect("failed to create kernel");
5150
5151        // Define a function with positional param
5152        kernel
5153            .execute(r#"greet() { echo "Hi $1" }"#)
5154            .await
5155            .expect("function definition failed");
5156
5157        // Call with positional argument
5158        let result = kernel
5159            .execute(r#"greet "Amy""#)
5160            .await
5161            .expect("function call failed");
5162
5163        assert!(result.ok(), "greet failed: {}", result.err);
5164        assert_eq!(result.text_out().trim(), "Hi Amy");
5165    }
5166
5167    #[tokio::test]
5168    async fn test_function_shared_scope() {
5169        let kernel = Kernel::transient().expect("failed to create kernel");
5170
5171        // Set a variable in parent scope
5172        kernel
5173            .execute(r#"SECRET="hidden""#)
5174            .await
5175            .expect("set failed");
5176
5177        // Define a function that accesses and modifies parent variable
5178        kernel
5179            .execute(r#"access_parent() {
5180                echo "${SECRET}"
5181                SECRET="modified"
5182            }"#)
5183            .await
5184            .expect("function definition failed");
5185
5186        // Call the function - it SHOULD see SECRET (shared scope like sh)
5187        let result = kernel.execute("access_parent").await.expect("function call failed");
5188
5189        // Function should have access to parent scope
5190        assert!(
5191            result.text_out().contains("hidden"),
5192            "Function should access parent scope, got: {}",
5193            result.text_out()
5194        );
5195
5196        // Function should have modified the parent variable
5197        let secret = kernel.get_var("SECRET").await;
5198        assert_eq!(
5199            secret,
5200            Some(Value::String("modified".into())),
5201            "Function should modify parent scope"
5202        );
5203    }
5204
5205    #[tokio::test]
5206    #[ignore = "exec replaces the test binary via CommandExt::exec, hangs libtest; cannot be run under cargo test"]
5207    async fn test_exec_builtin() {
5208        let kernel = Kernel::transient().expect("failed to create kernel");
5209        // argv is now a space-separated string or JSON array string
5210        let result = kernel
5211            .execute(r#"exec command="/bin/echo" argv="hello world""#)
5212            .await
5213            .expect("exec failed");
5214
5215        assert!(result.ok(), "exec failed: {}", result.err);
5216        assert_eq!(result.text_out().trim(), "hello world");
5217    }
5218
5219    #[tokio::test]
5220    async fn test_while_false_never_runs() {
5221        let kernel = Kernel::transient().expect("failed to create kernel");
5222
5223        // A while loop with false condition should never run
5224        let result = kernel
5225            .execute(r#"
5226                while false; do
5227                    echo "should not run"
5228                done
5229            "#)
5230            .await
5231            .expect("while false failed");
5232
5233        assert!(result.ok());
5234        assert!(result.text_out().is_empty(), "while false should not execute body: {}", result.text_out());
5235    }
5236
5237    #[tokio::test]
5238    async fn test_while_string_comparison() {
5239        let kernel = Kernel::transient().expect("failed to create kernel");
5240
5241        // Set a flag
5242        kernel.execute(r#"FLAG="go""#).await.expect("set failed");
5243
5244        // Use string comparison as condition (shell-compatible [[ ]] syntax)
5245        // Note: Put echo last so we can check the output
5246        let result = kernel
5247            .execute(r#"
5248                while [[ ${FLAG} == "go" ]]; do
5249                    FLAG="stop"
5250                    echo "running"
5251                done
5252            "#)
5253            .await
5254            .expect("while with string cmp failed");
5255
5256        assert!(result.ok());
5257        assert!(result.text_out().contains("running"), "should have run once: {}", result.text_out());
5258
5259        // Verify flag was changed
5260        let flag = kernel.get_var("FLAG").await;
5261        assert_eq!(flag, Some(Value::String("stop".into())));
5262    }
5263
5264    #[tokio::test]
5265    async fn test_while_numeric_comparison() {
5266        let kernel = Kernel::transient().expect("failed to create kernel");
5267
5268        // Test > comparison (shell-compatible [[ ]] with -gt)
5269        kernel.execute("N=5").await.expect("set failed");
5270
5271        // Note: Put echo last so we can check the output
5272        let result = kernel
5273            .execute(r#"
5274                while [[ ${N} -gt 3 ]]; do
5275                    N=3
5276                    echo "N was greater"
5277                done
5278            "#)
5279            .await
5280            .expect("while with > failed");
5281
5282        assert!(result.ok());
5283        assert!(result.text_out().contains("N was greater"), "should have run once: {}", result.text_out());
5284    }
5285
5286    #[tokio::test]
5287    async fn test_break_in_while_loop() {
5288        let kernel = Kernel::transient().expect("failed to create kernel");
5289
5290        let result = kernel
5291            .execute(r#"
5292                I=0
5293                while true; do
5294                    I=1
5295                    echo "before break"
5296                    break
5297                    echo "after break"
5298                done
5299            "#)
5300            .await
5301            .expect("while with break failed");
5302
5303        assert!(result.ok());
5304        assert!(result.text_out().contains("before break"), "should see before break: {}", result.text_out());
5305        assert!(!result.text_out().contains("after break"), "should not see after break: {}", result.text_out());
5306
5307        // Verify we exited the loop
5308        let i = kernel.get_var("I").await;
5309        assert_eq!(i, Some(Value::Int(1)));
5310    }
5311
5312    #[tokio::test]
5313    async fn test_continue_in_while_loop() {
5314        let kernel = Kernel::transient().expect("failed to create kernel");
5315
5316        // Test continue in a while loop where variables persist
5317        // We use string state transition: "start" -> "middle" -> "end"
5318        // continue on "middle" should skip to next iteration
5319        // Shell-compatible: use [[ ]] for comparisons
5320        let result = kernel
5321            .execute(r#"
5322                STATE="start"
5323                AFTER_CONTINUE="no"
5324                while [[ ${STATE} != "done" ]]; do
5325                    if [[ ${STATE} == "start" ]]; then
5326                        STATE="middle"
5327                        continue
5328                        AFTER_CONTINUE="yes"
5329                    fi
5330                    if [[ ${STATE} == "middle" ]]; then
5331                        STATE="done"
5332                    fi
5333                done
5334            "#)
5335            .await
5336            .expect("while with continue failed");
5337
5338        assert!(result.ok());
5339
5340        // STATE should be "done" (we completed the loop)
5341        let state = kernel.get_var("STATE").await;
5342        assert_eq!(state, Some(Value::String("done".into())));
5343
5344        // AFTER_CONTINUE should still be "no" (continue skipped the assignment)
5345        let after = kernel.get_var("AFTER_CONTINUE").await;
5346        assert_eq!(after, Some(Value::String("no".into())));
5347    }
5348
5349    #[tokio::test]
5350    async fn test_break_with_level() {
5351        let kernel = Kernel::transient().expect("failed to create kernel");
5352
5353        // Nested loop with break 2 to exit both loops
5354        // We verify by checking OUTER value:
5355        // - If break 2 works, OUTER stays at 1 (set before for loop)
5356        // - If break 2 fails, OUTER becomes 2 (set after for loop)
5357        let result = kernel
5358            .execute(r#"
5359                OUTER=0
5360                while true; do
5361                    OUTER=1
5362                    for X in "1 2"; do
5363                        break 2
5364                    done
5365                    OUTER=2
5366                done
5367            "#)
5368            .await
5369            .expect("nested break failed");
5370
5371        assert!(result.ok());
5372
5373        // OUTER should be 1 (set before for loop), not 2 (would be set after for loop)
5374        let outer = kernel.get_var("OUTER").await;
5375        assert_eq!(outer, Some(Value::Int(1)), "break 2 should have skipped OUTER=2");
5376    }
5377
5378    #[tokio::test]
5379    async fn test_return_from_tool() {
5380        let kernel = Kernel::transient().expect("failed to create kernel");
5381
5382        // Define a function that returns early
5383        kernel
5384            .execute(r#"early_return() {
5385                if [[ $1 == 1 ]]; then
5386                    return 42
5387                fi
5388                echo "not returned"
5389            }"#)
5390            .await
5391            .expect("function definition failed");
5392
5393        // Call with arg=1 should return with exit code 42
5394        // (POSIX shell behavior: return N sets exit code, doesn't output N)
5395        let result = kernel
5396            .execute("early_return 1")
5397            .await
5398            .expect("function call failed");
5399
5400        // Exit code should be 42 (non-zero, so not ok())
5401        assert_eq!(result.code, 42);
5402        // Output should be empty (we returned before echo)
5403        assert!(result.text_out().is_empty());
5404    }
5405
5406    #[tokio::test]
5407    async fn test_return_without_value() {
5408        let kernel = Kernel::transient().expect("failed to create kernel");
5409
5410        // Define a function that returns without a value
5411        kernel
5412            .execute(r#"early_exit() {
5413                if [[ $1 == "stop" ]]; then
5414                    return
5415                fi
5416                echo "continued"
5417            }"#)
5418            .await
5419            .expect("function definition failed");
5420
5421        // Call with arg="stop" should return early
5422        let result = kernel
5423            .execute(r#"early_exit "stop""#)
5424            .await
5425            .expect("function call failed");
5426
5427        assert!(result.ok());
5428        assert!(result.text_out().is_empty() || result.text_out().trim().is_empty());
5429    }
5430
5431    #[tokio::test]
5432    async fn test_exit_stops_execution() {
5433        let kernel = Kernel::transient().expect("failed to create kernel");
5434
5435        // exit should stop further execution
5436        kernel
5437            .execute(r#"
5438                BEFORE="yes"
5439                exit 0
5440                AFTER="yes"
5441            "#)
5442            .await
5443            .expect("execution failed");
5444
5445        // BEFORE should be set, AFTER should not
5446        let before = kernel.get_var("BEFORE").await;
5447        assert_eq!(before, Some(Value::String("yes".into())));
5448
5449        let after = kernel.get_var("AFTER").await;
5450        assert!(after.is_none(), "AFTER should not be set after exit");
5451    }
5452
5453    #[tokio::test]
5454    async fn test_exit_with_code() {
5455        let kernel = Kernel::transient().expect("failed to create kernel");
5456
5457        // exit with code should propagate the exit code
5458        let result = kernel
5459            .execute("exit 42")
5460            .await
5461            .expect("exit failed");
5462
5463        assert_eq!(result.code, 42);
5464        assert!(result.text_out().is_empty(), "exit should not produce stdout");
5465    }
5466
5467    #[tokio::test]
5468    async fn test_set_e_stops_on_failure() {
5469        let kernel = Kernel::transient().expect("failed to create kernel");
5470
5471        // Enable error-exit mode
5472        kernel.execute("set -e").await.expect("set -e failed");
5473
5474        // Run a sequence where the middle command fails
5475        kernel
5476            .execute(r#"
5477                STEP1="done"
5478                false
5479                STEP2="done"
5480            "#)
5481            .await
5482            .expect("execution failed");
5483
5484        // STEP1 should be set, but STEP2 should NOT be set (exit on false)
5485        let step1 = kernel.get_var("STEP1").await;
5486        assert_eq!(step1, Some(Value::String("done".into())));
5487
5488        let step2 = kernel.get_var("STEP2").await;
5489        assert!(step2.is_none(), "STEP2 should not be set after false with set -e");
5490    }
5491
5492    #[tokio::test]
5493    async fn test_set_plus_e_disables_error_exit() {
5494        let kernel = Kernel::transient().expect("failed to create kernel");
5495
5496        // Enable then disable error-exit mode
5497        kernel.execute("set -e").await.expect("set -e failed");
5498        kernel.execute("set +e").await.expect("set +e failed");
5499
5500        // Now failure should NOT stop execution
5501        kernel
5502            .execute(r#"
5503                STEP1="done"
5504                false
5505                STEP2="done"
5506            "#)
5507            .await
5508            .expect("execution failed");
5509
5510        // Both should be set since +e disables error exit
5511        let step1 = kernel.get_var("STEP1").await;
5512        assert_eq!(step1, Some(Value::String("done".into())));
5513
5514        let step2 = kernel.get_var("STEP2").await;
5515        assert_eq!(step2, Some(Value::String("done".into())));
5516    }
5517
5518    #[tokio::test]
5519    async fn test_set_ignores_unknown_options() {
5520        let kernel = Kernel::transient().expect("failed to create kernel");
5521
5522        // Bash idiom: set -euo pipefail (we support -e, ignore the rest)
5523        let result = kernel
5524            .execute("set -e -u -o pipefail")
5525            .await
5526            .expect("set with unknown options failed");
5527
5528        assert!(result.ok(), "set should succeed with unknown options");
5529
5530        // -e should still be enabled
5531        kernel
5532            .execute(r#"
5533                BEFORE="yes"
5534                false
5535                AFTER="yes"
5536            "#)
5537            .await
5538            .ok();
5539
5540        let after = kernel.get_var("AFTER").await;
5541        assert!(after.is_none(), "-e should be enabled despite unknown options");
5542    }
5543
5544    #[tokio::test]
5545    async fn test_set_no_args_shows_settings() {
5546        let kernel = Kernel::transient().expect("failed to create kernel");
5547
5548        // Enable -e
5549        kernel.execute("set -e").await.expect("set -e failed");
5550
5551        // Call set with no args to see settings
5552        let result = kernel.execute("set").await.expect("set failed");
5553
5554        assert!(result.ok());
5555        assert!(result.text_out().contains("set -e"), "should show -e is enabled: {}", result.text_out());
5556    }
5557
5558    #[tokio::test]
5559    async fn test_set_e_in_pipeline() {
5560        let kernel = Kernel::transient().expect("failed to create kernel");
5561
5562        kernel.execute("set -e").await.expect("set -e failed");
5563
5564        // Pipeline failure should trigger exit
5565        kernel
5566            .execute(r#"
5567                BEFORE="yes"
5568                false | cat
5569                AFTER="yes"
5570            "#)
5571            .await
5572            .ok();
5573
5574        let before = kernel.get_var("BEFORE").await;
5575        assert_eq!(before, Some(Value::String("yes".into())));
5576
5577        // AFTER should not be set if pipeline failure triggers exit
5578        // Note: The exit code of a pipeline is the exit code of the last command
5579        // So `false | cat` returns 0 (cat succeeds). This is bash-compatible behavior.
5580        // To test pipeline failure, we need the last command to fail.
5581    }
5582
5583    #[tokio::test]
5584    async fn test_set_e_with_and_chain() {
5585        let kernel = Kernel::transient().expect("failed to create kernel");
5586
5587        kernel.execute("set -e").await.expect("set -e failed");
5588
5589        // Commands in && chain should not trigger -e on the first failure
5590        // because && explicitly handles the error
5591        kernel
5592            .execute(r#"
5593                RESULT="initial"
5594                false && RESULT="chained"
5595                RESULT="continued"
5596            "#)
5597            .await
5598            .ok();
5599
5600        // In bash, commands in && don't trigger -e. The chain handles the failure.
5601        // Our implementation may differ - let's verify current behavior.
5602        let result = kernel.get_var("RESULT").await;
5603        // If we follow bash semantics, RESULT should be "continued"
5604        // If we trigger -e on the false, RESULT stays "initial"
5605        assert!(result.is_some(), "RESULT should be set");
5606    }
5607
5608    #[tokio::test]
5609    async fn test_set_e_exits_in_for_loop() {
5610        let kernel = Kernel::transient().expect("failed to create kernel");
5611
5612        kernel.execute("set -e").await.expect("set -e failed");
5613
5614        kernel
5615            .execute(r#"
5616                REACHED="no"
5617                for x in 1 2 3; do
5618                    false
5619                    REACHED="yes"
5620                done
5621            "#)
5622            .await
5623            .ok();
5624
5625        // With set -e, false should trigger exit; REACHED should remain "no"
5626        let reached = kernel.get_var("REACHED").await;
5627        assert_eq!(reached, Some(Value::String("no".into())),
5628            "set -e should exit on failure in for loop body");
5629    }
5630
5631    #[tokio::test]
5632    async fn test_for_loop_continues_without_set_e() {
5633        let kernel = Kernel::transient().expect("failed to create kernel");
5634
5635        // Without set -e, for loop should continue normally
5636        kernel
5637            .execute(r#"
5638                COUNT=0
5639                for x in 1 2 3; do
5640                    false
5641                    COUNT=$((COUNT + 1))
5642                done
5643            "#)
5644            .await
5645            .ok();
5646
5647        let count = kernel.get_var("COUNT").await;
5648        // Arithmetic produces Int values; accept either Int or String representation
5649        let count_val = match &count {
5650            Some(Value::Int(n)) => *n,
5651            Some(Value::String(s)) => s.parse().unwrap_or(-1),
5652            _ => -1,
5653        };
5654        assert_eq!(count_val, 3,
5655            "without set -e, loop should complete all iterations (got {:?})", count);
5656    }
5657
5658    // ═══════════════════════════════════════════════════════════════════════════
5659    // Source Tests
5660    // ═══════════════════════════════════════════════════════════════════════════
5661
5662    #[tokio::test]
5663    async fn test_source_sets_variables() {
5664        let kernel = Kernel::transient().expect("failed to create kernel");
5665
5666        // Write a script to the VFS
5667        kernel
5668            .execute(r#"write "/test.kai" 'FOO="bar"'"#)
5669            .await
5670            .expect("write failed");
5671
5672        // Source the script
5673        let result = kernel
5674            .execute(r#"source "/test.kai""#)
5675            .await
5676            .expect("source failed");
5677
5678        assert!(result.ok(), "source should succeed");
5679
5680        // Variable should be set in current scope
5681        let foo = kernel.get_var("FOO").await;
5682        assert_eq!(foo, Some(Value::String("bar".into())));
5683    }
5684
5685    #[tokio::test]
5686    async fn test_source_with_dot_alias() {
5687        let kernel = Kernel::transient().expect("failed to create kernel");
5688
5689        // Write a script to the VFS
5690        kernel
5691            .execute(r#"write "/vars.kai" 'X=42'"#)
5692            .await
5693            .expect("write failed");
5694
5695        // Source using . alias
5696        let result = kernel
5697            .execute(r#". "/vars.kai""#)
5698            .await
5699            .expect(". failed");
5700
5701        assert!(result.ok(), ". should succeed");
5702
5703        // Variable should be set in current scope
5704        let x = kernel.get_var("X").await;
5705        assert_eq!(x, Some(Value::Int(42)));
5706    }
5707
5708    #[tokio::test]
5709    async fn test_source_not_found() {
5710        let kernel = Kernel::transient().expect("failed to create kernel");
5711
5712        // Try to source a non-existent file
5713        let result = kernel
5714            .execute(r#"source "/nonexistent.kai""#)
5715            .await
5716            .expect("source should not fail with error");
5717
5718        assert!(!result.ok(), "source of non-existent file should fail");
5719        assert!(result.err.contains("nonexistent.kai"), "error should mention filename");
5720    }
5721
5722    #[tokio::test]
5723    async fn test_source_missing_filename() {
5724        let kernel = Kernel::transient().expect("failed to create kernel");
5725
5726        // Call source with no arguments
5727        let result = kernel
5728            .execute("source")
5729            .await
5730            .expect("source should not fail with error");
5731
5732        assert!(!result.ok(), "source without filename should fail");
5733        assert!(result.err.contains("missing filename"), "error should mention missing filename");
5734    }
5735
5736    #[tokio::test]
5737    async fn test_source_executes_multiple_statements() {
5738        let kernel = Kernel::transient().expect("failed to create kernel");
5739
5740        // Write a script with multiple statements
5741        kernel
5742            .execute(r#"write "/multi.kai" 'A=1
5743B=2
5744C=3'"#)
5745            .await
5746            .expect("write failed");
5747
5748        // Source it
5749        kernel
5750            .execute(r#"source "/multi.kai""#)
5751            .await
5752            .expect("source failed");
5753
5754        // All variables should be set
5755        assert_eq!(kernel.get_var("A").await, Some(Value::Int(1)));
5756        assert_eq!(kernel.get_var("B").await, Some(Value::Int(2)));
5757        assert_eq!(kernel.get_var("C").await, Some(Value::Int(3)));
5758    }
5759
5760    #[tokio::test]
5761    async fn test_source_can_define_functions() {
5762        let kernel = Kernel::transient().expect("failed to create kernel");
5763
5764        // Write a script that defines a function
5765        kernel
5766            .execute(r#"write "/functions.kai" 'greet() {
5767    echo "Hello, $1!"
5768}'"#)
5769            .await
5770            .expect("write failed");
5771
5772        // Source it
5773        kernel
5774            .execute(r#"source "/functions.kai""#)
5775            .await
5776            .expect("source failed");
5777
5778        // Use the defined function
5779        let result = kernel
5780            .execute(r#"greet "World""#)
5781            .await
5782            .expect("greet failed");
5783
5784        assert!(result.ok());
5785        assert!(result.text_out().contains("Hello, World!"));
5786    }
5787
5788    #[tokio::test]
5789    async fn test_source_inherits_error_exit() {
5790        let kernel = Kernel::transient().expect("failed to create kernel");
5791
5792        // Enable error exit
5793        kernel.execute("set -e").await.expect("set -e failed");
5794
5795        // Write a script that has a failure
5796        kernel
5797            .execute(r#"write "/fail.kai" 'BEFORE="yes"
5798false
5799AFTER="yes"'"#)
5800            .await
5801            .expect("write failed");
5802
5803        // Source it (should exit on false due to set -e)
5804        kernel
5805            .execute(r#"source "/fail.kai""#)
5806            .await
5807            .ok();
5808
5809        // BEFORE should be set, AFTER should NOT be set due to error exit
5810        let before = kernel.get_var("BEFORE").await;
5811        assert_eq!(before, Some(Value::String("yes".into())));
5812
5813        // Note: This test depends on whether error exit is checked within source
5814        // Currently our implementation checks per-statement in the main kernel
5815    }
5816
5817    // ═══════════════════════════════════════════════════════════════════════════
5818    // set -e with && / || chains
5819    // ═══════════════════════════════════════════════════════════════════════════
5820
5821    #[tokio::test]
5822    async fn test_set_e_and_chain_left_fails() {
5823        // set -e; false && echo hi; REACHED=1 → REACHED should be set
5824        let kernel = Kernel::transient().expect("failed to create kernel");
5825        kernel.execute("set -e").await.expect("set -e failed");
5826
5827        kernel
5828            .execute("false && echo hi; REACHED=1")
5829            .await
5830            .expect("execution failed");
5831
5832        let reached = kernel.get_var("REACHED").await;
5833        assert_eq!(
5834            reached,
5835            Some(Value::Int(1)),
5836            "set -e should not trigger on left side of &&"
5837        );
5838    }
5839
5840    #[tokio::test]
5841    async fn test_set_e_and_chain_right_fails() {
5842        // set -e; true && false; REACHED=1 → REACHED should NOT be set
5843        let kernel = Kernel::transient().expect("failed to create kernel");
5844        kernel.execute("set -e").await.expect("set -e failed");
5845
5846        kernel
5847            .execute("true && false; REACHED=1")
5848            .await
5849            .expect("execution failed");
5850
5851        let reached = kernel.get_var("REACHED").await;
5852        assert!(
5853            reached.is_none(),
5854            "set -e should trigger when right side of && fails"
5855        );
5856    }
5857
5858    #[tokio::test]
5859    async fn test_set_e_or_chain_recovers() {
5860        // set -e; false || echo recovered; REACHED=1 → REACHED should be set
5861        let kernel = Kernel::transient().expect("failed to create kernel");
5862        kernel.execute("set -e").await.expect("set -e failed");
5863
5864        kernel
5865            .execute("false || echo recovered; REACHED=1")
5866            .await
5867            .expect("execution failed");
5868
5869        let reached = kernel.get_var("REACHED").await;
5870        assert_eq!(
5871            reached,
5872            Some(Value::Int(1)),
5873            "set -e should not trigger when || recovers the failure"
5874        );
5875    }
5876
5877    #[tokio::test]
5878    async fn test_set_e_or_chain_both_fail() {
5879        // set -e; false || false; REACHED=1 → REACHED should NOT be set
5880        let kernel = Kernel::transient().expect("failed to create kernel");
5881        kernel.execute("set -e").await.expect("set -e failed");
5882
5883        kernel
5884            .execute("false || false; REACHED=1")
5885            .await
5886            .expect("execution failed");
5887
5888        let reached = kernel.get_var("REACHED").await;
5889        assert!(
5890            reached.is_none(),
5891            "set -e should trigger when || chain ultimately fails"
5892        );
5893    }
5894
5895    // ═══════════════════════════════════════════════════════════════════════════
5896    // Cancellation Tests
5897    // ═══════════════════════════════════════════════════════════════════════════
5898
5899    /// Helper: schedule a cancel after a delay from a background thread.
5900    /// Uses std::thread because cancel() is sync and Kernel is not Send.
5901    fn schedule_cancel(kernel: &Arc<Kernel>, delay: std::time::Duration) {
5902        let k = Arc::clone(kernel);
5903        std::thread::spawn(move || {
5904            std::thread::sleep(delay);
5905            k.cancel();
5906        });
5907    }
5908
5909    #[tokio::test]
5910    async fn test_cancel_interrupts_for_loop() {
5911        let kernel = Arc::new(Kernel::transient().expect("failed to create kernel"));
5912
5913        // Schedule cancel after a short delay from a background OS thread
5914        schedule_cancel(&kernel, std::time::Duration::from_millis(10));
5915
5916        let result = kernel
5917            .execute("for i in $(seq 1 100000); do X=$i; done")
5918            .await
5919            .expect("execute failed");
5920
5921        assert_eq!(result.code, 130, "cancelled execution should exit with code 130");
5922
5923        // The loop variable should be set to something < 100000
5924        let x = kernel.get_var("X").await;
5925        if let Some(Value::Int(n)) = x {
5926            assert!(n < 100000, "loop should have been interrupted before finishing, got X={n}");
5927        }
5928    }
5929
5930    #[tokio::test]
5931    async fn test_cancel_interrupts_while_loop() {
5932        let kernel = Arc::new(Kernel::transient().expect("failed to create kernel"));
5933        kernel.execute("COUNT=0").await.expect("init failed");
5934
5935        schedule_cancel(&kernel, std::time::Duration::from_millis(10));
5936
5937        let result = kernel
5938            .execute("while true; do COUNT=$((COUNT + 1)); done")
5939            .await
5940            .expect("execute failed");
5941
5942        assert_eq!(result.code, 130);
5943
5944        let count = kernel.get_var("COUNT").await;
5945        if let Some(Value::Int(n)) = count {
5946            assert!(n > 0, "loop should have run at least once");
5947        }
5948    }
5949
5950    #[tokio::test]
5951    async fn test_reset_after_cancel() {
5952        // After cancellation, the next execute() should work normally
5953        let kernel = Kernel::transient().expect("failed to create kernel");
5954        kernel.cancel(); // cancel with nothing running
5955
5956        let result = kernel.execute("echo hello").await.expect("execute failed");
5957        assert!(result.ok(), "execute after cancel should succeed");
5958        assert_eq!(result.text_out().trim(), "hello");
5959    }
5960
5961    #[tokio::test]
5962    async fn test_cancel_interrupts_statement_sequence() {
5963        let kernel = Arc::new(Kernel::transient().expect("failed to create kernel"));
5964
5965        // Schedule cancel after the first statement runs but before sleep finishes
5966        schedule_cancel(&kernel, std::time::Duration::from_millis(50));
5967
5968        let result = kernel
5969            .execute("STEP=1; sleep 5; STEP=2; sleep 5; STEP=3")
5970            .await
5971            .expect("execute failed");
5972
5973        assert_eq!(result.code, 130);
5974
5975        // STEP should be 1 (set before sleep), not 2 or 3
5976        let step = kernel.get_var("STEP").await;
5977        assert_eq!(step, Some(Value::Int(1)), "cancel should stop before STEP=2");
5978    }
5979
5980    // ═══════════════════════════════════════════════════════════════════════════
5981    // Case Statement Tests
5982    // ═══════════════════════════════════════════════════════════════════════════
5983
5984    #[tokio::test]
5985    async fn test_case_simple_match() {
5986        let kernel = Kernel::transient().expect("failed to create kernel");
5987
5988        let result = kernel
5989            .execute(r#"
5990                case "hello" in
5991                    hello) echo "matched hello" ;;
5992                    world) echo "matched world" ;;
5993                esac
5994            "#)
5995            .await
5996            .expect("case failed");
5997
5998        assert!(result.ok());
5999        assert_eq!(result.text_out().trim(), "matched hello");
6000    }
6001
6002    #[tokio::test]
6003    async fn test_case_wildcard_match() {
6004        let kernel = Kernel::transient().expect("failed to create kernel");
6005
6006        let result = kernel
6007            .execute(r#"
6008                case "main.rs" in
6009                    *.py) echo "Python" ;;
6010                    *.rs) echo "Rust" ;;
6011                    *) echo "Unknown" ;;
6012                esac
6013            "#)
6014            .await
6015            .expect("case failed");
6016
6017        assert!(result.ok());
6018        assert_eq!(result.text_out().trim(), "Rust");
6019    }
6020
6021    #[tokio::test]
6022    async fn test_case_default_match() {
6023        let kernel = Kernel::transient().expect("failed to create kernel");
6024
6025        let result = kernel
6026            .execute(r#"
6027                case "unknown.xyz" in
6028                    *.py) echo "Python" ;;
6029                    *.rs) echo "Rust" ;;
6030                    *) echo "Default" ;;
6031                esac
6032            "#)
6033            .await
6034            .expect("case failed");
6035
6036        assert!(result.ok());
6037        assert_eq!(result.text_out().trim(), "Default");
6038    }
6039
6040    #[tokio::test]
6041    async fn test_case_no_match() {
6042        let kernel = Kernel::transient().expect("failed to create kernel");
6043
6044        // Case with no default branch and no match
6045        let result = kernel
6046            .execute(r#"
6047                case "nope" in
6048                    "yes") echo "yes" ;;
6049                    "no") echo "no" ;;
6050                esac
6051            "#)
6052            .await
6053            .expect("case failed");
6054
6055        assert!(result.ok());
6056        assert!(result.text_out().is_empty(), "no match should produce empty output");
6057    }
6058
6059    #[tokio::test]
6060    async fn test_case_with_variable() {
6061        let kernel = Kernel::transient().expect("failed to create kernel");
6062
6063        kernel.execute(r#"LANG="rust""#).await.expect("set failed");
6064
6065        let result = kernel
6066            .execute(r#"
6067                case ${LANG} in
6068                    python) echo "snake" ;;
6069                    rust) echo "crab" ;;
6070                    go) echo "gopher" ;;
6071                esac
6072            "#)
6073            .await
6074            .expect("case failed");
6075
6076        assert!(result.ok());
6077        assert_eq!(result.text_out().trim(), "crab");
6078    }
6079
6080    #[tokio::test]
6081    async fn test_case_multiple_patterns() {
6082        let kernel = Kernel::transient().expect("failed to create kernel");
6083
6084        let result = kernel
6085            .execute(r#"
6086                case "yes" in
6087                    "y"|"yes"|"Y"|"YES") echo "affirmative" ;;
6088                    "n"|"no"|"N"|"NO") echo "negative" ;;
6089                esac
6090            "#)
6091            .await
6092            .expect("case failed");
6093
6094        assert!(result.ok());
6095        assert_eq!(result.text_out().trim(), "affirmative");
6096    }
6097
6098    #[tokio::test]
6099    async fn test_case_glob_question_mark() {
6100        let kernel = Kernel::transient().expect("failed to create kernel");
6101
6102        let result = kernel
6103            .execute(r#"
6104                case "test1" in
6105                    test?) echo "matched test?" ;;
6106                    *) echo "default" ;;
6107                esac
6108            "#)
6109            .await
6110            .expect("case failed");
6111
6112        assert!(result.ok());
6113        assert_eq!(result.text_out().trim(), "matched test?");
6114    }
6115
6116    #[tokio::test]
6117    async fn test_case_char_class() {
6118        let kernel = Kernel::transient().expect("failed to create kernel");
6119
6120        let result = kernel
6121            .execute(r#"
6122                case "Yes" in
6123                    [Yy]*) echo "yes-like" ;;
6124                    [Nn]*) echo "no-like" ;;
6125                esac
6126            "#)
6127            .await
6128            .expect("case failed");
6129
6130        assert!(result.ok());
6131        assert_eq!(result.text_out().trim(), "yes-like");
6132    }
6133
6134    // ═══════════════════════════════════════════════════════════════════════════
6135    // Cat Stdin Tests
6136    // ═══════════════════════════════════════════════════════════════════════════
6137
6138    #[tokio::test]
6139    async fn test_cat_from_pipeline() {
6140        let kernel = Kernel::transient().expect("failed to create kernel");
6141
6142        let result = kernel
6143            .execute(r#"echo "piped text" | cat"#)
6144            .await
6145            .expect("cat pipeline failed");
6146
6147        assert!(result.ok(), "cat failed: {}", result.err);
6148        assert_eq!(result.text_out().trim(), "piped text");
6149    }
6150
6151    #[tokio::test]
6152    async fn test_cat_from_pipeline_multiline() {
6153        let kernel = Kernel::transient().expect("failed to create kernel");
6154
6155        let result = kernel
6156            .execute(r#"echo "line1\nline2" | cat -n"#)
6157            .await
6158            .expect("cat pipeline failed");
6159
6160        assert!(result.ok(), "cat failed: {}", result.err);
6161        assert!(result.text_out().contains("1\t"), "output: {}", result.text_out());
6162    }
6163
6164    // ═══════════════════════════════════════════════════════════════════════════
6165    // Heredoc Tests
6166    // ═══════════════════════════════════════════════════════════════════════════
6167
6168    #[tokio::test]
6169    async fn test_heredoc_basic() {
6170        let kernel = Kernel::transient().expect("failed to create kernel");
6171
6172        let result = kernel
6173            .execute("cat <<EOF\nhello\nEOF")
6174            .await
6175            .expect("heredoc failed");
6176
6177        assert!(result.ok(), "cat with heredoc failed: {}", result.err);
6178        assert_eq!(result.text_out().trim(), "hello");
6179    }
6180
6181    #[tokio::test]
6182    async fn test_arithmetic_in_string() {
6183        let kernel = Kernel::transient().expect("failed to create kernel");
6184
6185        let result = kernel
6186            .execute(r#"echo "result: $((1 + 2))""#)
6187            .await
6188            .expect("arithmetic in string failed");
6189
6190        assert!(result.ok(), "echo failed: {}", result.err);
6191        assert_eq!(result.text_out().trim(), "result: 3");
6192    }
6193
6194    #[tokio::test]
6195    async fn test_heredoc_multiline() {
6196        let kernel = Kernel::transient().expect("failed to create kernel");
6197
6198        let result = kernel
6199            .execute("cat <<EOF\nline1\nline2\nline3\nEOF")
6200            .await
6201            .expect("heredoc failed");
6202
6203        assert!(result.ok(), "cat with heredoc failed: {}", result.err);
6204        assert!(result.text_out().contains("line1"), "output: {}", result.text_out());
6205        assert!(result.text_out().contains("line2"), "output: {}", result.text_out());
6206        assert!(result.text_out().contains("line3"), "output: {}", result.text_out());
6207    }
6208
6209    #[tokio::test]
6210    async fn test_heredoc_variable_expansion() {
6211        // Bug N: unquoted heredoc should expand variables
6212        let kernel = Kernel::transient().expect("failed to create kernel");
6213
6214        kernel.execute("GREETING=hello").await.expect("set var");
6215
6216        let result = kernel
6217            .execute("cat <<EOF\n$GREETING world\nEOF")
6218            .await
6219            .expect("heredoc expansion failed");
6220
6221        assert!(result.ok(), "heredoc expansion failed: {}", result.err);
6222        assert_eq!(result.text_out().trim(), "hello world");
6223    }
6224
6225    #[tokio::test]
6226    async fn test_heredoc_quoted_no_expansion() {
6227        // Bug N: quoted heredoc (<<'EOF') should NOT expand variables
6228        let kernel = Kernel::transient().expect("failed to create kernel");
6229
6230        kernel.execute("GREETING=hello").await.expect("set var");
6231
6232        let result = kernel
6233            .execute("cat <<'EOF'\n$GREETING world\nEOF")
6234            .await
6235            .expect("quoted heredoc failed");
6236
6237        assert!(result.ok(), "quoted heredoc failed: {}", result.err);
6238        assert_eq!(result.text_out().trim(), "$GREETING world");
6239    }
6240
6241    #[tokio::test]
6242    async fn test_heredoc_default_value_expansion() {
6243        // Bug N: ${VAR:-default} should expand in unquoted heredocs
6244        let kernel = Kernel::transient().expect("failed to create kernel");
6245
6246        let result = kernel
6247            .execute("cat <<EOF\n${UNSET:-fallback}\nEOF")
6248            .await
6249            .expect("heredoc default expansion failed");
6250
6251        assert!(result.ok(), "heredoc default expansion failed: {}", result.err);
6252        assert_eq!(result.text_out().trim(), "fallback");
6253    }
6254
6255    // ═══════════════════════════════════════════════════════════════════════════
6256    // Read Builtin Tests
6257    // ═══════════════════════════════════════════════════════════════════════════
6258
6259    #[tokio::test]
6260    async fn test_read_from_pipeline() {
6261        let kernel = Kernel::transient().expect("failed to create kernel");
6262
6263        // Pipe input to read
6264        let result = kernel
6265            .execute(r#"echo "Alice" | read NAME; echo "Hello, ${NAME}""#)
6266            .await
6267            .expect("read pipeline failed");
6268
6269        assert!(result.ok(), "read failed: {}", result.err);
6270        assert!(result.text_out().contains("Hello, Alice"), "output: {}", result.text_out());
6271    }
6272
6273    #[tokio::test]
6274    async fn test_read_multiple_vars_from_pipeline() {
6275        let kernel = Kernel::transient().expect("failed to create kernel");
6276
6277        let result = kernel
6278            .execute(r#"echo "John Doe 42" | read FIRST LAST AGE; echo "${FIRST} is ${AGE}""#)
6279            .await
6280            .expect("read pipeline failed");
6281
6282        assert!(result.ok(), "read failed: {}", result.err);
6283        assert!(result.text_out().contains("John is 42"), "output: {}", result.text_out());
6284    }
6285
6286    // ═══════════════════════════════════════════════════════════════════════════
6287    // Shell-Style Function Tests
6288    // ═══════════════════════════════════════════════════════════════════════════
6289
6290    #[tokio::test]
6291    async fn test_posix_function_with_positional_params() {
6292        let kernel = Kernel::transient().expect("failed to create kernel");
6293
6294        // Define POSIX-style function
6295        kernel
6296            .execute(r#"greet() { echo "Hello, $1!" }"#)
6297            .await
6298            .expect("function definition failed");
6299
6300        // Call the function
6301        let result = kernel
6302            .execute(r#"greet "Amy""#)
6303            .await
6304            .expect("function call failed");
6305
6306        assert!(result.ok(), "greet failed: {}", result.err);
6307        assert_eq!(result.text_out().trim(), "Hello, Amy!");
6308    }
6309
6310    #[tokio::test]
6311    async fn test_posix_function_multiple_args() {
6312        let kernel = Kernel::transient().expect("failed to create kernel");
6313
6314        // Define function using $1 and $2
6315        kernel
6316            .execute(r#"add_greeting() { echo "$1 $2!" }"#)
6317            .await
6318            .expect("function definition failed");
6319
6320        // Call the function
6321        let result = kernel
6322            .execute(r#"add_greeting "Hello" "World""#)
6323            .await
6324            .expect("function call failed");
6325
6326        assert!(result.ok(), "function failed: {}", result.err);
6327        assert_eq!(result.text_out().trim(), "Hello World!");
6328    }
6329
6330    #[tokio::test]
6331    async fn test_bash_function_with_positional_params() {
6332        let kernel = Kernel::transient().expect("failed to create kernel");
6333
6334        // Define bash-style function (function keyword, no parens)
6335        kernel
6336            .execute(r#"function greet { echo "Hi $1" }"#)
6337            .await
6338            .expect("function definition failed");
6339
6340        // Call the function
6341        let result = kernel
6342            .execute(r#"greet "Bob""#)
6343            .await
6344            .expect("function call failed");
6345
6346        assert!(result.ok(), "greet failed: {}", result.err);
6347        assert_eq!(result.text_out().trim(), "Hi Bob");
6348    }
6349
6350    #[tokio::test]
6351    async fn test_shell_function_with_all_args() {
6352        let kernel = Kernel::transient().expect("failed to create kernel");
6353
6354        // Define function using $@ (all args)
6355        kernel
6356            .execute(r#"echo_all() { echo "args: $@" }"#)
6357            .await
6358            .expect("function definition failed");
6359
6360        // Call with multiple args
6361        let result = kernel
6362            .execute(r#"echo_all "a" "b" "c""#)
6363            .await
6364            .expect("function call failed");
6365
6366        assert!(result.ok(), "function failed: {}", result.err);
6367        assert_eq!(result.text_out().trim(), "args: a b c");
6368    }
6369
6370    #[tokio::test]
6371    async fn test_shell_function_with_arg_count() {
6372        let kernel = Kernel::transient().expect("failed to create kernel");
6373
6374        // Define function using $# (arg count)
6375        kernel
6376            .execute(r#"count_args() { echo "count: $#" }"#)
6377            .await
6378            .expect("function definition failed");
6379
6380        // Call with three args
6381        let result = kernel
6382            .execute(r#"count_args "x" "y" "z""#)
6383            .await
6384            .expect("function call failed");
6385
6386        assert!(result.ok(), "function failed: {}", result.err);
6387        assert_eq!(result.text_out().trim(), "count: 3");
6388    }
6389
6390    #[tokio::test]
6391    async fn test_shell_function_shared_scope() {
6392        let kernel = Kernel::transient().expect("failed to create kernel");
6393
6394        // Set a variable in parent scope
6395        kernel
6396            .execute(r#"PARENT_VAR="visible""#)
6397            .await
6398            .expect("set failed");
6399
6400        // Define shell function that reads and writes parent variable
6401        kernel
6402            .execute(r#"modify_parent() {
6403                echo "saw: ${PARENT_VAR}"
6404                PARENT_VAR="changed by function"
6405            }"#)
6406            .await
6407            .expect("function definition failed");
6408
6409        // Call the function - it SHOULD see PARENT_VAR (bash-compatible shared scope)
6410        let result = kernel.execute("modify_parent").await.expect("function failed");
6411
6412        assert!(
6413            result.text_out().contains("visible"),
6414            "Shell function should access parent scope, got: {}",
6415            result.text_out()
6416        );
6417
6418        // Parent variable should be modified
6419        let var = kernel.get_var("PARENT_VAR").await;
6420        assert_eq!(
6421            var,
6422            Some(Value::String("changed by function".into())),
6423            "Shell function should modify parent scope"
6424        );
6425    }
6426
6427    // ═══════════════════════════════════════════════════════════════════════════
6428    // Script Execution via PATH Tests
6429    // ═══════════════════════════════════════════════════════════════════════════
6430
6431    #[tokio::test]
6432    async fn test_script_execution_from_path() {
6433        let kernel = Kernel::transient().expect("failed to create kernel");
6434
6435        // Create /bin directory and script
6436        kernel.execute(r#"mkdir "/bin""#).await.ok();
6437        kernel
6438            .execute(r#"write "/bin/hello.kai" 'echo "Hello from script!"'"#)
6439            .await
6440            .expect("write script failed");
6441
6442        // Set PATH to /bin
6443        kernel.execute(r#"PATH="/bin""#).await.expect("set PATH failed");
6444
6445        // Call script by name (without .kai extension)
6446        let result = kernel
6447            .execute("hello")
6448            .await
6449            .expect("script execution failed");
6450
6451        assert!(result.ok(), "script failed: {}", result.err);
6452        assert_eq!(result.text_out().trim(), "Hello from script!");
6453    }
6454
6455    #[tokio::test]
6456    async fn test_script_with_args() {
6457        let kernel = Kernel::transient().expect("failed to create kernel");
6458
6459        // Create script that uses positional params
6460        kernel.execute(r#"mkdir "/bin""#).await.ok();
6461        kernel
6462            .execute(r#"write "/bin/greet.kai" 'echo "Hello, $1!"'"#)
6463            .await
6464            .expect("write script failed");
6465
6466        // Set PATH
6467        kernel.execute(r#"PATH="/bin""#).await.expect("set PATH failed");
6468
6469        // Call script with arg
6470        let result = kernel
6471            .execute(r#"greet "World""#)
6472            .await
6473            .expect("script execution failed");
6474
6475        assert!(result.ok(), "script failed: {}", result.err);
6476        assert_eq!(result.text_out().trim(), "Hello, World!");
6477    }
6478
6479    #[tokio::test]
6480    async fn test_script_not_found() {
6481        let kernel = Kernel::transient().expect("failed to create kernel");
6482
6483        // Set empty PATH
6484        kernel.execute(r#"PATH="/nonexistent""#).await.expect("set PATH failed");
6485
6486        // Call non-existent script
6487        let result = kernel
6488            .execute("noscript")
6489            .await
6490            .expect("execution failed");
6491
6492        assert!(!result.ok(), "should fail with command not found");
6493        assert_eq!(result.code, 127);
6494        assert!(result.err.contains("command not found"));
6495    }
6496
6497    #[tokio::test]
6498    async fn test_script_path_search_order() {
6499        let kernel = Kernel::transient().expect("failed to create kernel");
6500
6501        // Create two directories with same-named script
6502        // Note: using "myscript" not "test" to avoid conflict with test builtin
6503        kernel.execute(r#"mkdir "/first""#).await.ok();
6504        kernel.execute(r#"mkdir "/second""#).await.ok();
6505        kernel
6506            .execute(r#"write "/first/myscript.kai" 'echo "from first"'"#)
6507            .await
6508            .expect("write failed");
6509        kernel
6510            .execute(r#"write "/second/myscript.kai" 'echo "from second"'"#)
6511            .await
6512            .expect("write failed");
6513
6514        // Set PATH with first before second
6515        kernel.execute(r#"PATH="/first:/second""#).await.expect("set PATH failed");
6516
6517        // Should find first one
6518        let result = kernel
6519            .execute("myscript")
6520            .await
6521            .expect("script execution failed");
6522
6523        assert!(result.ok(), "script failed: {}", result.err);
6524        assert_eq!(result.text_out().trim(), "from first");
6525    }
6526
6527    // ═══════════════════════════════════════════════════════════════════════════
6528    // Special Variable Tests ($?, $$, unset vars)
6529    // ═══════════════════════════════════════════════════════════════════════════
6530
6531    #[tokio::test]
6532    async fn test_last_exit_code_success() {
6533        let kernel = Kernel::transient().expect("failed to create kernel");
6534
6535        // true exits with 0
6536        let result = kernel.execute("true; echo $?").await.expect("execution failed");
6537        assert!(result.text_out().contains("0"), "expected 0, got: {}", result.text_out());
6538    }
6539
6540    #[tokio::test]
6541    async fn test_last_exit_code_failure() {
6542        let kernel = Kernel::transient().expect("failed to create kernel");
6543
6544        // false exits with 1
6545        let result = kernel.execute("false; echo $?").await.expect("execution failed");
6546        assert!(result.text_out().contains("1"), "expected 1, got: {}", result.text_out());
6547    }
6548
6549    #[tokio::test]
6550    async fn test_current_pid() {
6551        let kernel = Kernel::transient().expect("failed to create kernel");
6552
6553        let result = kernel.execute("echo $$").await.expect("execution failed");
6554        // PID should be a positive number
6555        let pid: u32 = result.text_out().trim().parse().expect("PID should be a number");
6556        assert!(pid > 0, "PID should be positive");
6557    }
6558
6559    #[tokio::test]
6560    async fn test_unset_variable_expands_to_empty() {
6561        let kernel = Kernel::transient().expect("failed to create kernel");
6562
6563        // Unset variable in interpolation should be empty
6564        let result = kernel.execute(r#"echo "prefix:${UNSET_VAR}:suffix""#).await.expect("execution failed");
6565        assert_eq!(result.text_out().trim(), "prefix::suffix");
6566    }
6567
6568    #[tokio::test]
6569    async fn test_eq_ne_operators() {
6570        let kernel = Kernel::transient().expect("failed to create kernel");
6571
6572        // Test -eq operator
6573        let result = kernel.execute(r#"if [[ 5 -eq 5 ]]; then echo "eq works"; fi"#).await.expect("execution failed");
6574        assert_eq!(result.text_out().trim(), "eq works");
6575
6576        // Test -ne operator
6577        let result = kernel.execute(r#"if [[ 5 -ne 3 ]]; then echo "ne works"; fi"#).await.expect("execution failed");
6578        assert_eq!(result.text_out().trim(), "ne works");
6579
6580        // Test -eq with different values
6581        let result = kernel.execute(r#"if [[ 5 -eq 3 ]]; then echo "wrong"; else echo "correct"; fi"#).await.expect("execution failed");
6582        assert_eq!(result.text_out().trim(), "correct");
6583    }
6584
6585    #[tokio::test]
6586    async fn test_escaped_dollar_in_string() {
6587        let kernel = Kernel::transient().expect("failed to create kernel");
6588
6589        // \$ should produce literal $
6590        let result = kernel.execute(r#"echo "\$100""#).await.expect("execution failed");
6591        assert_eq!(result.text_out().trim(), "$100");
6592    }
6593
6594    #[tokio::test]
6595    async fn test_special_vars_in_interpolation() {
6596        let kernel = Kernel::transient().expect("failed to create kernel");
6597
6598        // Test $? in string interpolation
6599        let result = kernel.execute(r#"true; echo "exit: $?""#).await.expect("execution failed");
6600        assert_eq!(result.text_out().trim(), "exit: 0");
6601
6602        // Test $$ in string interpolation
6603        let result = kernel.execute(r#"echo "pid: $$""#).await.expect("execution failed");
6604        assert!(result.text_out().starts_with("pid: "), "unexpected output: {}", result.text_out());
6605        let text = result.text_out();
6606        let pid_part = text.trim().strip_prefix("pid: ").unwrap();
6607        let _pid: u32 = pid_part.parse().expect("PID in string should be a number");
6608    }
6609
6610    // ═══════════════════════════════════════════════════════════════════════════
6611    // Command Substitution Tests
6612    // ═══════════════════════════════════════════════════════════════════════════
6613
6614    #[tokio::test]
6615    async fn test_command_subst_assignment() {
6616        let kernel = Kernel::transient().expect("failed to create kernel");
6617
6618        // Command substitution in assignment
6619        let result = kernel.execute(r#"X=$(echo hello); echo "$X""#).await.expect("execution failed");
6620        assert_eq!(result.text_out().trim(), "hello");
6621    }
6622
6623    #[tokio::test]
6624    async fn test_command_subst_with_args() {
6625        let kernel = Kernel::transient().expect("failed to create kernel");
6626
6627        // Command substitution with string argument
6628        let result = kernel.execute(r#"X=$(echo "a b c"); echo "$X""#).await.expect("execution failed");
6629        assert_eq!(result.text_out().trim(), "a b c");
6630    }
6631
6632    #[tokio::test]
6633    async fn test_command_subst_nested_vars() {
6634        let kernel = Kernel::transient().expect("failed to create kernel");
6635
6636        // Variables inside command substitution
6637        let result = kernel.execute(r#"Y=world; X=$(echo "hello $Y"); echo "$X""#).await.expect("execution failed");
6638        assert_eq!(result.text_out().trim(), "hello world");
6639    }
6640
6641    #[tokio::test]
6642    async fn test_background_job_basic() {
6643        use std::time::Duration;
6644
6645        let kernel = Kernel::new(KernelConfig::isolated()).expect("failed to create kernel");
6646
6647        // Run a simple background command
6648        let result = kernel.execute("echo hello &").await.expect("execution failed");
6649        assert!(result.ok(), "background command should succeed: {}", result.err);
6650        assert!(result.text_out().contains("[1]"), "should return job ID: {}", result.text_out());
6651
6652        // Give the job time to complete
6653        tokio::time::sleep(Duration::from_millis(100)).await;
6654
6655        // Check job status
6656        let status = kernel.execute("cat /v/jobs/1/status").await.expect("status check failed");
6657        assert!(status.ok(), "status should succeed: {}", status.err);
6658        assert!(
6659            status.text_out().contains("done:") || status.text_out().contains("running"),
6660            "should have valid status: {}",
6661            status.text_out()
6662        );
6663
6664        // Check stdout
6665        let stdout = kernel.execute("cat /v/jobs/1/stdout").await.expect("stdout check failed");
6666        assert!(stdout.ok());
6667        assert!(stdout.text_out().contains("hello"));
6668    }
6669
6670    #[tokio::test]
6671    async fn test_heredoc_piped_to_command() {
6672        // Bug 4: heredoc content should pipe through to next command
6673        let kernel = Kernel::transient().expect("kernel");
6674        let result = kernel.execute("cat <<EOF | cat\nhello world\nEOF").await.expect("exec");
6675        assert!(result.ok(), "heredoc | cat failed: {}", result.err);
6676        assert_eq!(result.text_out().trim(), "hello world");
6677    }
6678
6679    /// A transient kernel paired with a real, auto-cleaning tempdir. The
6680    /// transient (Sandboxed) kernel mounts `/tmp` as a real `LocalFs`, so glob
6681    /// tests need actual files on disk. Hold the returned `TempDir` for the
6682    /// test's lifetime: it removes the directory tree on drop — including on
6683    /// panic — so no test scratch leaks into `/tmp` (the project's tmp-builder
6684    /// convention; never hardcode `/tmp/...` paths). Returns the absolute path
6685    /// as a string for interpolation into scripts.
6686    fn transient_with_tempdir() -> (Kernel, tempfile::TempDir, String) {
6687        let kernel = Kernel::transient().expect("kernel");
6688        let tmp = tempfile::tempdir().expect("tempdir");
6689        let dir = tmp.path().display().to_string();
6690        (kernel, tmp, dir)
6691    }
6692
6693    #[tokio::test]
6694    async fn test_for_loop_glob_iterates() {
6695        // Bug 1: for F in $(glob ...) should iterate per file, not once
6696        let (kernel, _tmp, dir) = transient_with_tempdir();
6697        kernel.execute(&format!("echo a > {dir}/a.txt")).await.unwrap();
6698        kernel.execute(&format!("echo b > {dir}/b.txt")).await.unwrap();
6699        let result = kernel.execute(&format!(r#"
6700            N=0
6701            for F in $(glob "{dir}/*.txt"); do
6702                N=$((N + 1))
6703            done
6704            echo $N
6705        "#)).await.unwrap();
6706        assert!(result.ok(), "for glob failed: {}", result.err);
6707        assert_eq!(result.text_out().trim(), "2", "Should iterate 2 files, got: {}", result.text_out());
6708    }
6709
6710    #[tokio::test]
6711    async fn test_bare_glob_expansion_echo() {
6712        let (kernel, _tmp, dir) = transient_with_tempdir();
6713        kernel.execute(&format!("echo a > {dir}/a.txt")).await.unwrap();
6714        kernel.execute(&format!("echo b > {dir}/b.txt")).await.unwrap();
6715        kernel.execute(&format!("echo c > {dir}/c.rs")).await.unwrap();
6716        kernel.execute(&format!("cd {dir}")).await.unwrap();
6717        let result = kernel.execute("echo *.txt").await.unwrap();
6718        assert!(result.ok(), "echo *.txt failed: {}", result.err);
6719        let out = result.text_out();
6720        let out = out.trim();
6721        // Should contain both .txt files (order may vary)
6722        assert!(out.contains("a.txt"), "missing a.txt in: {}", out);
6723        assert!(out.contains("b.txt"), "missing b.txt in: {}", out);
6724        assert!(!out.contains("c.rs"), "should not contain c.rs in: {}", out);
6725    }
6726
6727    #[tokio::test]
6728    async fn test_bare_glob_no_matches_errors() {
6729        let (kernel, _tmp, dir) = transient_with_tempdir();
6730        kernel.execute(&format!("cd {dir}")).await.unwrap();
6731        let result = kernel.execute("echo *.nonexistent").await;
6732        match &result {
6733            Ok(exec) => {
6734                // No-match glob should produce a non-zero exit code
6735                assert!(!exec.ok(), "expected failure, got success: out={}, err={}", exec.text_out(), exec.err);
6736                assert!(exec.err.contains("no matches"), "error should say no matches: {}", exec.err);
6737            }
6738            Err(e) => {
6739                assert!(e.to_string().contains("no matches"), "error should say no matches: {}", e);
6740            }
6741        }
6742    }
6743
6744    #[tokio::test]
6745    async fn test_bare_glob_disabled_with_set() {
6746        let (kernel, _tmp, dir) = transient_with_tempdir();
6747        kernel.execute(&format!("echo a > {dir}/a.txt")).await.unwrap();
6748        kernel.execute(&format!("cd {dir}")).await.unwrap();
6749        // Disable glob expansion
6750        kernel.execute("set +o glob").await.unwrap();
6751        let result = kernel.execute("echo *.txt").await.unwrap();
6752        // With glob disabled, *.txt should be passed as literal string
6753        assert!(result.ok(), "echo should succeed: {}", result.err);
6754        assert_eq!(result.text_out().trim(), "*.txt", "should be literal: {}", result.text_out());
6755    }
6756
6757    #[tokio::test]
6758    async fn test_bare_glob_quoted_not_expanded() {
6759        let (kernel, _tmp, dir) = transient_with_tempdir();
6760        kernel.execute(&format!("echo a > {dir}/a.txt")).await.unwrap();
6761        kernel.execute(&format!("cd {dir}")).await.unwrap();
6762        // Quoted globs should NOT expand
6763        let result = kernel.execute("echo \"*.txt\"").await.unwrap();
6764        assert!(result.ok(), "echo should succeed: {}", result.err);
6765        assert_eq!(result.text_out().trim(), "*.txt", "quoted should be literal: {}", result.text_out());
6766    }
6767
6768    #[tokio::test]
6769    async fn test_bare_glob_for_loop() {
6770        let (kernel, _tmp, dir) = transient_with_tempdir();
6771        kernel.execute(&format!("echo a > {dir}/a.txt")).await.unwrap();
6772        kernel.execute(&format!("echo b > {dir}/b.txt")).await.unwrap();
6773        kernel.execute(&format!("cd {dir}")).await.unwrap();
6774        let result = kernel.execute(r#"
6775            N=0
6776            for f in *.txt; do
6777                N=$((N + 1))
6778            done
6779            echo $N
6780        "#).await.unwrap();
6781        assert!(result.ok(), "for loop failed: {}", result.err);
6782        assert_eq!(result.text_out().trim(), "2", "should iterate 2 files: {}", result.text_out());
6783    }
6784
6785    #[tokio::test]
6786    async fn test_glob_in_assignment_is_literal() {
6787        let kernel = Kernel::transient().expect("kernel");
6788        let result = kernel.execute("X=*.txt; echo $X").await.unwrap();
6789        assert!(result.ok());
6790        assert_eq!(result.text_out().trim(), "*.txt", "glob in assignment should be literal");
6791    }
6792
6793    #[tokio::test]
6794    async fn test_glob_in_test_expr_is_literal() {
6795        let kernel = Kernel::transient().expect("kernel");
6796        let result = kernel.execute(r#"
6797            if [[ *.txt == "*.txt" ]]; then
6798                echo "match"
6799            else
6800                echo "no"
6801            fi
6802        "#).await.unwrap();
6803        assert!(result.ok());
6804        assert_eq!(result.text_out().trim(), "match", "glob in test expr should be literal");
6805    }
6806
6807    #[tokio::test]
6808    async fn test_command_subst_echo_not_iterable() {
6809        // Regression guard: $(echo "a b c") must remain a single string
6810        let kernel = Kernel::transient().expect("kernel");
6811        let result = kernel.execute(r#"
6812            N=0
6813            for X in $(echo "a b c"); do N=$((N + 1)); done
6814            echo $N
6815        "#).await.unwrap();
6816        assert!(result.ok());
6817        assert_eq!(result.text_out().trim(), "1", "echo should be one item: {}", result.text_out());
6818    }
6819
6820    // -- accumulate_result / newline tests --
6821
6822    #[test]
6823    fn test_accumulate_preserves_own_newlines() {
6824        // Outputs concatenate verbatim — a command's own trailing newline is
6825        // kept, none is invented.
6826        let mut acc = ExecResult::success("line1\n");
6827        let new = ExecResult::success("line2\n");
6828        accumulate_result(&mut acc, &new);
6829        assert_eq!(&*acc.text_out(), "line1\nline2\n");
6830        assert!(!acc.text_out().contains("\n\n"), "should not have double newlines: {:?}", acc.text_out());
6831    }
6832
6833    #[test]
6834    fn test_accumulate_inserts_no_separator() {
6835        // No artificial separator: `printf a; printf b` style concatenates to
6836        // `ab`, matching bash (regression for the 2026-06-09 finding).
6837        let mut acc = ExecResult::success("line1");
6838        let new = ExecResult::success("line2");
6839        accumulate_result(&mut acc, &new);
6840        assert_eq!(&*acc.text_out(), "line1line2");
6841    }
6842
6843    #[test]
6844    fn test_accumulate_empty_into_nonempty() {
6845        let mut acc = ExecResult::success("");
6846        let new = ExecResult::success("hello\n");
6847        accumulate_result(&mut acc, &new);
6848        assert_eq!(&*acc.text_out(), "hello\n");
6849    }
6850
6851    #[test]
6852    fn test_accumulate_nonempty_into_empty() {
6853        let mut acc = ExecResult::success("hello\n");
6854        let new = ExecResult::success("");
6855        accumulate_result(&mut acc, &new);
6856        assert_eq!(&*acc.text_out(), "hello\n");
6857    }
6858
6859    #[test]
6860    fn test_accumulate_stderr_no_double_newlines() {
6861        let mut acc = ExecResult::failure(1, "err1\n");
6862        let new = ExecResult::failure(1, "err2\n");
6863        accumulate_result(&mut acc, &new);
6864        assert!(!acc.err.contains("\n\n"), "stderr should not have double newlines: {:?}", acc.err);
6865    }
6866
6867    #[tokio::test]
6868    async fn test_multiple_echo_no_blank_lines() {
6869        let kernel = Kernel::transient().expect("kernel");
6870        let result = kernel
6871            .execute("echo one\necho two\necho three")
6872            .await
6873            .expect("execution failed");
6874        assert!(result.ok());
6875        assert_eq!(&*result.text_out(), "one\ntwo\nthree\n");
6876    }
6877
6878    #[tokio::test]
6879    async fn test_for_loop_no_blank_lines() {
6880        let kernel = Kernel::transient().expect("kernel");
6881        let result = kernel
6882            .execute(r#"for X in a b c; do echo "item: ${X}"; done"#)
6883            .await
6884            .expect("execution failed");
6885        assert!(result.ok());
6886        assert_eq!(&*result.text_out(), "item: a\nitem: b\nitem: c\n");
6887    }
6888
6889    #[tokio::test]
6890    async fn test_for_command_subst_no_blank_lines() {
6891        let kernel = Kernel::transient().expect("kernel");
6892        let result = kernel
6893            .execute(r#"for N in $(seq 1 3); do echo "n=${N}"; done"#)
6894            .await
6895            .expect("execution failed");
6896        assert!(result.ok());
6897        assert_eq!(&*result.text_out(), "n=1\nn=2\nn=3\n");
6898    }
6899
6900    // ------------------------------------------------------------------
6901    // build_args_async: multi-consume flags (jq --arg NAME VALUE pattern)
6902    // ------------------------------------------------------------------
6903
6904    /// Helper: a throwaway schema with one `--pair` param declared as
6905    /// consuming two positionals per occurrence. Modelled after what
6906    /// jq_native will declare for `--arg` / `--argjson`.
6907    fn multi_consume_schema() -> crate::tools::ToolSchema {
6908        use crate::tools::{ParamSchema, ToolSchema};
6909        ToolSchema::new("test", "multi-consume smoke")
6910            .param(
6911                ParamSchema::optional("pair", "array", Value::Null, "name+value pair")
6912                    .consumes(2),
6913            )
6914    }
6915
6916    fn pos(s: &str) -> Arg {
6917        Arg::Positional(Expr::Literal(Value::String(s.to_string())))
6918    }
6919
6920    #[tokio::test]
6921    async fn build_args_multi_consume_single_occurrence() {
6922        let kernel = Kernel::transient().expect("kernel");
6923        let schema = multi_consume_schema();
6924        // Simulates:  test --pair NAME VALUE filter
6925        let args = vec![
6926            Arg::LongFlag("pair".into()),
6927            pos("NAME"),
6928            pos("VALUE"),
6929            pos("filter"),
6930        ];
6931        let built = kernel
6932            .build_args_async(&args, Some(&schema))
6933            .await
6934            .expect("build_args should succeed");
6935
6936        // `--pair` + its two positionals are consumed into named["pair"],
6937        // which becomes an outer array of one inner 2-element array.
6938        let pair = built.named.get("pair").expect("named[pair] missing");
6939        match pair {
6940            Value::Json(serde_json::Value::Array(occurrences)) => {
6941                assert_eq!(occurrences.len(), 1, "expected one occurrence");
6942                match &occurrences[0] {
6943                    serde_json::Value::Array(values) => {
6944                        assert_eq!(values.len(), 2, "pair must have 2 values");
6945                        assert_eq!(values[0], serde_json::Value::String("NAME".into()));
6946                        assert_eq!(values[1], serde_json::Value::String("VALUE".into()));
6947                    }
6948                    other => panic!("expected inner array, got {other:?}"),
6949                }
6950            }
6951            other => panic!("expected Json(Array(...)) for named[pair], got {other:?}"),
6952        }
6953
6954        // The un-consumed positional ("filter") remains in `positional`.
6955        assert_eq!(built.positional.len(), 1);
6956        assert_eq!(built.positional[0], Value::String("filter".into()));
6957    }
6958    #[tokio::test]
6959    async fn build_args_multi_consume_two_occurrences_accumulate() {
6960        let kernel = Kernel::transient().expect("kernel");
6961        let schema = multi_consume_schema();
6962        // Simulates:  test --pair A 1 --pair B 2 filter
6963        let args = vec![
6964            Arg::LongFlag("pair".into()),
6965            pos("A"),
6966            pos("1"),
6967            Arg::LongFlag("pair".into()),
6968            pos("B"),
6969            pos("2"),
6970            pos("filter"),
6971        ];
6972        let built = kernel
6973            .build_args_async(&args, Some(&schema))
6974            .await
6975            .expect("build_args should succeed");
6976
6977        let pair = built.named.get("pair").expect("named[pair] missing");
6978        match pair {
6979            Value::Json(serde_json::Value::Array(occurrences)) => {
6980                assert_eq!(occurrences.len(), 2, "expected two occurrences");
6981                // Preserved in invocation order.
6982                match &occurrences[0] {
6983                    serde_json::Value::Array(values) => {
6984                        assert_eq!(values[0], serde_json::Value::String("A".into()));
6985                        assert_eq!(values[1], serde_json::Value::String("1".into()));
6986                    }
6987                    other => panic!("expected inner array, got {other:?}"),
6988                }
6989                match &occurrences[1] {
6990                    serde_json::Value::Array(values) => {
6991                        assert_eq!(values[0], serde_json::Value::String("B".into()));
6992                        assert_eq!(values[1], serde_json::Value::String("2".into()));
6993                    }
6994                    other => panic!("expected inner array, got {other:?}"),
6995                }
6996            }
6997            other => panic!("expected Json(Array(...)), got {other:?}"),
6998        }
6999    }
7000
7001    // ── undeclared space-form flag under map_positionals (kj --type val) ──
7002    //
7003    // A backend/MCP tool whose schema does NOT declare a flag must not let
7004    // `--flag value` (space form) silently divorce the value: that was a
7005    // privilege-escalation-by-typo against kaijutsu (see docs/issues.md).
7006    // kaish fails loud rather than guessing.
7007
7008    use crate::tools::{ParamSchema, ToolSchema};
7009
7010    /// Backend-style schema (map_positionals) declaring only a `name`
7011    /// positional — `--type` is intentionally undeclared.
7012    fn kj_like_schema() -> ToolSchema {
7013        ToolSchema::new("kj", "incomplete backend schema")
7014            .param(ParamSchema::optional("name", "string", Value::Null, "context name"))
7015            .with_positional_mapping()
7016    }
7017
7018    #[tokio::test]
7019    async fn build_args_undeclared_space_flag_errors_under_map_positionals() {
7020        let kernel = Kernel::transient().expect("kernel");
7021        let schema = kj_like_schema();
7022        // kj context create exp --type explorer
7023        let args = vec![
7024            pos("context"),
7025            pos("create"),
7026            pos("exp"),
7027            Arg::LongFlag("type".into()),
7028            pos("explorer"),
7029        ];
7030        let err = kernel
7031            .build_args_async(&args, Some(&schema))
7032            .await
7033            .expect_err("undeclared --type with a space value must fail loud");
7034        let msg = err.to_string();
7035        assert!(msg.contains("--type"), "message should name the flag: {msg}");
7036        assert!(msg.contains("--type=explorer"), "message should suggest the = form: {msg}");
7037        assert!(msg.contains("kj"), "message should name the tool: {msg}");
7038    }
7039
7040    #[tokio::test]
7041    async fn build_args_declared_space_flag_still_binds() {
7042        let kernel = Kernel::transient().expect("kernel");
7043        // Same tool, but now the schema DECLARES --type as a string param.
7044        let schema = ToolSchema::new("kj", "complete schema")
7045            .param(ParamSchema::optional("name", "string", Value::Null, "context name"))
7046            .param(ParamSchema::optional("type", "string", Value::Null, "role type"))
7047            .with_positional_mapping();
7048        let args = vec![
7049            pos("exp"),
7050            Arg::LongFlag("type".into()),
7051            pos("explorer"),
7052        ];
7053        let built = kernel.build_args_async(&args, Some(&schema)).await.unwrap();
7054        assert_eq!(built.named.get("type"), Some(&Value::String("explorer".into())));
7055    }
7056
7057    #[tokio::test]
7058    async fn build_args_equals_form_binds_for_undeclared_flag() {
7059        let kernel = Kernel::transient().expect("kernel");
7060        let schema = kj_like_schema();
7061        // The unambiguous `=` form must keep working even when undeclared.
7062        let args = vec![
7063            pos("exp"),
7064            Arg::Named { key: "type".into(), value: Expr::Literal(Value::String("explorer".into())) },
7065        ];
7066        let built = kernel.build_args_async(&args, Some(&schema)).await.unwrap();
7067        assert_eq!(built.named.get("type"), Some(&Value::String("explorer".into())));
7068    }
7069
7070    #[tokio::test]
7071    async fn build_args_undeclared_bool_flag_at_end_is_ok() {
7072        let kernel = Kernel::transient().expect("kernel");
7073        let schema = kj_like_schema();
7074        // No positional follows --force → unambiguously a bare flag.
7075        let args = vec![pos("exp"), Arg::LongFlag("force".into())];
7076        let built = kernel.build_args_async(&args, Some(&schema)).await.unwrap();
7077        assert!(built.flags.contains("force"));
7078    }
7079
7080    #[tokio::test]
7081    async fn build_args_undeclared_flag_before_another_flag_is_ok() {
7082        let kernel = Kernel::transient().expect("kernel");
7083        let schema = kj_like_schema();
7084        // --verbose is followed by a flag, not a positional → not ambiguous.
7085        let args = vec![
7086            Arg::LongFlag("verbose".into()),
7087            Arg::Named { key: "name".into(), value: Expr::Literal(Value::String("x".into())) },
7088        ];
7089        let built = kernel.build_args_async(&args, Some(&schema)).await.unwrap();
7090        assert!(built.flags.contains("verbose"));
7091    }
7092
7093    #[tokio::test]
7094    async fn build_args_undeclared_space_flag_ok_for_builtin_schema() {
7095        let kernel = Kernel::transient().expect("kernel");
7096        // Builtins set map_positionals=false; the ambiguity guard must not
7097        // fire there (clap validates their flags separately).
7098        let schema = ToolSchema::new("frobnicate", "builtin-style")
7099            .param(ParamSchema::optional("name", "string", Value::Null, "name"));
7100        let args = vec![Arg::LongFlag("frob".into()), pos("value")];
7101        let built = kernel.build_args_async(&args, Some(&schema)).await.unwrap();
7102        assert!(built.flags.contains("frob"));
7103    }
7104
7105    // ── subcommand-aware binding (select_leaf wired into build_args_async) ──
7106    //
7107    // A tool exposing a subcommand tree binds flags against the *routed leaf's*
7108    // params, not the root's. The subcommand-path positionals stay positional
7109    // (kj re-parses them with its own clap), and a value flag declared only on
7110    // a deep leaf still binds in space form.
7111
7112    /// kj → context (alias ctx) → create{--type value, --force bool}.
7113    /// map_positionals defaults false on every node (builtin/kj style).
7114    fn kj_tree_schema() -> ToolSchema {
7115        ToolSchema::new("kj", "subcommand tool").subcommand(
7116            ToolSchema::new("context", "context ops")
7117                .with_command_aliases(["ctx"])
7118                .subcommand(
7119                    ToolSchema::new("create", "create context")
7120                        .param(ParamSchema::new("type", "string").with_aliases(["t"]))
7121                        .param(ParamSchema::new("force", "bool")),
7122                ),
7123        )
7124    }
7125
7126    #[tokio::test]
7127    async fn build_args_binds_deep_leaf_value_flag_space_form() {
7128        let kernel = Kernel::transient().expect("kernel");
7129        let schema = kj_tree_schema();
7130        // kj context create --type explorer
7131        let args = vec![
7132            pos("context"),
7133            pos("create"),
7134            Arg::LongFlag("type".into()),
7135            pos("explorer"),
7136        ];
7137        let built = kernel.build_args_async(&args, Some(&schema)).await.expect("build_args");
7138        // --type (declared only on the create leaf) binds in space form.
7139        assert_eq!(built.named.get("type"), Some(&Value::String("explorer".into())));
7140        // The subcommand path survives as positionals for kj to re-parse.
7141        let positionals: Vec<&str> = built
7142            .positional
7143            .iter()
7144            .filter_map(|v| if let Value::String(s) = v { Some(s.as_str()) } else { None })
7145            .collect();
7146        assert_eq!(positionals, vec!["context", "create"]);
7147    }
7148
7149    #[tokio::test]
7150    async fn build_args_leaf_bool_flag_does_not_swallow_positional() {
7151        let kernel = Kernel::transient().expect("kernel");
7152        let schema = kj_tree_schema();
7153        // kj context create --force somearg  → --force is a leaf bool flag,
7154        // it must NOT consume `somearg`.
7155        let args = vec![
7156            pos("context"),
7157            pos("create"),
7158            Arg::LongFlag("force".into()),
7159            pos("somearg"),
7160        ];
7161        let built = kernel.build_args_async(&args, Some(&schema)).await.expect("build_args");
7162        assert!(built.flags.contains("force"), "force should be a bare flag");
7163        let positionals: Vec<&str> = built
7164            .positional
7165            .iter()
7166            .filter_map(|v| if let Value::String(s) = v { Some(s.as_str()) } else { None })
7167            .collect();
7168        assert_eq!(positionals, vec!["context", "create", "somearg"]);
7169    }
7170
7171    #[tokio::test]
7172    async fn build_args_alias_routed_leaf_binds_value_flag() {
7173        let kernel = Kernel::transient().expect("kernel");
7174        let schema = kj_tree_schema();
7175        // kj ctx create -t explorer  → command alias + short flag alias.
7176        let args = vec![
7177            pos("ctx"),
7178            pos("create"),
7179            Arg::ShortFlag("t".into()),
7180            pos("explorer"),
7181        ];
7182        let built = kernel.build_args_async(&args, Some(&schema)).await.expect("build_args");
7183        assert_eq!(built.named.get("type"), Some(&Value::String("explorer".into())));
7184    }
7185
7186    #[tokio::test]
7187    async fn build_args_computed_subcommand_selector_fails_loud() {
7188        let kernel = Kernel::transient().expect("kernel");
7189        let schema = kj_tree_schema();
7190        // kj $(echo context) — routing can't see the value; fail loud.
7191        let args = vec![Arg::Positional(Expr::CommandSubst(vec![Stmt::Command(
7192            crate::ast::Command { name: "echo".into(), args: vec![], redirects: vec![] },
7193        )]))];
7194        let err = kernel
7195            .build_args_async(&args, Some(&schema))
7196            .await
7197            .expect_err("computed subcommand selector must error");
7198        assert!(
7199            err.to_string().contains("subcommand name is required"),
7200            "got: {err}"
7201        );
7202    }
7203
7204    // ── finalize_output: --json rendering vs. owns_output opt-out ───────────
7205
7206    #[test]
7207    fn finalize_output_renders_when_kernel_owns_it() {
7208        use crate::interpreter::{OutputData, OutputFormat};
7209        let r = ExecResult::with_output(OutputData::text("RAW"));
7210        let out = finalize_output(r, Some(OutputFormat::Json), false);
7211        // Kernel renders the typed OutputData → JSON; text is no longer bare.
7212        assert_ne!(out.text_out(), "RAW", "kernel should reformat to JSON");
7213    }
7214
7215    #[test]
7216    fn finalize_output_skips_when_tool_owns_output() {
7217        use crate::interpreter::{OutputData, OutputFormat};
7218        let r = ExecResult::with_output(OutputData::text("RAW"));
7219        let out = finalize_output(r, Some(OutputFormat::Json), true);
7220        // owns_output: the tool already rendered; kernel leaves bytes untouched.
7221        assert_eq!(out.text_out(), "RAW", "owned output must be left as-is");
7222    }
7223
7224    #[test]
7225    fn finalize_output_no_format_is_noop() {
7226        use crate::interpreter::OutputData;
7227        let r = ExecResult::with_output(OutputData::text("RAW"));
7228        let out = finalize_output(r, None, false);
7229        assert_eq!(out.text_out(), "RAW");
7230    }
7231
7232    // ── initial_vars + execute_with_vars + hermetic env ───────────────────
7233
7234    #[tokio::test]
7235    async fn test_initial_vars_set_and_exported() {
7236        let config = KernelConfig::transient()
7237            .with_var("INIT_FOO", Value::String("bar".into()));
7238        let kernel = Kernel::new(config).expect("failed to create kernel");
7239
7240        assert_eq!(
7241            kernel.get_var("INIT_FOO").await,
7242            Some(Value::String("bar".into()))
7243        );
7244        assert!(
7245            kernel.scope.read().await.is_exported("INIT_FOO"),
7246            "initial_vars entries must be marked exported"
7247        );
7248    }
7249
7250    #[tokio::test]
7251    async fn test_execute_with_vars_overlay_visible() {
7252        let kernel = Kernel::transient().expect("failed to create kernel");
7253        let mut overlay = HashMap::new();
7254        overlay.insert("OVERLAY_X".to_string(), Value::String("yes".into()));
7255
7256        let result = kernel
7257            .execute_with_options(r#"echo "${OVERLAY_X}""#, ExecuteOptions::new().with_vars(overlay))
7258            .await
7259            .expect("execute failed");
7260
7261        assert!(result.ok());
7262        assert_eq!(result.text_out().trim(), "yes");
7263    }
7264
7265    #[tokio::test]
7266    async fn test_execute_with_vars_overlay_cleanup() {
7267        let kernel = Kernel::transient().expect("failed to create kernel");
7268        let mut overlay = HashMap::new();
7269        overlay.insert("EPHEMERAL".to_string(), Value::String("transient".into()));
7270
7271        kernel
7272            .execute_with_options("echo ignored", ExecuteOptions::new().with_vars(overlay))
7273            .await
7274            .expect("execute failed");
7275
7276        assert_eq!(kernel.get_var("EPHEMERAL").await, None);
7277        assert!(
7278            !kernel.scope.read().await.is_exported("EPHEMERAL"),
7279            "overlay-only export must be cleared on return"
7280        );
7281    }
7282
7283    #[tokio::test]
7284    async fn test_execute_with_vars_does_not_clobber_existing_export() {
7285        let kernel = Kernel::transient().expect("failed to create kernel");
7286        kernel
7287            .execute("export OUTER=outer")
7288            .await
7289            .expect("export failed");
7290
7291        let mut overlay = HashMap::new();
7292        overlay.insert("OUTER".to_string(), Value::String("inner".into()));
7293        let result = kernel
7294            .execute_with_options(r#"echo "${OUTER}""#, ExecuteOptions::new().with_vars(overlay))
7295            .await
7296            .expect("execute failed");
7297        assert_eq!(result.text_out().trim(), "inner");
7298
7299        assert_eq!(
7300            kernel.get_var("OUTER").await,
7301            Some(Value::String("outer".into())),
7302            "outer value must reappear after pop"
7303        );
7304        assert!(
7305            kernel.scope.read().await.is_exported("OUTER"),
7306            "outer export must survive overlay"
7307        );
7308    }
7309
7310    #[tokio::test]
7311    async fn test_execute_with_vars_inner_assignment_is_local() {
7312        let kernel = Kernel::transient().expect("failed to create kernel");
7313        let mut overlay = HashMap::new();
7314        overlay.insert("LOCAL_FOO".to_string(), Value::String("from-overlay".into()));
7315
7316        // Variable assignment inside a single statement uses set() (innermost
7317        // frame), not set_global() — this matches bash function-local semantics.
7318        // We explicitly use `local FOO=...` style by relying on the pushed
7319        // frame; the assignment in the script body modifies the same frame.
7320        let result = kernel
7321            .execute_with_options(
7322                r#"LOCAL_FOO="reassigned"; echo "${LOCAL_FOO}""#,
7323                ExecuteOptions::new().with_vars(overlay),
7324            )
7325            .await
7326            .expect("execute failed");
7327        assert!(result.ok());
7328
7329        // After the call the frame is popped, so LOCAL_FOO is gone regardless
7330        // of how the script reassigned it.
7331        assert_eq!(kernel.get_var("LOCAL_FOO").await, None);
7332    }
7333
7334    #[tokio::test]
7335    async fn test_external_command_sees_exported_var() {
7336        let kernel = Kernel::transient().expect("failed to create kernel");
7337        let result = kernel
7338            .execute("export EXT_FOO=bar; printenv EXT_FOO")
7339            .await
7340            .expect("execute failed");
7341
7342        assert!(result.ok(), "printenv should succeed: stderr={}", result.err);
7343        assert_eq!(result.text_out().trim(), "bar");
7344    }
7345
7346    #[tokio::test]
7347    async fn test_external_command_does_not_see_unexported_var() {
7348        let kernel = Kernel::transient().expect("failed to create kernel");
7349
7350        // Set without exporting; printenv must not see it (exit code != 0,
7351        // empty stdout per printenv semantics).
7352        let result = kernel
7353            .execute("EXT_BAR=hidden; printenv EXT_BAR")
7354            .await
7355            .expect("execute failed");
7356
7357        assert!(!result.ok(), "printenv should fail when var is unexported");
7358        assert!(
7359            result.text_out().trim().is_empty(),
7360            "no stdout when var is missing, got: {}",
7361            result.text_out()
7362        );
7363    }
7364
7365    #[tokio::test]
7366    async fn test_external_command_does_not_see_os_env() {
7367        // The kernel is hermetic: it never reads std::env::vars() and only
7368        // exports what it has been told to export. Cargo always sets PATH for
7369        // tests, so PATH is reliably present in the OS env — but a transient
7370        // kernel doesn't seed it into initial_vars, so `printenv PATH` from
7371        // inside the kernel must fail.
7372        assert!(
7373            std::env::var_os("PATH").is_some(),
7374            "test precondition: cargo should set PATH"
7375        );
7376
7377        let kernel = Kernel::transient().expect("failed to create kernel");
7378        let result = kernel
7379            .execute("printenv PATH")
7380            .await
7381            .expect("execute failed");
7382
7383        assert!(
7384            !result.ok(),
7385            "printenv PATH must fail in hermetic kernel, got stdout={:?}",
7386            result.text_out()
7387        );
7388        assert!(
7389            result.text_out().trim().is_empty(),
7390            "no PATH in subprocess env, got stdout={:?}",
7391            result.text_out()
7392        );
7393    }
7394
7395    #[tokio::test]
7396    async fn test_execute_with_vars_overlay_reaches_subprocess() {
7397        let kernel = Kernel::transient().expect("failed to create kernel");
7398        let mut overlay = HashMap::new();
7399        overlay.insert("SUB_FOO".to_string(), Value::String("subproc".into()));
7400
7401        let result = kernel
7402            .execute_with_options("printenv SUB_FOO", ExecuteOptions::new().with_vars(overlay))
7403            .await
7404            .expect("execute failed");
7405
7406        assert!(
7407            result.ok(),
7408            "printenv should succeed: code={} stdout={:?} stderr={:?}",
7409            result.code,
7410            result.text_out(),
7411            result.err
7412        );
7413        assert_eq!(result.text_out().trim(), "subproc");
7414    }
7415}