Skip to main content

kaish_kernel/
kernel.rs

1//! The Kernel (核) — the heart of kaish.
2//!
3//! The Kernel owns and coordinates all core components:
4//! - Interpreter state (scope, $?)
5//! - Tool registry (builtins, user tools)
6//! - VFS router (mount points)
7//! - Job manager (background jobs)
8//!
9//! # Architecture
10//!
11//! ```text
12//! ┌────────────────────────────────────────────────────────────┐
13//! │                         Kernel (核)                         │
14//! │  ┌──────────────┐  ┌──────────────┐  ┌──────────────────┐  │
15//! │  │   Scope      │  │ ToolRegistry │  │  VfsRouter       │  │
16//! │  │  (variables) │  │  (builtins,  │  │  (mount points)  │  │
17//! │  │              │  │   user tools)│  │                  │  │
18//! │  └──────────────┘  └──────────────┘  └──────────────────┘  │
19//! │  ┌──────────────────────────────┐  ┌──────────────────┐    │
20//! │  │  JobManager (background)     │  │  ExecResult ($?) │    │
21//! │  └──────────────────────────────┘  └──────────────────┘    │
22//! └────────────────────────────────────────────────────────────┘
23//! ```
24
25use std::collections::HashMap;
26use std::path::PathBuf;
27use std::sync::Arc;
28use std::sync::atomic::{AtomicU64, Ordering};
29use std::time::Duration;
30
31use anyhow::{Context, Result};
32use tokio::sync::RwLock;
33
34/// Monotonic counter assigned to each Kernel at construction time, exposed
35/// via `$$` / `${$}`. Starts at 1; each new Kernel gets the next value.
36/// `Kernel::fork()` inherits the parent's value (matching bash's "subshell
37/// keeps parent's $$" semantics) because forks clone the parent's Scope
38/// rather than calling `set_pid` again.
39///
40/// Deliberately *not* the OS PID — kaish runs as a long-lived MCP server
41/// or embedded inside other binaries (kaijutsu), where the host PID is
42/// meaningless to the script. See
43/// `~/.claude/projects/-home-atobey-src-kaish/memory/lang_dollar_dollar_identifier.md`
44/// for the design rationale.
45static KERNEL_COUNTER: AtomicU64 = AtomicU64::new(1);
46
47use async_trait::async_trait;
48
49use crate::ast::{Arg, Command, Expr, FileTestOp, Stmt, StringPart, TestExpr, ToolDef, Value, BinaryOp};
50pub use kaish_types::ExecuteOptions;
51use crate::backend::{BackendError, KernelBackend};
52use kaish_glob::glob_match;
53use crate::dispatch::{CommandDispatcher, PipelinePosition};
54use crate::interpreter::{apply_output_format, eval_expr, expand_tilde, json_to_value, value_to_bool, value_to_string, ControlFlow, ExecResult, Scope};
55use crate::parser::parse;
56use crate::scheduler::{is_bool_type, schema_param_lookup, select_leaf, stderr_stream, BoundedStream, JobManager, PipelineRunner, StderrReceiver};
57#[cfg(feature = "subprocess")]
58use crate::scheduler::{drain_to_stream, DEFAULT_STREAM_MAX_SIZE};
59use crate::tools::{register_builtins, ExecContext, GlobalFlags, ToolArgs, ToolRegistry};
60#[cfg(feature = "subprocess")]
61use crate::tools::resolve_in_path;
62use crate::validator::{Severity, Validator};
63#[cfg(feature = "localfs")]
64use crate::vfs::LocalFs;
65use crate::vfs::{BuiltinFs, JobFs, MemoryFs, VfsRouter};
66use kaish_vfs::ByteBudget;
67#[cfg(all(feature = "localfs", feature = "overlay"))]
68use kaish_vfs::OverlayFs;
69
70/// VFS mount mode determines how the local filesystem is exposed.
71///
72/// Different modes trade off convenience vs. security:
73/// - `Passthrough` gives native path access (best for human REPL use)
74/// - `Sandboxed` restricts access to a subtree (safer for agents)
75/// - `NoLocal` provides complete isolation (tests, pure memory mode)
76#[derive(Debug, Clone)]
77pub enum VfsMountMode {
78    /// LocalFs at "/" — native paths work directly.
79    ///
80    /// Full filesystem access. Use for human-operated REPL sessions where
81    /// native paths like `/home/user/project` should just work.
82    ///
83    /// Mounts:
84    /// - `/` → LocalFs("/")
85    /// - `/v` → MemoryFs (blob storage)
86    #[cfg(feature = "localfs")]
87    Passthrough,
88
89    /// Transparent sandbox — paths look native but access is restricted.
90    ///
91    /// The local filesystem is mounted at its real path (e.g., `/home/user`),
92    /// so `/home/user/src/project` just works. But paths outside the sandbox
93    /// root are not accessible.
94    ///
95    /// **Note:** This only restricts VFS (builtin) operations. External commands
96    /// bypass the sandbox entirely — see [`KernelConfig::allow_external_commands`].
97    ///
98    /// Mounts:
99    /// - `/` → MemoryFs (catches paths outside sandbox)
100    /// - `{root}` → LocalFs(root)  (e.g., `/home/user` → LocalFs)
101    /// - `/tmp` → LocalFs("/tmp")
102    /// - `/v` → MemoryFs (blob storage)
103    #[cfg(feature = "localfs")]
104    Sandboxed {
105        /// Root path for local filesystem. Defaults to `$HOME`.
106        /// Can be restricted further, e.g., `~/src`.
107        root: Option<PathBuf>,
108    },
109
110    /// No local filesystem. Memory only.
111    ///
112    /// Complete isolation — no access to the host filesystem.
113    /// Useful for tests or pure sandboxed execution.
114    ///
115    /// Output spill is forced to [`SpillMode::Memory`](crate::output_limit::SpillMode::Memory)
116    /// for this mode at kernel construction: with no host filesystem mounted,
117    /// large output must not write a host spill file (`paths::spill_dir()`
118    /// bypasses the VFS). This overrides any explicit `SpillMode::Disk`.
119    ///
120    /// Mounts:
121    /// - `/` → MemoryFs
122    /// - `/tmp` → MemoryFs
123    /// - `/v` → MemoryFs
124    NoLocal,
125}
126
127#[allow(clippy::derivable_impls)] // native has multiple variants; not derivable cross-feature
128impl Default for VfsMountMode {
129    fn default() -> Self {
130        #[cfg(feature = "localfs")]
131        { VfsMountMode::Sandboxed { root: None } }
132        #[cfg(not(feature = "localfs"))]
133        { VfsMountMode::NoLocal }
134    }
135}
136
137/// Configuration for kernel initialization.
138#[derive(Debug, Clone)]
139pub struct KernelConfig {
140    /// Name of this kernel (for identification).
141    pub name: String,
142
143    /// VFS mount mode — controls how local filesystem is exposed.
144    pub vfs_mode: VfsMountMode,
145
146    /// Initial working directory (VFS path).
147    pub cwd: PathBuf,
148
149    /// Whether to skip pre-execution validation.
150    ///
151    /// When false (default), scripts are validated before execution to catch
152    /// errors early. Set to true to skip validation for performance or to
153    /// allow dynamic/external commands.
154    pub skip_validation: bool,
155
156    /// When true, standalone external commands inherit stdio for real-time output.
157    ///
158    /// Set by script runner and REPL for human-visible output.
159    /// Not set by MCP server (output must be captured for structured responses).
160    pub interactive: bool,
161
162    /// Ignore file configuration for file-walking tools.
163    pub ignore_config: crate::ignore_config::IgnoreConfig,
164
165    /// Output size limit configuration for agent safety.
166    pub output_limit: crate::output_limit::OutputLimitConfig,
167
168    /// Whether external command execution (PATH lookup, `exec`, `spawn`) is allowed.
169    ///
170    /// When `true` (default), commands not found as builtins are resolved via PATH
171    /// and executed as child processes. When `false`, only kaish builtins and
172    /// backend-registered tools are available.
173    ///
174    /// **Security:** External commands bypass the VFS sandbox entirely — they see
175    /// the real filesystem, network, and environment. Set to `false` when running
176    /// untrusted input.
177    pub allow_external_commands: bool,
178
179    /// Enable confirmation latch for dangerous operations (set -o latch).
180    ///
181    /// When enabled, destructive operations like `rm` require nonce confirmation.
182    /// Can also be enabled at runtime with `set -o latch` or via `KAISH_LATCH=1`.
183    pub latch_enabled: bool,
184
185    /// Enable trash-on-delete for rm (set -o trash).
186    ///
187    /// When enabled, small files are moved to freedesktop.org Trash instead of
188    /// being permanently deleted. Can also be enabled at runtime with `set -o trash`
189    /// or via `KAISH_TRASH=1`.
190    pub trash_enabled: bool,
191
192    /// Shared nonce store for cross-request confirmation latch.
193    ///
194    /// When `Some`, the kernel uses this store instead of creating a fresh one.
195    /// This allows nonces issued in one MCP `execute()` call to be validated
196    /// in a subsequent call. When `None` (default), a fresh store is created.
197    pub nonce_store: Option<crate::nonce::NonceStore>,
198
199    /// Variables to populate the root scope with at construction, all marked
200    /// for export to child processes.
201    ///
202    /// The kernel itself is hermetic — it never reads `std::env::vars()` —
203    /// so frontends that want OS-env passthrough (REPL, MCP) populate this
204    /// from `std::env::vars()`. Embedders that want isolation pass nothing
205    /// (or only the keys they curate).
206    pub initial_vars: HashMap<String, Value>,
207
208    /// Default per-request timeout. When `Some`, every `execute_with_options`
209    /// call without an explicit `ExecuteOptions::timeout` uses this duration.
210    /// When elapsed, the kernel cancels the request, kills any external
211    /// children with the configured grace, and returns exit code 124.
212    ///
213    /// `None` means no default timeout — only explicit per-call timeouts apply.
214    pub request_timeout: Option<Duration>,
215
216    /// Grace period between SIGTERM and SIGKILL when killing an external
217    /// child on cancellation or timeout.
218    ///
219    /// Defaults to 2 seconds. Set to `Duration::ZERO` to escalate immediately
220    /// to SIGKILL. Long-shutdown processes (databases, etc.) may need more.
221    pub kill_grace: Duration,
222
223    /// Cap on memory-resident bytes across all kernel-owned `MemoryFs` mounts.
224    ///
225    /// One shared `ByteBudget` (labeled `"vfs-memory"`) is created at kernel
226    /// construction and handed to every `MemoryFs` the kernel builds in
227    /// `setup_vfs` (Passthrough `/v`; Sandboxed `/` and `/v`; NoLocal `/`,
228    /// `/tmp`, `/v`). Writes that would exceed the cap fail loudly with
229    /// `StorageFull` — an in-band error a model reads and adapts to; fail
230    /// loud over quietly eating RAM.
231    ///
232    /// **Why MCP is bounded by default:** each `execute()` call creates a fresh
233    /// kernel (see `server/execute.rs`), so the 64 MiB cap is per-call, not
234    /// per-session. Embedders that know their workload needs more opt out with
235    /// `without_vfs_budget()` or raise the cap with `with_vfs_budget(bytes)` —
236    /// protection on by default, opt out knowingly. All other profiles default
237    /// to `None` (unbounded).
238    ///
239    /// Follows the same pattern as `OutputLimitConfig`: MCP bounded, rest unbounded.
240    pub vfs_budget_bytes: Option<u64>,
241
242    /// Enable copy-on-write overlay mode (opt-in).
243    ///
244    /// When `true`, the primary local filesystem mount is wrapped in an
245    /// `OverlayFs` so writes are virtual — the lower layer is never touched.
246    /// Use `kaish-vfs status/diff/commit/reset` to inspect and manage the
247    /// overlay transaction.
248    ///
249    /// **Passthrough:** `/` becomes `OverlayFs over LocalFs::read_only("/")`.
250    /// **Sandboxed{root}:** the `{root}` mount becomes
251    /// `OverlayFs over LocalFs::read_only(root)`; the `/tmp` and XDG runtime
252    /// mounts stay as real `LocalFs` (real writes escape the transaction —
253    /// see `docs/kaish-overlayfs.md` for the escape-hatch inventory).
254    /// **NoLocal:** incompatible — construction fails loudly (everything is
255    /// already virtual; an overlay adds no value and no lower layer to wrap).
256    /// **with_backend:** incompatible — the embedder controls the VFS; the
257    /// kernel cannot wrap it without bypassing the embedder's semantics.
258    ///
259    /// **Not default-on for MCP:** each `execute()` call gets a fresh kernel,
260    /// making the overlay a per-call transaction — `kaish-vfs commit` must run
261    /// in the same call as the writes, or the transaction is discarded on drop.
262    /// Frontends (REPL, MCP) expose `--overlay` as an explicit opt-in flag.
263    pub overlay: bool,
264}
265
266/// Get the default sandbox root ($HOME).
267#[cfg(feature = "localfs")]
268fn default_sandbox_root() -> PathBuf {
269    std::env::var("HOME")
270        .map(PathBuf::from)
271        .unwrap_or_else(|_| PathBuf::from("/"))
272}
273
274impl Default for KernelConfig {
275    fn default() -> Self {
276        #[cfg(feature = "localfs")]
277        {
278            let home = default_sandbox_root();
279            Self {
280                name: "default".to_string(),
281                vfs_mode: VfsMountMode::Sandboxed { root: None },
282                cwd: home,
283                skip_validation: false,
284                interactive: false,
285                ignore_config: crate::ignore_config::IgnoreConfig::none(),
286                output_limit: crate::output_limit::OutputLimitConfig::none(),
287                allow_external_commands: cfg!(feature = "subprocess"),
288                latch_enabled: std::env::var("KAISH_LATCH").is_ok_and(|v| v == "1"),
289                trash_enabled: std::env::var("KAISH_TRASH").is_ok_and(|v| v == "1"),
290                nonce_store: None,
291                initial_vars: HashMap::new(),
292                request_timeout: None,
293                kill_grace: Duration::from_secs(2),
294                vfs_budget_bytes: None,
295                overlay: false,
296            }
297        }
298        #[cfg(not(feature = "localfs"))]
299        {
300            Self {
301                name: "default".to_string(),
302                vfs_mode: VfsMountMode::NoLocal,
303                cwd: PathBuf::from("/"),
304                skip_validation: false,
305                interactive: false,
306                ignore_config: crate::ignore_config::IgnoreConfig::none(),
307                output_limit: crate::output_limit::OutputLimitConfig::none(),
308                allow_external_commands: false,
309                latch_enabled: false,
310                trash_enabled: false,
311                nonce_store: None,
312                initial_vars: HashMap::new(),
313                request_timeout: None,
314                kill_grace: Duration::from_secs(2),
315                vfs_budget_bytes: None,
316                overlay: false,
317            }
318        }
319    }
320}
321
322impl KernelConfig {
323    /// Create a transient kernel config (sandboxed, for temporary use).
324    #[cfg(feature = "localfs")]
325    pub fn transient() -> Self {
326        let home = default_sandbox_root();
327        Self {
328            name: "transient".to_string(),
329            vfs_mode: VfsMountMode::Sandboxed { root: None },
330            cwd: home,
331            skip_validation: false,
332            interactive: false,
333            ignore_config: crate::ignore_config::IgnoreConfig::none(),
334            output_limit: crate::output_limit::OutputLimitConfig::none(),
335            allow_external_commands: cfg!(feature = "subprocess"),
336            latch_enabled: false,
337            trash_enabled: false,
338            nonce_store: None,
339            initial_vars: HashMap::new(),
340            request_timeout: None,
341            kill_grace: Duration::from_secs(2),
342            vfs_budget_bytes: None,
343            overlay: false,
344        }
345    }
346
347    /// Create a transient kernel config (isolated, no-default-features).
348    #[cfg(not(feature = "localfs"))]
349    pub fn transient() -> Self {
350        Self::isolated()
351    }
352
353    /// Create a kernel config with the given name (sandboxed by default).
354    #[cfg(feature = "localfs")]
355    pub fn named(name: &str) -> Self {
356        let home = default_sandbox_root();
357        Self {
358            name: name.to_string(),
359            vfs_mode: VfsMountMode::Sandboxed { root: None },
360            cwd: home,
361            skip_validation: false,
362            interactive: false,
363            ignore_config: crate::ignore_config::IgnoreConfig::none(),
364            output_limit: crate::output_limit::OutputLimitConfig::none(),
365            allow_external_commands: cfg!(feature = "subprocess"),
366            latch_enabled: false,
367            trash_enabled: false,
368            nonce_store: None,
369            initial_vars: HashMap::new(),
370            request_timeout: None,
371            kill_grace: Duration::from_secs(2),
372            vfs_budget_bytes: None,
373            overlay: false,
374        }
375    }
376
377    /// Create a kernel config with the given name (isolated, no-default-features).
378    #[cfg(not(feature = "localfs"))]
379    pub fn named(name: &str) -> Self {
380        Self {
381            name: name.to_string(),
382            ..Self::isolated()
383        }
384    }
385
386    /// Create a REPL config with passthrough filesystem access.
387    ///
388    /// Native paths like `/home/user/project` work directly.
389    /// The cwd is set to the actual current working directory.
390    #[cfg(feature = "localfs")]
391    pub fn repl() -> Self {
392        let cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("/"));
393        Self {
394            name: "repl".to_string(),
395            vfs_mode: VfsMountMode::Passthrough,
396            cwd,
397            skip_validation: false,
398            interactive: false,
399            ignore_config: crate::ignore_config::IgnoreConfig::none(),
400            output_limit: crate::output_limit::OutputLimitConfig::none(),
401            allow_external_commands: cfg!(feature = "subprocess"),
402            latch_enabled: std::env::var("KAISH_LATCH").is_ok_and(|v| v == "1"),
403            trash_enabled: std::env::var("KAISH_TRASH").is_ok_and(|v| v == "1"),
404            nonce_store: None,
405            initial_vars: HashMap::new(),
406            request_timeout: None,
407            kill_grace: Duration::from_secs(2),
408            vfs_budget_bytes: None,
409            overlay: false,
410        }
411    }
412
413    /// Create an MCP server config with sandboxed filesystem access.
414    ///
415    /// Local filesystem is accessible at its real path (e.g., `/home/user`),
416    /// but sandboxed to `$HOME`. Paths outside the sandbox are not accessible
417    /// through builtins. External commands still access the real filesystem —
418    /// use `.with_allow_external_commands(false)` to block them.
419    ///
420    /// VFS memory is bounded at 64 MiB per `execute()` call by default
421    /// (MCP creates a fresh kernel per call). Raise or remove with
422    /// `with_vfs_budget` / `without_vfs_budget`.
423    #[cfg(feature = "localfs")]
424    pub fn mcp() -> Self {
425        let home = default_sandbox_root();
426        Self {
427            name: "mcp".to_string(),
428            vfs_mode: VfsMountMode::Sandboxed { root: None },
429            cwd: home,
430            skip_validation: false,
431            interactive: false,
432            ignore_config: crate::ignore_config::IgnoreConfig::mcp(),
433            output_limit: crate::output_limit::OutputLimitConfig::mcp(),
434            allow_external_commands: cfg!(feature = "subprocess"),
435            latch_enabled: std::env::var("KAISH_LATCH").is_ok_and(|v| v == "1"),
436            trash_enabled: std::env::var("KAISH_TRASH").is_ok_and(|v| v == "1"),
437            nonce_store: None,
438            initial_vars: HashMap::new(),
439            request_timeout: None,
440            kill_grace: Duration::from_secs(2),
441            vfs_budget_bytes: Some(64 * 1024 * 1024),
442            overlay: false,
443        }
444    }
445
446    /// Create an MCP server config with a custom sandbox root.
447    ///
448    /// Use this to restrict access to a subdirectory like `~/src`.
449    ///
450    /// VFS memory is bounded at 64 MiB per `execute()` call by default.
451    /// Raise or remove with `with_vfs_budget` / `without_vfs_budget`.
452    #[cfg(feature = "localfs")]
453    pub fn mcp_with_root(root: PathBuf) -> Self {
454        Self {
455            name: "mcp".to_string(),
456            vfs_mode: VfsMountMode::Sandboxed { root: Some(root.clone()) },
457            cwd: root,
458            skip_validation: false,
459            interactive: false,
460            ignore_config: crate::ignore_config::IgnoreConfig::mcp(),
461            output_limit: crate::output_limit::OutputLimitConfig::mcp(),
462            allow_external_commands: cfg!(feature = "subprocess"),
463            latch_enabled: std::env::var("KAISH_LATCH").is_ok_and(|v| v == "1"),
464            trash_enabled: std::env::var("KAISH_TRASH").is_ok_and(|v| v == "1"),
465            nonce_store: None,
466            initial_vars: HashMap::new(),
467            request_timeout: None,
468            kill_grace: Duration::from_secs(2),
469            vfs_budget_bytes: Some(64 * 1024 * 1024),
470            overlay: false,
471        }
472    }
473
474    /// Create a config with no local filesystem (memory only).
475    ///
476    /// Complete isolation: no local filesystem and external commands are disabled.
477    /// Useful for tests or pure sandboxed execution.
478    pub fn isolated() -> Self {
479        Self {
480            name: "isolated".to_string(),
481            vfs_mode: VfsMountMode::NoLocal,
482            cwd: PathBuf::from("/"),
483            skip_validation: false,
484            interactive: false,
485            ignore_config: crate::ignore_config::IgnoreConfig::none(),
486            output_limit: crate::output_limit::OutputLimitConfig::none(),
487            allow_external_commands: false,
488            latch_enabled: false,
489            trash_enabled: false,
490            nonce_store: None,
491            initial_vars: HashMap::new(),
492            request_timeout: None,
493            kill_grace: Duration::from_secs(2),
494            vfs_budget_bytes: None,
495            overlay: false,
496        }
497    }
498
499    /// Set the VFS mount mode.
500    pub fn with_vfs_mode(mut self, mode: VfsMountMode) -> Self {
501        self.vfs_mode = mode;
502        self
503    }
504
505    /// Set the initial working directory.
506    pub fn with_cwd(mut self, cwd: PathBuf) -> Self {
507        self.cwd = cwd;
508        self
509    }
510
511    /// Skip pre-execution validation.
512    pub fn with_skip_validation(mut self, skip: bool) -> Self {
513        self.skip_validation = skip;
514        self
515    }
516
517    /// Enable interactive mode (external commands inherit stdio).
518    pub fn with_interactive(mut self, interactive: bool) -> Self {
519        self.interactive = interactive;
520        self
521    }
522
523    /// Set the ignore file configuration.
524    pub fn with_ignore_config(mut self, config: crate::ignore_config::IgnoreConfig) -> Self {
525        self.ignore_config = config;
526        self
527    }
528
529    /// Set the output limit configuration.
530    pub fn with_output_limit(mut self, config: crate::output_limit::OutputLimitConfig) -> Self {
531        self.output_limit = config;
532        self
533    }
534
535    /// Set whether external command execution is allowed.
536    ///
537    /// When `false`, commands not found as builtins produce "command not found"
538    /// instead of searching PATH. The `exec` and `spawn` builtins also return
539    /// errors. Use this to prevent VFS sandbox bypass via external binaries.
540    pub fn with_allow_external_commands(mut self, allow: bool) -> Self {
541        self.allow_external_commands = allow;
542        self
543    }
544
545    /// Enable or disable confirmation latch at startup.
546    pub fn with_latch(mut self, enabled: bool) -> Self {
547        self.latch_enabled = enabled;
548        self
549    }
550
551    /// Enable or disable trash-on-delete at startup.
552    pub fn with_trash(mut self, enabled: bool) -> Self {
553        self.trash_enabled = enabled;
554        self
555    }
556
557    /// Use a shared nonce store for cross-request confirmation latch.
558    ///
559    /// Pass a `NonceStore` that outlives individual kernel instances so nonces
560    /// issued in one MCP `execute()` call can be validated in subsequent calls.
561    pub fn with_nonce_store(mut self, store: crate::nonce::NonceStore) -> Self {
562        self.nonce_store = Some(store);
563        self
564    }
565
566    /// Add a single initial variable; marked exported when the kernel boots.
567    ///
568    /// Repeated calls add (last write wins on key collision).
569    pub fn with_var(mut self, name: impl Into<String>, value: Value) -> Self {
570        self.initial_vars.insert(name.into(), value);
571        self
572    }
573
574    /// Replace the entire initial-vars map. All entries are marked exported.
575    pub fn with_initial_vars(mut self, vars: HashMap<String, Value>) -> Self {
576        self.initial_vars = vars;
577        self
578    }
579
580    /// Extend the initial-vars map with the given entries (last write wins).
581    pub fn with_vars(mut self, vars: HashMap<String, Value>) -> Self {
582        self.initial_vars.extend(vars);
583        self
584    }
585
586    /// Set the default per-request timeout (kernel-wide).
587    ///
588    /// Each `execute_with_options` call without an explicit timeout uses
589    /// this. On elapsed, the kernel cancels and returns exit code 124.
590    pub fn with_request_timeout(mut self, timeout: Duration) -> Self {
591        self.request_timeout = Some(timeout);
592        self
593    }
594
595    /// Set the SIGTERM-to-SIGKILL grace period for child kills.
596    pub fn with_kill_grace(mut self, grace: Duration) -> Self {
597        self.kill_grace = grace;
598        self
599    }
600
601    /// Cap VFS memory-resident bytes at `bytes` across all kernel-owned
602    /// `MemoryFs` mounts. A shared `ByteBudget` labeled `"vfs-memory"` is
603    /// created at kernel construction and passed to every `MemoryFs` the
604    /// kernel builds (see `setup_vfs` and `with_backend`).
605    ///
606    /// Writes that would exceed the cap fail loudly with `StorageFull` — an
607    /// in-band error a model reads and adapts to; fail loud over quietly eating
608    /// RAM. Use `without_vfs_budget` to remove the cap entirely.
609    pub fn with_vfs_budget(mut self, bytes: u64) -> Self {
610        self.vfs_budget_bytes = Some(bytes);
611        self
612    }
613
614    /// Remove the VFS memory budget — all `MemoryFs` mounts are unbounded.
615    ///
616    /// Use when the caller knows the workload and the default 64 MiB cap
617    /// (set by `KernelConfig::mcp`) is too conservative.
618    pub fn without_vfs_budget(mut self) -> Self {
619        self.vfs_budget_bytes = None;
620        self
621    }
622
623    /// Enable or disable copy-on-write overlay mode.
624    ///
625    /// When `true`, the primary local filesystem mount is wrapped in an
626    /// `OverlayFs` so writes are virtual — the lower layer is never touched.
627    /// Incompatible with `VfsMountMode::NoLocal` (fails loudly at construction)
628    /// and `with_backend` kernels (same — the embedder controls the VFS).
629    pub fn with_overlay(mut self, overlay: bool) -> Self {
630        self.overlay = overlay;
631        self
632    }
633}
634
635/// Handle to an active overlay session, kept on the kernel and shared to
636/// `ExecContext` so the `kaish-vfs` builtin can reach the `OverlayFs`.
637///
638/// The `mount_path` is the VFS prefix the overlay was mounted under (e.g.
639/// `/home/user`); `commit_root` is the real filesystem path the overlay's
640/// lower is backed by (used as the target for `kaish-vfs commit`).
641#[cfg(all(feature = "localfs", feature = "overlay"))]
642#[derive(Clone)]
643pub struct OverlayHandle {
644    /// The mounted `OverlayFs`, Arc-shared so the builtin can call inspection
645    /// methods without holding a VfsRouter lock.
646    pub fs: Arc<OverlayFs>,
647    /// VFS path this overlay is mounted at (e.g. `/home/user`).
648    pub mount_path: PathBuf,
649    /// Real filesystem root to commit into. Same as the lower's root.
650    pub commit_root: PathBuf,
651}
652
653/// The Kernel (核) — executes kaish code.
654///
655/// This is the primary interface for running kaish commands. It owns all
656/// the runtime state: variables, tools, VFS, jobs, and persistence.
657pub struct Kernel {
658    /// Kernel name.
659    name: String,
660    /// Variable scope.
661    scope: RwLock<Scope>,
662    /// Tool registry.
663    tools: Arc<ToolRegistry>,
664    /// User-defined tools (from `tool name { body }` statements).
665    user_tools: RwLock<HashMap<String, ToolDef>>,
666    /// Virtual filesystem router.
667    vfs: Arc<VfsRouter>,
668    /// Background job manager.
669    jobs: Arc<JobManager>,
670    /// Pipeline runner.
671    runner: PipelineRunner,
672    /// Execution context (cwd, stdin, etc.).
673    exec_ctx: RwLock<ExecContext>,
674    /// Whether to skip pre-execution validation.
675    skip_validation: bool,
676    /// When true, standalone external commands inherit stdio for real-time output.
677    interactive: bool,
678    /// Whether external command execution is allowed.
679    allow_external_commands: bool,
680    /// Shared memory budget for all kernel-owned `MemoryFs` mounts.
681    ///
682    /// `None` when `KernelConfig::vfs_budget_bytes` was `None` (unbounded).
683    /// `Some` is Arc-cloned into forks so all concurrent execution draws from
684    /// the same pool — a background job's writes reduce the same cap as
685    /// foreground writes, which is the correct behaviour.
686    vfs_budget: Option<Arc<kaish_vfs::ByteBudget>>,
687    /// Active overlay session handle, if this kernel was constructed with
688    /// `overlay: true`. Arc-shared so `ExecContext` (and thus the
689    /// `kaish-vfs` builtin) can inspect and mutate the overlay without
690    /// holding a kernel write lock. Propagated to forks via `fork_inner`
691    /// and `child_for_pipeline` so `kaish-vfs` works inside background
692    /// jobs, scatter workers, and pipeline stages.
693    #[cfg(all(feature = "localfs", feature = "overlay"))]
694    overlay_handle: Option<Arc<OverlayHandle>>,
695    /// Default per-request timeout (None = no default).
696    request_timeout: Option<Duration>,
697    /// SIGTERM-to-SIGKILL grace period for child kills.
698    kill_grace: Duration,
699    /// Receiver for the kernel stderr stream.
700    ///
701    /// Pipeline stages write to the corresponding `StderrStream` (set on ExecContext).
702    /// The kernel drains this after each statement in `execute_streaming`.
703    stderr_receiver: tokio::sync::Mutex<StderrReceiver>,
704    /// Cancellation token for interrupting execution (Ctrl-C).
705    ///
706    /// Protected by `std::sync::Mutex` (not tokio) because the SIGINT handler
707    /// needs sync access. Each `execute()` call gets a fresh child token;
708    /// `cancel()` cancels the current token and replaces it.
709    cancel_token: std::sync::Mutex<tokio_util::sync::CancellationToken>,
710    /// Terminal state for job control (interactive mode only, Unix only).
711    #[cfg(all(unix, feature = "subprocess"))]
712    terminal_state: Option<Arc<crate::terminal::TerminalState>>,
713    /// Weak self-reference for handing out `Arc<dyn CommandDispatcher>`.
714    ///
715    /// Set by `into_arc()`. Allows builtins to re-dispatch inner commands
716    /// through the full Kernel resolution chain.
717    self_weak: std::sync::OnceLock<std::sync::Weak<Self>>,
718    /// Background job this kernel (a fork) is executing on behalf of, if any.
719    /// Set on the fork created by `execute_background` and inherited by all its
720    /// sub-forks (pipeline stages, scatter workers), so an external command
721    /// spawned anywhere under a background job can record its process group on
722    /// that job for `kill -<sig> %N`. `None` for foreground execution.
723    bg_job_id: Option<crate::scheduler::JobId>,
724    /// Serializes concurrent `execute()` / `execute_streaming()` callers on
725    /// this Kernel instance. Tokio's Mutex is fair (FIFO) and acts as the
726    /// queue. Background jobs, scatter workers, and concurrent pipeline
727    /// stages do NOT take this lock — they run against a *forked* Kernel
728    /// (see [`Kernel::fork`]) so they never contend with the foreground.
729    execute_lock: tokio::sync::Mutex<()>,
730}
731
732/// Internal result of [`Kernel::setup_vfs`].
733struct VfsSetupResult {
734    vfs: VfsRouter,
735    budget: Option<Arc<ByteBudget>>,
736    #[cfg(all(feature = "localfs", feature = "overlay"))]
737    overlay_handle: Option<Arc<OverlayHandle>>,
738}
739
740impl Kernel {
741    /// Create a new kernel with the given configuration.
742    pub fn new(config: KernelConfig) -> Result<Self> {
743        let mut setup = Self::setup_vfs(&config)?;
744        let jobs = Arc::new(JobManager::new());
745
746        // Mount JobFs for job observability at /v/jobs
747        setup.vfs.mount("/v/jobs", JobFs::new(jobs.clone()));
748
749        #[cfg(all(feature = "localfs", feature = "overlay"))]
750        let overlay_handle = setup.overlay_handle.take();
751
752        // Mode-based construction: the kernel owns its host mounts, so whether
753        // host side channels are allowed is decided by the VFS mode inside
754        // `assemble` (NoLocal forbids them).
755        let kernel = Self::assemble(config, setup.vfs, jobs, false, setup.budget, |_| {}, |vfs_ref, tools| {
756            ExecContext::with_vfs_and_tools(vfs_ref.clone(), tools.clone())
757        })?;
758
759        #[cfg(all(feature = "localfs", feature = "overlay"))]
760        {
761            let mut kernel = kernel;
762            kernel.overlay_handle = overlay_handle;
763            // Also set it on the ExecContext so builtins can access it.
764            if let Some(ref handle) = kernel.overlay_handle {
765                kernel.exec_ctx.get_mut().overlay_handle = Some(Arc::clone(handle));
766            }
767            return Ok(kernel);
768        }
769
770        #[allow(unreachable_code)]
771        Ok(kernel)
772    }
773
774    /// Set up VFS based on mount mode.
775    ///
776    /// Returns the router, the budget handle (if bounded), and an optional
777    /// overlay handle when `config.overlay` is true. The budget is Arc-shared:
778    /// every `MemoryFs` the kernel creates here holds a clone of the same
779    /// `Arc<ByteBudget>`, so the total charged against it is the sum of all
780    /// in-memory content across all kernel-owned memory mounts.
781    ///
782    /// # Errors
783    /// Returns `Err` if `config.overlay` is true and the mode is `NoLocal`
784    /// (overlay is meaningless when everything is already virtual — there is
785    /// no real lower layer to wrap). The caller (`Kernel::new`) propagates
786    /// this as an `anyhow::Error`.
787    fn setup_vfs(config: &KernelConfig) -> Result<VfsSetupResult> {
788        let mut vfs = VfsRouter::new();
789
790        // One budget for all memory mounts this kernel owns — labeled so the
791        // error message tells the user exactly which knob to raise.
792        let budget: Option<Arc<ByteBudget>> = config
793            .vfs_budget_bytes
794            .map(|bytes| Arc::new(ByteBudget::labeled(bytes, "vfs-memory")));
795
796        /// Helper: construct a `MemoryFs` wired to `budget` if present.
797        fn mem(budget: &Option<Arc<ByteBudget>>) -> MemoryFs {
798            match budget {
799                Some(b) => MemoryFs::with_budget(Arc::clone(b)),
800                None => MemoryFs::new(),
801            }
802        }
803
804        // Overlay handle — populated below if config.overlay is true.
805        #[cfg(all(feature = "localfs", feature = "overlay"))]
806        let mut overlay_handle: Option<Arc<OverlayHandle>> = None;
807
808        match &config.vfs_mode {
809            #[cfg(feature = "localfs")]
810            VfsMountMode::Passthrough => {
811                #[cfg(feature = "overlay")]
812                if config.overlay {
813                    // Wrap "/" in an OverlayFs so writes are virtual.
814                    let lower = Arc::new(LocalFs::read_only(PathBuf::from("/")));
815                    let overlay_fs = Arc::new(match &budget {
816                        Some(b) => OverlayFs::over_with_budget(lower, Arc::clone(b)),
817                        None => OverlayFs::over(lower),
818                    });
819                    let handle = Arc::new(OverlayHandle {
820                        fs: Arc::clone(&overlay_fs),
821                        mount_path: PathBuf::from("/"),
822                        commit_root: PathBuf::from("/"),
823                    });
824                    vfs.mount_arc("/", overlay_fs as Arc<dyn kaish_vfs::Filesystem>);
825                    overlay_handle = Some(handle);
826                } else {
827                    // LocalFs at "/" — native paths work directly
828                    vfs.mount("/", LocalFs::new(PathBuf::from("/")));
829                }
830                #[cfg(not(feature = "overlay"))]
831                {
832                    if config.overlay {
833                        return Err(anyhow::anyhow!(
834                            "overlay=true requires the `overlay` feature, but this build \
835                             was compiled without it. Recompile with --features overlay \
836                             (or the default feature set) to enable overlay mode."
837                        ));
838                    }
839                    // LocalFs at "/" — native paths work directly
840                    vfs.mount("/", LocalFs::new(PathBuf::from("/")));
841                }
842                // Memory for blobs
843                vfs.mount("/v", mem(&budget));
844            }
845            #[cfg(feature = "localfs")]
846            VfsMountMode::Sandboxed { root } => {
847                // Memory at root for safety (catches paths outside sandbox).
848                // Note: /tmp and the XDG runtime dir are LocalFs — writes
849                // there escape the VFS budget and are NOT virtual. This is
850                // intentional: /tmp interop with other processes matters more
851                // than accounting for scratch files there.
852                vfs.mount("/", mem(&budget));
853                vfs.mount("/v", mem(&budget));
854
855                // Real /tmp for interop with other processes
856                vfs.mount("/tmp", LocalFs::new(PathBuf::from("/tmp")));
857
858                // Mount XDG runtime dir for spill files and socket access
859                let runtime = crate::paths::xdg_runtime_dir();
860                if runtime.exists() {
861                    let runtime_str = runtime.to_string_lossy().to_string();
862                    vfs.mount(&runtime_str, LocalFs::new(runtime));
863                }
864
865                // Resolve the sandbox root (defaults to $HOME)
866                let local_root = root.clone().unwrap_or_else(|| {
867                    std::env::var("HOME")
868                        .map(PathBuf::from)
869                        .unwrap_or_else(|_| PathBuf::from("/"))
870                });
871
872                let mount_point = local_root.to_string_lossy().to_string();
873
874                #[cfg(feature = "overlay")]
875                if config.overlay {
876                    // Wrap the sandbox root in an OverlayFs.
877                    let lower = Arc::new(LocalFs::read_only(local_root.clone()));
878                    let overlay_fs = Arc::new(match &budget {
879                        Some(b) => OverlayFs::over_with_budget(lower, Arc::clone(b)),
880                        None => OverlayFs::over(lower),
881                    });
882                    let handle = Arc::new(OverlayHandle {
883                        fs: Arc::clone(&overlay_fs),
884                        mount_path: PathBuf::from(&mount_point),
885                        commit_root: local_root,
886                    });
887                    vfs.mount_arc(&mount_point, overlay_fs as Arc<dyn kaish_vfs::Filesystem>);
888                    overlay_handle = Some(handle);
889                } else {
890                    // Mount at the real path for transparent access
891                    // e.g., /home/atobey → LocalFs("/home/atobey")
892                    // so /home/atobey/src/kaish just works
893                    vfs.mount(&mount_point, LocalFs::new(local_root));
894                }
895                #[cfg(not(feature = "overlay"))]
896                {
897                    if config.overlay {
898                        return Err(anyhow::anyhow!(
899                            "overlay=true requires the `overlay` feature, but this build \
900                             was compiled without it. Recompile with --features overlay \
901                             (or the default feature set) to enable overlay mode."
902                        ));
903                    }
904                    // Mount at the real path for transparent access
905                    vfs.mount(&mount_point, LocalFs::new(local_root));
906                }
907            }
908            VfsMountMode::NoLocal => {
909                if config.overlay {
910                    return Err(anyhow::anyhow!(
911                        "overlay=true is incompatible with VfsMountMode::NoLocal: \
912                         everything is already virtual, there is no real lower layer \
913                         to wrap. Use with_overlay(false) or switch to a Passthrough \
914                         or Sandboxed VFS mode."
915                    ));
916                }
917                // Pure memory mode — no local filesystem
918                vfs.mount("/", mem(&budget));
919                vfs.mount("/tmp", mem(&budget));
920                vfs.mount("/v", mem(&budget));
921            }
922        }
923
924        Ok(VfsSetupResult {
925            vfs,
926            budget,
927            #[cfg(all(feature = "localfs", feature = "overlay"))]
928            overlay_handle,
929        })
930    }
931
932    /// Create a transient kernel (no persistence).
933    pub fn transient() -> Result<Self> {
934        Self::new(KernelConfig::transient())
935    }
936
937    /// Create a kernel with a custom backend and `/v/*` virtual path support.
938    ///
939    /// This is the constructor for embedding kaish in other systems that provide
940    /// their own storage backend (e.g., CRDT-backed storage in kaijutsu).
941    ///
942    /// A `VirtualOverlayBackend` routes paths automatically:
943    /// - `/v/*` → Internal VFS (JobFs at `/v/jobs`, MemoryFs at `/v/blobs`)
944    /// - Everything else → Your custom backend
945    ///
946    /// The optional `configure_vfs` closure lets you add additional virtual mounts
947    /// (e.g., `/v/docs` for CRDT blocks) after the built-in mounts are set up.
948    ///
949    /// **Note:** The config's `vfs_mode` is ignored — all non-`/v/*` path routing
950    /// is handled by your custom backend. The config is only used for `name`, `cwd`,
951    /// `skip_validation`, and `interactive`.
952    ///
953    /// # Example
954    ///
955    /// ```ignore
956    /// // Simple: default /v/* mounts only
957    /// let kernel = Kernel::with_backend(backend, config, |_| {}, |_| {})?;
958    ///
959    /// // With custom mounts
960    /// let kernel = Kernel::with_backend(backend, config, |vfs| {
961    ///     vfs.mount_arc("/v/docs", docs_fs);
962    ///     vfs.mount_arc("/v/g", git_fs);
963    /// }, |_| {})?;
964    ///
965    /// // With custom tools
966    /// let kernel = Kernel::with_backend(backend, config, |_| {}, |tools| {
967    ///     tools.register(MyCustomTool::new());
968    /// })?;
969    /// ```
970    pub fn with_backend(
971        backend: Arc<dyn KernelBackend>,
972        config: KernelConfig,
973        configure_vfs: impl FnOnce(&mut VfsRouter),
974        configure_tools: impl FnOnce(&mut ToolRegistry),
975    ) -> Result<Self> {
976        use crate::backend::VirtualOverlayBackend;
977
978        // overlay=true is incompatible with with_backend: the embedder controls
979        // the VFS and the kernel cannot wrap it without bypassing the embedder's
980        // semantics. Fail loudly rather than silently ignoring the flag.
981        if config.overlay {
982            return Err(anyhow::anyhow!(
983                "overlay=true is incompatible with Kernel::with_backend: the embedder \
984                 controls the VFS; the kernel cannot wrap it with an OverlayFs without \
985                 bypassing the embedder's storage semantics. Use KernelConfig::with_overlay(false)."
986            ));
987        }
988
989        let mut vfs = VfsRouter::new();
990        let jobs = Arc::new(JobManager::new());
991
992        // Create the budget from config so `with_vfs_budget` / `without_vfs_budget`
993        // work for `with_backend` callers too. The /v/blobs MemoryFs is the only
994        // kernel-owned memory mount here — embedders own the rest of the VFS.
995        let vfs_budget: Option<Arc<ByteBudget>> = config
996            .vfs_budget_bytes
997            .map(|bytes| Arc::new(ByteBudget::labeled(bytes, "vfs-memory")));
998
999        vfs.mount("/v/jobs", JobFs::new(jobs.clone()));
1000        let blobs_fs = match &vfs_budget {
1001            Some(b) => MemoryFs::with_budget(Arc::clone(b)),
1002            None => MemoryFs::new(),
1003        };
1004        vfs.mount("/v/blobs", blobs_fs);
1005
1006        // Let caller add custom mounts (e.g., /v/docs, /v/g)
1007        configure_vfs(&mut vfs);
1008
1009        // A custom-backend kernel owns no host mounts — the embedder supplies
1010        // the entire VFS — so any kernel write to a host filesystem via
1011        // `std::fs` (output spill, job output files) bypasses that VFS and its
1012        // read-only guarantees. Forbid host side channels unconditionally.
1013        Self::assemble(config, vfs, jobs, true, vfs_budget, configure_tools, |vfs_arc: &Arc<VfsRouter>, _: &Arc<ToolRegistry>| {
1014            let overlay: Arc<dyn KernelBackend> =
1015                Arc::new(VirtualOverlayBackend::new(backend, vfs_arc.clone()));
1016            ExecContext::with_backend(overlay)
1017        })
1018    }
1019
1020    /// Shared assembly: wires up tools, runner, scope, and ExecContext.
1021    ///
1022    /// The `make_ctx` closure receives the VFS and tools so backends that need
1023    /// them (like `LocalBackend::with_tools`) can capture them. Custom backends
1024    /// that already have their own storage can ignore these parameters.
1025    fn assemble(
1026        config: KernelConfig,
1027        mut vfs: VfsRouter,
1028        jobs: Arc<JobManager>,
1029        no_host_filesystem: bool,
1030        vfs_budget: Option<Arc<ByteBudget>>,
1031        configure_tools: impl FnOnce(&mut ToolRegistry),
1032        make_ctx: impl FnOnce(&Arc<VfsRouter>, &Arc<ToolRegistry>) -> ExecContext,
1033    ) -> Result<Self> {
1034        // A kernel with no host filesystem of its own must never write to one
1035        // through a side channel. Two paths bypass the VFS by going straight to
1036        // `std::fs`: output spill (`paths::spill_dir()` → host temp/cache) and
1037        // background-job output files (`Job::write_output_file` → host temp).
1038        // Both would punch through the isolation, so force them off:
1039        // in-memory truncation for spill, no host file for job output.
1040        //
1041        // This is true for a `NoLocal` kernel (mounts nothing) and for any
1042        // `with_backend` kernel (`no_host_filesystem` — the embedder owns the
1043        // VFS, so the kernel controls no host mounts and any host write is a
1044        // bypass). Overrides an explicit `SpillMode::Disk`, which is nonsensical
1045        // when there is no kernel-owned host filesystem to spill to.
1046        let no_host_side_channel =
1047            no_host_filesystem || matches!(config.vfs_mode, VfsMountMode::NoLocal);
1048
1049        let KernelConfig { name, cwd, skip_validation, interactive, ignore_config, mut output_limit, allow_external_commands, latch_enabled, trash_enabled, nonce_store, initial_vars, request_timeout, kill_grace, .. } = config;
1050
1051        if no_host_side_channel {
1052            output_limit.set_spill_mode(crate::output_limit::SpillMode::Memory);
1053            jobs.set_persist_output_files(false);
1054        }
1055
1056        let mut tools = ToolRegistry::new();
1057        register_builtins(&mut tools);
1058        configure_tools(&mut tools);
1059        let tools = Arc::new(tools);
1060
1061        // Mount BuiltinFs so `ls /v/bin` lists builtins
1062        vfs.mount("/v/bin", BuiltinFs::new(tools.clone()));
1063
1064        let vfs = Arc::new(vfs);
1065
1066        let runner = PipelineRunner::new(tools.clone());
1067
1068        let (stderr_writer, stderr_receiver) = stderr_stream();
1069
1070        let mut exec_ctx = make_ctx(&vfs, &tools);
1071        exec_ctx.set_cwd(cwd);
1072        exec_ctx.set_job_manager(jobs.clone());
1073        exec_ctx.set_tool_schemas(tools.schemas());
1074        exec_ctx.set_tools(tools.clone());
1075        #[cfg(feature = "os-integration")]
1076        exec_ctx.set_trash_backend(Arc::new(crate::trash_system::SystemTrash));
1077        exec_ctx.stderr = Some(stderr_writer);
1078        exec_ctx.ignore_config = ignore_config;
1079        exec_ctx.output_limit = output_limit;
1080        exec_ctx.allow_external_commands = allow_external_commands;
1081        exec_ctx.vfs_budget = vfs_budget.clone();
1082        if let Some(store) = nonce_store {
1083            exec_ctx.nonce_store = store;
1084        }
1085
1086        Ok(Self {
1087            name,
1088            scope: RwLock::new({
1089                let mut scope = Scope::new();
1090                scope.set_pid(KERNEL_COUNTER.fetch_add(1, Ordering::Relaxed));
1091                // HOME is NOT read from the host env here — the kernel is
1092                // hermetic. Frontends (REPL, MCP) seed it via `initial_vars`
1093                // below (from `std::env::vars()`); a hermetic embedder leaves
1094                // `initial_vars` empty and gets no HOME (tilde stays literal).
1095                // Apply caller-supplied initial variables, all marked exported.
1096                // Frontends (REPL, MCP) populate this from std::env::vars()
1097                // for shell-like UX; embedders that want hermetic behavior
1098                // simply leave it empty.
1099                for (name, value) in initial_vars {
1100                    scope.set_exported(name, value);
1101                }
1102                scope.set_latch_enabled(latch_enabled);
1103                scope.set_trash_enabled(trash_enabled);
1104                scope
1105            }),
1106            tools,
1107            user_tools: RwLock::new(HashMap::new()),
1108            vfs,
1109            jobs,
1110            runner,
1111            exec_ctx: RwLock::new(exec_ctx),
1112            skip_validation,
1113            interactive,
1114            allow_external_commands,
1115            vfs_budget,
1116            request_timeout,
1117            kill_grace,
1118            stderr_receiver: tokio::sync::Mutex::new(stderr_receiver),
1119            cancel_token: std::sync::Mutex::new(tokio_util::sync::CancellationToken::new()),
1120            #[cfg(all(unix, feature = "subprocess"))]
1121            terminal_state: None,
1122            self_weak: std::sync::OnceLock::new(),
1123            execute_lock: tokio::sync::Mutex::new(()),
1124            bg_job_id: None,
1125            // Overlay handle is set by Kernel::new after assemble returns;
1126            // assemble itself doesn't know the handle (it's constructed in setup_vfs).
1127            // with_backend always has None (overlay=true is rejected above).
1128            #[cfg(all(feature = "localfs", feature = "overlay"))]
1129            overlay_handle: None,
1130        })
1131    }
1132
1133    /// Get the kernel name.
1134    pub fn name(&self) -> &str {
1135        &self.name
1136    }
1137
1138    /// Wrap this Kernel in an Arc and initialize its self-reference.
1139    ///
1140    /// This enables the Kernel to hand out `Arc<dyn CommandDispatcher>` references
1141    /// to child contexts, allowing builtins like `timeout` to dispatch inner
1142    /// commands through the full resolution chain (user tools → builtins →
1143    /// .kai scripts → external commands).
1144    pub fn into_arc(self) -> Arc<Self> {
1145        let arc = Arc::new(self);
1146        let _ = arc.self_weak.set(Arc::downgrade(&arc));
1147        arc
1148    }
1149
1150    /// Fork a subsidiary kernel for concurrent execution.
1151    ///
1152    /// The fork is a fully-functional `Kernel` that:
1153    /// - **Snapshots** per-session state from the parent: scope (COW — cheap),
1154    ///   user-defined tools, cwd, aliases, ignore config, etc. Mutations on
1155    ///   the fork do NOT propagate back to the parent — matching bash
1156    ///   subshell / background-job semantics.
1157    /// - **Shares** read-mostly resources with the parent via `Arc`: the tool
1158    ///   registry, the VFS router, and the job manager. A job registered by
1159    ///   the fork is visible to the parent's `jobs` builtin, and the fork
1160    ///   sees the same VFS mounts.
1161    /// - **Owns** its own `stderr_receiver`, `cancel_token`, and
1162    ///   `execute_lock`. It is never the TTY owner, so `interactive` is
1163    ///   `false` and `terminal_state` is `None`.
1164    ///
1165    /// The returned Arc has its `self_weak` populated (via `into_arc`), so
1166    /// nested dispatch through `ctx.dispatcher` (e.g. the `timeout` builtin)
1167    /// routes through the fork itself, not the parent — which is essential
1168    /// for concurrency safety.
1169    ///
1170    /// Use this for **detached** background concurrency where the fork should
1171    /// survive parent cancellation: the `&` background-job operator and any
1172    /// other "fire and forget" worker. The fork gets a fresh, independent
1173    /// cancellation token.
1174    ///
1175    /// For foreground concurrency (scatter workers, concurrent pipeline
1176    /// stages, `$(...)` cmdsubs) where parent timeout/cancel must cascade
1177    /// into the fork's external children, use [`Self::fork_attached`].
1178    pub async fn fork(&self) -> Arc<Self> {
1179        self.fork_inner(tokio_util::sync::CancellationToken::new(), self.bg_job_id)
1180            .await
1181    }
1182
1183    /// Fork attached to the parent's cancellation.
1184    ///
1185    /// Same as [`Self::fork`] but the fork's `cancel_token` is a child of
1186    /// the parent's. When the parent cancels (request timeout, embedder
1187    /// `Kernel::cancel`, etc.), the fork's token also cancels, which in
1188    /// turn kills any external children spawned in the fork via the
1189    /// `wait_or_kill` / SIGTERM-grace-SIGKILL path.
1190    pub async fn fork_attached(&self) -> Arc<Self> {
1191        let child_token = {
1192            #[allow(clippy::expect_used)]
1193            let parent = self.cancel_token.lock().expect("cancel_token poisoned");
1194            parent.child_token()
1195        };
1196        self.fork_inner(child_token, self.bg_job_id).await
1197    }
1198
1199    /// Fork for a background job, stamping the job id so external commands
1200    /// spawned anywhere beneath it record their process groups on that job
1201    /// (for `kill -<sig> %N`). The caller owns `cancel` so it can also drive
1202    /// `JobManager::cancel`.
1203    pub async fn fork_for_background(
1204        &self,
1205        cancel: tokio_util::sync::CancellationToken,
1206        job_id: crate::scheduler::JobId,
1207    ) -> Arc<Self> {
1208        self.fork_inner(cancel, Some(job_id)).await
1209    }
1210
1211    /// Shared fork implementation. Caller decides the cancellation token and
1212    /// which background job (if any) this fork runs on behalf of.
1213    async fn fork_inner(
1214        &self,
1215        cancel: tokio_util::sync::CancellationToken,
1216        bg_job_id: Option<crate::scheduler::JobId>,
1217    ) -> Arc<Self> {
1218        let scope_snapshot = self.scope.read().await.clone();
1219        let user_tools_snapshot = self.user_tools.read().await.clone();
1220
1221        // Snapshot exec_ctx by cloning the cloneable fields, then override
1222        // the ones that should not carry over (stderr channel, dispatcher,
1223        // interactive flag, terminal state, cancel — set from `cancel` arg).
1224        let mut fork_ctx = {
1225            let parent_ctx = self.exec_ctx.read().await;
1226            parent_ctx.child_for_pipeline()
1227        };
1228        let (stderr_writer, stderr_receiver) = stderr_stream();
1229        fork_ctx.stderr = Some(stderr_writer);
1230        // Clear dispatcher; dispatch_command will repopulate it to point at
1231        // the fork on the first dispatch call.
1232        fork_ctx.dispatcher = None;
1233        fork_ctx.interactive = false;
1234        fork_ctx.cancel = cancel.clone();
1235        #[cfg(all(unix, feature = "subprocess"))]
1236        {
1237            fork_ctx.terminal_state = None;
1238        }
1239
1240        let fork = Self {
1241            name: format!("{}:fork", self.name),
1242            scope: RwLock::new(scope_snapshot),
1243            tools: Arc::clone(&self.tools),
1244            user_tools: RwLock::new(user_tools_snapshot),
1245            vfs: Arc::clone(&self.vfs),
1246            jobs: Arc::clone(&self.jobs),
1247            runner: self.runner.clone(),
1248            exec_ctx: RwLock::new(fork_ctx),
1249            skip_validation: self.skip_validation,
1250            // Forks are never the TTY owner — they run in the background.
1251            interactive: false,
1252            allow_external_commands: self.allow_external_commands,
1253            // Arc-clone the budget so the fork draws from the same pool as the
1254            // parent — background jobs and scatter workers count against the same
1255            // cap as foreground writes.
1256            vfs_budget: self.vfs_budget.clone(),
1257            request_timeout: self.request_timeout,
1258            kill_grace: self.kill_grace,
1259            stderr_receiver: tokio::sync::Mutex::new(stderr_receiver),
1260            cancel_token: std::sync::Mutex::new(cancel),
1261            #[cfg(all(unix, feature = "subprocess"))]
1262            terminal_state: None,
1263            self_weak: std::sync::OnceLock::new(),
1264            execute_lock: tokio::sync::Mutex::new(()),
1265            bg_job_id,
1266            // Arc-clone the overlay handle so forks (background jobs, scatter
1267            // workers, pipeline stages) can reach the same overlay transaction
1268            // via `kaish-vfs status/diff/commit/reset`.
1269            #[cfg(all(feature = "localfs", feature = "overlay"))]
1270            overlay_handle: self.overlay_handle.clone(),
1271        };
1272
1273        fork.into_arc()
1274    }
1275
1276    /// Get an `Arc<dyn CommandDispatcher>` to this Kernel, if wrapped via `into_arc()`.
1277    ///
1278    /// Returns `None` if the Kernel was not wrapped, or if all strong references
1279    /// have been dropped (the `Weak` can no longer upgrade).
1280    pub fn dispatcher(&self) -> Option<Arc<dyn CommandDispatcher>> {
1281        self.self_weak
1282            .get()
1283            .and_then(|weak| weak.upgrade())
1284            .map(|arc| arc as Arc<dyn CommandDispatcher>)
1285    }
1286
1287    /// Initialize terminal state for interactive job control.
1288    ///
1289    /// Call this after kernel creation when running as an interactive REPL
1290    /// and stdin is a TTY. Sets up process groups and signal handling.
1291    #[cfg(all(unix, feature = "subprocess"))]
1292    pub fn init_terminal(&mut self) {
1293        if !self.interactive {
1294            return;
1295        }
1296        match crate::terminal::TerminalState::init() {
1297            Ok(state) => {
1298                let state = Arc::new(state);
1299                self.terminal_state = Some(state.clone());
1300                // Set on exec_ctx so builtins (fg, bg, kill) can access it
1301                self.exec_ctx.get_mut().terminal_state = Some(state);
1302                tracing::debug!("terminal job control initialized");
1303            }
1304            Err(e) => {
1305                tracing::warn!("failed to initialize terminal job control: {}", e);
1306            }
1307        }
1308    }
1309
1310    /// Replace or remove the trash backend used by `rm` and `kaish-trash`.
1311    ///
1312    /// The kernel installs the OS trash (`SystemTrash`) automatically when
1313    /// built with the `os-integration` feature. Embedders and tests can swap
1314    /// in a custom [`crate::trash::TrashBackend`], or pass `None` to remove
1315    /// it — with trash enabled but no backend present, `rm` fails loud
1316    /// rather than falling through to permanent delete.
1317    pub fn set_trash_backend(&mut self, backend: Option<Arc<dyn crate::trash::TrashBackend>>) {
1318        self.exec_ctx.get_mut().trash_backend = backend;
1319    }
1320
1321    /// Cancel the current execution.
1322    ///
1323    /// This cancels the current cancellation token, causing any execution
1324    /// loop to exit at the next checkpoint with exit code 130 (SIGINT).
1325    /// A fresh token is installed for the next `execute()` call.
1326    pub fn cancel(&self) {
1327        #[allow(clippy::expect_used)]
1328        let token = self.cancel_token.lock().expect("cancel_token poisoned");
1329        token.cancel();
1330    }
1331
1332    /// Check if the current execution has been cancelled.
1333    pub fn is_cancelled(&self) -> bool {
1334        #[allow(clippy::expect_used)]
1335        let token = self.cancel_token.lock().expect("cancel_token poisoned");
1336        token.is_cancelled()
1337    }
1338
1339    /// Reset the cancellation token (called at the start of each execute).
1340    fn reset_cancel(&self) -> tokio_util::sync::CancellationToken {
1341        #[allow(clippy::expect_used)]
1342        let mut token = self.cancel_token.lock().expect("cancel_token poisoned");
1343        if token.is_cancelled() {
1344            *token = tokio_util::sync::CancellationToken::new();
1345        }
1346        token.clone()
1347    }
1348
1349    /// Acquire the per-Kernel execute lock, warning on contention.
1350    ///
1351    /// Tokio's Mutex is fair (FIFO) so callers queue in arrival order. When
1352    /// the lock is already held, emit a warning so the silent serialization
1353    /// is observable in logs — if you need real parallelism, fork the kernel.
1354    async fn acquire_execute_lock(&self) -> tokio::sync::MutexGuard<'_, ()> {
1355        match self.execute_lock.try_lock() {
1356            Ok(guard) => guard,
1357            Err(_) => {
1358                tracing::warn!(
1359                    target: "kaish::kernel::concurrency",
1360                    kernel = %self.name,
1361                    "execute() contended — serializing concurrent caller; \
1362                     use Kernel::fork() for parallelism instead of sharing"
1363                );
1364                self.execute_lock.lock().await
1365            }
1366        }
1367    }
1368
1369    /// Execute kaish source code with default options.
1370    ///
1371    /// Equivalent to `execute_with_options(input, ExecuteOptions::default())`.
1372    /// Returns the result of the last statement executed.
1373    pub async fn execute(&self, input: &str) -> Result<ExecResult> {
1374        self.run_inner(input, ExecuteOptions::default(), None).await
1375    }
1376
1377    /// Execute with per-call options. The primary entry point for embedders
1378    /// that don't need per-statement output streaming.
1379    ///
1380    /// `opts` carries timeout, transient vars overlay, optional cwd override,
1381    /// and optional embedder-owned cancellation token. See [`ExecuteOptions`]
1382    /// for semantics. For streaming, use [`Self::execute_with_options_streaming`].
1383    ///
1384    /// **Cancellation:** if `opts.cancel_token` is `Some`, it is *raced*
1385    /// against the kernel's internal token. Either firing cancels and kills
1386    /// external children. The embedder's token is read-only — kernel
1387    /// timeouts do NOT propagate into it. Distinguish via the returned
1388    /// `code`: 124 = timeout, 130 = cancellation.
1389    ///
1390    /// **Timeout:** `opts.timeout` overrides `KernelConfig::request_timeout`.
1391    /// `Some(Duration::ZERO)` returns 124 immediately without spawning.
1392    ///
1393    /// Concurrent callers on the same Kernel serialize on the kernel-wide
1394    /// execute lock. For true parallelism, call [`Kernel::fork`] (detached)
1395    /// or [`Kernel::fork_attached`] (cancellation cascades from this kernel).
1396    pub async fn execute_with_options(
1397        &self,
1398        input: &str,
1399        opts: ExecuteOptions,
1400    ) -> Result<ExecResult> {
1401        self.run_inner(input, opts, None).await
1402    }
1403
1404    /// Same as [`Self::execute_with_options`] but with a per-statement output
1405    /// callback. The callback fires after each top-level statement so the
1406    /// embedder (REPL, MCP streaming) can flush output incrementally.
1407    pub async fn execute_with_options_streaming(
1408        &self,
1409        input: &str,
1410        opts: ExecuteOptions,
1411        on_output: &mut (dyn FnMut(&ExecResult) + Send),
1412    ) -> Result<ExecResult> {
1413        self.run_inner(input, opts, Some(on_output)).await
1414    }
1415
1416    /// Execute kaish source code with a transient overlay of exported variables.
1417    ///
1418    /// Deprecated thin wrapper over [`Self::execute_with_options`]. New code
1419    /// should use that method directly:
1420    /// `execute_with_options(input, ExecuteOptions::new().with_vars(vars))`.
1421    #[deprecated(note = "use Kernel::execute_with_options with ExecuteOptions::with_vars")]
1422    pub async fn execute_with_vars(
1423        &self,
1424        input: &str,
1425        vars: HashMap<String, Value>,
1426    ) -> Result<ExecResult> {
1427        self.run_inner(input, ExecuteOptions::new().with_vars(vars), None).await
1428    }
1429
1430    /// Execute kaish source code with a per-statement callback.
1431    ///
1432    /// Deprecated thin wrapper. New code should use
1433    /// [`Self::execute_with_options_streaming`].
1434    #[deprecated(note = "use Kernel::execute_with_options_streaming")]
1435    pub async fn execute_streaming(
1436        &self,
1437        input: &str,
1438        on_output: &mut (dyn FnMut(&ExecResult) + Send),
1439    ) -> Result<ExecResult> {
1440        self.run_inner(input, ExecuteOptions::default(), Some(on_output)).await
1441    }
1442
1443    /// Link embedder trace context, then run [`Self::execute_with_options_inner`].
1444    ///
1445    /// The `#[instrument]` execution span resolves its parent from the *current*
1446    /// OpenTelemetry context (see `tracing-opentelemetry`'s `parent_context`),
1447    /// captured when the span is first entered — not when the future is
1448    /// constructed. So a thread-local `attach()` scoped to construction is too
1449    /// early to be seen (the integration test confirms this). `with_context`
1450    /// re-attaches the embedder's context on *every* poll of the inner future,
1451    /// so the context is current at first-enter and survives runtime thread
1452    /// hops. With no embedder trace context, the future runs unwrapped.
1453    async fn run_inner(
1454        &self,
1455        input: &str,
1456        opts: ExecuteOptions,
1457        on_output: Option<&mut (dyn FnMut(&ExecResult) + Send)>,
1458    ) -> Result<ExecResult> {
1459        use opentelemetry::context::FutureExt;
1460
1461        // Capture the embedder's baggage before `opts` is consumed so it can be
1462        // echoed back onto the result on egress (see `merge_egress_baggage`).
1463        let embedder_baggage = opts.baggage.clone();
1464
1465        let result = match crate::telemetry::extract_parent(&opts) {
1466            Some(parent) => self
1467                .execute_with_options_inner(input, opts, on_output)
1468                .with_context(parent)
1469                .await,
1470            None => self.execute_with_options_inner(input, opts, on_output).await,
1471        };
1472
1473        result.map(|mut r| {
1474            crate::telemetry::merge_egress_baggage(&mut r, embedder_baggage);
1475            r
1476        })
1477    }
1478
1479    /// Shared body for `execute`, `execute_with_options(_streaming)`, and
1480    /// the deprecated wrappers. Owns the per-call cancel token, vars overlay,
1481    /// cwd override, and timeout race.
1482    #[tracing::instrument(level = "info", skip(self, opts, on_output), fields(input_len = input.len()))]
1483    async fn execute_with_options_inner(
1484        &self,
1485        input: &str,
1486        opts: ExecuteOptions,
1487        on_output: Option<&mut (dyn FnMut(&ExecResult) + Send)>,
1488    ) -> Result<ExecResult> {
1489        let _guard = self.acquire_execute_lock().await;
1490
1491        // Always reset to a fresh internal token; this is the kernel's own
1492        // cancel surface for embedders calling `Kernel::cancel()`. The
1493        // embedder-supplied `opts.cancel_token` is a *read-only input* — it
1494        // is NOT written into `self.cancel_token`, because doing so would
1495        // (a) leak the embedder's token past this call's lifetime,
1496        // (b) re-route a later `Kernel::cancel()` into the embedder's token,
1497        // (c) extend the token's lifetime via the kernel's strong clone.
1498        let internal = self.reset_cancel();
1499        // Race the embedder token against the kernel's internal token via a
1500        // tracked watcher task. We hold the JoinHandle so we can abort the
1501        // task at function exit — otherwise it would wait forever for either
1502        // token to fire and leak per call.
1503        let (effective_cancel, watcher_handle): (
1504            tokio_util::sync::CancellationToken,
1505            Option<tokio::task::JoinHandle<()>>,
1506        ) = if let Some(ext) = opts.cancel_token {
1507            let combined = tokio_util::sync::CancellationToken::new();
1508            let combined_writer = combined.clone();
1509            let i = internal.clone();
1510            let handle = tokio::spawn(async move {
1511                tokio::select! {
1512                    _ = i.cancelled() => combined_writer.cancel(),
1513                    _ = ext.cancelled() => combined_writer.cancel(),
1514                }
1515            });
1516            (combined, Some(handle))
1517        } else {
1518            (internal, None)
1519        };
1520
1521        // Effective timeout: per-call wins over kernel-config default.
1522        let timeout = opts.timeout.or(self.request_timeout);
1523
1524        // ZERO timeout: return 124 immediately without spawning anything.
1525        if timeout == Some(Duration::ZERO) {
1526            if let Some(h) = watcher_handle {
1527                h.abort();
1528            }
1529            return Ok(ExecResult::failure(124, "timeout: timed out after 0s".to_string()));
1530        }
1531
1532        // Apply per-call vars overlay (push frame + set_exported), wrapped in
1533        // an RAII guard so a panic inside `execute_streaming_inner` still
1534        // pops the frame and unexports the temporarily-exported names.
1535        struct VarsFrameGuard<'a> {
1536            kernel: &'a Kernel,
1537            newly_exported: Vec<String>,
1538        }
1539        impl Drop for VarsFrameGuard<'_> {
1540            fn drop(&mut self) {
1541                // Best-effort cleanup using try_write. The execute_lock held
1542                // throughout execute_with_options means there is no concurrent
1543                // foreground caller; forks have their own scope and won't
1544                // block this. blocking_write would deadlock the runtime when
1545                // called from a tokio worker thread, so we explicitly do NOT
1546                // fall back to it — if try_write fails (which we've never
1547                // seen in practice), log loudly and accept the leak rather
1548                // than deadlock the entire kernel.
1549                let Ok(mut scope) = self.kernel.scope.try_write() else {
1550                    tracing::error!(
1551                        "vars frame guard: scope lock unexpectedly busy; \
1552                         skipping pop_frame to avoid runtime deadlock — \
1553                         transient vars may leak"
1554                    );
1555                    return;
1556                };
1557                scope.pop_frame();
1558                for name in self.newly_exported.drain(..) {
1559                    scope.unexport(&name);
1560                }
1561            }
1562        }
1563
1564        // Per-call cwd override: save current cwd, set the new one, restore
1565        // on Drop so the kernel's persistent cwd doesn't leak between calls.
1566        // Same RAII pattern as VarsFrameGuard, same blocking_write trade-off.
1567        struct CwdGuard<'a> {
1568            kernel: &'a Kernel,
1569            saved: PathBuf,
1570        }
1571        impl Drop for CwdGuard<'_> {
1572            fn drop(&mut self) {
1573                let Ok(mut ec) = self.kernel.exec_ctx.try_write() else {
1574                    tracing::error!(
1575                        "cwd guard: exec_ctx lock unexpectedly busy; \
1576                         skipping cwd restore — kernel cwd may be wrong for next call"
1577                    );
1578                    return;
1579                };
1580                ec.cwd = std::mem::take(&mut self.saved);
1581            }
1582        }
1583        let _cwd_guard: Option<CwdGuard<'_>> = if let Some(new_cwd) = opts.cwd {
1584            let mut ec = self.exec_ctx.write().await;
1585            let saved = std::mem::replace(&mut ec.cwd, new_cwd);
1586            drop(ec);
1587            Some(CwdGuard { kernel: self, saved })
1588        } else {
1589            None
1590        };
1591
1592        let _vars_guard: Option<VarsFrameGuard<'_>> = if !opts.vars.is_empty() {
1593            let mut scope = self.scope.write().await;
1594            scope.push_frame();
1595            let mut newly = Vec::with_capacity(opts.vars.len());
1596            for (name, value) in opts.vars {
1597                if !scope.is_exported(&name) {
1598                    newly.push(name.clone());
1599                }
1600                scope.set_exported(name, value);
1601            }
1602            drop(scope);
1603            Some(VarsFrameGuard { kernel: self, newly_exported: newly })
1604        } else {
1605            None
1606        };
1607
1608        // Sync the effective cancel into self.exec_ctx so try_execute_external
1609        // (which reads via self.cancel_token) sees cancellation. We also need
1610        // builtins to see it via ctx.cancel — handled in execute_command.
1611        // For simplicity here we mirror effective_cancel into self.cancel_token
1612        // for the duration of this call, then restore the internal token at
1613        // the end (so a later Kernel::cancel still hits our internal surface).
1614        {
1615            #[allow(clippy::expect_used)]
1616            let mut cur = self.cancel_token.lock().expect("cancel_token poisoned");
1617            *cur = effective_cancel.clone();
1618        }
1619
1620        // The movable-deadline watchdog for this call (None without a timeout),
1621        // mirrored into exec_ctx — like the cancel token — so builtins can
1622        // suspend the script clock via `ctx.patient`. Assigned unconditionally
1623        // (clearing any stale handle) and reset to None in the restore block.
1624        let watchdog = timeout.map(|d| Arc::new(crate::watchdog::Watchdog::new(d)));
1625        {
1626            let mut ec = self.exec_ctx.write().await;
1627            ec.watchdog = watchdog.clone();
1628        }
1629
1630        // Run inner with optional timeout. The watchdog task cancels our token
1631        // on elapsed; the cascade fires SIGTERM/SIGKILL on any external
1632        // children via the wait_or_kill discipline in try_execute_external.
1633        let mut noop_cb: Box<dyn FnMut(&ExecResult) + Send> = Box::new(|_| {});
1634        let cb_ref: &mut (dyn FnMut(&ExecResult) + Send) = match on_output {
1635            Some(cb) => cb,
1636            None => &mut *noop_cb,
1637        };
1638
1639        let result = if let Some(d) = timeout {
1640            #[allow(clippy::expect_used)]
1641            let watchdog = watchdog.clone().expect("watchdog constructed when timeout is set");
1642            let elapsed = Arc::new(std::sync::atomic::AtomicBool::new(false));
1643            let timer = tokio::spawn(watchdog.run(elapsed.clone(), effective_cancel.clone()));
1644            let r = self.execute_streaming_inner(input, cb_ref).await;
1645            timer.abort();
1646            match r {
1647                Ok(mut res) => {
1648                    if elapsed.load(std::sync::atomic::Ordering::SeqCst) {
1649                        res.code = 124;
1650                        if res.err.is_empty() {
1651                            res.err = format!("timeout: timed out after {:?}", d);
1652                        }
1653                    }
1654                    Ok(res)
1655                }
1656                Err(e) => Err(e),
1657            }
1658        } else {
1659            self.execute_streaming_inner(input, cb_ref).await
1660        };
1661
1662        // Restore self.cancel_token to a fresh, uncancelled token so the
1663        // embedder's view of `Kernel::cancel()` stays predictable on the
1664        // next call (it cancels the kernel's own token, not whatever was
1665        // left over from this call's combined token).
1666        {
1667            #[allow(clippy::expect_used)]
1668            let mut cur = self.cancel_token.lock().expect("cancel_token poisoned");
1669            *cur = tokio_util::sync::CancellationToken::new();
1670        }
1671
1672        // Drop the watchdog handle from exec_ctx — its timer task is gone
1673        // (fired or aborted above); a patient hold acquired against a stale
1674        // handle would silently suspend nothing.
1675        {
1676            let mut ec = self.exec_ctx.write().await;
1677            ec.watchdog = None;
1678        }
1679
1680        // Tear down the embedder-token race watcher (if any). Leaving it
1681        // alive would idle forever waiting for tokens that may never fire.
1682        if let Some(h) = watcher_handle {
1683            h.abort();
1684        }
1685
1686        // VarsFrameGuard drops here on the success path and on early-return
1687        // paths above (error path included). Panic safety preserved.
1688        result
1689    }
1690
1691    /// The actual body of `execute_streaming`, run while holding the execute lock.
1692    ///
1693    /// Split out so internal kernel paths that are already under the lock can
1694    /// call this without deadlocking on re-entry. External callers must go
1695    /// through [`Self::execute_streaming`] so they acquire the lock.
1696    async fn execute_streaming_inner(
1697        &self,
1698        input: &str,
1699        on_output: &mut (dyn FnMut(&ExecResult) + Send),
1700    ) -> Result<ExecResult> {
1701        let program = parse(input).map_err(|errors| {
1702            let msg = errors
1703                .iter()
1704                .map(|e| e.format(input))
1705                .collect::<Vec<_>>()
1706                .join("\n");
1707            anyhow::anyhow!("parse error:\n{}", msg)
1708        })?;
1709
1710        // AST display mode: show AST instead of executing
1711        {
1712            let scope = self.scope.read().await;
1713            if scope.show_ast() {
1714                let output = format!("{:#?}\n", program);
1715                return Ok(ExecResult::with_output(crate::interpreter::OutputData::text(output)));
1716            }
1717        }
1718
1719        // Pre-execution validation
1720        if !self.skip_validation {
1721            let user_tools = self.user_tools.read().await;
1722            let validator = Validator::new(&self.tools, &user_tools);
1723            let issues = validator.validate(&program);
1724
1725            // Collect errors (warnings are logged but don't prevent execution)
1726            let errors: Vec<_> = issues
1727                .iter()
1728                .filter(|i| i.severity == Severity::Error)
1729                .collect();
1730
1731            if !errors.is_empty() {
1732                let error_msg = errors
1733                    .iter()
1734                    .map(|e| e.format(input))
1735                    .collect::<Vec<_>>()
1736                    .join("\n");
1737                return Err(anyhow::anyhow!("validation failed:\n{}", error_msg));
1738            }
1739
1740            // Log warnings via tracing (trace level to avoid noise)
1741            for warning in issues.iter().filter(|i| i.severity == Severity::Warning) {
1742                tracing::trace!("validation: {}", warning.format(input));
1743            }
1744        }
1745
1746        let mut result = ExecResult::success("");
1747
1748        // Reset cancellation token for this execution.
1749        let cancel = self.reset_cancel();
1750
1751        for stmt in program.statements {
1752            if matches!(stmt, Stmt::Empty) {
1753                continue;
1754            }
1755
1756            // Cancellation checkpoint
1757            if cancel.is_cancelled() {
1758                result.code = 130;
1759                return Ok(result);
1760            }
1761
1762            let flow = self.execute_stmt_flow(&stmt).await?;
1763
1764            // Drain any stderr written by pipeline stages during this statement.
1765            // This captures stderr from intermediate pipeline stages that would
1766            // otherwise be lost (only the last stage's result is returned).
1767            let drained_stderr = {
1768                let mut receiver = self.stderr_receiver.lock().await;
1769                receiver.drain_lossy()
1770            };
1771
1772            match flow {
1773                ControlFlow::Normal(mut r) => {
1774                    if !drained_stderr.is_empty() {
1775                        if !r.err.is_empty() && !r.err.ends_with('\n') {
1776                            r.err.push('\n');
1777                        }
1778                        // Prepend pipeline stderr before the last stage's stderr
1779                        let combined = format!("{}{}", drained_stderr, r.err);
1780                        r.err = combined;
1781                    }
1782                    on_output(&r);
1783                    // Carry the last statement's structured output for MCP TOON encoding.
1784                    // Must be done here (not in accumulate_result) because accumulate_result
1785                    // is also used in loops where per-iteration output would be wrong.
1786                    let last_output = r.output().cloned();
1787                    accumulate_result(&mut result, &r);
1788                    result.set_output(last_output);
1789                }
1790                ControlFlow::Exit { code } => {
1791                    if !drained_stderr.is_empty() {
1792                        result.err.push_str(&drained_stderr);
1793                    }
1794                    result.code = code;
1795                    return Ok(result);
1796                }
1797                ControlFlow::Return { mut value } => {
1798                    if !drained_stderr.is_empty() {
1799                        value.err = format!("{}{}", drained_stderr, value.err);
1800                    }
1801                    on_output(&value);
1802                    result = value;
1803                }
1804                ControlFlow::Break { result: mut r, .. } | ControlFlow::Continue { result: mut r, .. } => {
1805                    if !drained_stderr.is_empty() {
1806                        r.err = format!("{}{}", drained_stderr, r.err);
1807                    }
1808                    on_output(&r);
1809                    result = r;
1810                }
1811            }
1812        }
1813
1814        Ok(result)
1815    }
1816
1817    /// Execute a single statement, returning control flow information.
1818    fn execute_stmt_flow<'a>(
1819        &'a self,
1820        stmt: &'a Stmt,
1821    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<ControlFlow>> + Send + 'a>> {
1822        use tracing::Instrument;
1823        let span = tracing::debug_span!("execute_stmt_flow", stmt_type = %stmt.kind_name());
1824        Box::pin(async move {
1825        match stmt {
1826            Stmt::Assignment(assign) => {
1827                // Use async evaluator to support command substitution
1828                let value = self.eval_expr_async(&assign.value).await
1829                    .context("failed to evaluate assignment")?;
1830                let mut scope = self.scope.write().await;
1831                if assign.local {
1832                    // local: set in innermost (current function) frame
1833                    scope.set(&assign.name, value.clone());
1834                } else {
1835                    // non-local: update existing or create in root frame
1836                    scope.set_global(&assign.name, value.clone());
1837                }
1838                drop(scope);
1839
1840                // Assignments don't produce output (like sh)
1841                Ok(ControlFlow::ok(ExecResult::success("")))
1842            }
1843            Stmt::Command(cmd) => {
1844                // Route single commands through execute_pipeline for a unified path.
1845                // This ensures all commands go through the dispatcher chain.
1846                let pipeline = crate::ast::Pipeline {
1847                    commands: vec![cmd.clone()],
1848                    background: false,
1849                };
1850                let result = self.execute_pipeline(&pipeline).await?;
1851                self.update_last_result(&result).await;
1852
1853                // Check for error exit mode (set -e)
1854                if !result.ok() {
1855                    let scope = self.scope.read().await;
1856                    if scope.error_exit_enabled() {
1857                        return Ok(ControlFlow::exit_code(result.code));
1858                    }
1859                }
1860
1861                Ok(ControlFlow::ok(result))
1862            }
1863            Stmt::Pipeline(pipeline) => {
1864                let result = self.execute_pipeline(pipeline).await?;
1865                self.update_last_result(&result).await;
1866
1867                // Check for error exit mode (set -e)
1868                if !result.ok() {
1869                    let scope = self.scope.read().await;
1870                    if scope.error_exit_enabled() {
1871                        return Ok(ControlFlow::exit_code(result.code));
1872                    }
1873                }
1874
1875                Ok(ControlFlow::ok(result))
1876            }
1877            Stmt::If(if_stmt) => {
1878                // Use async evaluator to support command substitution in conditions
1879                let cond_value = self.eval_expr_async(&if_stmt.condition).await?;
1880
1881                let branch = if is_truthy(&cond_value) {
1882                    &if_stmt.then_branch
1883                } else {
1884                    if_stmt.else_branch.as_deref().unwrap_or(&[])
1885                };
1886
1887                let mut result = ExecResult::success("");
1888                for stmt in branch {
1889                    let flow = self.execute_stmt_flow(stmt).await?;
1890                    match flow {
1891                        ControlFlow::Normal(r) => {
1892                            accumulate_result(&mut result, &r);
1893                            self.drain_stderr_into(&mut result).await;
1894                        }
1895                        other => {
1896                            self.drain_stderr_into(&mut result).await;
1897                            return Ok(other);
1898                        }
1899                    }
1900                }
1901                Ok(ControlFlow::ok(result))
1902            }
1903            Stmt::For(for_loop) => {
1904                // Evaluate all items and collect values for iteration
1905                // Use async evaluator to support command substitution like $(seq 1 5)
1906                let mut items: Vec<Value> = Vec::new();
1907                for item_expr in &for_loop.items {
1908                    // Glob expansion in for-loop items: `for f in *.txt`
1909                    if let Expr::GlobPattern(pattern) = item_expr {
1910                        let glob_enabled = {
1911                            let scope = self.scope.read().await;
1912                            scope.glob_enabled()
1913                        };
1914                        if glob_enabled {
1915                            let (paths, cwd) = {
1916                                let ctx = self.exec_ctx.read().await;
1917                                let paths = ctx.expand_glob(pattern).await
1918                                    .map_err(|e| anyhow::anyhow!("glob: {}", e))?;
1919                                let cwd = ctx.resolve_path(".");
1920                                (paths, cwd)
1921                            };
1922                            if paths.is_empty() {
1923                                return Err(anyhow::anyhow!("no matches: {}", pattern));
1924                            }
1925                            for path in paths {
1926                                let display = if !pattern.starts_with('/') {
1927                                    path.strip_prefix(&cwd)
1928                                        .unwrap_or(&path)
1929                                        .to_string_lossy().into_owned()
1930                                } else {
1931                                    path.to_string_lossy().into_owned()
1932                                };
1933                                items.push(Value::String(display));
1934                            }
1935                            continue;
1936                        }
1937                    }
1938                    // Track whether this item came from $(cmd); that's the
1939                    // only position where multi-line stdout auto-splits per
1940                    // line. Arrays still spread element-by-element; bare
1941                    // $VAR is rejected upstream by validator E012. See
1942                    // docs/plan-for-loop-newline-split.md.
1943                    let from_command_subst = matches!(item_expr, Expr::CommandSubst(_));
1944                    let item = self.eval_expr_async(item_expr).await?;
1945                    match item {
1946                        // JSON arrays iterate over elements (preferred path
1947                        // when builtins emit .data — seq, jq, cut, find, …)
1948                        Value::Json(serde_json::Value::Array(arr)) => {
1949                            for elem in arr {
1950                                items.push(json_to_value(elem));
1951                            }
1952                        }
1953                        // Strings from $(cmd): empty → 0 iterations,
1954                        // multi-line → split per line (trimming trailing
1955                        // newlines and per-line trailing \r), single-line
1956                        // → one iteration. Whitespace within a line is
1957                        // NOT split — the "$VAR with spaces just works"
1958                        // promise is preserved because this only fires
1959                        // in CommandSubst position.
1960                        Value::String(s) if from_command_subst => {
1961                            let trimmed = s.trim_end_matches(['\n', '\r']);
1962                            if trimmed.is_empty() {
1963                                continue;
1964                            }
1965                            if trimmed.contains('\n') {
1966                                for line in trimmed.split('\n') {
1967                                    let line = line.trim_end_matches('\r');
1968                                    items.push(Value::String(line.to_string()));
1969                                }
1970                            } else {
1971                                items.push(Value::String(trimmed.to_string()));
1972                            }
1973                        }
1974                        // Strings not from $(cmd) stay as one value.
1975                        other => items.push(other),
1976                    }
1977                }
1978
1979                let mut result = ExecResult::success("");
1980                {
1981                    let mut scope = self.scope.write().await;
1982                    scope.push_frame();
1983                }
1984
1985                'outer: for item in items {
1986                    // Cancellation checkpoint per iteration
1987                    if self.is_cancelled() {
1988                        let mut scope = self.scope.write().await;
1989                        scope.pop_frame();
1990                        result.code = 130;
1991                        return Ok(ControlFlow::ok(result));
1992                    }
1993                    {
1994                        let mut scope = self.scope.write().await;
1995                        scope.set(&for_loop.variable, item);
1996                    }
1997                    for stmt in &for_loop.body {
1998                        let mut flow = match self.execute_stmt_flow(stmt).await {
1999                            Ok(f) => f,
2000                            Err(e) => {
2001                                let mut scope = self.scope.write().await;
2002                                scope.pop_frame();
2003                                return Err(e);
2004                            }
2005                        };
2006                        self.drain_stderr_into(&mut result).await;
2007                        match &mut flow {
2008                            ControlFlow::Normal(r) => {
2009                                accumulate_result(&mut result, r);
2010                                if !r.ok() {
2011                                    let scope = self.scope.read().await;
2012                                    if scope.error_exit_enabled() {
2013                                        drop(scope);
2014                                        let mut scope = self.scope.write().await;
2015                                        scope.pop_frame();
2016                                        return Ok(ControlFlow::exit_code(r.code));
2017                                    }
2018                                }
2019                            }
2020                            ControlFlow::Break { .. } => {
2021                                if flow.decrement_level() {
2022                                    accumulate_flow_output(&mut result, &flow);
2023                                    break 'outer;
2024                                }
2025                                fold_loop_output_into_flow(std::mem::take(&mut result), &mut flow);
2026                                let mut scope = self.scope.write().await;
2027                                scope.pop_frame();
2028                                return Ok(flow);
2029                            }
2030                            ControlFlow::Continue { .. } => {
2031                                if flow.decrement_level() {
2032                                    accumulate_flow_output(&mut result, &flow);
2033                                    continue 'outer;
2034                                }
2035                                fold_loop_output_into_flow(std::mem::take(&mut result), &mut flow);
2036                                let mut scope = self.scope.write().await;
2037                                scope.pop_frame();
2038                                return Ok(flow);
2039                            }
2040                            ControlFlow::Return { .. } | ControlFlow::Exit { .. } => {
2041                                let mut scope = self.scope.write().await;
2042                                scope.pop_frame();
2043                                return Ok(flow);
2044                            }
2045                        }
2046                    }
2047                }
2048
2049                {
2050                    let mut scope = self.scope.write().await;
2051                    scope.pop_frame();
2052                }
2053                Ok(ControlFlow::ok(result))
2054            }
2055            Stmt::While(while_loop) => {
2056                let mut result = ExecResult::success("");
2057
2058                'outer: loop {
2059                    // Evaluate condition - use async to support command substitution
2060                    // Cancellation checkpoint per iteration
2061                    if self.is_cancelled() {
2062                        result.code = 130;
2063                        return Ok(ControlFlow::ok(result));
2064                    }
2065
2066                    let cond_value = self.eval_expr_async(&while_loop.condition).await?;
2067
2068                    if !is_truthy(&cond_value) {
2069                        break;
2070                    }
2071
2072                    // Execute body
2073                    for stmt in &while_loop.body {
2074                        let mut flow = self.execute_stmt_flow(stmt).await?;
2075                        self.drain_stderr_into(&mut result).await;
2076                        match &mut flow {
2077                            ControlFlow::Normal(r) => {
2078                                accumulate_result(&mut result, r);
2079                                if !r.ok() {
2080                                    let scope = self.scope.read().await;
2081                                    if scope.error_exit_enabled() {
2082                                        return Ok(ControlFlow::exit_code(r.code));
2083                                    }
2084                                }
2085                            }
2086                            ControlFlow::Break { .. } => {
2087                                if flow.decrement_level() {
2088                                    accumulate_flow_output(&mut result, &flow);
2089                                    break 'outer;
2090                                }
2091                                fold_loop_output_into_flow(std::mem::take(&mut result), &mut flow);
2092                                return Ok(flow);
2093                            }
2094                            ControlFlow::Continue { .. } => {
2095                                if flow.decrement_level() {
2096                                    accumulate_flow_output(&mut result, &flow);
2097                                    continue 'outer;
2098                                }
2099                                fold_loop_output_into_flow(std::mem::take(&mut result), &mut flow);
2100                                return Ok(flow);
2101                            }
2102                            ControlFlow::Return { .. } | ControlFlow::Exit { .. } => {
2103                                return Ok(flow);
2104                            }
2105                        }
2106                    }
2107                }
2108
2109                Ok(ControlFlow::ok(result))
2110            }
2111            Stmt::Case(case_stmt) => {
2112                // Evaluate the expression to match against
2113                let match_value = {
2114                    let value = self.eval_expr_async(&case_stmt.expr).await?;
2115                    value_to_string(&value)
2116                };
2117
2118                // Try each branch until we find a match
2119                for branch in &case_stmt.branches {
2120                    let matched = branch.patterns.iter().any(|pattern| {
2121                        glob_match(pattern, &match_value)
2122                    });
2123
2124                    if matched {
2125                        // Execute the branch body
2126                        let mut result = ExecResult::success("");
2127                        for stmt in &branch.body {
2128                            let flow = self.execute_stmt_flow(stmt).await?;
2129                            match flow {
2130                                ControlFlow::Normal(r) => {
2131                                    accumulate_result(&mut result, &r);
2132                                    self.drain_stderr_into(&mut result).await;
2133                                }
2134                                other => {
2135                                    self.drain_stderr_into(&mut result).await;
2136                                    return Ok(other);
2137                                }
2138                            }
2139                        }
2140                        return Ok(ControlFlow::ok(result));
2141                    }
2142                }
2143
2144                // No match - return success with empty output (like sh)
2145                Ok(ControlFlow::ok(ExecResult::success("")))
2146            }
2147            Stmt::Break(levels) => {
2148                Ok(ControlFlow::break_n(levels.unwrap_or(1)))
2149            }
2150            Stmt::Continue(levels) => {
2151                Ok(ControlFlow::continue_n(levels.unwrap_or(1)))
2152            }
2153            Stmt::Return(expr) => {
2154                // return [N] - N becomes the exit code, NOT stdout
2155                // Shell semantics: return sets exit code, doesn't produce output
2156                let result = if let Some(e) = expr {
2157                    let val = self.eval_expr_async(e).await?;
2158                    let code = crate::interpreter::value_to_exit_code(&val)
2159                        .map_err(|e| anyhow::anyhow!("return: {}", e))?;
2160                    ExecResult::from_parts(code, String::new(), String::new(), None)
2161                } else {
2162                    ExecResult::success("")
2163                };
2164                Ok(ControlFlow::return_value(result))
2165            }
2166            Stmt::Exit(expr) => {
2167                let code = if let Some(e) = expr {
2168                    let val = self.eval_expr_async(e).await?;
2169                    crate::interpreter::value_to_exit_code(&val)
2170                        .map_err(|e| anyhow::anyhow!("exit: {}", e))?
2171                } else {
2172                    0
2173                };
2174                Ok(ControlFlow::exit_code(code))
2175            }
2176            Stmt::ToolDef(tool_def) => {
2177                let mut user_tools = self.user_tools.write().await;
2178                user_tools.insert(tool_def.name.clone(), tool_def.clone());
2179                Ok(ControlFlow::ok(ExecResult::success("")))
2180            }
2181            Stmt::AndChain { left, right } => {
2182                // cmd1 && cmd2 - run cmd2 only if cmd1 succeeds (exit code 0)
2183                // Suppress errexit for the left side — && handles failure itself.
2184                {
2185                    let mut scope = self.scope.write().await;
2186                    scope.suppress_errexit();
2187                }
2188                let left_flow = match self.execute_stmt_flow(left).await {
2189                    Ok(f) => f,
2190                    Err(e) => {
2191                        let mut scope = self.scope.write().await;
2192                        scope.unsuppress_errexit();
2193                        return Err(e);
2194                    }
2195                };
2196                {
2197                    let mut scope = self.scope.write().await;
2198                    scope.unsuppress_errexit();
2199                }
2200                match left_flow {
2201                    ControlFlow::Normal(mut left_result) => {
2202                        self.drain_stderr_into(&mut left_result).await;
2203                        self.update_last_result(&left_result).await;
2204                        if left_result.ok() {
2205                            let right_flow = self.execute_stmt_flow(right).await?;
2206                            match right_flow {
2207                                ControlFlow::Normal(mut right_result) => {
2208                                    self.drain_stderr_into(&mut right_result).await;
2209                                    self.update_last_result(&right_result).await;
2210                                    let mut combined = left_result;
2211                                    accumulate_result(&mut combined, &right_result);
2212                                    Ok(ControlFlow::ok(combined))
2213                                }
2214                                other => Ok(other),
2215                            }
2216                        } else {
2217                            Ok(ControlFlow::ok(left_result))
2218                        }
2219                    }
2220                    _ => Ok(left_flow),
2221                }
2222            }
2223            Stmt::OrChain { left, right } => {
2224                // cmd1 || cmd2 - run cmd2 only if cmd1 fails (non-zero exit code)
2225                // Suppress errexit for the left side — || handles failure itself.
2226                {
2227                    let mut scope = self.scope.write().await;
2228                    scope.suppress_errexit();
2229                }
2230                let left_flow = match self.execute_stmt_flow(left).await {
2231                    Ok(f) => f,
2232                    Err(e) => {
2233                        let mut scope = self.scope.write().await;
2234                        scope.unsuppress_errexit();
2235                        return Err(e);
2236                    }
2237                };
2238                {
2239                    let mut scope = self.scope.write().await;
2240                    scope.unsuppress_errexit();
2241                }
2242                match left_flow {
2243                    ControlFlow::Normal(mut left_result) => {
2244                        self.drain_stderr_into(&mut left_result).await;
2245                        self.update_last_result(&left_result).await;
2246                        if !left_result.ok() {
2247                            let right_flow = self.execute_stmt_flow(right).await?;
2248                            match right_flow {
2249                                ControlFlow::Normal(mut right_result) => {
2250                                    self.drain_stderr_into(&mut right_result).await;
2251                                    self.update_last_result(&right_result).await;
2252                                    let mut combined = left_result;
2253                                    accumulate_result(&mut combined, &right_result);
2254                                    Ok(ControlFlow::ok(combined))
2255                                }
2256                                other => Ok(other),
2257                            }
2258                        } else {
2259                            Ok(ControlFlow::ok(left_result))
2260                        }
2261                    }
2262                    _ => Ok(left_flow), // Propagate non-normal flow
2263                }
2264            }
2265            Stmt::Test(test_expr) => {
2266                let is_true = self.eval_test_async(test_expr).await?;
2267                if is_true {
2268                    Ok(ControlFlow::ok(ExecResult::success("")))
2269                } else {
2270                    Ok(ControlFlow::ok(ExecResult::failure(1, "")))
2271                }
2272            }
2273            Stmt::EnvScoped { assignments, body } => {
2274                // Inline env prefix (`NAME=value ... command`): apply the
2275                // assignments as EXPORTED vars in a fresh frame so the command
2276                // — and its subprocess environment — sees them, then unwind so
2277                // they do NOT persist (bash-style command-scoped env). Values
2278                // evaluate left-to-right with earlier ones already in scope, so
2279                // `A=1 B=$A cmd` works.
2280                {
2281                    let mut scope = self.scope.write().await;
2282                    scope.push_frame();
2283                }
2284                let mut prior_export: Vec<(String, bool)> =
2285                    Vec::with_capacity(assignments.len());
2286                let mut setup_err: Option<anyhow::Error> = None;
2287                for assign in assignments {
2288                    match self.eval_expr_async(&assign.value).await {
2289                        Ok(value) => {
2290                            let mut scope = self.scope.write().await;
2291                            prior_export
2292                                .push((assign.name.clone(), scope.is_exported(&assign.name)));
2293                            scope.set_exported(&assign.name, value);
2294                        }
2295                        Err(e) => {
2296                            setup_err = Some(e);
2297                            break;
2298                        }
2299                    }
2300                }
2301
2302                let flow = if setup_err.is_none() {
2303                    self.execute_stmt_flow(body).await
2304                } else {
2305                    Ok(ControlFlow::ok(ExecResult::success("")))
2306                };
2307
2308                // Unwind the env frame and restore export marks unconditionally
2309                // (names that were not exported before must not stay exported).
2310                {
2311                    let mut scope = self.scope.write().await;
2312                    scope.pop_frame();
2313                    for (name, was_exported) in &prior_export {
2314                        if !*was_exported {
2315                            scope.unexport(name);
2316                        }
2317                    }
2318                }
2319
2320                match setup_err {
2321                    Some(e) => Err(e),
2322                    None => flow,
2323                }
2324            }
2325            Stmt::Empty => Ok(ControlFlow::ok(ExecResult::success(""))),
2326        }
2327        }.instrument(span))
2328    }
2329
2330    /// Execute a pipeline.
2331    #[tracing::instrument(level = "debug", skip(self, pipeline), fields(background = pipeline.background, command_count = pipeline.commands.len()))]
2332    async fn execute_pipeline(&self, pipeline: &crate::ast::Pipeline) -> Result<ExecResult> {
2333        if pipeline.commands.is_empty() {
2334            return Ok(ExecResult::success(""));
2335        }
2336
2337        // Handle background execution (`&` operator)
2338        if pipeline.background {
2339            return self.execute_background(pipeline).await;
2340        }
2341
2342        // All commands go through the runner with the Kernel as dispatcher.
2343        // This is the single execution path — no fast path for single commands.
2344        //
2345        // IMPORTANT: We snapshot exec_ctx into a local context and release the
2346        // lock before running. This prevents deadlocks when dispatch_command
2347        // is called from within the pipeline and recursively triggers another
2348        // pipeline (e.g., via user-defined tools).
2349        let mut ctx = {
2350            let ec = self.exec_ctx.read().await;
2351            let scope = self.scope.read().await;
2352            ExecContext {
2353                backend: ec.backend.clone(),
2354                scope: scope.clone(),
2355                cwd: ec.cwd.clone(),
2356                prev_cwd: ec.prev_cwd.clone(),
2357                stdin: None,
2358                stdin_data: None,
2359                pipe_stdin: None,
2360                pipe_stdout: None,
2361                stderr: ec.stderr.clone(),
2362                tool_schemas: ec.tool_schemas.clone(),
2363                tools: ec.tools.clone(),
2364                job_manager: ec.job_manager.clone(),
2365                pipeline_position: PipelinePosition::Only,
2366                interactive: self.interactive,
2367                aliases: ec.aliases.clone(),
2368                ignore_config: ec.ignore_config.clone(),
2369                output_limit: ec.output_limit.clone(),
2370                allow_external_commands: self.allow_external_commands,
2371                nonce_store: ec.nonce_store.clone(),
2372                trash_backend: ec.trash_backend.clone(),
2373                #[cfg(all(unix, feature = "subprocess"))]
2374                terminal_state: ec.terminal_state.clone(),
2375                dispatcher: self.dispatcher(),
2376                cancel: {
2377                    #[allow(clippy::expect_used)]
2378                    let token = self.cancel_token.lock().expect("cancel_token poisoned");
2379                    token.clone()
2380                },
2381                output_format: None,
2382                vfs_budget: self.vfs_budget.clone(),
2383                watchdog: ec.watchdog.clone(),
2384                #[cfg(all(feature = "localfs", feature = "overlay"))]
2385                overlay_handle: self.overlay_handle.clone(),
2386            }
2387        }; // locks released
2388
2389        let mut result = self.runner.run(&pipeline.commands, &mut ctx, self).await;
2390
2391        // Post-hoc spill check (catches builtins and fast external commands)
2392        if ctx.output_limit.is_enabled() {
2393            let _ = crate::output_limit::spill_if_needed(&mut result, &ctx.output_limit).await;
2394        }
2395
2396        // Signal spill with exit 3; agent reads the spill file directly
2397        // (use `set +o output-limit` before cat/head/tail to bypass the limit)
2398        if result.did_spill {
2399            result.original_code = Some(result.code);
2400            result.code = 3;
2401        }
2402
2403        // Sync changes back from context
2404        {
2405            let mut ec = self.exec_ctx.write().await;
2406            ec.cwd = ctx.cwd.clone();
2407            ec.prev_cwd = ctx.prev_cwd.clone();
2408            ec.aliases = ctx.aliases.clone();
2409            ec.ignore_config = ctx.ignore_config.clone();
2410            ec.output_limit = ctx.output_limit.clone();
2411        }
2412        {
2413            let mut scope = self.scope.write().await;
2414            *scope = ctx.scope.clone();
2415        }
2416
2417        Ok(result)
2418    }
2419
2420    /// Execute a pipeline in the background.
2421    ///
2422    /// The command is spawned as a tokio task, registered with the JobManager,
2423    /// and its output is captured via BoundedStreams. The job is observable via
2424    /// `/v/jobs/{id}/stdout`, `/v/jobs/{id}/stderr`, and `/v/jobs/{id}/status`.
2425    ///
2426    /// Returns immediately with a job ID like "[1]".
2427    #[tracing::instrument(level = "debug", skip(self, pipeline), fields(command_count = pipeline.commands.len()))]
2428    async fn execute_background(&self, pipeline: &crate::ast::Pipeline) -> Result<ExecResult> {
2429        use tokio::sync::oneshot;
2430
2431        // Format the command for display in /v/jobs/{id}/command
2432        let command_str = self.format_pipeline(pipeline);
2433
2434        // Create bounded streams for output capture
2435        let stdout = Arc::new(BoundedStream::default_size());
2436        let stderr = Arc::new(BoundedStream::default_size());
2437
2438        // Create channel for result notification
2439        let (tx, rx) = oneshot::channel();
2440
2441        // Register with JobManager to get job ID and create VFS entries
2442        let job_id = self.jobs.register_with_streams(
2443            command_str.clone(),
2444            rx,
2445            stdout.clone(),
2446            stderr.clone(),
2447        ).await;
2448
2449        // Fork the kernel for this background job. The fork snapshots the
2450        // parent's scope/cwd/aliases/user_tools so mutations stay isolated,
2451        // while sharing the job manager, VFS, and tool registry. The fork's
2452        // full dispatch chain (user tools, .kai scripts, `$(...)` in args)
2453        // is available here — something BackendDispatcher couldn't provide.
2454        //
2455        // The fork gets its own cancellation token (recorded on the job so
2456        // `kill %N` can stop the job — including a pure-builtin job with no OS
2457        // process group) and is stamped with the job id so any external
2458        // command it spawns records its process group for `kill -<sig> %N`.
2459        let cancel = tokio_util::sync::CancellationToken::new();
2460        self.jobs.set_cancel_token(job_id, cancel.clone()).await;
2461        let fork = self.fork_for_background(cancel, job_id).await;
2462        let runner = self.runner.clone();
2463        let commands = pipeline.commands.clone();
2464
2465        // Snapshot the fork's exec_ctx for the spawned task. We have to do
2466        // this before tokio::spawn because the fork's exec_ctx is behind a
2467        // tokio RwLock and we want the spawned task to own its ctx.
2468        let mut bg_ctx = {
2469            let ec = fork.exec_ctx.read().await;
2470            ec.child_for_pipeline()
2471        };
2472        bg_ctx.scope = fork.scope.read().await.clone();
2473        // The fork's dispatcher points at the fork itself; set it here so
2474        // builtins inside the background task (e.g. timeout) re-dispatch
2475        // through the fork, not the parent.
2476        bg_ctx.dispatcher = fork.dispatcher();
2477
2478        // Spawn the background task. Propagate the embedder's trace context
2479        // across the spawn boundary so the job's spans stay in the same trace.
2480        tokio::spawn(crate::telemetry::bind_current_context(async move {
2481            // runner.run needs a &dyn CommandDispatcher; fork.as_ref()
2482            // gives us that (Kernel implements CommandDispatcher).
2483            let result = runner.run(&commands, &mut bg_ctx, fork.as_ref()).await;
2484
2485            // Write output to streams
2486            let text = result.text_out();
2487            if !text.is_empty() {
2488                stdout.write(text.as_bytes()).await;
2489            }
2490            if !result.err.is_empty() {
2491                stderr.write(result.err.as_bytes()).await;
2492            }
2493
2494            // Close streams
2495            stdout.close().await;
2496            stderr.close().await;
2497
2498            // Send result to JobManager (ignore error if receiver dropped)
2499            let _ = tx.send(result);
2500        }));
2501
2502        Ok(ExecResult::success(format!("[{}]", job_id)))
2503    }
2504
2505    /// Format a pipeline as a command string for display.
2506    fn format_pipeline(&self, pipeline: &crate::ast::Pipeline) -> String {
2507        pipeline.commands
2508            .iter()
2509            .map(|cmd| {
2510                let mut parts = vec![cmd.name.clone()];
2511                for arg in &cmd.args {
2512                    match arg {
2513                        Arg::Positional(expr) => {
2514                            parts.push(self.format_expr(expr));
2515                        }
2516                        Arg::Named { key, value } => {
2517                            parts.push(format!("--{}={}", key, self.format_expr(value)));
2518                        }
2519                        Arg::WordAssign { key, value } => {
2520                            parts.push(format!("{}={}", key, self.format_expr(value)));
2521                        }
2522                        Arg::ShortFlag(name) => {
2523                            parts.push(format!("-{}", name));
2524                        }
2525                        Arg::LongFlag(name) => {
2526                            parts.push(format!("--{}", name));
2527                        }
2528                        Arg::DoubleDash => {
2529                            parts.push("--".to_string());
2530                        }
2531                    }
2532                }
2533                parts.join(" ")
2534            })
2535            .collect::<Vec<_>>()
2536            .join(" | ")
2537    }
2538
2539    /// Format an expression as a string for display.
2540    fn format_expr(&self, expr: &Expr) -> String {
2541        match expr {
2542            Expr::Literal(Value::String(s)) => {
2543                if s.contains(' ') || s.contains('"') {
2544                    format!("'{}'", s.replace('\'', "\\'"))
2545                } else {
2546                    s.clone()
2547                }
2548            }
2549            Expr::Literal(Value::Int(i)) => i.to_string(),
2550            Expr::Literal(Value::Float(f)) => f.to_string(),
2551            Expr::Literal(Value::Bool(b)) => b.to_string(),
2552            Expr::Literal(Value::Null) => "null".to_string(),
2553            Expr::VarRef(path) => {
2554                let name = path.segments.iter()
2555                    .map(|seg| match seg {
2556                        crate::ast::VarSegment::Field(f) => f.clone(),
2557                    })
2558                    .collect::<Vec<_>>()
2559                    .join(".");
2560                format!("${{{}}}", name)
2561            }
2562            Expr::Interpolated(_) => "\"...\"".to_string(),
2563            Expr::HereDocBody { .. } => "<<heredoc".to_string(),
2564            _ => "...".to_string(),
2565        }
2566    }
2567
2568    /// Execute a single command.
2569    async fn execute_command(&self, name: &str, args: &[Arg]) -> Result<ExecResult> {
2570        self.execute_command_depth(name, args, 0).await
2571    }
2572
2573    #[tracing::instrument(level = "info", skip(self, args, alias_depth), fields(command = %name), err)]
2574    async fn execute_command_depth(&self, name: &str, args: &[Arg], alias_depth: u8) -> Result<ExecResult> {
2575        // Special built-ins
2576        match name {
2577            "true" => return Ok(ExecResult::success("")),
2578            "false" => return Ok(ExecResult::failure(1, "")),
2579            "source" | "." => return self.execute_source(args).await,
2580            _ => {}
2581        }
2582
2583        // Alias expansion (with recursion limit)
2584        if alias_depth < 10 {
2585            let alias_value = {
2586                let ctx = self.exec_ctx.read().await;
2587                ctx.aliases.get(name).cloned()
2588            };
2589            if let Some(alias_val) = alias_value {
2590                // Split alias value into command + args
2591                let parts: Vec<&str> = alias_val.split_whitespace().collect();
2592                if let Some((alias_cmd, alias_args)) = parts.split_first() {
2593                    let mut new_args: Vec<Arg> = alias_args
2594                        .iter()
2595                        .map(|a| Arg::Positional(Expr::Literal(Value::String(a.to_string()))))
2596                        .collect();
2597                    new_args.extend_from_slice(args);
2598                    return Box::pin(self.execute_command_depth(alias_cmd, &new_args, alias_depth + 1)).await;
2599                }
2600            }
2601        }
2602
2603        // Handle /v/bin/ prefix — dispatch to builtins via virtual path
2604        if let Some(builtin_name) = name.strip_prefix("/v/bin/") {
2605            return match self.tools.get(builtin_name) {
2606                Some(_) => Box::pin(self.execute_command_depth(builtin_name, args, alias_depth)).await,
2607                None => Ok(ExecResult::failure(127, format!("command not found: {}", name))),
2608            };
2609        }
2610
2611        // Check user-defined tools first
2612        {
2613            let user_tools = self.user_tools.read().await;
2614            if let Some(tool_def) = user_tools.get(name) {
2615                let tool_def = tool_def.clone();
2616                drop(user_tools);
2617                return self.execute_user_tool(tool_def, args).await;
2618            }
2619        }
2620
2621        // Look up builtin tool
2622        let tool = match self.tools.get(name) {
2623            Some(t) => t,
2624            None => {
2625                // Try executing as .kai script from PATH
2626                if let Some(result) = self.try_execute_script(name, args).await? {
2627                    return Ok(result);
2628                }
2629                // Try executing as external command from PATH
2630                if let Some(result) = self.try_execute_external(name, args).await? {
2631                    return Ok(result);
2632                }
2633
2634                // Try backend-registered tools (embedder engines, etc.)
2635                // Look up tool schema for positional→named mapping.
2636                // Clone backend and drop read lock before awaiting (may involve network I/O).
2637                // Backend tools expect named JSON params, so enable positional mapping.
2638                let backend = self.exec_ctx.read().await.backend.clone();
2639                let tool_schema = backend.get_tool(name).await.ok().flatten().map(|t| {
2640                    let mut s = t.schema;
2641                    // Flat backend/MCP tools expect named JSON params, so map
2642                    // bare positionals onto named params. Subcommand-aware tools
2643                    // route positionals through the subcommand path and declare
2644                    // map_positionals per leaf (kj keeps it false so it re-parses
2645                    // the argv with its own clap) — don't blanket-override them.
2646                    if s.subcommands.is_empty() {
2647                        s.map_positionals = true;
2648                    }
2649                    s
2650                });
2651                let tool_args = self.build_args_async(args, tool_schema.as_ref()).await?;
2652                let mut ctx = self.exec_ctx.write().await;
2653                {
2654                    let scope = self.scope.read().await;
2655                    ctx.scope = scope.clone();
2656                }
2657                let backend = ctx.backend.clone();
2658                match backend.call_tool(name, tool_args, &mut *ctx).await {
2659                    Ok(tool_result) => {
2660                        let mut scope = self.scope.write().await;
2661                        *scope = ctx.scope.clone();
2662                        let mut exec = ExecResult::from_output(
2663                            tool_result.code as i64, tool_result.stdout, tool_result.stderr,
2664                        );
2665                        exec.set_output(tool_result.output);
2666                        return Ok(exec);
2667                    }
2668                    Err(BackendError::ToolNotFound(_)) => {
2669                        // Fall through to "command not found"
2670                    }
2671                    Err(e) => {
2672                        // Backend dispatch is last-resort lookup — if it fails
2673                        // for any reason, the command simply doesn't exist.
2674                        tracing::debug!("backend error for {name}: {e}");
2675                    }
2676                }
2677
2678                return Ok(ExecResult::failure(127, format!("command not found: {}", name)));
2679            }
2680        };
2681
2682        // Build arguments (async to support command substitution, schema-aware for flag values)
2683        let schema = tool.schema();
2684        let tool_args = self.build_args_async(args, Some(&schema)).await?;
2685
2686        // --help / -h: show help unless the tool's schema claims that flag
2687        let schema_claims = |flag: &str| -> bool {
2688            let bare = flag.trim_start_matches('-');
2689            schema.params.iter().any(|p| p.matches_flag(flag) || p.matches_flag(bare))
2690        };
2691        let wants_help =
2692            (tool_args.flags.contains("help") && !schema_claims("help"))
2693            || (tool_args.flags.contains("h") && !schema_claims("-h"));
2694        if wants_help {
2695            let help_topic = crate::help::HelpTopic::Tool(name.to_string());
2696            let ctx = self.exec_ctx.read().await;
2697            let content = crate::help::get_help(&help_topic, &ctx.tool_schemas);
2698            return Ok(ExecResult::with_output(crate::interpreter::OutputData::text(content)));
2699        }
2700
2701        // Snapshot exec_ctx into a local context and release the write lock
2702        // before calling tool.execute. Holding the write across tool execution
2703        // would deadlock any builtin that re-dispatches through ctx.dispatcher
2704        // (timeout, scatter) — the inner dispatch_command needs its own
2705        // exec_ctx.write() and would block forever.
2706        let mut ctx = {
2707            let ec = self.exec_ctx.write().await;
2708            let scope = self.scope.read().await;
2709            ExecContext {
2710                backend: ec.backend.clone(),
2711                scope: scope.clone(),
2712                cwd: ec.cwd.clone(),
2713                prev_cwd: ec.prev_cwd.clone(),
2714                stdin: ec.stdin.clone(),
2715                stdin_data: ec.stdin_data.clone(),
2716                pipe_stdin: None, // streaming pipes are per-pipeline; not snapshotted
2717                pipe_stdout: None,
2718                stderr: ec.stderr.clone(),
2719                tool_schemas: ec.tool_schemas.clone(),
2720                tools: ec.tools.clone(),
2721                job_manager: ec.job_manager.clone(),
2722                pipeline_position: ec.pipeline_position,
2723                interactive: self.interactive,
2724                aliases: ec.aliases.clone(),
2725                ignore_config: ec.ignore_config.clone(),
2726                output_limit: ec.output_limit.clone(),
2727                allow_external_commands: self.allow_external_commands,
2728                nonce_store: ec.nonce_store.clone(),
2729                trash_backend: ec.trash_backend.clone(),
2730                #[cfg(all(unix, feature = "subprocess"))]
2731                terminal_state: ec.terminal_state.clone(),
2732                dispatcher: self.dispatcher(),
2733                // Use ec.cancel (set by dispatch_command from the runner's
2734                // ctx.cancel) so any builtin-swapped child token (e.g. timeout's
2735                // child token) reaches the spawned external via wait_or_kill.
2736                // Falls back to the kernel's own token when ec.cancel is the
2737                // default fresh token from a non-dispatch path.
2738                cancel: ec.cancel.clone(),
2739                output_format: None,
2740                vfs_budget: self.vfs_budget.clone(),
2741                watchdog: ec.watchdog.clone(),
2742                #[cfg(all(feature = "localfs", feature = "overlay"))]
2743                overlay_handle: self.overlay_handle.clone(),
2744            }
2745        }; // both locks released — tool.execute can re-dispatch safely
2746
2747        // Move stdin out of self.exec_ctx into the snapshot (consumed-by-tool
2748        // semantics): take() so a later dispatch doesn't see stale stdin.
2749        // Done after the snapshot above so we hold the write briefly.
2750        {
2751            let mut ec = self.exec_ctx.write().await;
2752            ctx.stdin = ec.stdin.take();
2753            ctx.stdin_data = ec.stdin_data.take();
2754            ctx.pipe_stdin = ec.pipe_stdin.take();
2755            ctx.pipe_stdout = ec.pipe_stdout.take();
2756        }
2757
2758        // Honor --json before the builtin runs so its setting survives a clap
2759        // parse failure (e.g. `cmd --json --bogus-flag` would otherwise drop
2760        // --json on the floor when `try_parse_from` returns Err early).
2761        // The builtin's own `parsed.global.apply(ctx)` becomes idempotent.
2762        GlobalFlags::apply_from_args(&tool_args, &mut ctx);
2763
2764        let result = tool.execute(tool_args, &mut ctx).await;
2765
2766        // Sync mutations back. Tools may have changed scope (set/cd),
2767        // cwd/prev_cwd (cd), and aliases (alias). Also return any unused pipe
2768        // endpoints to self.exec_ctx so dispatch_command's post-execute sync
2769        // hands them back to the pipeline runner — the runner uses
2770        // stage_ctx.pipe_stdout to write the result to the next stage when
2771        // the tool itself didn't take and write to it.
2772        {
2773            let mut scope = self.scope.write().await;
2774            *scope = ctx.scope.clone();
2775        }
2776        {
2777            let mut ec = self.exec_ctx.write().await;
2778            ec.cwd = ctx.cwd;
2779            ec.prev_cwd = ctx.prev_cwd;
2780            ec.aliases = ctx.aliases;
2781            // A builtin (`set -o output-limit`, `kaish-output-limit set`) can
2782            // mutate the runtime output limit; without this sync the change is
2783            // dropped here and never reaches dispatch_command's read-back, so
2784            // it would not survive past the current statement.
2785            ec.output_limit = ctx.output_limit.clone();
2786            ec.pipe_stdin = ctx.pipe_stdin.take();
2787            ec.pipe_stdout = ctx.pipe_stdout.take();
2788        }
2789
2790        // Builtins parse --json via the GlobalFlags flatten in their clap
2791        // struct and write ctx.output_format. The kernel applies it — unless the
2792        // tool owns its own output (renders --json itself), in which case we
2793        // leave its bytes untouched.
2794        let result = finalize_output(result, ctx.output_format, schema.owns_output);
2795
2796        Ok(result)
2797    }
2798
2799    /// The session `HOME` from the kernel scope, if set. Tilde expansion reads
2800    /// this rather than `std::env::var("HOME")` so the kernel stays hermetic —
2801    /// a hermetic embedder (empty `initial_vars`) gets `None`, and `~` is left
2802    /// unexpanded rather than leaking the host home directory.
2803    async fn scope_home(&self) -> Option<String> {
2804        match self.scope.read().await.get("HOME") {
2805            Some(Value::String(s)) => Some(s.clone()),
2806            _ => None,
2807        }
2808    }
2809
2810    /// Pull `consumes` positional args after a non-bool flag and stash them
2811    /// on `tool_args.named` under the canonical param name.
2812    ///
2813    /// - `consumes == 1` keeps the historical contract: a single scalar value.
2814    /// - `consumes > 1` accumulates each occurrence as an inner
2815    ///   `serde_json::Value::Array` inside `named[canonical] =
2816    ///   Value::Json(Array(...))`, preserving invocation order. This is the
2817    ///   shape jq's `--arg NAME VAL` / `--argjson NAME VAL` land in.
2818    ///
2819    /// Errors loudly if the flag is missing required positionals — matches
2820    /// kaish's "no silent fallback" posture and mirrors real jq, which
2821    /// errors on `--arg NAME` with no value.
2822    #[allow(clippy::too_many_arguments)]
2823    async fn consume_flag_positionals(
2824        &self,
2825        args: &[Arg],
2826        flag_name: &str,
2827        canonical: &str,
2828        consumes: usize,
2829        positional_indices: &[usize],
2830        consumed: &mut std::collections::HashSet<usize>,
2831        current_idx: usize,
2832        tool_args: &mut ToolArgs,
2833    ) -> Result<()> {
2834        let home = self.scope_home().await;
2835        let mut collected: Vec<Value> = Vec::with_capacity(consumes.max(1));
2836        for _ in 0..consumes.max(1) {
2837            let next_pos = positional_indices
2838                .iter()
2839                .find(|idx| **idx > current_idx && !consumed.contains(idx))
2840                .copied();
2841            match next_pos {
2842                Some(pos_idx) => {
2843                    if let Arg::Positional(expr) = &args[pos_idx] {
2844                        let value = self.eval_expr_async(expr).await?;
2845                        let value = apply_tilde_expansion(value, home.as_deref());
2846                        collected.push(value);
2847                        consumed.insert(pos_idx);
2848                    }
2849                }
2850                None => {
2851                    if consumes <= 1 && collected.is_empty() {
2852                        // Back-compat: a flag with no follow-up positional
2853                        // becomes a bare flag. `--path` with nothing after
2854                        // lands in `flags`, same as before this refactor.
2855                        tool_args.flags.insert(flag_name.to_string());
2856                        return Ok(());
2857                    }
2858                    anyhow::bail!(
2859                        "--{flag_name} requires {consumes} argument{}, got {}",
2860                        if consumes == 1 { "" } else { "s" },
2861                        collected.len()
2862                    );
2863                }
2864            }
2865        }
2866
2867        if consumes <= 1 {
2868            if let Some(v) = collected.pop() {
2869                tool_args.named.insert(canonical.to_string(), v);
2870            }
2871            return Ok(());
2872        }
2873
2874        // Multi-consume: accumulate under named[canonical] as array-of-arrays.
2875        let occ: Vec<serde_json::Value> = collected
2876            .into_iter()
2877            .map(|v| crate::interpreter::value_to_json(&v))
2878            .collect();
2879        let entry = tool_args
2880            .named
2881            .entry(canonical.to_string())
2882            .or_insert_with(|| Value::Json(serde_json::Value::Array(Vec::new())));
2883        if let Value::Json(serde_json::Value::Array(outer)) = entry {
2884            outer.push(serde_json::Value::Array(occ));
2885        } else {
2886            anyhow::bail!(
2887                "--{flag_name}: named[{canonical}] already holds a non-array value"
2888            );
2889        }
2890        Ok(())
2891    }
2892
2893    /// Build tool arguments from AST args.
2894    ///
2895    /// Uses async evaluation to support command substitution in arguments.
2896    async fn build_args_async(&self, args: &[Arg], schema: Option<&crate::tools::ToolSchema>) -> Result<ToolArgs> {
2897        let mut tool_args = ToolArgs::new();
2898        let home = self.scope_home().await;
2899        // Subcommand-aware tools (e.g. `kj context list`) expose a tree of
2900        // schemas; pick the leaf the leading positionals route to and bind
2901        // flags against *its* params. Flat tools return the root. select_leaf
2902        // errors (fail loud) if a computed positional sits where a subcommand
2903        // selector is required.
2904        let leaf = match schema {
2905            Some(s) => Some(select_leaf(s, args)?),
2906            None => None,
2907        };
2908        // Bind against the leaf's params, but MERGE the root schema's params on
2909        // top as "global" flags: a value-flag declared at the tool's top level
2910        // (e.g. kj's `--confirm <nonce>`) must bind at every leaf, including when
2911        // it trails the subcommand path (`kj context retag a b --confirm <n>`).
2912        // The leaf wins on name conflicts. For a flat tool, leaf == root, so the
2913        // merge is a harmless no-op.
2914        let mut param_lookup = schema.map(schema_param_lookup).unwrap_or_default();
2915        if let Some(l) = leaf {
2916            param_lookup.extend(schema_param_lookup(l));
2917        }
2918        // accepts_word_assign keys off the root tool name (the WORD_ASSIGN list),
2919        // not the leaf — it's a property of the command, not the subcommand.
2920        let accepts_word_assign = schema
2921            .map(|s| crate::tools::accepts_word_assign(s.name.as_str()))
2922            .unwrap_or(false);
2923
2924        // Track which positional indices have been consumed as flag values
2925        let mut consumed: std::collections::HashSet<usize> = std::collections::HashSet::new();
2926        let mut past_double_dash = false;
2927
2928        // Find positional arg indices for flag value consumption
2929        let positional_indices: Vec<usize> = args.iter().enumerate()
2930            .filter_map(|(i, a)| matches!(a, Arg::Positional(_)).then_some(i))
2931            .collect();
2932
2933        let mut i = 0;
2934        while i < args.len() {
2935            match &args[i] {
2936                Arg::DoubleDash => {
2937                    past_double_dash = true;
2938                }
2939                Arg::Positional(expr) => {
2940                    if !consumed.contains(&i) {
2941                        // Glob expansion: bare glob patterns expand to matching files
2942                        if let Expr::GlobPattern(pattern) = expr {
2943                            let glob_enabled = {
2944                                let scope = self.scope.read().await;
2945                                scope.glob_enabled()
2946                            };
2947                            if glob_enabled {
2948                                let (paths, cwd) = {
2949                                    let ctx = self.exec_ctx.read().await;
2950                                    let paths = ctx.expand_glob(pattern).await
2951                                        .map_err(|e| anyhow::anyhow!("glob: {}", e))?;
2952                                    let cwd = ctx.resolve_path(".");
2953                                    (paths, cwd)
2954                                };
2955                                if paths.is_empty() {
2956                                    return Err(anyhow::anyhow!("no matches: {}", pattern));
2957                                }
2958                                for path in paths {
2959                                    let display = if !pattern.starts_with('/') {
2960                                        path.strip_prefix(&cwd)
2961                                            .unwrap_or(&path)
2962                                            .to_string_lossy().into_owned()
2963                                    } else {
2964                                        path.to_string_lossy().into_owned()
2965                                    };
2966                                    tool_args.positional.push(Value::String(display));
2967                                }
2968                                i += 1;
2969                                continue;
2970                            }
2971                        }
2972                        let value = self.eval_expr_async(expr).await?;
2973                        let value = apply_tilde_expansion(value, home.as_deref());
2974                        tool_args.positional.push(value);
2975                    }
2976                }
2977                Arg::Named { key, value } => {
2978                    let val = self.eval_expr_async(value).await?;
2979                    let val = apply_tilde_expansion(val, home.as_deref());
2980                    tool_args.named.insert(key.clone(), val);
2981                }
2982                Arg::WordAssign { key, value } => {
2983                    let val = self.eval_expr_async(value).await?;
2984                    let val = apply_tilde_expansion(val, home.as_deref());
2985                    if accepts_word_assign {
2986                        tool_args.named.insert(key.clone(), val);
2987                    } else {
2988                        // Stringify "key=value" and pass as a positional.
2989                        // Matches bash: `cat foo=bar` opens a file named `foo=bar`.
2990                        let val_str = crate::interpreter::value_to_string(&val);
2991                        tool_args.positional.push(Value::String(format!("{key}={val_str}")));
2992                    }
2993                }
2994                Arg::ShortFlag(name) => {
2995                    if past_double_dash {
2996                        tool_args.positional.push(Value::String(format!("-{name}")));
2997                    } else if name.len() == 1 {
2998                        let flag_name = name.as_str();
2999                        let lookup = param_lookup.get(flag_name);
3000                        let is_bool = lookup.map(|(_, typ, _)| is_bool_type(typ)).unwrap_or(true);
3001
3002                        if is_bool {
3003                            tool_args.flags.insert(flag_name.to_string());
3004                        } else {
3005                            // Non-bool: consume `consumes` positionals as value(s)
3006                            let canonical = lookup.map(|(n, _, _)| *n).unwrap_or(flag_name);
3007                            let consumes = lookup.map(|(_, _, c)| *c).unwrap_or(1);
3008                            self.consume_flag_positionals(
3009                                args,
3010                                name,
3011                                canonical,
3012                                consumes,
3013                                &positional_indices,
3014                                &mut consumed,
3015                                i,
3016                                &mut tool_args,
3017                            )
3018                            .await?;
3019                        }
3020                    } else if let Some(&(canonical, typ, consumes)) = param_lookup.get(name.as_str()) {
3021                        // Multi-char short flag matches a schema param (POSIX style: -name value)
3022                        if is_bool_type(typ) {
3023                            tool_args.flags.insert(canonical.to_string());
3024                        } else {
3025                            self.consume_flag_positionals(
3026                                args,
3027                                name,
3028                                canonical,
3029                                consumes,
3030                                &positional_indices,
3031                                &mut consumed,
3032                                i,
3033                                &mut tool_args,
3034                            )
3035                            .await?;
3036                        }
3037                    } else {
3038                        // Multi-char combined flags like -la: always boolean
3039                        for c in name.chars() {
3040                            tool_args.flags.insert(c.to_string());
3041                        }
3042                    }
3043                }
3044                Arg::LongFlag(name) => {
3045                    if past_double_dash {
3046                        tool_args.positional.push(Value::String(format!("--{name}")));
3047                    } else {
3048                        let lookup = param_lookup.get(name.as_str());
3049                        // An *undeclared* long flag under a `map_positionals`
3050                        // (backend/MCP) schema that is immediately followed by an
3051                        // unconsumed positional is ambiguous: kaish can't tell the
3052                        // space-form value (`--type explorer`) from a bool flag
3053                        // before a real positional (`--force file.txt`). Defaulting
3054                        // to bool here silently divorces the value and misroutes it
3055                        // — a privilege-escalation-by-typo against deny-by-default
3056                        // embedders (docs/issues.md). Fail loud instead of guessing.
3057                        let ambiguous_value = (lookup.is_none()
3058                            && leaf.is_some_and(|s| s.map_positionals)
3059                            && !consumed.contains(&(i + 1)))
3060                            .then(|| match args.get(i + 1) {
3061                                // Echo a concrete value for a copy-pasteable fix
3062                                // when it's a plain literal; fall back to VALUE.
3063                                Some(Arg::Positional(Expr::Literal(Value::String(s)))) => {
3064                                    Some(s.clone())
3065                                }
3066                                Some(Arg::Positional(_)) => Some("VALUE".to_string()),
3067                                _ => None,
3068                            })
3069                            .flatten();
3070                        if let Some(val) = ambiguous_value {
3071                            let tool = leaf.map(|s| s.name.as_str()).unwrap_or("command");
3072                            anyhow::bail!(
3073                                "{tool}: --{name} is not a declared flag, so the \
3074                                 space-separated value would be silently dropped. \
3075                                 Use --{name}={val}, or have {tool} declare --{name} \
3076                                 in its schema."
3077                            );
3078                        }
3079                        let is_bool = lookup.map(|(_, typ, _)| is_bool_type(typ)).unwrap_or(true);
3080
3081                        if is_bool {
3082                            tool_args.flags.insert(name.clone());
3083                        } else {
3084                            let canonical = lookup.map(|(n, _, _)| *n).unwrap_or(name.as_str());
3085                            let consumes = lookup.map(|(_, _, c)| *c).unwrap_or(1);
3086                            self.consume_flag_positionals(
3087                                args,
3088                                name,
3089                                canonical,
3090                                consumes,
3091                                &positional_indices,
3092                                &mut consumed,
3093                                i,
3094                                &mut tool_args,
3095                            )
3096                            .await?;
3097                        }
3098                    }
3099                }
3100            }
3101            i += 1;
3102        }
3103
3104        // Map remaining positionals to unfilled non-bool schema params (in order).
3105        // This enables `drift_push "abc" "hello"` → named["target_ctx"] = "abc", named["content"] = "hello"
3106        // Positionals that appeared after `--` are never mapped (they're raw data).
3107        // Only for backend/external tools (map_positionals=true). Builtins handle their own positionals.
3108        // Keyed off the routed leaf so a subcommand tool maps against the active
3109        // leaf's params (kj leaves keep map_positionals=false → block skipped).
3110        if let Some(schema) = leaf.filter(|s| s.map_positionals) {
3111            let pre_dash_count = if past_double_dash {
3112                let dash_pos = args.iter().position(|a| matches!(a, Arg::DoubleDash)).unwrap_or(args.len());
3113                positional_indices.iter()
3114                    .filter(|idx| **idx < dash_pos && !consumed.contains(idx))
3115                    .count()
3116            } else {
3117                tool_args.positional.len()
3118            };
3119
3120            let mut remaining = Vec::new();
3121            let mut positional_iter = tool_args.positional.drain(..).enumerate();
3122
3123            for param in &schema.params {
3124                if tool_args.named.contains_key(&param.name) || tool_args.flags.contains(&param.name) {
3125                    continue;
3126                }
3127                if is_bool_type(&param.param_type) {
3128                    continue;
3129                }
3130                loop {
3131                    match positional_iter.next() {
3132                        Some((idx, val)) if idx < pre_dash_count => {
3133                            tool_args.named.insert(param.name.clone(), val);
3134                            break;
3135                        }
3136                        Some((_, val)) => {
3137                            remaining.push(val);
3138                        }
3139                        None => break,
3140                    }
3141                }
3142            }
3143
3144            remaining.extend(positional_iter.map(|(_, v)| v));
3145            tool_args.positional = remaining;
3146        }
3147
3148        Ok(tool_args)
3149    }
3150
3151    /// Build arguments as flat string list for external commands.
3152    ///
3153    /// Unlike `build_args_async` which separates flags into a HashSet (for schema-aware builtins),
3154    /// this preserves the original flag format as strings for external commands:
3155    /// - `-l` stays as `-l`
3156    /// - `--verbose` stays as `--verbose`
3157    /// - `key=value` stays as `key=value`
3158    ///
3159    /// This is what external commands expect in their argv.
3160    #[cfg(feature = "subprocess")]
3161    async fn build_args_flat(&self, args: &[Arg]) -> Result<Vec<String>> {
3162        let mut argv = Vec::new();
3163        let home = self.scope_home().await;
3164        for arg in args {
3165            match arg {
3166                Arg::Positional(expr) => {
3167                    // Glob expansion for external commands
3168                    if let Expr::GlobPattern(pattern) = expr {
3169                        let glob_enabled = {
3170                            let scope = self.scope.read().await;
3171                            scope.glob_enabled()
3172                        };
3173                        if glob_enabled {
3174                            let (paths, cwd) = {
3175                                let ctx = self.exec_ctx.read().await;
3176                                let paths = ctx.expand_glob(pattern).await
3177                                    .map_err(|e| anyhow::anyhow!("glob: {}", e))?;
3178                                let cwd = ctx.resolve_path(".");
3179                                (paths, cwd)
3180                            };
3181                            if paths.is_empty() {
3182                                return Err(anyhow::anyhow!("no matches: {}", pattern));
3183                            }
3184                            for path in paths {
3185                                let display = if !pattern.starts_with('/') {
3186                                    path.strip_prefix(&cwd)
3187                                        .unwrap_or(&path)
3188                                        .to_string_lossy().into_owned()
3189                                } else {
3190                                    path.to_string_lossy().into_owned()
3191                                };
3192                                argv.push(display);
3193                            }
3194                            continue;
3195                        }
3196                    }
3197                    let value = self.eval_expr_async(expr).await?;
3198                    let value = apply_tilde_expansion(value, home.as_deref());
3199                    argv.push(value_to_string(&value));
3200                }
3201                Arg::Named { key, value } => {
3202                    let val = self.eval_expr_async(value).await?;
3203                    let val = apply_tilde_expansion(val, home.as_deref());
3204                    argv.push(format!("--{}={}", key, value_to_string(&val)));
3205                }
3206                Arg::WordAssign { key, value } => {
3207                    let val = self.eval_expr_async(value).await?;
3208                    let val = apply_tilde_expansion(val, home.as_deref());
3209                    argv.push(format!("{}={}", key, value_to_string(&val)));
3210                }
3211                Arg::ShortFlag(name) => {
3212                    // Preserve original format: -l, -la (combined flags)
3213                    argv.push(format!("-{}", name));
3214                }
3215                Arg::LongFlag(name) => {
3216                    // Preserve original format: --verbose
3217                    argv.push(format!("--{}", name));
3218                }
3219                Arg::DoubleDash => {
3220                    // Preserve the -- marker
3221                    argv.push("--".to_string());
3222                }
3223            }
3224        }
3225        Ok(argv)
3226    }
3227
3228    /// Async expression evaluator that supports command substitution.
3229    ///
3230    /// This is used for contexts where expressions may contain `$(...)` command
3231    /// substitution. Unlike the sync `eval_expr`, this can execute pipelines.
3232    fn eval_expr_async<'a>(&'a self, expr: &'a Expr) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Value>> + Send + 'a>> {
3233        Box::pin(async move {
3234        match expr {
3235            Expr::Literal(value) => Ok(value.clone()),
3236            Expr::VarRef(path) => {
3237                let scope = self.scope.read().await;
3238                scope.resolve_path(path)
3239                    .ok_or_else(|| anyhow::anyhow!("undefined variable"))
3240            }
3241            Expr::Interpolated(parts) => {
3242                let mut result = String::new();
3243                for part in parts {
3244                    result.push_str(&self.eval_string_part_async(part).await?);
3245                }
3246                Ok(Value::String(result))
3247            }
3248            Expr::HereDocBody { parts, strip_tabs } => {
3249                let mut result = String::new();
3250                for sp in parts {
3251                    result.push_str(&self.eval_string_part_async(&sp.part).await?);
3252                }
3253                if *strip_tabs {
3254                    Ok(Value::String(crate::interpreter::strip_leading_tabs(&result)))
3255                } else {
3256                    Ok(Value::String(result))
3257                }
3258            }
3259            Expr::BinaryOp { left, op, right } => match op {
3260                BinaryOp::And => {
3261                    let left_val = self.eval_expr_async(left).await?;
3262                    if !is_truthy(&left_val) {
3263                        return Ok(left_val);
3264                    }
3265                    self.eval_expr_async(right).await
3266                }
3267                BinaryOp::Or => {
3268                    let left_val = self.eval_expr_async(left).await?;
3269                    if is_truthy(&left_val) {
3270                        return Ok(left_val);
3271                    }
3272                    self.eval_expr_async(right).await
3273                }
3274            },
3275            Expr::CommandSubst(stmts) => {
3276                // Snapshot scope+cwd before running — only output escapes,
3277                // not side effects like `cd` or variable assignments.
3278                let saved_scope = { self.scope.read().await.clone() };
3279                let saved_cwd = {
3280                    let ec = self.exec_ctx.read().await;
3281                    (ec.cwd.clone(), ec.prev_cwd.clone())
3282                };
3283
3284                // Capture result without `?` — restore state unconditionally
3285                let run_result = self.execute_block_capturing(stmts).await;
3286
3287                // Restore scope and cwd regardless of success/failure
3288                {
3289                    let mut scope = self.scope.write().await;
3290                    *scope = saved_scope;
3291                    if let Ok(ref r) = run_result {
3292                        scope.set_last_result(r.clone());
3293                    }
3294                }
3295                {
3296                    let mut ec = self.exec_ctx.write().await;
3297                    ec.cwd = saved_cwd.0;
3298                    ec.prev_cwd = saved_cwd.1;
3299                }
3300
3301                // Now propagate the error
3302                let result = run_result?;
3303
3304                // Prefer structured data (enables `for i in $(cmd)` iteration)
3305                if let Some(data) = &result.data {
3306                    Ok(data.clone())
3307                } else if let Some(output) = result.output() {
3308                    // Flat non-text node lists (glob, ls, tree) → iterable array
3309                    if output.is_flat() && !output.is_simple_text() && !output.root.is_empty() {
3310                        let items: Vec<serde_json::Value> = output.root.iter()
3311                            .map(|n| serde_json::Value::String(n.display_name().to_string()))
3312                            .collect();
3313                        Ok(Value::Json(serde_json::Value::Array(items)))
3314                    } else {
3315                        Ok(Value::String(result.text_out().trim_end().to_string()))
3316                    }
3317                } else {
3318                    // Otherwise return stdout as single string (NO implicit splitting)
3319                    Ok(Value::String(result.text_out().trim_end().to_string()))
3320                }
3321            }
3322            Expr::Test(test_expr) => {
3323                Ok(Value::Bool(self.eval_test_async(test_expr).await?))
3324            }
3325            Expr::Positional(n) => {
3326                let scope = self.scope.read().await;
3327                match scope.get_positional(*n) {
3328                    Some(s) => Ok(Value::String(s.to_string())),
3329                    None => Ok(Value::String(String::new())),
3330                }
3331            }
3332            Expr::AllArgs => {
3333                let scope = self.scope.read().await;
3334                Ok(Value::String(scope.all_args().join(" ")))
3335            }
3336            Expr::ArgCount => {
3337                let scope = self.scope.read().await;
3338                Ok(Value::Int(scope.arg_count() as i64))
3339            }
3340            Expr::VarLength(name) => {
3341                let scope = self.scope.read().await;
3342                match scope.get(name) {
3343                    Some(value) => Ok(Value::Int(value_to_string(value).len() as i64)),
3344                    None => Ok(Value::Int(0)),
3345                }
3346            }
3347            Expr::VarWithDefault { name, default } => {
3348                let scope = self.scope.read().await;
3349                let use_default = match scope.get(name) {
3350                    Some(value) => value_to_string(value).is_empty(),
3351                    None => true,
3352                };
3353                drop(scope); // Release the lock before recursive evaluation
3354                if use_default {
3355                    // Evaluate the default parts (supports nested expansions)
3356                    self.eval_string_parts_async(default).await.map(Value::String)
3357                } else {
3358                    let scope = self.scope.read().await;
3359                    scope.get(name).cloned().ok_or_else(|| anyhow::anyhow!("variable '{}' not found", name))
3360                }
3361            }
3362            Expr::Arithmetic(expr_str) => {
3363                let scope = self.scope.read().await;
3364                crate::arithmetic::eval_arithmetic(expr_str, &scope)
3365                    .map(Value::Int)
3366                    .map_err(|e| anyhow::anyhow!("arithmetic error: {}", e))
3367            }
3368            Expr::Command(cmd) => {
3369                // Execute command and return boolean based on exit code
3370                let result = self.execute_command(&cmd.name, &cmd.args).await?;
3371                Ok(Value::Bool(result.code == 0))
3372            }
3373            Expr::LastExitCode => {
3374                let scope = self.scope.read().await;
3375                Ok(Value::Int(scope.last_result().code))
3376            }
3377            Expr::CurrentPid => {
3378                let scope = self.scope.read().await;
3379                Ok(Value::Int(scope.pid() as i64))
3380            }
3381            Expr::GlobPattern(s) => Ok(Value::String(s.clone())),
3382        }
3383        })
3384    }
3385
3386    /// Async helper to evaluate multiple StringParts into a single string.
3387    fn eval_string_parts_async<'a>(&'a self, parts: &'a [StringPart]) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<String>> + Send + 'a>> {
3388        Box::pin(async move {
3389            let mut result = String::new();
3390            for part in parts {
3391                result.push_str(&self.eval_string_part_async(part).await?);
3392            }
3393            Ok(result)
3394        })
3395    }
3396
3397    /// Async helper to evaluate a StringPart.
3398    /// Evaluate a `[[ ]]` test expression asynchronously, routing file tests
3399    /// through the VFS backend instead of using raw `std::path`.
3400    fn eval_test_async<'a>(&'a self, test_expr: &'a TestExpr) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<bool>> + Send + 'a>> {
3401        Box::pin(async move {
3402            match test_expr {
3403                TestExpr::FileTest { op, path } => {
3404                    let path_value = self.eval_expr_async(path).await?;
3405                    let path_str = value_to_string(&path_value);
3406                    let backend = self.exec_ctx.read().await.backend.clone();
3407                    let entry = backend.stat(std::path::Path::new(&path_str)).await.ok();
3408                    Ok(match op {
3409                        FileTestOp::Exists => entry.is_some(),
3410                        FileTestOp::IsFile => entry.as_ref().is_some_and(|e| e.is_file()),
3411                        FileTestOp::IsDir => entry.as_ref().is_some_and(|e| e.is_dir()),
3412                        FileTestOp::Readable => entry.is_some(),
3413                        FileTestOp::Writable => entry.as_ref().is_some_and(|e| {
3414                            e.permissions.is_none_or(|p| p & 0o222 != 0)
3415                        }),
3416                        FileTestOp::Executable => entry.as_ref().is_some_and(|e| {
3417                            e.permissions.is_some_and(|p| p & 0o111 != 0)
3418                        }),
3419                    })
3420                }
3421                TestExpr::StringTest { op, value } => {
3422                    let val = self.eval_expr_async(value).await?;
3423                    let s = value_to_string(&val);
3424                    Ok(match op {
3425                        crate::ast::StringTestOp::IsEmpty => s.is_empty(),
3426                        crate::ast::StringTestOp::IsNonEmpty => !s.is_empty(),
3427                    })
3428                }
3429                TestExpr::Comparison { left, op, right } => {
3430                    // Evaluate operands async (handles $(cmd)), then compare sync
3431                    let left_val = self.eval_expr_async(left).await?;
3432                    let right_val = self.eval_expr_async(right).await?;
3433                    let resolved = TestExpr::Comparison {
3434                        left: Box::new(Expr::Literal(left_val)),
3435                        op: *op,
3436                        right: Box::new(Expr::Literal(right_val)),
3437                    };
3438                    let expr = Expr::Test(Box::new(resolved));
3439                    let mut scope = self.scope.write().await;
3440                    let value = eval_expr(&expr, &mut scope)
3441                        .map_err(|e| anyhow::anyhow!("{}", e))?;
3442                    Ok(value_to_bool(&value))
3443                }
3444                TestExpr::And { left, right } => {
3445                    if !self.eval_test_async(left).await? {
3446                        Ok(false)
3447                    } else {
3448                        self.eval_test_async(right).await
3449                    }
3450                }
3451                TestExpr::Or { left, right } => {
3452                    if self.eval_test_async(left).await? {
3453                        Ok(true)
3454                    } else {
3455                        self.eval_test_async(right).await
3456                    }
3457                }
3458                TestExpr::Not { expr } => {
3459                    Ok(!self.eval_test_async(expr).await?)
3460                }
3461            }
3462        })
3463    }
3464
3465    fn eval_string_part_async<'a>(&'a self, part: &'a StringPart) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<String>> + Send + 'a>> {
3466        Box::pin(async move {
3467            match part {
3468                StringPart::Literal(s) => Ok(s.clone()),
3469                StringPart::Var(path) => {
3470                    let scope = self.scope.read().await;
3471                    match scope.resolve_path(path) {
3472                        Some(value) => Ok(value_to_string(&value)),
3473                        None => Ok(String::new()), // Unset vars expand to empty
3474                    }
3475                }
3476                StringPart::VarWithDefault { name, default } => {
3477                    let scope = self.scope.read().await;
3478                    let use_default = match scope.get(name) {
3479                        Some(value) => value_to_string(value).is_empty(),
3480                        None => true,
3481                    };
3482                    drop(scope); // Release lock before recursive evaluation
3483                    if use_default {
3484                        // Evaluate the default parts (supports nested expansions)
3485                        self.eval_string_parts_async(default).await
3486                    } else {
3487                        let scope = self.scope.read().await;
3488                        Ok(value_to_string(scope.get(name).ok_or_else(|| anyhow::anyhow!("variable '{}' not found", name))?))
3489                    }
3490                }
3491            StringPart::VarLength(name) => {
3492                let scope = self.scope.read().await;
3493                match scope.get(name) {
3494                    Some(value) => Ok(value_to_string(value).len().to_string()),
3495                    None => Ok("0".to_string()),
3496                }
3497            }
3498            StringPart::Positional(n) => {
3499                let scope = self.scope.read().await;
3500                match scope.get_positional(*n) {
3501                    Some(s) => Ok(s.to_string()),
3502                    None => Ok(String::new()),
3503                }
3504            }
3505            StringPart::AllArgs => {
3506                let scope = self.scope.read().await;
3507                Ok(scope.all_args().join(" "))
3508            }
3509            StringPart::ArgCount => {
3510                let scope = self.scope.read().await;
3511                Ok(scope.arg_count().to_string())
3512            }
3513            StringPart::Arithmetic(expr) => {
3514                let scope = self.scope.read().await;
3515                match crate::arithmetic::eval_arithmetic(expr, &scope) {
3516                    Ok(value) => Ok(value.to_string()),
3517                    Err(_) => Ok(String::new()),
3518                }
3519            }
3520            StringPart::CommandSubst(stmts) => {
3521                // Snapshot scope+cwd — command substitution in strings must
3522                // not leak side effects (e.g., `"dir: $(cd /; pwd)"` must not change cwd).
3523                let saved_scope = { self.scope.read().await.clone() };
3524                let saved_cwd = {
3525                    let ec = self.exec_ctx.read().await;
3526                    (ec.cwd.clone(), ec.prev_cwd.clone())
3527                };
3528
3529                // Capture result without `?` — restore state unconditionally
3530                let run_result = self.execute_block_capturing(stmts).await;
3531
3532                // Restore scope and cwd regardless of success/failure
3533                {
3534                    let mut scope = self.scope.write().await;
3535                    *scope = saved_scope;
3536                    if let Ok(ref r) = run_result {
3537                        scope.set_last_result(r.clone());
3538                    }
3539                }
3540                {
3541                    let mut ec = self.exec_ctx.write().await;
3542                    ec.cwd = saved_cwd.0;
3543                    ec.prev_cwd = saved_cwd.1;
3544                }
3545
3546                // Now propagate the error
3547                let result = run_result?;
3548
3549                Ok(result.text_out().trim_end_matches('\n').to_string())
3550            }
3551            StringPart::LastExitCode => {
3552                let scope = self.scope.read().await;
3553                Ok(scope.last_result().code.to_string())
3554            }
3555            StringPart::CurrentPid => {
3556                let scope = self.scope.read().await;
3557                Ok(scope.pid().to_string())
3558            }
3559        }
3560        })
3561    }
3562
3563    /// Update the last result in scope.
3564    async fn update_last_result(&self, result: &ExecResult) {
3565        let mut scope = self.scope.write().await;
3566        scope.set_last_result(result.clone());
3567    }
3568
3569    /// Drain accumulated pipeline stderr into a result.
3570    ///
3571    /// Called after each sub-statement inside control structures (`if`, `for`,
3572    /// `while`, `case`, `&&`, `||`) so that stderr appears incrementally rather
3573    /// than batching until the entire structure finishes.
3574    async fn drain_stderr_into(&self, result: &mut ExecResult) {
3575        let drained = {
3576            let mut receiver = self.stderr_receiver.lock().await;
3577            receiver.drain_lossy()
3578        };
3579        if !drained.is_empty() {
3580            if !result.err.is_empty() && !result.err.ends_with('\n') {
3581                result.err.push('\n');
3582            }
3583            result.err.push_str(&drained);
3584        }
3585    }
3586
3587    /// Execute a user-defined function with local variable scoping.
3588    ///
3589    /// Functions push a new scope frame for local variables. Variables declared
3590    /// with `local` are scoped to the function; other assignments modify outer
3591    /// scopes (or create in root if new).
3592    async fn execute_user_tool(&self, def: ToolDef, args: &[Arg]) -> Result<ExecResult> {
3593        // 1. Build function args from AST args (async to support command substitution)
3594        let tool_args = self.build_args_async(args, None).await?;
3595
3596        // 2. Push a new scope frame for local variables
3597        {
3598            let mut scope = self.scope.write().await;
3599            scope.push_frame();
3600        }
3601
3602        // 3. Save current positional parameters and set new ones for this function
3603        let saved_positional = {
3604            let mut scope = self.scope.write().await;
3605            let saved = scope.save_positional();
3606
3607            // Set up new positional parameters ($0 = function name, $1, $2, ... = args)
3608            let positional_args: Vec<String> = tool_args.positional
3609                .iter()
3610                .map(value_to_string)
3611                .collect();
3612            scope.set_positional(&def.name, positional_args);
3613
3614            saved
3615        };
3616
3617        // 3. Execute body statements with control flow handling
3618        // Accumulate output across statements (like sh)
3619        let mut accumulated_out = String::new();
3620        let mut accumulated_err = String::new();
3621        let mut last_code = 0i64;
3622        let mut last_data: Option<Value> = None;
3623
3624        // Track execution error for propagation after cleanup
3625        let mut exec_error: Option<anyhow::Error> = None;
3626        let mut exit_code: Option<i64> = None;
3627
3628        for stmt in &def.body {
3629            match self.execute_stmt_flow(stmt).await {
3630                Ok(flow) => {
3631                    // Drain pipeline stderr after each sub-statement.
3632                    let drained = {
3633                        let mut receiver = self.stderr_receiver.lock().await;
3634                        receiver.drain_lossy()
3635                    };
3636                    if !drained.is_empty() {
3637                        accumulated_err.push_str(&drained);
3638                    }
3639
3640                    match flow {
3641                        ControlFlow::Normal(r) => {
3642                            accumulated_out.push_str(&r.text_out());
3643                            accumulated_err.push_str(&r.err);
3644                            last_code = r.code;
3645                            last_data = r.data;
3646                        }
3647                        ControlFlow::Return { value } => {
3648                            accumulated_out.push_str(&value.text_out());
3649                            accumulated_err.push_str(&value.err);
3650                            last_code = value.code;
3651                            last_data = value.data;
3652                            break;
3653                        }
3654                        ControlFlow::Exit { code } => {
3655                            exit_code = Some(code);
3656                            break;
3657                        }
3658                        ControlFlow::Break { result: r, .. } | ControlFlow::Continue { result: r, .. } => {
3659                            accumulated_out.push_str(&r.text_out());
3660                            accumulated_err.push_str(&r.err);
3661                            last_code = r.code;
3662                            last_data = r.data;
3663                        }
3664                    }
3665                }
3666                Err(e) => {
3667                    exec_error = Some(e);
3668                    break;
3669                }
3670            }
3671        }
3672
3673        // 4. Pop scope frame and restore original positional parameters (unconditionally)
3674        {
3675            let mut scope = self.scope.write().await;
3676            scope.pop_frame();
3677            scope.set_positional(saved_positional.0, saved_positional.1);
3678        }
3679
3680        // 5. Propagate error or exit after cleanup
3681        if let Some(e) = exec_error {
3682            return Err(e);
3683        }
3684        if let Some(code) = exit_code {
3685            return Ok(ExecResult::from_parts(code, accumulated_out, accumulated_err, last_data));
3686        }
3687
3688        Ok(ExecResult::from_parts(last_code, accumulated_out, accumulated_err, last_data))
3689    }
3690
3691    /// Execute a command-substitution body — a block of statements — and return
3692    /// the combined result. Stdout/stderr accumulate across statements with **no
3693    /// inserted separator** (matching bash and the `;`/`&&`/`||` output model),
3694    /// and the last statement's exit code and structured `.data` ride through,
3695    /// so `for x in $(seq 3)` still iterates the array and `$(printf a; printf b)`
3696    /// captures `ab`. Scope/cwd snapshotting (so `$(cd / && pwd)` cannot leak the
3697    /// cwd) is the caller's responsibility.
3698    async fn execute_block_capturing(&self, stmts: &[Stmt]) -> Result<ExecResult> {
3699        let mut accumulated_out = String::new();
3700        let mut accumulated_err = String::new();
3701        let mut last_code = 0i64;
3702        let mut last_data: Option<Value> = None;
3703
3704        for stmt in stmts {
3705            let flow = self.execute_stmt_flow(stmt).await?;
3706
3707            // Drain pipeline stderr after each sub-statement (incremental, like
3708            // the control-structure and function-body executors).
3709            let drained = {
3710                let mut receiver = self.stderr_receiver.lock().await;
3711                receiver.drain_lossy()
3712            };
3713            if !drained.is_empty() {
3714                accumulated_err.push_str(&drained);
3715            }
3716
3717            match flow {
3718                ControlFlow::Normal(r)
3719                | ControlFlow::Break { result: r, .. }
3720                | ControlFlow::Continue { result: r, .. } => {
3721                    accumulated_out.push_str(&r.text_out());
3722                    accumulated_err.push_str(&r.err);
3723                    last_code = r.code;
3724                    last_data = r.data;
3725                }
3726                ControlFlow::Return { value } => {
3727                    accumulated_out.push_str(&value.text_out());
3728                    accumulated_err.push_str(&value.err);
3729                    last_code = value.code;
3730                    last_data = value.data;
3731                    break;
3732                }
3733                ControlFlow::Exit { code } => {
3734                    last_code = code;
3735                    break;
3736                }
3737            }
3738        }
3739
3740        Ok(ExecResult::from_parts(
3741            last_code,
3742            accumulated_out,
3743            accumulated_err,
3744            last_data,
3745        ))
3746    }
3747
3748    /// Execute the `source` / `.` command to include and run a script.
3749    ///
3750    /// Unlike regular tool execution, `source` executes in the CURRENT scope,
3751    /// allowing the sourced script to set variables and modify shell state.
3752    async fn execute_source(&self, args: &[Arg]) -> Result<ExecResult> {
3753        // Get the file path from the first positional argument
3754        let tool_args = self.build_args_async(args, None).await?;
3755        let path = match tool_args.positional.first() {
3756            Some(Value::String(s)) => s.clone(),
3757            Some(v) => value_to_string(v),
3758            None => {
3759                return Ok(ExecResult::failure(1, "source: missing filename"));
3760            }
3761        };
3762
3763        // Resolve path relative to cwd
3764        let full_path = {
3765            let ctx = self.exec_ctx.read().await;
3766            if path.starts_with('/') {
3767                std::path::PathBuf::from(&path)
3768            } else {
3769                ctx.cwd.join(&path)
3770            }
3771        };
3772
3773        // Read file content via backend
3774        let content = {
3775            let ctx = self.exec_ctx.read().await;
3776            match ctx.backend.read(&full_path, None).await {
3777                Ok(bytes) => {
3778                    String::from_utf8(bytes).map_err(|e| {
3779                        anyhow::anyhow!("source: {}: invalid UTF-8: {}", path, e)
3780                    })?
3781                }
3782                Err(e) => {
3783                    return Ok(ExecResult::failure(
3784                        1,
3785                        format!("source: {}: {}", path, e),
3786                    ));
3787                }
3788            }
3789        };
3790
3791        // Parse the content
3792        let program = match crate::parser::parse(&content) {
3793            Ok(p) => p,
3794            Err(errors) => {
3795                let msg = errors
3796                    .iter()
3797                    .map(|e| format!("{}:{}: {}", path, e.span.start, e.message))
3798                    .collect::<Vec<_>>()
3799                    .join("\n");
3800                return Ok(ExecResult::failure(1, format!("source: {}", msg)));
3801            }
3802        };
3803
3804        // Execute each statement in the CURRENT scope (not isolated)
3805        let mut result = ExecResult::success("");
3806        for stmt in program.statements {
3807            if matches!(stmt, crate::ast::Stmt::Empty) {
3808                continue;
3809            }
3810
3811            match self.execute_stmt_flow(&stmt).await {
3812                Ok(flow) => {
3813                    self.drain_stderr_into(&mut result).await;
3814                    match flow {
3815                        ControlFlow::Normal(r) => {
3816                            result = r.clone();
3817                            self.update_last_result(&r).await;
3818                        }
3819                        ControlFlow::Break { .. } | ControlFlow::Continue { .. } => {
3820                            return Err(anyhow::anyhow!(
3821                                "source: {}: unexpected break/continue outside loop",
3822                                path
3823                            ));
3824                        }
3825                        ControlFlow::Return { value } => {
3826                            return Ok(value);
3827                        }
3828                        ControlFlow::Exit { code } => {
3829                            result.code = code;
3830                            return Ok(result);
3831                        }
3832                    }
3833                }
3834                Err(e) => {
3835                    return Err(e.context(format!("source: {}", path)));
3836                }
3837            }
3838        }
3839
3840        Ok(result)
3841    }
3842
3843    /// Try to execute a script from PATH directories.
3844    ///
3845    /// Searches PATH for `{name}.kai` files and executes them in isolated scope
3846    /// (like user-defined tools). Returns None if no script is found.
3847    async fn try_execute_script(&self, name: &str, args: &[Arg]) -> Result<Option<ExecResult>> {
3848        // Get PATH from scope (default to "/bin")
3849        let path_value = {
3850            let scope = self.scope.read().await;
3851            scope
3852                .get("PATH")
3853                .map(value_to_string)
3854                .unwrap_or_else(|| "/bin".to_string())
3855        };
3856
3857        // Search PATH directories for script
3858        for dir in path_value.split(':') {
3859            if dir.is_empty() {
3860                continue;
3861            }
3862
3863            // Build script path: {dir}/{name}.kai
3864            let script_path = PathBuf::from(dir).join(format!("{}.kai", name));
3865
3866            // Check if script exists
3867            let exists = {
3868                let ctx = self.exec_ctx.read().await;
3869                ctx.backend.exists(&script_path).await
3870            };
3871
3872            if !exists {
3873                continue;
3874            }
3875
3876            // Read script content
3877            let content = {
3878                let ctx = self.exec_ctx.read().await;
3879                match ctx.backend.read(&script_path, None).await {
3880                    Ok(bytes) => match String::from_utf8(bytes) {
3881                        Ok(s) => s,
3882                        Err(e) => {
3883                            return Ok(Some(ExecResult::failure(
3884                                1,
3885                                format!("{}: invalid UTF-8: {}", script_path.display(), e),
3886                            )));
3887                        }
3888                    },
3889                    Err(e) => {
3890                        return Ok(Some(ExecResult::failure(
3891                            1,
3892                            format!("{}: {}", script_path.display(), e),
3893                        )));
3894                    }
3895                }
3896            };
3897
3898            // Parse the script
3899            let program = match crate::parser::parse(&content) {
3900                Ok(p) => p,
3901                Err(errors) => {
3902                    let msg = errors
3903                        .iter()
3904                        .map(|e| format!("{}:{}: {}", script_path.display(), e.span.start, e.message))
3905                        .collect::<Vec<_>>()
3906                        .join("\n");
3907                    return Ok(Some(ExecResult::failure(1, msg)));
3908                }
3909            };
3910
3911            // Build tool_args from args (async for command substitution support)
3912            let tool_args = self.build_args_async(args, None).await?;
3913
3914            // Create isolated scope (like user tools)
3915            let mut isolated_scope = Scope::new();
3916
3917            // Set up positional parameters ($0 = script name, $1, $2, ... = args)
3918            let positional_args: Vec<String> = tool_args.positional
3919                .iter()
3920                .map(value_to_string)
3921                .collect();
3922            isolated_scope.set_positional(name, positional_args);
3923
3924            // Save current scope and swap with isolated scope
3925            let original_scope = {
3926                let mut scope = self.scope.write().await;
3927                std::mem::replace(&mut *scope, isolated_scope)
3928            };
3929
3930            // Execute script statements — track outcome for cleanup
3931            let mut result = ExecResult::success("");
3932            let mut exec_error: Option<anyhow::Error> = None;
3933            let mut exit_code: Option<i64> = None;
3934
3935            for stmt in program.statements {
3936                if matches!(stmt, crate::ast::Stmt::Empty) {
3937                    continue;
3938                }
3939
3940                match self.execute_stmt_flow(&stmt).await {
3941                    Ok(flow) => {
3942                        match flow {
3943                            ControlFlow::Normal(r) => result = r,
3944                            ControlFlow::Return { value } => {
3945                                result = value;
3946                                break;
3947                            }
3948                            ControlFlow::Exit { code } => {
3949                                exit_code = Some(code);
3950                                break;
3951                            }
3952                            ControlFlow::Break { result: r, .. } | ControlFlow::Continue { result: r, .. } => {
3953                                result = r;
3954                            }
3955                        }
3956                    }
3957                    Err(e) => {
3958                        exec_error = Some(e);
3959                        break;
3960                    }
3961                }
3962            }
3963
3964            // Restore original scope unconditionally
3965            {
3966                let mut scope = self.scope.write().await;
3967                *scope = original_scope;
3968            }
3969
3970            // Propagate error or exit after cleanup
3971            if let Some(e) = exec_error {
3972                return Err(e.context(format!("script: {}", script_path.display())));
3973            }
3974            if let Some(code) = exit_code {
3975                result.code = code;
3976                return Ok(Some(result));
3977            }
3978
3979            return Ok(Some(result));
3980        }
3981
3982        // No script found
3983        Ok(None)
3984    }
3985
3986    /// Try to execute an external command from PATH.
3987    ///
3988    /// This is the fallback when no builtin or user-defined tool matches.
3989    /// External commands receive a clean argv (flags preserved in their original format).
3990    ///
3991    /// # Requirements
3992    /// - Command must be found in PATH
3993    /// - Current working directory must be on a real filesystem (not virtual like /v)
3994    ///
3995    /// # Returns
3996    /// - `Ok(Some(result))` if command was found and executed
3997    /// - `Ok(None)` if command was not found in PATH
3998    /// - `Err` on execution errors
3999    #[cfg(not(feature = "subprocess"))]
4000    async fn try_execute_external(&self, _name: &str, _args: &[Arg]) -> Result<Option<ExecResult>> {
4001        Ok(None)
4002    }
4003
4004    /// Try to execute an external command from PATH.
4005    #[cfg(feature = "subprocess")]
4006    #[tracing::instrument(level = "debug", skip(self, args), fields(command = %name))]
4007    async fn try_execute_external(&self, name: &str, args: &[Arg]) -> Result<Option<ExecResult>> {
4008        // Read the cancel token from `self.exec_ctx`, which `dispatch_command`
4009        // populates from the inbound ctx.cancel on every dispatch. This is
4010        // what makes the `timeout` builtin's swapped child token reach the
4011        // wait_or_kill discipline below — reading `self.cancel_token` would
4012        // give the kernel-wide token and miss the timeout's child cascade.
4013        let cancel = {
4014            let ec = self.exec_ctx.read().await;
4015            ec.cancel.clone()
4016        };
4017        let kill_grace = self.kill_grace;
4018        if !self.allow_external_commands {
4019            return Ok(None);
4020        }
4021
4022        // Get real working directory for relative path resolution and child cwd.
4023        // If the CWD is virtual (no real filesystem path), skip external command
4024        // execution entirely — return None so the dispatch can fall through to
4025        // backend-registered tools.
4026        let real_cwd = {
4027            let ctx = self.exec_ctx.read().await;
4028            match ctx.backend.resolve_real_path(&ctx.cwd) {
4029                Some(p) => p,
4030                None => return Ok(None),
4031            }
4032        };
4033
4034        let executable = if name.contains('/') {
4035            // Resolve relative paths (./script, ../bin/tool) against the shell's cwd
4036            let resolved = if std::path::Path::new(name).is_absolute() {
4037                std::path::PathBuf::from(name)
4038            } else {
4039                real_cwd.join(name)
4040            };
4041            if !resolved.exists() {
4042                return Ok(Some(ExecResult::failure(
4043                    127,
4044                    format!("{}: No such file or directory", name),
4045                )));
4046            }
4047            if !resolved.is_file() {
4048                return Ok(Some(ExecResult::failure(
4049                    126,
4050                    format!("{}: Is a directory", name),
4051                )));
4052            }
4053            #[cfg(unix)]
4054            {
4055                use std::os::unix::fs::PermissionsExt;
4056                let mode = std::fs::metadata(&resolved)
4057                    .map(|m| m.permissions().mode())
4058                    .unwrap_or(0);
4059                if mode & 0o111 == 0 {
4060                    return Ok(Some(ExecResult::failure(
4061                        126,
4062                        format!("{}: Permission denied", name),
4063                    )));
4064                }
4065            }
4066            resolved.to_string_lossy().into_owned()
4067        } else {
4068            // Get PATH from scope or environment
4069            let path_var = {
4070                let scope = self.scope.read().await;
4071                scope
4072                    .get("PATH")
4073                    .map(value_to_string)
4074                    .unwrap_or_else(|| std::env::var("PATH").unwrap_or_default())
4075            };
4076
4077            // Resolve command in PATH
4078            match resolve_in_path(name, &path_var) {
4079                Some(path) => path,
4080                None => return Ok(None), // Not found - let caller handle error
4081            }
4082        };
4083
4084        tracing::debug!(executable = %executable, "resolved external command");
4085
4086        // Build flat argv (preserves flag format)
4087        let argv = self.build_args_flat(args).await?;
4088
4089        // Get stdin if available
4090        let stdin_data = {
4091            let mut ctx = self.exec_ctx.write().await;
4092            ctx.take_stdin()
4093        };
4094
4095        // Build and spawn the command
4096        use tokio::process::Command;
4097
4098        let mut cmd = Command::new(&executable);
4099        cmd.args(&argv);
4100        cmd.current_dir(&real_cwd);
4101
4102        // Hermetic env: child sees only kaish's exported vars, not the kaish
4103        // process's OS env. Frontends that want OS-env passthrough (REPL, MCP)
4104        // populate it via KernelConfig::initial_vars at construction.
4105        cmd.env_clear();
4106        {
4107            let scope = self.scope.read().await;
4108            for (var_name, value) in scope.exported_vars() {
4109                cmd.env(var_name, value_to_string(&value));
4110            }
4111        }
4112
4113        // Handle stdin
4114        cmd.stdin(if stdin_data.is_some() {
4115            std::process::Stdio::piped()
4116        } else if self.interactive {
4117            std::process::Stdio::inherit()
4118        } else {
4119            std::process::Stdio::null()
4120        });
4121
4122        // In interactive mode, standalone or last-in-pipeline commands inherit
4123        // the terminal's stdout/stderr so output streams in real-time.
4124        // First/middle commands must capture stdout for the pipe — same as bash.
4125        let pipeline_position = {
4126            let ctx = self.exec_ctx.read().await;
4127            ctx.pipeline_position
4128        };
4129        let inherit_output = self.interactive
4130            && matches!(pipeline_position, PipelinePosition::Only | PipelinePosition::Last);
4131
4132        if inherit_output {
4133            cmd.stdout(std::process::Stdio::inherit());
4134            cmd.stderr(std::process::Stdio::inherit());
4135        } else {
4136            cmd.stdout(std::process::Stdio::piped());
4137            cmd.stderr(std::process::Stdio::piped());
4138        }
4139
4140        // On Unix, always put the child in its own process group so cancellation
4141        // can `killpg` the whole tree (the child plus any grandchildren).
4142        // Restoring default tty-related signal handlers stays gated on
4143        // job-control mode — those only matter when the child has a controlling
4144        // terminal.
4145        #[cfg(unix)]
4146        {
4147            let restore_jc_signals = self.terminal_state.is_some() && inherit_output;
4148            // SAFETY: setpgid and sigaction(SIG_DFL) are async-signal-safe per POSIX
4149            #[allow(unsafe_code)]
4150            unsafe {
4151                cmd.pre_exec(move || {
4152                    // Own process group — for kill scope.
4153                    nix::unistd::setpgid(nix::unistd::Pid::from_raw(0), nix::unistd::Pid::from_raw(0))
4154                        .map_err(|e| std::io::Error::from_raw_os_error(e as i32))?;
4155                    if restore_jc_signals {
4156                        use nix::libc::{sigaction, SIGTSTP, SIGTTOU, SIGTTIN, SIGINT, SIG_DFL};
4157                        let mut sa: nix::libc::sigaction = std::mem::zeroed();
4158                        sa.sa_sigaction = SIG_DFL;
4159                        if sigaction(SIGTSTP, &sa, std::ptr::null_mut()) != 0 {
4160                            return Err(std::io::Error::last_os_error());
4161                        }
4162                        if sigaction(SIGTTOU, &sa, std::ptr::null_mut()) != 0 {
4163                            return Err(std::io::Error::last_os_error());
4164                        }
4165                        if sigaction(SIGTTIN, &sa, std::ptr::null_mut()) != 0 {
4166                            return Err(std::io::Error::last_os_error());
4167                        }
4168                        if sigaction(SIGINT, &sa, std::ptr::null_mut()) != 0 {
4169                            return Err(std::io::Error::last_os_error());
4170                        }
4171                    }
4172                    Ok(())
4173                });
4174            }
4175        }
4176
4177        // Backstop for kill on drop in case our explicit kill path is bypassed
4178        // (panic, early return, etc) on the **capture** wait path. We do NOT
4179        // set this on the JC inherit path: that uses sync `waitpid` outside
4180        // tokio's view of the child, so on drop tokio would try to kill an
4181        // already-reaped (possibly-reused) PID. The JC path has its own
4182        // cancel handling via the side-task watcher.
4183        let in_jc_inherit_path = inherit_output && self.terminal_state.is_some();
4184        if !in_jc_inherit_path {
4185            cmd.kill_on_drop(true);
4186        }
4187
4188        // Spawn the process. Capture a `KillTarget` immediately so cancel/
4189        // timeout paths can deliver signals via pidfd (Linux ≥ 5.3) — bound
4190        // to this process's generation, immune to PID reuse if the OS reaps
4191        // the child before our kill syscalls fire.
4192        let mut child = match cmd.spawn() {
4193            Ok(child) => child,
4194            Err(e) => {
4195                return Ok(Some(ExecResult::failure(
4196                    127,
4197                    format!("{}: {}", name, e),
4198                )));
4199            }
4200        };
4201        let kill_target = crate::pidfd::KillTarget::from_child(&child);
4202
4203        // If this external runs on behalf of a background job, record its
4204        // process group on the job so `kill -<sig> %N` can signal the real
4205        // process directly (STOP/CONT/USR1/…, not just terminate). The child
4206        // did `setpgid(0, 0)` in pre_exec, so its PGID equals its PID.
4207        if let Some(job_id) = self.bg_job_id
4208            && let Some(pid) = child.id()
4209        {
4210            self.jobs.add_pgid(job_id, pid).await;
4211        }
4212
4213        // Write stdin if present
4214        if let Some(data) = stdin_data
4215            && let Some(mut stdin) = child.stdin.take()
4216        {
4217            use tokio::io::AsyncWriteExt;
4218            if let Err(e) = stdin.write_all(data.as_bytes()).await {
4219                return Ok(Some(ExecResult::failure(
4220                    1,
4221                    format!("{}: failed to write stdin: {}", name, e),
4222                )));
4223            }
4224            // Drop stdin to signal EOF
4225        }
4226
4227        if inherit_output {
4228            // Job control path: use waitpid with WUNTRACED for Ctrl-Z support
4229            #[cfg(unix)]
4230            if let Some(ref term) = self.terminal_state {
4231                let child_id = child.id().unwrap_or(0);
4232                let pid = nix::unistd::Pid::from_raw(child_id as i32);
4233                let pgid = pid; // child is its own pgid leader
4234
4235                // Give the terminal to the child's process group
4236                if let Err(e) = term.give_terminal_to(pgid) {
4237                    tracing::warn!("failed to give terminal to child: {}", e);
4238                }
4239
4240                let term_clone = term.clone();
4241                let cmd_name = name.to_string();
4242                let cmd_display = format!("{} {}", name, argv.join(" "));
4243                let jobs = self.jobs.clone();
4244
4245                // Side task that watches for cancellation while the blocking
4246                // waitpid runs. On cancel, it SIGTERMs the process group, waits
4247                // the grace period, then SIGKILLs. The blocking waitpid returns
4248                // when the child dies. AbortOnDrop guard cancels the watcher
4249                // on the success path so it doesn't keep running after wait
4250                // returns naturally.
4251                //
4252                // `wait_complete` shrinks the PID-reuse race: the watcher
4253                // checks it before each kill syscall and bails out if
4254                // wait_for_foreground has already reaped the child. This
4255                // doesn't fully eliminate the race (atomic load + kill is
4256                // not atomic with the OS reap+reuse), but narrows the window
4257                // to nanoseconds — enough to be ignorable in practice.
4258                let wait_complete = std::sync::Arc::new(
4259                    std::sync::atomic::AtomicBool::new(false)
4260                );
4261                let cancel_watcher = {
4262                    let cancel = cancel.clone();
4263                    let wc = wait_complete.clone();
4264                    // Ownership transfer: the JC path's sync wait inside
4265                    // block_in_place owns the child's reaping, so the
4266                    // cancel_watcher drives the kill side via KillTarget
4267                    // (pidfd-bound on Linux). When kill_target is None
4268                    // (older kernel + open failure, or non-Linux), falls
4269                    // through to the older PID-based path the closure
4270                    // captures from `pid`.
4271                    let target = kill_target.as_ref().map(|t| {
4272                        // Re-borrow the components we need into Owned-ish form
4273                        // so the spawned task is 'static. We can't move
4274                        // KillTarget directly because try_execute_external
4275                        // still uses it after the spawn — but on the JC path
4276                        // there is no further use after the watcher spawn,
4277                        // so a clone-of-pid + owned None pidfd is safe.
4278                        // Simpler: signal via the existing target by cloning
4279                        // a fresh pidfd; the original keeps its handle.
4280                        // Pidfd is just an OwnedFd — not Clone — so do it
4281                        // by re-opening from the pid. Fall back if reopen
4282                        // fails (race already reaped → best-effort kill).
4283                        crate::pidfd::KillTarget::from_pid(t.pid())
4284                    });
4285                    tokio::spawn(async move {
4286                        cancel.cancelled().await;
4287                        if wc.load(std::sync::atomic::Ordering::SeqCst) { return; }
4288                        use nix::sys::signal::Signal;
4289                        if let Some(t) = &target {
4290                            t.signal(Signal::SIGTERM);
4291                            t.signal_pg(Signal::SIGTERM);
4292                        } else {
4293                            let _ = nix::sys::signal::kill(pid, Signal::SIGTERM);
4294                            let _ = nix::sys::signal::killpg(pid, Signal::SIGTERM);
4295                        }
4296                        if kill_grace > Duration::ZERO {
4297                            tokio::time::sleep(kill_grace).await;
4298                            if wc.load(std::sync::atomic::Ordering::SeqCst) { return; }
4299                        }
4300                        if let Some(t) = &target {
4301                            t.signal(Signal::SIGKILL);
4302                            t.signal_pg(Signal::SIGKILL);
4303                        } else {
4304                            let _ = nix::sys::signal::kill(pid, Signal::SIGKILL);
4305                            let _ = nix::sys::signal::killpg(pid, Signal::SIGKILL);
4306                        }
4307                    })
4308                };
4309                struct AbortOnDrop(tokio::task::JoinHandle<()>);
4310                impl Drop for AbortOnDrop {
4311                    fn drop(&mut self) {
4312                        self.0.abort();
4313                    }
4314                }
4315                let _watcher_guard = AbortOnDrop(cancel_watcher);
4316
4317                let wait_complete_setter = wait_complete.clone();
4318                let code = tokio::task::block_in_place(move || {
4319                    let result = term_clone.wait_for_foreground(pid);
4320                    // Mark wait done before the watcher might fire.
4321                    wait_complete_setter.store(true, std::sync::atomic::Ordering::SeqCst);
4322
4323                    // Always reclaim the terminal
4324                    if let Err(e) = term_clone.reclaim_terminal() {
4325                        tracing::warn!("failed to reclaim terminal: {}", e);
4326                    }
4327
4328                    match result {
4329                        crate::terminal::WaitResult::Exited(code) => code as i64,
4330                        crate::terminal::WaitResult::Signaled(sig) => 128 + sig as i64,
4331                        crate::terminal::WaitResult::Stopped(_sig) => {
4332                            // Register as a stopped job
4333                            let rt = tokio::runtime::Handle::current();
4334                            let job_id = rt.block_on(jobs.register_stopped(
4335                                cmd_display,
4336                                child_id,
4337                                child_id, // pgid = pid for group leader
4338                            ));
4339                            eprintln!("\n[{}]+ Stopped\t{}", job_id, cmd_name);
4340                            148 // 128 + SIGTSTP(20) on most systems, but we use a fixed value
4341                        }
4342                    }
4343                });
4344
4345                return Ok(Some(ExecResult::from_output(code, String::new(), String::new())));
4346            }
4347
4348            // Non-job-control path with inherited stdio.
4349            let status = match wait_or_kill(&mut child, kill_target.as_ref(), &cancel, kill_grace).await {
4350                Ok(s) => s,
4351                Err(e) => {
4352                    return Ok(Some(ExecResult::failure(
4353                        1,
4354                        format!("{}: failed to wait: {}", name, e),
4355                    )));
4356                }
4357            };
4358
4359            let code = status.code().unwrap_or_else(|| {
4360                #[cfg(unix)]
4361                {
4362                    use std::os::unix::process::ExitStatusExt;
4363                    128 + status.signal().unwrap_or(0)
4364                }
4365                #[cfg(not(unix))]
4366                {
4367                    -1
4368                }
4369            }) as i64;
4370
4371            // stdout/stderr already went to the terminal
4372            Ok(Some(ExecResult::from_output(code, String::new(), String::new())))
4373        } else {
4374            // Capture output via bounded streams
4375            let stdout_stream = Arc::new(BoundedStream::new(DEFAULT_STREAM_MAX_SIZE));
4376            let stderr_stream = Arc::new(BoundedStream::new(DEFAULT_STREAM_MAX_SIZE));
4377
4378            let stdout_pipe = child.stdout.take();
4379            let stderr_pipe = child.stderr.take();
4380
4381            let stdout_clone = stdout_stream.clone();
4382            let stderr_clone = stderr_stream.clone();
4383
4384            let stdout_task = stdout_pipe.map(|pipe| {
4385                tokio::spawn(async move {
4386                    drain_to_stream(pipe, stdout_clone).await;
4387                })
4388            });
4389
4390            let stderr_task = stderr_pipe.map(|pipe| {
4391                tokio::spawn(async move {
4392                    drain_to_stream(pipe, stderr_clone).await;
4393                })
4394            });
4395
4396            let cancelled_before_wait = cancel.is_cancelled();
4397            let status = match wait_or_kill(&mut child, kill_target.as_ref(), &cancel, kill_grace).await {
4398                Ok(s) => s,
4399                Err(e) => {
4400                    if let Some(task) = stdout_task { task.abort(); let _ = task.await; }
4401                    if let Some(task) = stderr_task { task.abort(); let _ = task.await; }
4402                    return Ok(Some(ExecResult::failure(
4403                        1,
4404                        format!("{}: failed to wait: {}", name, e),
4405                    )));
4406                }
4407            };
4408
4409            // On cancel, abort the drain tasks (the child's pipes are gone;
4410            // late output is lost but predictable death beats partial capture).
4411            // On normal exit, await drains so we don't lose buffered output.
4412            if cancelled_before_wait || cancel.is_cancelled() {
4413                if let Some(task) = stdout_task { task.abort(); let _ = task.await; }
4414                if let Some(task) = stderr_task { task.abort(); let _ = task.await; }
4415            } else {
4416                if let Some(task) = stdout_task {
4417                    // Ignore join error — the drain task logs its own errors
4418                    let _ = task.await;
4419                }
4420                if let Some(task) = stderr_task {
4421                    let _ = task.await;
4422                }
4423            }
4424
4425            let code = status.code().unwrap_or_else(|| {
4426                #[cfg(unix)]
4427                {
4428                    use std::os::unix::process::ExitStatusExt;
4429                    128 + status.signal().unwrap_or(0)
4430                }
4431                #[cfg(not(unix))]
4432                {
4433                    -1
4434                }
4435            }) as i64;
4436
4437            let stdout = stdout_stream.read_string().await;
4438            let stderr = stderr_stream.read_string().await;
4439
4440            Ok(Some(ExecResult::from_output(code, stdout, stderr)))
4441        }
4442    }
4443
4444    // --- Variable Access ---
4445
4446    /// Get a variable value.
4447    pub async fn get_var(&self, name: &str) -> Option<Value> {
4448        let scope = self.scope.read().await;
4449        scope.get(name).cloned()
4450    }
4451
4452    /// Check if error-exit mode is enabled (for testing).
4453    #[cfg(test)]
4454    pub async fn error_exit_enabled(&self) -> bool {
4455        let scope = self.scope.read().await;
4456        scope.error_exit_enabled()
4457    }
4458
4459    /// Set a variable value.
4460    pub async fn set_var(&self, name: &str, value: Value) {
4461        let mut scope = self.scope.write().await;
4462        scope.set(name.to_string(), value);
4463    }
4464
4465    /// Set positional parameters ($0 script name and $1-$9 args).
4466    pub async fn set_positional(&self, script_name: impl Into<String>, args: Vec<String>) {
4467        let mut scope = self.scope.write().await;
4468        scope.set_positional(script_name, args);
4469    }
4470
4471    /// List all variables.
4472    pub async fn list_vars(&self) -> Vec<(String, Value)> {
4473        let scope = self.scope.read().await;
4474        scope.all()
4475    }
4476
4477    /// List exported variables (name, value), sorted by name. These are the
4478    /// vars a child process would see (see `dispatch`'s hermetic env build).
4479    pub async fn exported_vars(&self) -> Vec<(String, Value)> {
4480        let scope = self.scope.read().await;
4481        scope.exported_vars()
4482    }
4483
4484    // --- CWD ---
4485
4486    /// Get current working directory.
4487    pub async fn cwd(&self) -> PathBuf {
4488        self.exec_ctx.read().await.cwd.clone()
4489    }
4490
4491    /// Set current working directory.
4492    pub async fn set_cwd(&self, path: PathBuf) {
4493        let mut ctx = self.exec_ctx.write().await;
4494        ctx.set_cwd(path);
4495    }
4496
4497    /// Set the working directory only if `path` resolves to a directory in the
4498    /// kernel's backend — the same namespace `cd` validates against. Unlike a
4499    /// raw host-FS `is_dir()` check, this correctly accepts virtual mounts
4500    /// (`/v/docs`, in-memory scratch, …) and rejects real paths that have since
4501    /// disappeared. Returns whether the cwd was changed.
4502    pub async fn try_set_cwd(&self, path: PathBuf) -> bool {
4503        // Clone the backend Arc out before the stat so we never hold the
4504        // exec_ctx lock across the await.
4505        let backend = self.exec_ctx.read().await.backend.clone();
4506        let is_dir = matches!(backend.stat(&path).await, Ok(entry) if entry.is_dir());
4507        if is_dir {
4508            self.exec_ctx.write().await.set_cwd(path);
4509        }
4510        is_dir
4511    }
4512
4513    // --- Last Result ---
4514
4515    /// Get the last result ($?).
4516    pub async fn last_result(&self) -> ExecResult {
4517        let scope = self.scope.read().await;
4518        scope.last_result().clone()
4519    }
4520
4521    // --- Tools ---
4522
4523    /// Check if a user-defined function exists.
4524    pub async fn has_function(&self, name: &str) -> bool {
4525        self.user_tools.read().await.contains_key(name)
4526    }
4527
4528    /// Get available tool schemas.
4529    pub fn tool_schemas(&self) -> Vec<crate::tools::ToolSchema> {
4530        self.tools.schemas()
4531    }
4532
4533    // --- Jobs ---
4534
4535    /// Get job manager.
4536    pub fn jobs(&self) -> Arc<JobManager> {
4537        self.jobs.clone()
4538    }
4539
4540    // --- VFS ---
4541
4542    /// Get VFS router.
4543    pub fn vfs(&self) -> Arc<VfsRouter> {
4544        self.vfs.clone()
4545    }
4546
4547    // --- State ---
4548
4549    /// Reset kernel to initial state.
4550    ///
4551    /// Clears in-memory variables and resets cwd to root.
4552    /// History is not cleared (it persists across resets).
4553    pub async fn reset(&self) -> Result<()> {
4554        {
4555            let mut scope = self.scope.write().await;
4556            *scope = Scope::new();
4557        }
4558        {
4559            let mut ctx = self.exec_ctx.write().await;
4560            ctx.cwd = PathBuf::from("/");
4561        }
4562        Ok(())
4563    }
4564
4565    /// Shutdown the kernel.
4566    pub async fn shutdown(self) -> Result<()> {
4567        // Wait for all background jobs
4568        self.jobs.wait_all().await;
4569        Ok(())
4570    }
4571
4572    /// Dispatch a single command using the full resolution chain.
4573    ///
4574    /// This is the core of `CommandDispatcher` — it syncs state between the
4575    /// passed-in `ExecContext` and kernel-internal state (scope, exec_ctx),
4576    /// then delegates to `execute_command` for the actual dispatch.
4577    ///
4578    /// State flow:
4579    /// 1. ctx → self: sync scope, cwd, stdin so internal methods see current state
4580    /// 2. execute_command: full dispatch chain (user tools, builtins, scripts, external, backend)
4581    /// 3. self → ctx: sync scope, cwd changes back so the pipeline runner sees them
4582    async fn dispatch_command(&self, cmd: &Command, ctx: &mut ExecContext) -> Result<ExecResult> {
4583        // Ensure nested dispatch (e.g. the `timeout` builtin re-dispatching
4584        // its inner command via ctx.dispatcher) routes through THIS kernel,
4585        // not a stale parent. Critical for forks: the fork's builtins must
4586        // use the fork's dispatcher, not the parent's.
4587        if let Some(d) = self.dispatcher() {
4588            ctx.dispatcher = Some(d);
4589        }
4590
4591        // 1. Sync ctx → self internals
4592        {
4593            let mut scope = self.scope.write().await;
4594            *scope = ctx.scope.clone();
4595        }
4596        {
4597            let mut ec = self.exec_ctx.write().await;
4598            ec.cwd = ctx.cwd.clone();
4599            ec.prev_cwd = ctx.prev_cwd.clone();
4600            ec.stdin = ctx.stdin.take();
4601            ec.stdin_data = ctx.stdin_data.take();
4602            // Streaming pipe endpoints and kernel stderr must flow to the
4603            // tool via self.exec_ctx — execute_command reads that, not the
4604            // passed-in ctx. Without moving these, concurrent pipeline
4605            // stages dispatched via a fork get pipe_stdin = None and
4606            // silently read nothing.
4607            ec.pipe_stdin = ctx.pipe_stdin.take();
4608            ec.pipe_stdout = ctx.pipe_stdout.take();
4609            if let Some(stderr) = ctx.stderr.clone() {
4610                ec.stderr = Some(stderr);
4611            }
4612            ec.aliases = ctx.aliases.clone();
4613            ec.ignore_config = ctx.ignore_config.clone();
4614            ec.output_limit = ctx.output_limit.clone();
4615            ec.pipeline_position = ctx.pipeline_position;
4616            // Sync the cancel token from ctx → ec. Builtins like `timeout`
4617            // swap ctx.cancel to a derived child token before re-dispatching;
4618            // execute_command's snapshot reads ec.cancel (kept aligned by
4619            // this sync), so try_execute_external sees the right token.
4620            ec.cancel = ctx.cancel.clone();
4621            // Same alignment for the watchdog: a fork dispatching through its
4622            // own kernel must hand the shared script clock to the snapshot so
4623            // patient holds in forked stages suspend the right timer.
4624            ec.watchdog = ctx.watchdog.clone();
4625        }
4626
4627        // 2. Execute via the full dispatch chain
4628        let result = self.execute_command(&cmd.name, &cmd.args).await?;
4629
4630        // 3. Sync self → ctx
4631        {
4632            let scope = self.scope.read().await;
4633            ctx.scope = scope.clone();
4634        }
4635        {
4636            let mut ec = self.exec_ctx.write().await;
4637            ctx.cwd = ec.cwd.clone();
4638            ctx.prev_cwd = ec.prev_cwd.clone();
4639            ctx.aliases = ec.aliases.clone();
4640            ctx.ignore_config = ec.ignore_config.clone();
4641            ctx.output_limit = ec.output_limit.clone();
4642            // Return any pipe endpoints that the tool didn't consume.
4643            // `take()` here keeps the fork's exec_ctx in a clean state for
4644            // the next dispatch — these are per-command and shouldn't leak
4645            // between calls.
4646            ctx.pipe_stdin = ec.pipe_stdin.take();
4647            ctx.pipe_stdout = ec.pipe_stdout.take();
4648        }
4649
4650        Ok(result)
4651    }
4652}
4653
4654#[async_trait]
4655impl CommandDispatcher for Kernel {
4656    /// Dispatch a command through the Kernel's full resolution chain.
4657    ///
4658    /// This is the single path for all command execution when called from
4659    /// the pipeline runner. It provides the full dispatch chain:
4660    /// user tools → builtins → .kai scripts → external commands → backend tools.
4661    async fn dispatch(&self, cmd: &Command, ctx: &mut ExecContext) -> Result<ExecResult> {
4662        self.dispatch_command(cmd, ctx).await
4663    }
4664
4665    /// Evaluate an expression through the kernel's async chain, including
4666    /// command substitution. Delegates to `eval_expr_async`, which snapshots
4667    /// the kernel's scope/cwd and restores them after any `$(...)` runs, so
4668    /// only command output escapes. The `ctx` is unused here because the
4669    /// kernel evaluates against its own session state (a fork carries the
4670    /// pipeline stage's snapshot); var refs resolve against that scope.
4671    async fn eval_expr(&self, expr: &Expr, _ctx: &ExecContext) -> Result<Value> {
4672        self.eval_expr_async(expr).await
4673    }
4674
4675    /// Produce a forked dispatcher with independent mutable state (detached).
4676    ///
4677    /// Calls the inherent `Kernel::fork` method (note the UFCS to avoid
4678    /// recursing into the trait method we're defining) and coerces the
4679    /// returned `Arc<Kernel>` to `Arc<dyn CommandDispatcher>`.
4680    async fn fork(&self) -> Arc<dyn CommandDispatcher> {
4681        let fork: Arc<Kernel> = Kernel::fork(self).await;
4682        fork
4683    }
4684
4685    /// Produce a forked dispatcher with cancellation cascading from this kernel.
4686    async fn fork_attached(&self) -> Arc<dyn CommandDispatcher> {
4687        let fork: Arc<Kernel> = Kernel::fork_attached(self).await;
4688        fork
4689    }
4690}
4691
4692/// Apply the requested output format to a builtin's result, unless the tool
4693/// owns its own output.
4694///
4695/// `format` is `ctx.output_format` (set from `--json`). When `owns_output` is
4696/// true the tool already rendered its bytes (bespoke JSON envelope), so the
4697/// kernel leaves the result untouched rather than re-formatting its
4698/// `OutputData`. Otherwise the kernel renders the typed `OutputData` uniformly.
4699fn finalize_output(
4700    result: ExecResult,
4701    format: Option<crate::interpreter::OutputFormat>,
4702    owns_output: bool,
4703) -> ExecResult {
4704    match format {
4705        Some(_) if owns_output => result,
4706        Some(format) => apply_output_format(result, format),
4707        None => result,
4708    }
4709}
4710
4711/// Accumulate output from one result into another.
4712///
4713/// Appends stdout and stderr verbatim and updates the exit code to match the
4714/// new result. Used to preserve output from multiple statements, loop
4715/// iterations, and command chains. No separator is inserted between outputs —
4716/// each command's output concatenates raw, matching bash (`printf a; printf b`
4717/// and `printf a && printf b` both yield `ab`; a trailing newline only appears
4718/// when a command emits its own, as `echo` does).
4719fn accumulate_result(accumulated: &mut ExecResult, new: &ExecResult) {
4720    // Materialize lazy OutputData into .out before accumulating.
4721    // Without this, the first command's output stays in .output while
4722    // the second's text gets appended to .out, losing the first.
4723    accumulated.materialize();
4724    accumulated.push_out(&new.text_out());
4725    accumulated.err.push_str(&new.err);
4726    accumulated.code = new.code;
4727    accumulated.data = new.data.clone();
4728    accumulated.did_spill = new.did_spill;
4729    accumulated.original_code = new.original_code;
4730    accumulated.content_type = new.content_type.clone();
4731    accumulated.baggage.clone_from(&new.baggage);
4732}
4733
4734/// Fold a loop's accumulated output into a break/continue signal that is
4735/// propagating to an *outer* loop. Output printed before `break N`/`continue N`
4736/// (with `N > 1`) would otherwise be discarded when the signal replaces the
4737/// loop's result on its way up. The loop's output comes first (it ran before
4738/// the signal was raised), then the signal's already-carried output.
4739fn fold_loop_output_into_flow(loop_output: ExecResult, flow: &mut ControlFlow) {
4740    if let ControlFlow::Break { result, .. } | ControlFlow::Continue { result, .. } = flow {
4741        let mut merged = loop_output;
4742        accumulate_result(&mut merged, result);
4743        *result = merged;
4744    }
4745}
4746
4747/// Accumulate the output a break/continue signal carried (from inner loops it
4748/// propagated through) into the loop that finally handles it, so it survives
4749/// into that loop's result.
4750fn accumulate_flow_output(accumulated: &mut ExecResult, flow: &ControlFlow) {
4751    if let ControlFlow::Break { result, .. } | ControlFlow::Continue { result, .. } = flow {
4752        accumulate_result(accumulated, result);
4753    }
4754}
4755
4756/// Check if a value is truthy.
4757fn is_truthy(value: &Value) -> bool {
4758    match value {
4759        Value::Null => false,
4760        Value::Bool(b) => *b,
4761        Value::Int(i) => *i != 0,
4762        Value::Float(f) => *f != 0.0,
4763        Value::String(s) => !s.is_empty(),
4764        Value::Json(json) => match json {
4765            serde_json::Value::Null => false,
4766            serde_json::Value::Array(arr) => !arr.is_empty(),
4767            serde_json::Value::Object(obj) => !obj.is_empty(),
4768            serde_json::Value::Bool(b) => *b,
4769            serde_json::Value::Number(n) => n.as_f64().map(|f| f != 0.0).unwrap_or(false),
4770            serde_json::Value::String(s) => !s.is_empty(),
4771        },
4772        Value::Blob(_) => true, // Blob references are always truthy
4773    }
4774}
4775
4776/// Apply tilde expansion to a value.
4777///
4778/// Only string values starting with `~` are expanded. `home` is the session
4779/// `HOME` from the kernel scope (the kernel is hermetic and never reads the
4780/// host env); `None` leaves `~`/`~/path` unexpanded. See [`expand_tilde`].
4781fn apply_tilde_expansion(value: Value, home: Option<&str>) -> Value {
4782    match value {
4783        Value::String(s) if s.starts_with('~') => Value::String(expand_tilde(&s, home)),
4784        _ => value,
4785    }
4786}
4787
4788/// Wait for a child to exit, killing it if `cancel` fires first.
4789///
4790/// `target` carries a Linux pidfd (when available) for race-free direct-child
4791/// kill; fall-through to PID-based kill otherwise. On non-unix targets the
4792/// parameter is ignored and we use tokio's cross-platform `start_kill`.
4793#[cfg(all(unix, feature = "subprocess"))]
4794pub(crate) async fn wait_or_kill(
4795    child: &mut tokio::process::Child,
4796    target: Option<&crate::pidfd::KillTarget>,
4797    cancel: &tokio_util::sync::CancellationToken,
4798    grace: Duration,
4799) -> std::io::Result<std::process::ExitStatus> {
4800    tokio::select! {
4801        biased;
4802        status = child.wait() => status,
4803        _ = cancel.cancelled() => kill_with_grace(child, target, grace).await,
4804    }
4805}
4806
4807#[cfg(all(not(unix), feature = "subprocess"))]
4808pub(crate) async fn wait_or_kill(
4809    child: &mut tokio::process::Child,
4810    _target: Option<&()>,
4811    cancel: &tokio_util::sync::CancellationToken,
4812    _grace: Duration,
4813) -> std::io::Result<std::process::ExitStatus> {
4814    tokio::select! {
4815        biased;
4816        status = child.wait() => status,
4817        _ = cancel.cancelled() => {
4818            let _ = child.start_kill();
4819            child.wait().await
4820        }
4821    }
4822}
4823
4824/// Send SIGTERM to the child and its process group; wait `grace`; then SIGKILL.
4825///
4826/// Direct-child kill goes through `target.signal()`, which on Linux uses a
4827/// pidfd (immune to PID reuse). Process-group kill uses `killpg` — there is
4828/// no PGID-equivalent of pidfd, so grandchildren retain a small reuse window.
4829#[cfg(all(unix, feature = "subprocess"))]
4830pub(crate) async fn kill_with_grace(
4831    child: &mut tokio::process::Child,
4832    target: Option<&crate::pidfd::KillTarget>,
4833    grace: Duration,
4834) -> std::io::Result<std::process::ExitStatus> {
4835    use nix::sys::signal::Signal;
4836
4837    if let Some(t) = target {
4838        t.signal(Signal::SIGTERM);
4839        t.signal_pg(Signal::SIGTERM);
4840        if grace > Duration::ZERO
4841            && let Ok(status) = tokio::time::timeout(grace, child.wait()).await
4842        {
4843            return status;
4844        }
4845        t.signal(Signal::SIGKILL);
4846        t.signal_pg(Signal::SIGKILL);
4847    }
4848    child.wait().await
4849}
4850
4851#[cfg(all(test, feature = "subprocess"))]
4852mod tests {
4853    use super::*;
4854
4855    #[tokio::test]
4856    async fn test_kernel_transient() {
4857        let kernel = Kernel::transient().expect("failed to create kernel");
4858        assert_eq!(kernel.name(), "transient");
4859    }
4860
4861    #[tokio::test]
4862    async fn test_kernel_execute_echo() {
4863        let kernel = Kernel::transient().expect("failed to create kernel");
4864        let result = kernel.execute("echo hello").await.expect("execution failed");
4865        assert!(result.ok());
4866        assert_eq!(result.text_out().trim(), "hello");
4867    }
4868
4869    #[tokio::test]
4870    async fn test_multiple_statements_accumulate_output() {
4871        let kernel = Kernel::transient().expect("failed to create kernel");
4872        let result = kernel
4873            .execute("echo one\necho two\necho three")
4874            .await
4875            .expect("execution failed");
4876        assert!(result.ok());
4877        // Should have all three outputs separated by newlines
4878        assert!(result.text_out().contains("one"), "missing 'one': {}", result.text_out());
4879        assert!(result.text_out().contains("two"), "missing 'two': {}", result.text_out());
4880        assert!(result.text_out().contains("three"), "missing 'three': {}", result.text_out());
4881    }
4882
4883    #[tokio::test]
4884    async fn test_and_chain_accumulates_output() {
4885        let kernel = Kernel::transient().expect("failed to create kernel");
4886        let result = kernel
4887            .execute("echo first && echo second")
4888            .await
4889            .expect("execution failed");
4890        assert!(result.ok());
4891        assert!(result.text_out().contains("first"), "missing 'first': {}", result.text_out());
4892        assert!(result.text_out().contains("second"), "missing 'second': {}", result.text_out());
4893    }
4894
4895    #[tokio::test]
4896    async fn test_for_loop_accumulates_output() {
4897        let kernel = Kernel::transient().expect("failed to create kernel");
4898        let result = kernel
4899            .execute(r#"for X in a b c; do echo "item: ${X}"; done"#)
4900            .await
4901            .expect("execution failed");
4902        assert!(result.ok());
4903        assert!(result.text_out().contains("item: a"), "missing 'item: a': {}", result.text_out());
4904        assert!(result.text_out().contains("item: b"), "missing 'item: b': {}", result.text_out());
4905        assert!(result.text_out().contains("item: c"), "missing 'item: c': {}", result.text_out());
4906    }
4907
4908    #[tokio::test]
4909    async fn test_while_loop_accumulates_output() {
4910        let kernel = Kernel::transient().expect("failed to create kernel");
4911        let result = kernel
4912            .execute(r#"
4913                N=3
4914                while [[ ${N} -gt 0 ]]; do
4915                    echo "N=${N}"
4916                    N=$((N - 1))
4917                done
4918            "#)
4919            .await
4920            .expect("execution failed");
4921        assert!(result.ok());
4922        assert!(result.text_out().contains("N=3"), "missing 'N=3': {}", result.text_out());
4923        assert!(result.text_out().contains("N=2"), "missing 'N=2': {}", result.text_out());
4924        assert!(result.text_out().contains("N=1"), "missing 'N=1': {}", result.text_out());
4925    }
4926
4927    #[tokio::test]
4928    async fn test_kernel_set_var() {
4929        let kernel = Kernel::transient().expect("failed to create kernel");
4930
4931        kernel.execute("X=42").await.expect("set failed");
4932
4933        let value = kernel.get_var("X").await;
4934        assert_eq!(value, Some(Value::Int(42)));
4935    }
4936
4937    #[tokio::test]
4938    async fn test_kernel_var_expansion() {
4939        let kernel = Kernel::transient().expect("failed to create kernel");
4940
4941        kernel.execute("NAME=\"world\"").await.expect("set failed");
4942        let result = kernel.execute("echo \"hello ${NAME}\"").await.expect("echo failed");
4943
4944        assert!(result.ok());
4945        assert_eq!(result.text_out().trim(), "hello world");
4946    }
4947
4948    #[tokio::test]
4949    async fn test_kernel_last_result() {
4950        let kernel = Kernel::transient().expect("failed to create kernel");
4951
4952        kernel.execute("echo test").await.expect("echo failed");
4953
4954        let last = kernel.last_result().await;
4955        assert!(last.ok());
4956        assert_eq!(last.text_out().trim(), "test");
4957    }
4958
4959    #[tokio::test]
4960    async fn test_kernel_tool_not_found() {
4961        let kernel = Kernel::transient().expect("failed to create kernel");
4962
4963        let result = kernel.execute("nonexistent_tool").await.expect("execution failed");
4964        assert!(!result.ok());
4965        assert_eq!(result.code, 127);
4966        assert!(result.err.contains("command not found"));
4967    }
4968
4969    #[tokio::test]
4970    async fn test_external_command_true() {
4971        // Use REPL config for passthrough filesystem access
4972        let kernel = Kernel::new(KernelConfig::repl()).expect("failed to create kernel");
4973
4974        // /bin/true should be available on any Unix system
4975        let result = kernel.execute("true").await.expect("execution failed");
4976        // This should use the builtin true, which returns 0
4977        assert!(result.ok(), "true should succeed: {:?}", result);
4978    }
4979
4980    #[tokio::test]
4981    async fn test_external_command_basic() {
4982        // Use REPL config for passthrough filesystem access
4983        let kernel = Kernel::new(KernelConfig::repl()).expect("failed to create kernel");
4984
4985        // Test with /bin/echo which is external
4986        // Note: kaish has a builtin echo, so this will use the builtin
4987        // Let's test with a command that's not a builtin
4988        // Actually, let's just test that PATH resolution works by checking the PATH var
4989        let path_var = std::env::var("PATH").unwrap_or_default();
4990        eprintln!("System PATH: {}", path_var);
4991
4992        // Set PATH in kernel to ensure it's available
4993        kernel.execute(&format!(r#"PATH="{}""#, path_var)).await.expect("set PATH failed");
4994
4995        // Now try an external command like /usr/bin/env
4996        // But env is also a builtin... let's try uname
4997        let result = kernel.execute("uname").await.expect("execution failed");
4998        eprintln!("uname result: {:?}", result);
4999        // uname should succeed if external commands work
5000        assert!(result.ok() || result.code == 127, "uname: {:?}", result);
5001    }
5002
5003    #[tokio::test]
5004    async fn test_kernel_reset() {
5005        let kernel = Kernel::transient().expect("failed to create kernel");
5006
5007        kernel.execute("X=1").await.expect("set failed");
5008        assert!(kernel.get_var("X").await.is_some());
5009
5010        kernel.reset().await.expect("reset failed");
5011        assert!(kernel.get_var("X").await.is_none());
5012    }
5013
5014    #[tokio::test]
5015    async fn test_kernel_cwd() {
5016        let kernel = Kernel::transient().expect("failed to create kernel");
5017
5018        // Transient kernel uses sandboxed mode with cwd=$HOME
5019        let cwd = kernel.cwd().await;
5020        let home = std::env::var("HOME")
5021            .map(PathBuf::from)
5022            .unwrap_or_else(|_| PathBuf::from("/"));
5023        assert_eq!(cwd, home);
5024
5025        kernel.set_cwd(PathBuf::from("/tmp")).await;
5026        assert_eq!(kernel.cwd().await, PathBuf::from("/tmp"));
5027    }
5028
5029    #[tokio::test]
5030    async fn test_kernel_list_vars() {
5031        let kernel = Kernel::transient().expect("failed to create kernel");
5032
5033        kernel.execute("A=1").await.ok();
5034        kernel.execute("B=2").await.ok();
5035
5036        let vars = kernel.list_vars().await;
5037        assert!(vars.iter().any(|(n, v)| n == "A" && *v == Value::Int(1)));
5038        assert!(vars.iter().any(|(n, v)| n == "B" && *v == Value::Int(2)));
5039    }
5040
5041    #[tokio::test]
5042    async fn test_is_truthy() {
5043        assert!(!is_truthy(&Value::Null));
5044        assert!(!is_truthy(&Value::Bool(false)));
5045        assert!(is_truthy(&Value::Bool(true)));
5046        assert!(!is_truthy(&Value::Int(0)));
5047        assert!(is_truthy(&Value::Int(1)));
5048        assert!(!is_truthy(&Value::String("".into())));
5049        assert!(is_truthy(&Value::String("x".into())));
5050    }
5051
5052    #[tokio::test]
5053    async fn test_jq_in_pipeline() {
5054        let kernel = Kernel::transient().expect("failed to create kernel");
5055        // kaish uses double quotes only; escape inner quotes
5056        let result = kernel
5057            .execute(r#"echo "{\"name\": \"Alice\"}" | jq ".name" -r"#)
5058            .await
5059            .expect("execution failed");
5060        assert!(result.ok(), "jq pipeline failed: {}", result.err);
5061        assert_eq!(result.text_out().trim(), "Alice");
5062    }
5063
5064    #[tokio::test]
5065    async fn test_user_defined_tool() {
5066        let kernel = Kernel::transient().expect("failed to create kernel");
5067
5068        // Define a function
5069        kernel
5070            .execute(r#"greet() { echo "Hello, $1!" }"#)
5071            .await
5072            .expect("function definition failed");
5073
5074        // Call the function
5075        let result = kernel
5076            .execute(r#"greet "World""#)
5077            .await
5078            .expect("function call failed");
5079
5080        assert!(result.ok(), "greet failed: {}", result.err);
5081        assert_eq!(result.text_out().trim(), "Hello, World!");
5082    }
5083
5084    #[tokio::test]
5085    async fn test_user_tool_positional_args() {
5086        let kernel = Kernel::transient().expect("failed to create kernel");
5087
5088        // Define a function with positional param
5089        kernel
5090            .execute(r#"greet() { echo "Hi $1" }"#)
5091            .await
5092            .expect("function definition failed");
5093
5094        // Call with positional argument
5095        let result = kernel
5096            .execute(r#"greet "Amy""#)
5097            .await
5098            .expect("function call failed");
5099
5100        assert!(result.ok(), "greet failed: {}", result.err);
5101        assert_eq!(result.text_out().trim(), "Hi Amy");
5102    }
5103
5104    #[tokio::test]
5105    async fn test_function_shared_scope() {
5106        let kernel = Kernel::transient().expect("failed to create kernel");
5107
5108        // Set a variable in parent scope
5109        kernel
5110            .execute(r#"SECRET="hidden""#)
5111            .await
5112            .expect("set failed");
5113
5114        // Define a function that accesses and modifies parent variable
5115        kernel
5116            .execute(r#"access_parent() {
5117                echo "${SECRET}"
5118                SECRET="modified"
5119            }"#)
5120            .await
5121            .expect("function definition failed");
5122
5123        // Call the function - it SHOULD see SECRET (shared scope like sh)
5124        let result = kernel.execute("access_parent").await.expect("function call failed");
5125
5126        // Function should have access to parent scope
5127        assert!(
5128            result.text_out().contains("hidden"),
5129            "Function should access parent scope, got: {}",
5130            result.text_out()
5131        );
5132
5133        // Function should have modified the parent variable
5134        let secret = kernel.get_var("SECRET").await;
5135        assert_eq!(
5136            secret,
5137            Some(Value::String("modified".into())),
5138            "Function should modify parent scope"
5139        );
5140    }
5141
5142    #[tokio::test]
5143    #[ignore = "exec replaces the test binary via CommandExt::exec, hangs libtest; cannot be run under cargo test"]
5144    async fn test_exec_builtin() {
5145        let kernel = Kernel::transient().expect("failed to create kernel");
5146        // argv is now a space-separated string or JSON array string
5147        let result = kernel
5148            .execute(r#"exec command="/bin/echo" argv="hello world""#)
5149            .await
5150            .expect("exec failed");
5151
5152        assert!(result.ok(), "exec failed: {}", result.err);
5153        assert_eq!(result.text_out().trim(), "hello world");
5154    }
5155
5156    #[tokio::test]
5157    async fn test_while_false_never_runs() {
5158        let kernel = Kernel::transient().expect("failed to create kernel");
5159
5160        // A while loop with false condition should never run
5161        let result = kernel
5162            .execute(r#"
5163                while false; do
5164                    echo "should not run"
5165                done
5166            "#)
5167            .await
5168            .expect("while false failed");
5169
5170        assert!(result.ok());
5171        assert!(result.text_out().is_empty(), "while false should not execute body: {}", result.text_out());
5172    }
5173
5174    #[tokio::test]
5175    async fn test_while_string_comparison() {
5176        let kernel = Kernel::transient().expect("failed to create kernel");
5177
5178        // Set a flag
5179        kernel.execute(r#"FLAG="go""#).await.expect("set failed");
5180
5181        // Use string comparison as condition (shell-compatible [[ ]] syntax)
5182        // Note: Put echo last so we can check the output
5183        let result = kernel
5184            .execute(r#"
5185                while [[ ${FLAG} == "go" ]]; do
5186                    FLAG="stop"
5187                    echo "running"
5188                done
5189            "#)
5190            .await
5191            .expect("while with string cmp failed");
5192
5193        assert!(result.ok());
5194        assert!(result.text_out().contains("running"), "should have run once: {}", result.text_out());
5195
5196        // Verify flag was changed
5197        let flag = kernel.get_var("FLAG").await;
5198        assert_eq!(flag, Some(Value::String("stop".into())));
5199    }
5200
5201    #[tokio::test]
5202    async fn test_while_numeric_comparison() {
5203        let kernel = Kernel::transient().expect("failed to create kernel");
5204
5205        // Test > comparison (shell-compatible [[ ]] with -gt)
5206        kernel.execute("N=5").await.expect("set failed");
5207
5208        // Note: Put echo last so we can check the output
5209        let result = kernel
5210            .execute(r#"
5211                while [[ ${N} -gt 3 ]]; do
5212                    N=3
5213                    echo "N was greater"
5214                done
5215            "#)
5216            .await
5217            .expect("while with > failed");
5218
5219        assert!(result.ok());
5220        assert!(result.text_out().contains("N was greater"), "should have run once: {}", result.text_out());
5221    }
5222
5223    #[tokio::test]
5224    async fn test_break_in_while_loop() {
5225        let kernel = Kernel::transient().expect("failed to create kernel");
5226
5227        let result = kernel
5228            .execute(r#"
5229                I=0
5230                while true; do
5231                    I=1
5232                    echo "before break"
5233                    break
5234                    echo "after break"
5235                done
5236            "#)
5237            .await
5238            .expect("while with break failed");
5239
5240        assert!(result.ok());
5241        assert!(result.text_out().contains("before break"), "should see before break: {}", result.text_out());
5242        assert!(!result.text_out().contains("after break"), "should not see after break: {}", result.text_out());
5243
5244        // Verify we exited the loop
5245        let i = kernel.get_var("I").await;
5246        assert_eq!(i, Some(Value::Int(1)));
5247    }
5248
5249    #[tokio::test]
5250    async fn test_continue_in_while_loop() {
5251        let kernel = Kernel::transient().expect("failed to create kernel");
5252
5253        // Test continue in a while loop where variables persist
5254        // We use string state transition: "start" -> "middle" -> "end"
5255        // continue on "middle" should skip to next iteration
5256        // Shell-compatible: use [[ ]] for comparisons
5257        let result = kernel
5258            .execute(r#"
5259                STATE="start"
5260                AFTER_CONTINUE="no"
5261                while [[ ${STATE} != "done" ]]; do
5262                    if [[ ${STATE} == "start" ]]; then
5263                        STATE="middle"
5264                        continue
5265                        AFTER_CONTINUE="yes"
5266                    fi
5267                    if [[ ${STATE} == "middle" ]]; then
5268                        STATE="done"
5269                    fi
5270                done
5271            "#)
5272            .await
5273            .expect("while with continue failed");
5274
5275        assert!(result.ok());
5276
5277        // STATE should be "done" (we completed the loop)
5278        let state = kernel.get_var("STATE").await;
5279        assert_eq!(state, Some(Value::String("done".into())));
5280
5281        // AFTER_CONTINUE should still be "no" (continue skipped the assignment)
5282        let after = kernel.get_var("AFTER_CONTINUE").await;
5283        assert_eq!(after, Some(Value::String("no".into())));
5284    }
5285
5286    #[tokio::test]
5287    async fn test_break_with_level() {
5288        let kernel = Kernel::transient().expect("failed to create kernel");
5289
5290        // Nested loop with break 2 to exit both loops
5291        // We verify by checking OUTER value:
5292        // - If break 2 works, OUTER stays at 1 (set before for loop)
5293        // - If break 2 fails, OUTER becomes 2 (set after for loop)
5294        let result = kernel
5295            .execute(r#"
5296                OUTER=0
5297                while true; do
5298                    OUTER=1
5299                    for X in "1 2"; do
5300                        break 2
5301                    done
5302                    OUTER=2
5303                done
5304            "#)
5305            .await
5306            .expect("nested break failed");
5307
5308        assert!(result.ok());
5309
5310        // OUTER should be 1 (set before for loop), not 2 (would be set after for loop)
5311        let outer = kernel.get_var("OUTER").await;
5312        assert_eq!(outer, Some(Value::Int(1)), "break 2 should have skipped OUTER=2");
5313    }
5314
5315    #[tokio::test]
5316    async fn test_return_from_tool() {
5317        let kernel = Kernel::transient().expect("failed to create kernel");
5318
5319        // Define a function that returns early
5320        kernel
5321            .execute(r#"early_return() {
5322                if [[ $1 == 1 ]]; then
5323                    return 42
5324                fi
5325                echo "not returned"
5326            }"#)
5327            .await
5328            .expect("function definition failed");
5329
5330        // Call with arg=1 should return with exit code 42
5331        // (POSIX shell behavior: return N sets exit code, doesn't output N)
5332        let result = kernel
5333            .execute("early_return 1")
5334            .await
5335            .expect("function call failed");
5336
5337        // Exit code should be 42 (non-zero, so not ok())
5338        assert_eq!(result.code, 42);
5339        // Output should be empty (we returned before echo)
5340        assert!(result.text_out().is_empty());
5341    }
5342
5343    #[tokio::test]
5344    async fn test_return_without_value() {
5345        let kernel = Kernel::transient().expect("failed to create kernel");
5346
5347        // Define a function that returns without a value
5348        kernel
5349            .execute(r#"early_exit() {
5350                if [[ $1 == "stop" ]]; then
5351                    return
5352                fi
5353                echo "continued"
5354            }"#)
5355            .await
5356            .expect("function definition failed");
5357
5358        // Call with arg="stop" should return early
5359        let result = kernel
5360            .execute(r#"early_exit "stop""#)
5361            .await
5362            .expect("function call failed");
5363
5364        assert!(result.ok());
5365        assert!(result.text_out().is_empty() || result.text_out().trim().is_empty());
5366    }
5367
5368    #[tokio::test]
5369    async fn test_exit_stops_execution() {
5370        let kernel = Kernel::transient().expect("failed to create kernel");
5371
5372        // exit should stop further execution
5373        kernel
5374            .execute(r#"
5375                BEFORE="yes"
5376                exit 0
5377                AFTER="yes"
5378            "#)
5379            .await
5380            .expect("execution failed");
5381
5382        // BEFORE should be set, AFTER should not
5383        let before = kernel.get_var("BEFORE").await;
5384        assert_eq!(before, Some(Value::String("yes".into())));
5385
5386        let after = kernel.get_var("AFTER").await;
5387        assert!(after.is_none(), "AFTER should not be set after exit");
5388    }
5389
5390    #[tokio::test]
5391    async fn test_exit_with_code() {
5392        let kernel = Kernel::transient().expect("failed to create kernel");
5393
5394        // exit with code should propagate the exit code
5395        let result = kernel
5396            .execute("exit 42")
5397            .await
5398            .expect("exit failed");
5399
5400        assert_eq!(result.code, 42);
5401        assert!(result.text_out().is_empty(), "exit should not produce stdout");
5402    }
5403
5404    #[tokio::test]
5405    async fn test_set_e_stops_on_failure() {
5406        let kernel = Kernel::transient().expect("failed to create kernel");
5407
5408        // Enable error-exit mode
5409        kernel.execute("set -e").await.expect("set -e failed");
5410
5411        // Run a sequence where the middle command fails
5412        kernel
5413            .execute(r#"
5414                STEP1="done"
5415                false
5416                STEP2="done"
5417            "#)
5418            .await
5419            .expect("execution failed");
5420
5421        // STEP1 should be set, but STEP2 should NOT be set (exit on false)
5422        let step1 = kernel.get_var("STEP1").await;
5423        assert_eq!(step1, Some(Value::String("done".into())));
5424
5425        let step2 = kernel.get_var("STEP2").await;
5426        assert!(step2.is_none(), "STEP2 should not be set after false with set -e");
5427    }
5428
5429    #[tokio::test]
5430    async fn test_set_plus_e_disables_error_exit() {
5431        let kernel = Kernel::transient().expect("failed to create kernel");
5432
5433        // Enable then disable error-exit mode
5434        kernel.execute("set -e").await.expect("set -e failed");
5435        kernel.execute("set +e").await.expect("set +e failed");
5436
5437        // Now failure should NOT stop execution
5438        kernel
5439            .execute(r#"
5440                STEP1="done"
5441                false
5442                STEP2="done"
5443            "#)
5444            .await
5445            .expect("execution failed");
5446
5447        // Both should be set since +e disables error exit
5448        let step1 = kernel.get_var("STEP1").await;
5449        assert_eq!(step1, Some(Value::String("done".into())));
5450
5451        let step2 = kernel.get_var("STEP2").await;
5452        assert_eq!(step2, Some(Value::String("done".into())));
5453    }
5454
5455    #[tokio::test]
5456    async fn test_set_ignores_unknown_options() {
5457        let kernel = Kernel::transient().expect("failed to create kernel");
5458
5459        // Bash idiom: set -euo pipefail (we support -e, ignore the rest)
5460        let result = kernel
5461            .execute("set -e -u -o pipefail")
5462            .await
5463            .expect("set with unknown options failed");
5464
5465        assert!(result.ok(), "set should succeed with unknown options");
5466
5467        // -e should still be enabled
5468        kernel
5469            .execute(r#"
5470                BEFORE="yes"
5471                false
5472                AFTER="yes"
5473            "#)
5474            .await
5475            .ok();
5476
5477        let after = kernel.get_var("AFTER").await;
5478        assert!(after.is_none(), "-e should be enabled despite unknown options");
5479    }
5480
5481    #[tokio::test]
5482    async fn test_set_no_args_shows_settings() {
5483        let kernel = Kernel::transient().expect("failed to create kernel");
5484
5485        // Enable -e
5486        kernel.execute("set -e").await.expect("set -e failed");
5487
5488        // Call set with no args to see settings
5489        let result = kernel.execute("set").await.expect("set failed");
5490
5491        assert!(result.ok());
5492        assert!(result.text_out().contains("set -e"), "should show -e is enabled: {}", result.text_out());
5493    }
5494
5495    #[tokio::test]
5496    async fn test_set_e_in_pipeline() {
5497        let kernel = Kernel::transient().expect("failed to create kernel");
5498
5499        kernel.execute("set -e").await.expect("set -e failed");
5500
5501        // Pipeline failure should trigger exit
5502        kernel
5503            .execute(r#"
5504                BEFORE="yes"
5505                false | cat
5506                AFTER="yes"
5507            "#)
5508            .await
5509            .ok();
5510
5511        let before = kernel.get_var("BEFORE").await;
5512        assert_eq!(before, Some(Value::String("yes".into())));
5513
5514        // AFTER should not be set if pipeline failure triggers exit
5515        // Note: The exit code of a pipeline is the exit code of the last command
5516        // So `false | cat` returns 0 (cat succeeds). This is bash-compatible behavior.
5517        // To test pipeline failure, we need the last command to fail.
5518    }
5519
5520    #[tokio::test]
5521    async fn test_set_e_with_and_chain() {
5522        let kernel = Kernel::transient().expect("failed to create kernel");
5523
5524        kernel.execute("set -e").await.expect("set -e failed");
5525
5526        // Commands in && chain should not trigger -e on the first failure
5527        // because && explicitly handles the error
5528        kernel
5529            .execute(r#"
5530                RESULT="initial"
5531                false && RESULT="chained"
5532                RESULT="continued"
5533            "#)
5534            .await
5535            .ok();
5536
5537        // In bash, commands in && don't trigger -e. The chain handles the failure.
5538        // Our implementation may differ - let's verify current behavior.
5539        let result = kernel.get_var("RESULT").await;
5540        // If we follow bash semantics, RESULT should be "continued"
5541        // If we trigger -e on the false, RESULT stays "initial"
5542        assert!(result.is_some(), "RESULT should be set");
5543    }
5544
5545    #[tokio::test]
5546    async fn test_set_e_exits_in_for_loop() {
5547        let kernel = Kernel::transient().expect("failed to create kernel");
5548
5549        kernel.execute("set -e").await.expect("set -e failed");
5550
5551        kernel
5552            .execute(r#"
5553                REACHED="no"
5554                for x in 1 2 3; do
5555                    false
5556                    REACHED="yes"
5557                done
5558            "#)
5559            .await
5560            .ok();
5561
5562        // With set -e, false should trigger exit; REACHED should remain "no"
5563        let reached = kernel.get_var("REACHED").await;
5564        assert_eq!(reached, Some(Value::String("no".into())),
5565            "set -e should exit on failure in for loop body");
5566    }
5567
5568    #[tokio::test]
5569    async fn test_for_loop_continues_without_set_e() {
5570        let kernel = Kernel::transient().expect("failed to create kernel");
5571
5572        // Without set -e, for loop should continue normally
5573        kernel
5574            .execute(r#"
5575                COUNT=0
5576                for x in 1 2 3; do
5577                    false
5578                    COUNT=$((COUNT + 1))
5579                done
5580            "#)
5581            .await
5582            .ok();
5583
5584        let count = kernel.get_var("COUNT").await;
5585        // Arithmetic produces Int values; accept either Int or String representation
5586        let count_val = match &count {
5587            Some(Value::Int(n)) => *n,
5588            Some(Value::String(s)) => s.parse().unwrap_or(-1),
5589            _ => -1,
5590        };
5591        assert_eq!(count_val, 3,
5592            "without set -e, loop should complete all iterations (got {:?})", count);
5593    }
5594
5595    // ═══════════════════════════════════════════════════════════════════════════
5596    // Source Tests
5597    // ═══════════════════════════════════════════════════════════════════════════
5598
5599    #[tokio::test]
5600    async fn test_source_sets_variables() {
5601        let kernel = Kernel::transient().expect("failed to create kernel");
5602
5603        // Write a script to the VFS
5604        kernel
5605            .execute(r#"write "/test.kai" 'FOO="bar"'"#)
5606            .await
5607            .expect("write failed");
5608
5609        // Source the script
5610        let result = kernel
5611            .execute(r#"source "/test.kai""#)
5612            .await
5613            .expect("source failed");
5614
5615        assert!(result.ok(), "source should succeed");
5616
5617        // Variable should be set in current scope
5618        let foo = kernel.get_var("FOO").await;
5619        assert_eq!(foo, Some(Value::String("bar".into())));
5620    }
5621
5622    #[tokio::test]
5623    async fn test_source_with_dot_alias() {
5624        let kernel = Kernel::transient().expect("failed to create kernel");
5625
5626        // Write a script to the VFS
5627        kernel
5628            .execute(r#"write "/vars.kai" 'X=42'"#)
5629            .await
5630            .expect("write failed");
5631
5632        // Source using . alias
5633        let result = kernel
5634            .execute(r#". "/vars.kai""#)
5635            .await
5636            .expect(". failed");
5637
5638        assert!(result.ok(), ". should succeed");
5639
5640        // Variable should be set in current scope
5641        let x = kernel.get_var("X").await;
5642        assert_eq!(x, Some(Value::Int(42)));
5643    }
5644
5645    #[tokio::test]
5646    async fn test_source_not_found() {
5647        let kernel = Kernel::transient().expect("failed to create kernel");
5648
5649        // Try to source a non-existent file
5650        let result = kernel
5651            .execute(r#"source "/nonexistent.kai""#)
5652            .await
5653            .expect("source should not fail with error");
5654
5655        assert!(!result.ok(), "source of non-existent file should fail");
5656        assert!(result.err.contains("nonexistent.kai"), "error should mention filename");
5657    }
5658
5659    #[tokio::test]
5660    async fn test_source_missing_filename() {
5661        let kernel = Kernel::transient().expect("failed to create kernel");
5662
5663        // Call source with no arguments
5664        let result = kernel
5665            .execute("source")
5666            .await
5667            .expect("source should not fail with error");
5668
5669        assert!(!result.ok(), "source without filename should fail");
5670        assert!(result.err.contains("missing filename"), "error should mention missing filename");
5671    }
5672
5673    #[tokio::test]
5674    async fn test_source_executes_multiple_statements() {
5675        let kernel = Kernel::transient().expect("failed to create kernel");
5676
5677        // Write a script with multiple statements
5678        kernel
5679            .execute(r#"write "/multi.kai" 'A=1
5680B=2
5681C=3'"#)
5682            .await
5683            .expect("write failed");
5684
5685        // Source it
5686        kernel
5687            .execute(r#"source "/multi.kai""#)
5688            .await
5689            .expect("source failed");
5690
5691        // All variables should be set
5692        assert_eq!(kernel.get_var("A").await, Some(Value::Int(1)));
5693        assert_eq!(kernel.get_var("B").await, Some(Value::Int(2)));
5694        assert_eq!(kernel.get_var("C").await, Some(Value::Int(3)));
5695    }
5696
5697    #[tokio::test]
5698    async fn test_source_can_define_functions() {
5699        let kernel = Kernel::transient().expect("failed to create kernel");
5700
5701        // Write a script that defines a function
5702        kernel
5703            .execute(r#"write "/functions.kai" 'greet() {
5704    echo "Hello, $1!"
5705}'"#)
5706            .await
5707            .expect("write failed");
5708
5709        // Source it
5710        kernel
5711            .execute(r#"source "/functions.kai""#)
5712            .await
5713            .expect("source failed");
5714
5715        // Use the defined function
5716        let result = kernel
5717            .execute(r#"greet "World""#)
5718            .await
5719            .expect("greet failed");
5720
5721        assert!(result.ok());
5722        assert!(result.text_out().contains("Hello, World!"));
5723    }
5724
5725    #[tokio::test]
5726    async fn test_source_inherits_error_exit() {
5727        let kernel = Kernel::transient().expect("failed to create kernel");
5728
5729        // Enable error exit
5730        kernel.execute("set -e").await.expect("set -e failed");
5731
5732        // Write a script that has a failure
5733        kernel
5734            .execute(r#"write "/fail.kai" 'BEFORE="yes"
5735false
5736AFTER="yes"'"#)
5737            .await
5738            .expect("write failed");
5739
5740        // Source it (should exit on false due to set -e)
5741        kernel
5742            .execute(r#"source "/fail.kai""#)
5743            .await
5744            .ok();
5745
5746        // BEFORE should be set, AFTER should NOT be set due to error exit
5747        let before = kernel.get_var("BEFORE").await;
5748        assert_eq!(before, Some(Value::String("yes".into())));
5749
5750        // Note: This test depends on whether error exit is checked within source
5751        // Currently our implementation checks per-statement in the main kernel
5752    }
5753
5754    // ═══════════════════════════════════════════════════════════════════════════
5755    // set -e with && / || chains
5756    // ═══════════════════════════════════════════════════════════════════════════
5757
5758    #[tokio::test]
5759    async fn test_set_e_and_chain_left_fails() {
5760        // set -e; false && echo hi; REACHED=1 → REACHED should be set
5761        let kernel = Kernel::transient().expect("failed to create kernel");
5762        kernel.execute("set -e").await.expect("set -e failed");
5763
5764        kernel
5765            .execute("false && echo hi; REACHED=1")
5766            .await
5767            .expect("execution failed");
5768
5769        let reached = kernel.get_var("REACHED").await;
5770        assert_eq!(
5771            reached,
5772            Some(Value::Int(1)),
5773            "set -e should not trigger on left side of &&"
5774        );
5775    }
5776
5777    #[tokio::test]
5778    async fn test_set_e_and_chain_right_fails() {
5779        // set -e; true && false; REACHED=1 → REACHED should NOT be set
5780        let kernel = Kernel::transient().expect("failed to create kernel");
5781        kernel.execute("set -e").await.expect("set -e failed");
5782
5783        kernel
5784            .execute("true && false; REACHED=1")
5785            .await
5786            .expect("execution failed");
5787
5788        let reached = kernel.get_var("REACHED").await;
5789        assert!(
5790            reached.is_none(),
5791            "set -e should trigger when right side of && fails"
5792        );
5793    }
5794
5795    #[tokio::test]
5796    async fn test_set_e_or_chain_recovers() {
5797        // set -e; false || echo recovered; REACHED=1 → REACHED should be set
5798        let kernel = Kernel::transient().expect("failed to create kernel");
5799        kernel.execute("set -e").await.expect("set -e failed");
5800
5801        kernel
5802            .execute("false || echo recovered; REACHED=1")
5803            .await
5804            .expect("execution failed");
5805
5806        let reached = kernel.get_var("REACHED").await;
5807        assert_eq!(
5808            reached,
5809            Some(Value::Int(1)),
5810            "set -e should not trigger when || recovers the failure"
5811        );
5812    }
5813
5814    #[tokio::test]
5815    async fn test_set_e_or_chain_both_fail() {
5816        // set -e; false || false; REACHED=1 → REACHED should NOT be set
5817        let kernel = Kernel::transient().expect("failed to create kernel");
5818        kernel.execute("set -e").await.expect("set -e failed");
5819
5820        kernel
5821            .execute("false || false; REACHED=1")
5822            .await
5823            .expect("execution failed");
5824
5825        let reached = kernel.get_var("REACHED").await;
5826        assert!(
5827            reached.is_none(),
5828            "set -e should trigger when || chain ultimately fails"
5829        );
5830    }
5831
5832    // ═══════════════════════════════════════════════════════════════════════════
5833    // Cancellation Tests
5834    // ═══════════════════════════════════════════════════════════════════════════
5835
5836    /// Helper: schedule a cancel after a delay from a background thread.
5837    /// Uses std::thread because cancel() is sync and Kernel is not Send.
5838    fn schedule_cancel(kernel: &Arc<Kernel>, delay: std::time::Duration) {
5839        let k = Arc::clone(kernel);
5840        std::thread::spawn(move || {
5841            std::thread::sleep(delay);
5842            k.cancel();
5843        });
5844    }
5845
5846    #[tokio::test]
5847    async fn test_cancel_interrupts_for_loop() {
5848        let kernel = Arc::new(Kernel::transient().expect("failed to create kernel"));
5849
5850        // Schedule cancel after a short delay from a background OS thread
5851        schedule_cancel(&kernel, std::time::Duration::from_millis(10));
5852
5853        let result = kernel
5854            .execute("for i in $(seq 1 100000); do X=$i; done")
5855            .await
5856            .expect("execute failed");
5857
5858        assert_eq!(result.code, 130, "cancelled execution should exit with code 130");
5859
5860        // The loop variable should be set to something < 100000
5861        let x = kernel.get_var("X").await;
5862        if let Some(Value::Int(n)) = x {
5863            assert!(n < 100000, "loop should have been interrupted before finishing, got X={n}");
5864        }
5865    }
5866
5867    #[tokio::test]
5868    async fn test_cancel_interrupts_while_loop() {
5869        let kernel = Arc::new(Kernel::transient().expect("failed to create kernel"));
5870        kernel.execute("COUNT=0").await.expect("init failed");
5871
5872        schedule_cancel(&kernel, std::time::Duration::from_millis(10));
5873
5874        let result = kernel
5875            .execute("while true; do COUNT=$((COUNT + 1)); done")
5876            .await
5877            .expect("execute failed");
5878
5879        assert_eq!(result.code, 130);
5880
5881        let count = kernel.get_var("COUNT").await;
5882        if let Some(Value::Int(n)) = count {
5883            assert!(n > 0, "loop should have run at least once");
5884        }
5885    }
5886
5887    #[tokio::test]
5888    async fn test_reset_after_cancel() {
5889        // After cancellation, the next execute() should work normally
5890        let kernel = Kernel::transient().expect("failed to create kernel");
5891        kernel.cancel(); // cancel with nothing running
5892
5893        let result = kernel.execute("echo hello").await.expect("execute failed");
5894        assert!(result.ok(), "execute after cancel should succeed");
5895        assert_eq!(result.text_out().trim(), "hello");
5896    }
5897
5898    #[tokio::test]
5899    async fn test_cancel_interrupts_statement_sequence() {
5900        let kernel = Arc::new(Kernel::transient().expect("failed to create kernel"));
5901
5902        // Schedule cancel after the first statement runs but before sleep finishes
5903        schedule_cancel(&kernel, std::time::Duration::from_millis(50));
5904
5905        let result = kernel
5906            .execute("STEP=1; sleep 5; STEP=2; sleep 5; STEP=3")
5907            .await
5908            .expect("execute failed");
5909
5910        assert_eq!(result.code, 130);
5911
5912        // STEP should be 1 (set before sleep), not 2 or 3
5913        let step = kernel.get_var("STEP").await;
5914        assert_eq!(step, Some(Value::Int(1)), "cancel should stop before STEP=2");
5915    }
5916
5917    // ═══════════════════════════════════════════════════════════════════════════
5918    // Case Statement Tests
5919    // ═══════════════════════════════════════════════════════════════════════════
5920
5921    #[tokio::test]
5922    async fn test_case_simple_match() {
5923        let kernel = Kernel::transient().expect("failed to create kernel");
5924
5925        let result = kernel
5926            .execute(r#"
5927                case "hello" in
5928                    hello) echo "matched hello" ;;
5929                    world) echo "matched world" ;;
5930                esac
5931            "#)
5932            .await
5933            .expect("case failed");
5934
5935        assert!(result.ok());
5936        assert_eq!(result.text_out().trim(), "matched hello");
5937    }
5938
5939    #[tokio::test]
5940    async fn test_case_wildcard_match() {
5941        let kernel = Kernel::transient().expect("failed to create kernel");
5942
5943        let result = kernel
5944            .execute(r#"
5945                case "main.rs" in
5946                    *.py) echo "Python" ;;
5947                    *.rs) echo "Rust" ;;
5948                    *) echo "Unknown" ;;
5949                esac
5950            "#)
5951            .await
5952            .expect("case failed");
5953
5954        assert!(result.ok());
5955        assert_eq!(result.text_out().trim(), "Rust");
5956    }
5957
5958    #[tokio::test]
5959    async fn test_case_default_match() {
5960        let kernel = Kernel::transient().expect("failed to create kernel");
5961
5962        let result = kernel
5963            .execute(r#"
5964                case "unknown.xyz" in
5965                    *.py) echo "Python" ;;
5966                    *.rs) echo "Rust" ;;
5967                    *) echo "Default" ;;
5968                esac
5969            "#)
5970            .await
5971            .expect("case failed");
5972
5973        assert!(result.ok());
5974        assert_eq!(result.text_out().trim(), "Default");
5975    }
5976
5977    #[tokio::test]
5978    async fn test_case_no_match() {
5979        let kernel = Kernel::transient().expect("failed to create kernel");
5980
5981        // Case with no default branch and no match
5982        let result = kernel
5983            .execute(r#"
5984                case "nope" in
5985                    "yes") echo "yes" ;;
5986                    "no") echo "no" ;;
5987                esac
5988            "#)
5989            .await
5990            .expect("case failed");
5991
5992        assert!(result.ok());
5993        assert!(result.text_out().is_empty(), "no match should produce empty output");
5994    }
5995
5996    #[tokio::test]
5997    async fn test_case_with_variable() {
5998        let kernel = Kernel::transient().expect("failed to create kernel");
5999
6000        kernel.execute(r#"LANG="rust""#).await.expect("set failed");
6001
6002        let result = kernel
6003            .execute(r#"
6004                case ${LANG} in
6005                    python) echo "snake" ;;
6006                    rust) echo "crab" ;;
6007                    go) echo "gopher" ;;
6008                esac
6009            "#)
6010            .await
6011            .expect("case failed");
6012
6013        assert!(result.ok());
6014        assert_eq!(result.text_out().trim(), "crab");
6015    }
6016
6017    #[tokio::test]
6018    async fn test_case_multiple_patterns() {
6019        let kernel = Kernel::transient().expect("failed to create kernel");
6020
6021        let result = kernel
6022            .execute(r#"
6023                case "yes" in
6024                    "y"|"yes"|"Y"|"YES") echo "affirmative" ;;
6025                    "n"|"no"|"N"|"NO") echo "negative" ;;
6026                esac
6027            "#)
6028            .await
6029            .expect("case failed");
6030
6031        assert!(result.ok());
6032        assert_eq!(result.text_out().trim(), "affirmative");
6033    }
6034
6035    #[tokio::test]
6036    async fn test_case_glob_question_mark() {
6037        let kernel = Kernel::transient().expect("failed to create kernel");
6038
6039        let result = kernel
6040            .execute(r#"
6041                case "test1" in
6042                    test?) echo "matched test?" ;;
6043                    *) echo "default" ;;
6044                esac
6045            "#)
6046            .await
6047            .expect("case failed");
6048
6049        assert!(result.ok());
6050        assert_eq!(result.text_out().trim(), "matched test?");
6051    }
6052
6053    #[tokio::test]
6054    async fn test_case_char_class() {
6055        let kernel = Kernel::transient().expect("failed to create kernel");
6056
6057        let result = kernel
6058            .execute(r#"
6059                case "Yes" in
6060                    [Yy]*) echo "yes-like" ;;
6061                    [Nn]*) echo "no-like" ;;
6062                esac
6063            "#)
6064            .await
6065            .expect("case failed");
6066
6067        assert!(result.ok());
6068        assert_eq!(result.text_out().trim(), "yes-like");
6069    }
6070
6071    // ═══════════════════════════════════════════════════════════════════════════
6072    // Cat Stdin Tests
6073    // ═══════════════════════════════════════════════════════════════════════════
6074
6075    #[tokio::test]
6076    async fn test_cat_from_pipeline() {
6077        let kernel = Kernel::transient().expect("failed to create kernel");
6078
6079        let result = kernel
6080            .execute(r#"echo "piped text" | cat"#)
6081            .await
6082            .expect("cat pipeline failed");
6083
6084        assert!(result.ok(), "cat failed: {}", result.err);
6085        assert_eq!(result.text_out().trim(), "piped text");
6086    }
6087
6088    #[tokio::test]
6089    async fn test_cat_from_pipeline_multiline() {
6090        let kernel = Kernel::transient().expect("failed to create kernel");
6091
6092        let result = kernel
6093            .execute(r#"echo "line1\nline2" | cat -n"#)
6094            .await
6095            .expect("cat pipeline failed");
6096
6097        assert!(result.ok(), "cat failed: {}", result.err);
6098        assert!(result.text_out().contains("1\t"), "output: {}", result.text_out());
6099    }
6100
6101    // ═══════════════════════════════════════════════════════════════════════════
6102    // Heredoc Tests
6103    // ═══════════════════════════════════════════════════════════════════════════
6104
6105    #[tokio::test]
6106    async fn test_heredoc_basic() {
6107        let kernel = Kernel::transient().expect("failed to create kernel");
6108
6109        let result = kernel
6110            .execute("cat <<EOF\nhello\nEOF")
6111            .await
6112            .expect("heredoc failed");
6113
6114        assert!(result.ok(), "cat with heredoc failed: {}", result.err);
6115        assert_eq!(result.text_out().trim(), "hello");
6116    }
6117
6118    #[tokio::test]
6119    async fn test_arithmetic_in_string() {
6120        let kernel = Kernel::transient().expect("failed to create kernel");
6121
6122        let result = kernel
6123            .execute(r#"echo "result: $((1 + 2))""#)
6124            .await
6125            .expect("arithmetic in string failed");
6126
6127        assert!(result.ok(), "echo failed: {}", result.err);
6128        assert_eq!(result.text_out().trim(), "result: 3");
6129    }
6130
6131    #[tokio::test]
6132    async fn test_heredoc_multiline() {
6133        let kernel = Kernel::transient().expect("failed to create kernel");
6134
6135        let result = kernel
6136            .execute("cat <<EOF\nline1\nline2\nline3\nEOF")
6137            .await
6138            .expect("heredoc failed");
6139
6140        assert!(result.ok(), "cat with heredoc failed: {}", result.err);
6141        assert!(result.text_out().contains("line1"), "output: {}", result.text_out());
6142        assert!(result.text_out().contains("line2"), "output: {}", result.text_out());
6143        assert!(result.text_out().contains("line3"), "output: {}", result.text_out());
6144    }
6145
6146    #[tokio::test]
6147    async fn test_heredoc_variable_expansion() {
6148        // Bug N: unquoted heredoc should expand variables
6149        let kernel = Kernel::transient().expect("failed to create kernel");
6150
6151        kernel.execute("GREETING=hello").await.expect("set var");
6152
6153        let result = kernel
6154            .execute("cat <<EOF\n$GREETING world\nEOF")
6155            .await
6156            .expect("heredoc expansion failed");
6157
6158        assert!(result.ok(), "heredoc expansion failed: {}", result.err);
6159        assert_eq!(result.text_out().trim(), "hello world");
6160    }
6161
6162    #[tokio::test]
6163    async fn test_heredoc_quoted_no_expansion() {
6164        // Bug N: quoted heredoc (<<'EOF') should NOT expand variables
6165        let kernel = Kernel::transient().expect("failed to create kernel");
6166
6167        kernel.execute("GREETING=hello").await.expect("set var");
6168
6169        let result = kernel
6170            .execute("cat <<'EOF'\n$GREETING world\nEOF")
6171            .await
6172            .expect("quoted heredoc failed");
6173
6174        assert!(result.ok(), "quoted heredoc failed: {}", result.err);
6175        assert_eq!(result.text_out().trim(), "$GREETING world");
6176    }
6177
6178    #[tokio::test]
6179    async fn test_heredoc_default_value_expansion() {
6180        // Bug N: ${VAR:-default} should expand in unquoted heredocs
6181        let kernel = Kernel::transient().expect("failed to create kernel");
6182
6183        let result = kernel
6184            .execute("cat <<EOF\n${UNSET:-fallback}\nEOF")
6185            .await
6186            .expect("heredoc default expansion failed");
6187
6188        assert!(result.ok(), "heredoc default expansion failed: {}", result.err);
6189        assert_eq!(result.text_out().trim(), "fallback");
6190    }
6191
6192    // ═══════════════════════════════════════════════════════════════════════════
6193    // Read Builtin Tests
6194    // ═══════════════════════════════════════════════════════════════════════════
6195
6196    #[tokio::test]
6197    async fn test_read_from_pipeline() {
6198        let kernel = Kernel::transient().expect("failed to create kernel");
6199
6200        // Pipe input to read
6201        let result = kernel
6202            .execute(r#"echo "Alice" | read NAME; echo "Hello, ${NAME}""#)
6203            .await
6204            .expect("read pipeline failed");
6205
6206        assert!(result.ok(), "read failed: {}", result.err);
6207        assert!(result.text_out().contains("Hello, Alice"), "output: {}", result.text_out());
6208    }
6209
6210    #[tokio::test]
6211    async fn test_read_multiple_vars_from_pipeline() {
6212        let kernel = Kernel::transient().expect("failed to create kernel");
6213
6214        let result = kernel
6215            .execute(r#"echo "John Doe 42" | read FIRST LAST AGE; echo "${FIRST} is ${AGE}""#)
6216            .await
6217            .expect("read pipeline failed");
6218
6219        assert!(result.ok(), "read failed: {}", result.err);
6220        assert!(result.text_out().contains("John is 42"), "output: {}", result.text_out());
6221    }
6222
6223    // ═══════════════════════════════════════════════════════════════════════════
6224    // Shell-Style Function Tests
6225    // ═══════════════════════════════════════════════════════════════════════════
6226
6227    #[tokio::test]
6228    async fn test_posix_function_with_positional_params() {
6229        let kernel = Kernel::transient().expect("failed to create kernel");
6230
6231        // Define POSIX-style function
6232        kernel
6233            .execute(r#"greet() { echo "Hello, $1!" }"#)
6234            .await
6235            .expect("function definition failed");
6236
6237        // Call the function
6238        let result = kernel
6239            .execute(r#"greet "Amy""#)
6240            .await
6241            .expect("function call failed");
6242
6243        assert!(result.ok(), "greet failed: {}", result.err);
6244        assert_eq!(result.text_out().trim(), "Hello, Amy!");
6245    }
6246
6247    #[tokio::test]
6248    async fn test_posix_function_multiple_args() {
6249        let kernel = Kernel::transient().expect("failed to create kernel");
6250
6251        // Define function using $1 and $2
6252        kernel
6253            .execute(r#"add_greeting() { echo "$1 $2!" }"#)
6254            .await
6255            .expect("function definition failed");
6256
6257        // Call the function
6258        let result = kernel
6259            .execute(r#"add_greeting "Hello" "World""#)
6260            .await
6261            .expect("function call failed");
6262
6263        assert!(result.ok(), "function failed: {}", result.err);
6264        assert_eq!(result.text_out().trim(), "Hello World!");
6265    }
6266
6267    #[tokio::test]
6268    async fn test_bash_function_with_positional_params() {
6269        let kernel = Kernel::transient().expect("failed to create kernel");
6270
6271        // Define bash-style function (function keyword, no parens)
6272        kernel
6273            .execute(r#"function greet { echo "Hi $1" }"#)
6274            .await
6275            .expect("function definition failed");
6276
6277        // Call the function
6278        let result = kernel
6279            .execute(r#"greet "Bob""#)
6280            .await
6281            .expect("function call failed");
6282
6283        assert!(result.ok(), "greet failed: {}", result.err);
6284        assert_eq!(result.text_out().trim(), "Hi Bob");
6285    }
6286
6287    #[tokio::test]
6288    async fn test_shell_function_with_all_args() {
6289        let kernel = Kernel::transient().expect("failed to create kernel");
6290
6291        // Define function using $@ (all args)
6292        kernel
6293            .execute(r#"echo_all() { echo "args: $@" }"#)
6294            .await
6295            .expect("function definition failed");
6296
6297        // Call with multiple args
6298        let result = kernel
6299            .execute(r#"echo_all "a" "b" "c""#)
6300            .await
6301            .expect("function call failed");
6302
6303        assert!(result.ok(), "function failed: {}", result.err);
6304        assert_eq!(result.text_out().trim(), "args: a b c");
6305    }
6306
6307    #[tokio::test]
6308    async fn test_shell_function_with_arg_count() {
6309        let kernel = Kernel::transient().expect("failed to create kernel");
6310
6311        // Define function using $# (arg count)
6312        kernel
6313            .execute(r#"count_args() { echo "count: $#" }"#)
6314            .await
6315            .expect("function definition failed");
6316
6317        // Call with three args
6318        let result = kernel
6319            .execute(r#"count_args "x" "y" "z""#)
6320            .await
6321            .expect("function call failed");
6322
6323        assert!(result.ok(), "function failed: {}", result.err);
6324        assert_eq!(result.text_out().trim(), "count: 3");
6325    }
6326
6327    #[tokio::test]
6328    async fn test_shell_function_shared_scope() {
6329        let kernel = Kernel::transient().expect("failed to create kernel");
6330
6331        // Set a variable in parent scope
6332        kernel
6333            .execute(r#"PARENT_VAR="visible""#)
6334            .await
6335            .expect("set failed");
6336
6337        // Define shell function that reads and writes parent variable
6338        kernel
6339            .execute(r#"modify_parent() {
6340                echo "saw: ${PARENT_VAR}"
6341                PARENT_VAR="changed by function"
6342            }"#)
6343            .await
6344            .expect("function definition failed");
6345
6346        // Call the function - it SHOULD see PARENT_VAR (bash-compatible shared scope)
6347        let result = kernel.execute("modify_parent").await.expect("function failed");
6348
6349        assert!(
6350            result.text_out().contains("visible"),
6351            "Shell function should access parent scope, got: {}",
6352            result.text_out()
6353        );
6354
6355        // Parent variable should be modified
6356        let var = kernel.get_var("PARENT_VAR").await;
6357        assert_eq!(
6358            var,
6359            Some(Value::String("changed by function".into())),
6360            "Shell function should modify parent scope"
6361        );
6362    }
6363
6364    // ═══════════════════════════════════════════════════════════════════════════
6365    // Script Execution via PATH Tests
6366    // ═══════════════════════════════════════════════════════════════════════════
6367
6368    #[tokio::test]
6369    async fn test_script_execution_from_path() {
6370        let kernel = Kernel::transient().expect("failed to create kernel");
6371
6372        // Create /bin directory and script
6373        kernel.execute(r#"mkdir "/bin""#).await.ok();
6374        kernel
6375            .execute(r#"write "/bin/hello.kai" 'echo "Hello from script!"'"#)
6376            .await
6377            .expect("write script failed");
6378
6379        // Set PATH to /bin
6380        kernel.execute(r#"PATH="/bin""#).await.expect("set PATH failed");
6381
6382        // Call script by name (without .kai extension)
6383        let result = kernel
6384            .execute("hello")
6385            .await
6386            .expect("script execution failed");
6387
6388        assert!(result.ok(), "script failed: {}", result.err);
6389        assert_eq!(result.text_out().trim(), "Hello from script!");
6390    }
6391
6392    #[tokio::test]
6393    async fn test_script_with_args() {
6394        let kernel = Kernel::transient().expect("failed to create kernel");
6395
6396        // Create script that uses positional params
6397        kernel.execute(r#"mkdir "/bin""#).await.ok();
6398        kernel
6399            .execute(r#"write "/bin/greet.kai" 'echo "Hello, $1!"'"#)
6400            .await
6401            .expect("write script failed");
6402
6403        // Set PATH
6404        kernel.execute(r#"PATH="/bin""#).await.expect("set PATH failed");
6405
6406        // Call script with arg
6407        let result = kernel
6408            .execute(r#"greet "World""#)
6409            .await
6410            .expect("script execution failed");
6411
6412        assert!(result.ok(), "script failed: {}", result.err);
6413        assert_eq!(result.text_out().trim(), "Hello, World!");
6414    }
6415
6416    #[tokio::test]
6417    async fn test_script_not_found() {
6418        let kernel = Kernel::transient().expect("failed to create kernel");
6419
6420        // Set empty PATH
6421        kernel.execute(r#"PATH="/nonexistent""#).await.expect("set PATH failed");
6422
6423        // Call non-existent script
6424        let result = kernel
6425            .execute("noscript")
6426            .await
6427            .expect("execution failed");
6428
6429        assert!(!result.ok(), "should fail with command not found");
6430        assert_eq!(result.code, 127);
6431        assert!(result.err.contains("command not found"));
6432    }
6433
6434    #[tokio::test]
6435    async fn test_script_path_search_order() {
6436        let kernel = Kernel::transient().expect("failed to create kernel");
6437
6438        // Create two directories with same-named script
6439        // Note: using "myscript" not "test" to avoid conflict with test builtin
6440        kernel.execute(r#"mkdir "/first""#).await.ok();
6441        kernel.execute(r#"mkdir "/second""#).await.ok();
6442        kernel
6443            .execute(r#"write "/first/myscript.kai" 'echo "from first"'"#)
6444            .await
6445            .expect("write failed");
6446        kernel
6447            .execute(r#"write "/second/myscript.kai" 'echo "from second"'"#)
6448            .await
6449            .expect("write failed");
6450
6451        // Set PATH with first before second
6452        kernel.execute(r#"PATH="/first:/second""#).await.expect("set PATH failed");
6453
6454        // Should find first one
6455        let result = kernel
6456            .execute("myscript")
6457            .await
6458            .expect("script execution failed");
6459
6460        assert!(result.ok(), "script failed: {}", result.err);
6461        assert_eq!(result.text_out().trim(), "from first");
6462    }
6463
6464    // ═══════════════════════════════════════════════════════════════════════════
6465    // Special Variable Tests ($?, $$, unset vars)
6466    // ═══════════════════════════════════════════════════════════════════════════
6467
6468    #[tokio::test]
6469    async fn test_last_exit_code_success() {
6470        let kernel = Kernel::transient().expect("failed to create kernel");
6471
6472        // true exits with 0
6473        let result = kernel.execute("true; echo $?").await.expect("execution failed");
6474        assert!(result.text_out().contains("0"), "expected 0, got: {}", result.text_out());
6475    }
6476
6477    #[tokio::test]
6478    async fn test_last_exit_code_failure() {
6479        let kernel = Kernel::transient().expect("failed to create kernel");
6480
6481        // false exits with 1
6482        let result = kernel.execute("false; echo $?").await.expect("execution failed");
6483        assert!(result.text_out().contains("1"), "expected 1, got: {}", result.text_out());
6484    }
6485
6486    #[tokio::test]
6487    async fn test_current_pid() {
6488        let kernel = Kernel::transient().expect("failed to create kernel");
6489
6490        let result = kernel.execute("echo $$").await.expect("execution failed");
6491        // PID should be a positive number
6492        let pid: u32 = result.text_out().trim().parse().expect("PID should be a number");
6493        assert!(pid > 0, "PID should be positive");
6494    }
6495
6496    #[tokio::test]
6497    async fn test_unset_variable_expands_to_empty() {
6498        let kernel = Kernel::transient().expect("failed to create kernel");
6499
6500        // Unset variable in interpolation should be empty
6501        let result = kernel.execute(r#"echo "prefix:${UNSET_VAR}:suffix""#).await.expect("execution failed");
6502        assert_eq!(result.text_out().trim(), "prefix::suffix");
6503    }
6504
6505    #[tokio::test]
6506    async fn test_eq_ne_operators() {
6507        let kernel = Kernel::transient().expect("failed to create kernel");
6508
6509        // Test -eq operator
6510        let result = kernel.execute(r#"if [[ 5 -eq 5 ]]; then echo "eq works"; fi"#).await.expect("execution failed");
6511        assert_eq!(result.text_out().trim(), "eq works");
6512
6513        // Test -ne operator
6514        let result = kernel.execute(r#"if [[ 5 -ne 3 ]]; then echo "ne works"; fi"#).await.expect("execution failed");
6515        assert_eq!(result.text_out().trim(), "ne works");
6516
6517        // Test -eq with different values
6518        let result = kernel.execute(r#"if [[ 5 -eq 3 ]]; then echo "wrong"; else echo "correct"; fi"#).await.expect("execution failed");
6519        assert_eq!(result.text_out().trim(), "correct");
6520    }
6521
6522    #[tokio::test]
6523    async fn test_escaped_dollar_in_string() {
6524        let kernel = Kernel::transient().expect("failed to create kernel");
6525
6526        // \$ should produce literal $
6527        let result = kernel.execute(r#"echo "\$100""#).await.expect("execution failed");
6528        assert_eq!(result.text_out().trim(), "$100");
6529    }
6530
6531    #[tokio::test]
6532    async fn test_special_vars_in_interpolation() {
6533        let kernel = Kernel::transient().expect("failed to create kernel");
6534
6535        // Test $? in string interpolation
6536        let result = kernel.execute(r#"true; echo "exit: $?""#).await.expect("execution failed");
6537        assert_eq!(result.text_out().trim(), "exit: 0");
6538
6539        // Test $$ in string interpolation
6540        let result = kernel.execute(r#"echo "pid: $$""#).await.expect("execution failed");
6541        assert!(result.text_out().starts_with("pid: "), "unexpected output: {}", result.text_out());
6542        let text = result.text_out();
6543        let pid_part = text.trim().strip_prefix("pid: ").unwrap();
6544        let _pid: u32 = pid_part.parse().expect("PID in string should be a number");
6545    }
6546
6547    // ═══════════════════════════════════════════════════════════════════════════
6548    // Command Substitution Tests
6549    // ═══════════════════════════════════════════════════════════════════════════
6550
6551    #[tokio::test]
6552    async fn test_command_subst_assignment() {
6553        let kernel = Kernel::transient().expect("failed to create kernel");
6554
6555        // Command substitution in assignment
6556        let result = kernel.execute(r#"X=$(echo hello); echo "$X""#).await.expect("execution failed");
6557        assert_eq!(result.text_out().trim(), "hello");
6558    }
6559
6560    #[tokio::test]
6561    async fn test_command_subst_with_args() {
6562        let kernel = Kernel::transient().expect("failed to create kernel");
6563
6564        // Command substitution with string argument
6565        let result = kernel.execute(r#"X=$(echo "a b c"); echo "$X""#).await.expect("execution failed");
6566        assert_eq!(result.text_out().trim(), "a b c");
6567    }
6568
6569    #[tokio::test]
6570    async fn test_command_subst_nested_vars() {
6571        let kernel = Kernel::transient().expect("failed to create kernel");
6572
6573        // Variables inside command substitution
6574        let result = kernel.execute(r#"Y=world; X=$(echo "hello $Y"); echo "$X""#).await.expect("execution failed");
6575        assert_eq!(result.text_out().trim(), "hello world");
6576    }
6577
6578    #[tokio::test]
6579    async fn test_background_job_basic() {
6580        use std::time::Duration;
6581
6582        let kernel = Kernel::new(KernelConfig::isolated()).expect("failed to create kernel");
6583
6584        // Run a simple background command
6585        let result = kernel.execute("echo hello &").await.expect("execution failed");
6586        assert!(result.ok(), "background command should succeed: {}", result.err);
6587        assert!(result.text_out().contains("[1]"), "should return job ID: {}", result.text_out());
6588
6589        // Give the job time to complete
6590        tokio::time::sleep(Duration::from_millis(100)).await;
6591
6592        // Check job status
6593        let status = kernel.execute("cat /v/jobs/1/status").await.expect("status check failed");
6594        assert!(status.ok(), "status should succeed: {}", status.err);
6595        assert!(
6596            status.text_out().contains("done:") || status.text_out().contains("running"),
6597            "should have valid status: {}",
6598            status.text_out()
6599        );
6600
6601        // Check stdout
6602        let stdout = kernel.execute("cat /v/jobs/1/stdout").await.expect("stdout check failed");
6603        assert!(stdout.ok());
6604        assert!(stdout.text_out().contains("hello"));
6605    }
6606
6607    #[tokio::test]
6608    async fn test_heredoc_piped_to_command() {
6609        // Bug 4: heredoc content should pipe through to next command
6610        let kernel = Kernel::transient().expect("kernel");
6611        let result = kernel.execute("cat <<EOF | cat\nhello world\nEOF").await.expect("exec");
6612        assert!(result.ok(), "heredoc | cat failed: {}", result.err);
6613        assert_eq!(result.text_out().trim(), "hello world");
6614    }
6615
6616    /// A transient kernel paired with a real, auto-cleaning tempdir. The
6617    /// transient (Sandboxed) kernel mounts `/tmp` as a real `LocalFs`, so glob
6618    /// tests need actual files on disk. Hold the returned `TempDir` for the
6619    /// test's lifetime: it removes the directory tree on drop — including on
6620    /// panic — so no test scratch leaks into `/tmp` (the project's tmp-builder
6621    /// convention; never hardcode `/tmp/...` paths). Returns the absolute path
6622    /// as a string for interpolation into scripts.
6623    fn transient_with_tempdir() -> (Kernel, tempfile::TempDir, String) {
6624        let kernel = Kernel::transient().expect("kernel");
6625        let tmp = tempfile::tempdir().expect("tempdir");
6626        let dir = tmp.path().display().to_string();
6627        (kernel, tmp, dir)
6628    }
6629
6630    #[tokio::test]
6631    async fn test_for_loop_glob_iterates() {
6632        // Bug 1: for F in $(glob ...) should iterate per file, not once
6633        let (kernel, _tmp, dir) = transient_with_tempdir();
6634        kernel.execute(&format!("echo a > {dir}/a.txt")).await.unwrap();
6635        kernel.execute(&format!("echo b > {dir}/b.txt")).await.unwrap();
6636        let result = kernel.execute(&format!(r#"
6637            N=0
6638            for F in $(glob "{dir}/*.txt"); do
6639                N=$((N + 1))
6640            done
6641            echo $N
6642        "#)).await.unwrap();
6643        assert!(result.ok(), "for glob failed: {}", result.err);
6644        assert_eq!(result.text_out().trim(), "2", "Should iterate 2 files, got: {}", result.text_out());
6645    }
6646
6647    #[tokio::test]
6648    async fn test_bare_glob_expansion_echo() {
6649        let (kernel, _tmp, dir) = transient_with_tempdir();
6650        kernel.execute(&format!("echo a > {dir}/a.txt")).await.unwrap();
6651        kernel.execute(&format!("echo b > {dir}/b.txt")).await.unwrap();
6652        kernel.execute(&format!("echo c > {dir}/c.rs")).await.unwrap();
6653        kernel.execute(&format!("cd {dir}")).await.unwrap();
6654        let result = kernel.execute("echo *.txt").await.unwrap();
6655        assert!(result.ok(), "echo *.txt failed: {}", result.err);
6656        let out = result.text_out();
6657        let out = out.trim();
6658        // Should contain both .txt files (order may vary)
6659        assert!(out.contains("a.txt"), "missing a.txt in: {}", out);
6660        assert!(out.contains("b.txt"), "missing b.txt in: {}", out);
6661        assert!(!out.contains("c.rs"), "should not contain c.rs in: {}", out);
6662    }
6663
6664    #[tokio::test]
6665    async fn test_bare_glob_no_matches_errors() {
6666        let (kernel, _tmp, dir) = transient_with_tempdir();
6667        kernel.execute(&format!("cd {dir}")).await.unwrap();
6668        let result = kernel.execute("echo *.nonexistent").await;
6669        match &result {
6670            Ok(exec) => {
6671                // No-match glob should produce a non-zero exit code
6672                assert!(!exec.ok(), "expected failure, got success: out={}, err={}", exec.text_out(), exec.err);
6673                assert!(exec.err.contains("no matches"), "error should say no matches: {}", exec.err);
6674            }
6675            Err(e) => {
6676                assert!(e.to_string().contains("no matches"), "error should say no matches: {}", e);
6677            }
6678        }
6679    }
6680
6681    #[tokio::test]
6682    async fn test_bare_glob_disabled_with_set() {
6683        let (kernel, _tmp, dir) = transient_with_tempdir();
6684        kernel.execute(&format!("echo a > {dir}/a.txt")).await.unwrap();
6685        kernel.execute(&format!("cd {dir}")).await.unwrap();
6686        // Disable glob expansion
6687        kernel.execute("set +o glob").await.unwrap();
6688        let result = kernel.execute("echo *.txt").await.unwrap();
6689        // With glob disabled, *.txt should be passed as literal string
6690        assert!(result.ok(), "echo should succeed: {}", result.err);
6691        assert_eq!(result.text_out().trim(), "*.txt", "should be literal: {}", result.text_out());
6692    }
6693
6694    #[tokio::test]
6695    async fn test_bare_glob_quoted_not_expanded() {
6696        let (kernel, _tmp, dir) = transient_with_tempdir();
6697        kernel.execute(&format!("echo a > {dir}/a.txt")).await.unwrap();
6698        kernel.execute(&format!("cd {dir}")).await.unwrap();
6699        // Quoted globs should NOT expand
6700        let result = kernel.execute("echo \"*.txt\"").await.unwrap();
6701        assert!(result.ok(), "echo should succeed: {}", result.err);
6702        assert_eq!(result.text_out().trim(), "*.txt", "quoted should be literal: {}", result.text_out());
6703    }
6704
6705    #[tokio::test]
6706    async fn test_bare_glob_for_loop() {
6707        let (kernel, _tmp, dir) = transient_with_tempdir();
6708        kernel.execute(&format!("echo a > {dir}/a.txt")).await.unwrap();
6709        kernel.execute(&format!("echo b > {dir}/b.txt")).await.unwrap();
6710        kernel.execute(&format!("cd {dir}")).await.unwrap();
6711        let result = kernel.execute(r#"
6712            N=0
6713            for f in *.txt; do
6714                N=$((N + 1))
6715            done
6716            echo $N
6717        "#).await.unwrap();
6718        assert!(result.ok(), "for loop failed: {}", result.err);
6719        assert_eq!(result.text_out().trim(), "2", "should iterate 2 files: {}", result.text_out());
6720    }
6721
6722    #[tokio::test]
6723    async fn test_glob_in_assignment_is_literal() {
6724        let kernel = Kernel::transient().expect("kernel");
6725        let result = kernel.execute("X=*.txt; echo $X").await.unwrap();
6726        assert!(result.ok());
6727        assert_eq!(result.text_out().trim(), "*.txt", "glob in assignment should be literal");
6728    }
6729
6730    #[tokio::test]
6731    async fn test_glob_in_test_expr_is_literal() {
6732        let kernel = Kernel::transient().expect("kernel");
6733        let result = kernel.execute(r#"
6734            if [[ *.txt == "*.txt" ]]; then
6735                echo "match"
6736            else
6737                echo "no"
6738            fi
6739        "#).await.unwrap();
6740        assert!(result.ok());
6741        assert_eq!(result.text_out().trim(), "match", "glob in test expr should be literal");
6742    }
6743
6744    #[tokio::test]
6745    async fn test_command_subst_echo_not_iterable() {
6746        // Regression guard: $(echo "a b c") must remain a single string
6747        let kernel = Kernel::transient().expect("kernel");
6748        let result = kernel.execute(r#"
6749            N=0
6750            for X in $(echo "a b c"); do N=$((N + 1)); done
6751            echo $N
6752        "#).await.unwrap();
6753        assert!(result.ok());
6754        assert_eq!(result.text_out().trim(), "1", "echo should be one item: {}", result.text_out());
6755    }
6756
6757    // -- accumulate_result / newline tests --
6758
6759    #[test]
6760    fn test_accumulate_preserves_own_newlines() {
6761        // Outputs concatenate verbatim — a command's own trailing newline is
6762        // kept, none is invented.
6763        let mut acc = ExecResult::success("line1\n");
6764        let new = ExecResult::success("line2\n");
6765        accumulate_result(&mut acc, &new);
6766        assert_eq!(&*acc.text_out(), "line1\nline2\n");
6767        assert!(!acc.text_out().contains("\n\n"), "should not have double newlines: {:?}", acc.text_out());
6768    }
6769
6770    #[test]
6771    fn test_accumulate_inserts_no_separator() {
6772        // No artificial separator: `printf a; printf b` style concatenates to
6773        // `ab`, matching bash (regression for the 2026-06-09 finding).
6774        let mut acc = ExecResult::success("line1");
6775        let new = ExecResult::success("line2");
6776        accumulate_result(&mut acc, &new);
6777        assert_eq!(&*acc.text_out(), "line1line2");
6778    }
6779
6780    #[test]
6781    fn test_accumulate_empty_into_nonempty() {
6782        let mut acc = ExecResult::success("");
6783        let new = ExecResult::success("hello\n");
6784        accumulate_result(&mut acc, &new);
6785        assert_eq!(&*acc.text_out(), "hello\n");
6786    }
6787
6788    #[test]
6789    fn test_accumulate_nonempty_into_empty() {
6790        let mut acc = ExecResult::success("hello\n");
6791        let new = ExecResult::success("");
6792        accumulate_result(&mut acc, &new);
6793        assert_eq!(&*acc.text_out(), "hello\n");
6794    }
6795
6796    #[test]
6797    fn test_accumulate_stderr_no_double_newlines() {
6798        let mut acc = ExecResult::failure(1, "err1\n");
6799        let new = ExecResult::failure(1, "err2\n");
6800        accumulate_result(&mut acc, &new);
6801        assert!(!acc.err.contains("\n\n"), "stderr should not have double newlines: {:?}", acc.err);
6802    }
6803
6804    #[tokio::test]
6805    async fn test_multiple_echo_no_blank_lines() {
6806        let kernel = Kernel::transient().expect("kernel");
6807        let result = kernel
6808            .execute("echo one\necho two\necho three")
6809            .await
6810            .expect("execution failed");
6811        assert!(result.ok());
6812        assert_eq!(&*result.text_out(), "one\ntwo\nthree\n");
6813    }
6814
6815    #[tokio::test]
6816    async fn test_for_loop_no_blank_lines() {
6817        let kernel = Kernel::transient().expect("kernel");
6818        let result = kernel
6819            .execute(r#"for X in a b c; do echo "item: ${X}"; done"#)
6820            .await
6821            .expect("execution failed");
6822        assert!(result.ok());
6823        assert_eq!(&*result.text_out(), "item: a\nitem: b\nitem: c\n");
6824    }
6825
6826    #[tokio::test]
6827    async fn test_for_command_subst_no_blank_lines() {
6828        let kernel = Kernel::transient().expect("kernel");
6829        let result = kernel
6830            .execute(r#"for N in $(seq 1 3); do echo "n=${N}"; done"#)
6831            .await
6832            .expect("execution failed");
6833        assert!(result.ok());
6834        assert_eq!(&*result.text_out(), "n=1\nn=2\nn=3\n");
6835    }
6836
6837    // ------------------------------------------------------------------
6838    // build_args_async: multi-consume flags (jq --arg NAME VALUE pattern)
6839    // ------------------------------------------------------------------
6840
6841    /// Helper: a throwaway schema with one `--pair` param declared as
6842    /// consuming two positionals per occurrence. Modelled after what
6843    /// jq_native will declare for `--arg` / `--argjson`.
6844    fn multi_consume_schema() -> crate::tools::ToolSchema {
6845        use crate::tools::{ParamSchema, ToolSchema};
6846        ToolSchema::new("test", "multi-consume smoke")
6847            .param(
6848                ParamSchema::optional("pair", "array", Value::Null, "name+value pair")
6849                    .consumes(2),
6850            )
6851    }
6852
6853    fn pos(s: &str) -> Arg {
6854        Arg::Positional(Expr::Literal(Value::String(s.to_string())))
6855    }
6856
6857    #[tokio::test]
6858    async fn build_args_multi_consume_single_occurrence() {
6859        let kernel = Kernel::transient().expect("kernel");
6860        let schema = multi_consume_schema();
6861        // Simulates:  test --pair NAME VALUE filter
6862        let args = vec![
6863            Arg::LongFlag("pair".into()),
6864            pos("NAME"),
6865            pos("VALUE"),
6866            pos("filter"),
6867        ];
6868        let built = kernel
6869            .build_args_async(&args, Some(&schema))
6870            .await
6871            .expect("build_args should succeed");
6872
6873        // `--pair` + its two positionals are consumed into named["pair"],
6874        // which becomes an outer array of one inner 2-element array.
6875        let pair = built.named.get("pair").expect("named[pair] missing");
6876        match pair {
6877            Value::Json(serde_json::Value::Array(occurrences)) => {
6878                assert_eq!(occurrences.len(), 1, "expected one occurrence");
6879                match &occurrences[0] {
6880                    serde_json::Value::Array(values) => {
6881                        assert_eq!(values.len(), 2, "pair must have 2 values");
6882                        assert_eq!(values[0], serde_json::Value::String("NAME".into()));
6883                        assert_eq!(values[1], serde_json::Value::String("VALUE".into()));
6884                    }
6885                    other => panic!("expected inner array, got {other:?}"),
6886                }
6887            }
6888            other => panic!("expected Json(Array(...)) for named[pair], got {other:?}"),
6889        }
6890
6891        // The un-consumed positional ("filter") remains in `positional`.
6892        assert_eq!(built.positional.len(), 1);
6893        assert_eq!(built.positional[0], Value::String("filter".into()));
6894    }
6895    #[tokio::test]
6896    async fn build_args_multi_consume_two_occurrences_accumulate() {
6897        let kernel = Kernel::transient().expect("kernel");
6898        let schema = multi_consume_schema();
6899        // Simulates:  test --pair A 1 --pair B 2 filter
6900        let args = vec![
6901            Arg::LongFlag("pair".into()),
6902            pos("A"),
6903            pos("1"),
6904            Arg::LongFlag("pair".into()),
6905            pos("B"),
6906            pos("2"),
6907            pos("filter"),
6908        ];
6909        let built = kernel
6910            .build_args_async(&args, Some(&schema))
6911            .await
6912            .expect("build_args should succeed");
6913
6914        let pair = built.named.get("pair").expect("named[pair] missing");
6915        match pair {
6916            Value::Json(serde_json::Value::Array(occurrences)) => {
6917                assert_eq!(occurrences.len(), 2, "expected two occurrences");
6918                // Preserved in invocation order.
6919                match &occurrences[0] {
6920                    serde_json::Value::Array(values) => {
6921                        assert_eq!(values[0], serde_json::Value::String("A".into()));
6922                        assert_eq!(values[1], serde_json::Value::String("1".into()));
6923                    }
6924                    other => panic!("expected inner array, got {other:?}"),
6925                }
6926                match &occurrences[1] {
6927                    serde_json::Value::Array(values) => {
6928                        assert_eq!(values[0], serde_json::Value::String("B".into()));
6929                        assert_eq!(values[1], serde_json::Value::String("2".into()));
6930                    }
6931                    other => panic!("expected inner array, got {other:?}"),
6932                }
6933            }
6934            other => panic!("expected Json(Array(...)), got {other:?}"),
6935        }
6936    }
6937
6938    // ── undeclared space-form flag under map_positionals (kj --type val) ──
6939    //
6940    // A backend/MCP tool whose schema does NOT declare a flag must not let
6941    // `--flag value` (space form) silently divorce the value: that was a
6942    // privilege-escalation-by-typo against kaijutsu (see docs/issues.md).
6943    // kaish fails loud rather than guessing.
6944
6945    use crate::tools::{ParamSchema, ToolSchema};
6946
6947    /// Backend-style schema (map_positionals) declaring only a `name`
6948    /// positional — `--type` is intentionally undeclared.
6949    fn kj_like_schema() -> ToolSchema {
6950        ToolSchema::new("kj", "incomplete backend schema")
6951            .param(ParamSchema::optional("name", "string", Value::Null, "context name"))
6952            .with_positional_mapping()
6953    }
6954
6955    #[tokio::test]
6956    async fn build_args_undeclared_space_flag_errors_under_map_positionals() {
6957        let kernel = Kernel::transient().expect("kernel");
6958        let schema = kj_like_schema();
6959        // kj context create exp --type explorer
6960        let args = vec![
6961            pos("context"),
6962            pos("create"),
6963            pos("exp"),
6964            Arg::LongFlag("type".into()),
6965            pos("explorer"),
6966        ];
6967        let err = kernel
6968            .build_args_async(&args, Some(&schema))
6969            .await
6970            .expect_err("undeclared --type with a space value must fail loud");
6971        let msg = err.to_string();
6972        assert!(msg.contains("--type"), "message should name the flag: {msg}");
6973        assert!(msg.contains("--type=explorer"), "message should suggest the = form: {msg}");
6974        assert!(msg.contains("kj"), "message should name the tool: {msg}");
6975    }
6976
6977    #[tokio::test]
6978    async fn build_args_declared_space_flag_still_binds() {
6979        let kernel = Kernel::transient().expect("kernel");
6980        // Same tool, but now the schema DECLARES --type as a string param.
6981        let schema = ToolSchema::new("kj", "complete schema")
6982            .param(ParamSchema::optional("name", "string", Value::Null, "context name"))
6983            .param(ParamSchema::optional("type", "string", Value::Null, "role type"))
6984            .with_positional_mapping();
6985        let args = vec![
6986            pos("exp"),
6987            Arg::LongFlag("type".into()),
6988            pos("explorer"),
6989        ];
6990        let built = kernel.build_args_async(&args, Some(&schema)).await.unwrap();
6991        assert_eq!(built.named.get("type"), Some(&Value::String("explorer".into())));
6992    }
6993
6994    #[tokio::test]
6995    async fn build_args_equals_form_binds_for_undeclared_flag() {
6996        let kernel = Kernel::transient().expect("kernel");
6997        let schema = kj_like_schema();
6998        // The unambiguous `=` form must keep working even when undeclared.
6999        let args = vec![
7000            pos("exp"),
7001            Arg::Named { key: "type".into(), value: Expr::Literal(Value::String("explorer".into())) },
7002        ];
7003        let built = kernel.build_args_async(&args, Some(&schema)).await.unwrap();
7004        assert_eq!(built.named.get("type"), Some(&Value::String("explorer".into())));
7005    }
7006
7007    #[tokio::test]
7008    async fn build_args_undeclared_bool_flag_at_end_is_ok() {
7009        let kernel = Kernel::transient().expect("kernel");
7010        let schema = kj_like_schema();
7011        // No positional follows --force → unambiguously a bare flag.
7012        let args = vec![pos("exp"), Arg::LongFlag("force".into())];
7013        let built = kernel.build_args_async(&args, Some(&schema)).await.unwrap();
7014        assert!(built.flags.contains("force"));
7015    }
7016
7017    #[tokio::test]
7018    async fn build_args_undeclared_flag_before_another_flag_is_ok() {
7019        let kernel = Kernel::transient().expect("kernel");
7020        let schema = kj_like_schema();
7021        // --verbose is followed by a flag, not a positional → not ambiguous.
7022        let args = vec![
7023            Arg::LongFlag("verbose".into()),
7024            Arg::Named { key: "name".into(), value: Expr::Literal(Value::String("x".into())) },
7025        ];
7026        let built = kernel.build_args_async(&args, Some(&schema)).await.unwrap();
7027        assert!(built.flags.contains("verbose"));
7028    }
7029
7030    #[tokio::test]
7031    async fn build_args_undeclared_space_flag_ok_for_builtin_schema() {
7032        let kernel = Kernel::transient().expect("kernel");
7033        // Builtins set map_positionals=false; the ambiguity guard must not
7034        // fire there (clap validates their flags separately).
7035        let schema = ToolSchema::new("frobnicate", "builtin-style")
7036            .param(ParamSchema::optional("name", "string", Value::Null, "name"));
7037        let args = vec![Arg::LongFlag("frob".into()), pos("value")];
7038        let built = kernel.build_args_async(&args, Some(&schema)).await.unwrap();
7039        assert!(built.flags.contains("frob"));
7040    }
7041
7042    // ── subcommand-aware binding (select_leaf wired into build_args_async) ──
7043    //
7044    // A tool exposing a subcommand tree binds flags against the *routed leaf's*
7045    // params, not the root's. The subcommand-path positionals stay positional
7046    // (kj re-parses them with its own clap), and a value flag declared only on
7047    // a deep leaf still binds in space form.
7048
7049    /// kj → context (alias ctx) → create{--type value, --force bool}.
7050    /// map_positionals defaults false on every node (builtin/kj style).
7051    fn kj_tree_schema() -> ToolSchema {
7052        ToolSchema::new("kj", "subcommand tool").subcommand(
7053            ToolSchema::new("context", "context ops")
7054                .with_command_aliases(["ctx"])
7055                .subcommand(
7056                    ToolSchema::new("create", "create context")
7057                        .param(ParamSchema::new("type", "string").with_aliases(["t"]))
7058                        .param(ParamSchema::new("force", "bool")),
7059                ),
7060        )
7061    }
7062
7063    #[tokio::test]
7064    async fn build_args_binds_deep_leaf_value_flag_space_form() {
7065        let kernel = Kernel::transient().expect("kernel");
7066        let schema = kj_tree_schema();
7067        // kj context create --type explorer
7068        let args = vec![
7069            pos("context"),
7070            pos("create"),
7071            Arg::LongFlag("type".into()),
7072            pos("explorer"),
7073        ];
7074        let built = kernel.build_args_async(&args, Some(&schema)).await.expect("build_args");
7075        // --type (declared only on the create leaf) binds in space form.
7076        assert_eq!(built.named.get("type"), Some(&Value::String("explorer".into())));
7077        // The subcommand path survives as positionals for kj to re-parse.
7078        let positionals: Vec<&str> = built
7079            .positional
7080            .iter()
7081            .filter_map(|v| if let Value::String(s) = v { Some(s.as_str()) } else { None })
7082            .collect();
7083        assert_eq!(positionals, vec!["context", "create"]);
7084    }
7085
7086    #[tokio::test]
7087    async fn build_args_leaf_bool_flag_does_not_swallow_positional() {
7088        let kernel = Kernel::transient().expect("kernel");
7089        let schema = kj_tree_schema();
7090        // kj context create --force somearg  → --force is a leaf bool flag,
7091        // it must NOT consume `somearg`.
7092        let args = vec![
7093            pos("context"),
7094            pos("create"),
7095            Arg::LongFlag("force".into()),
7096            pos("somearg"),
7097        ];
7098        let built = kernel.build_args_async(&args, Some(&schema)).await.expect("build_args");
7099        assert!(built.flags.contains("force"), "force should be a bare flag");
7100        let positionals: Vec<&str> = built
7101            .positional
7102            .iter()
7103            .filter_map(|v| if let Value::String(s) = v { Some(s.as_str()) } else { None })
7104            .collect();
7105        assert_eq!(positionals, vec!["context", "create", "somearg"]);
7106    }
7107
7108    #[tokio::test]
7109    async fn build_args_alias_routed_leaf_binds_value_flag() {
7110        let kernel = Kernel::transient().expect("kernel");
7111        let schema = kj_tree_schema();
7112        // kj ctx create -t explorer  → command alias + short flag alias.
7113        let args = vec![
7114            pos("ctx"),
7115            pos("create"),
7116            Arg::ShortFlag("t".into()),
7117            pos("explorer"),
7118        ];
7119        let built = kernel.build_args_async(&args, Some(&schema)).await.expect("build_args");
7120        assert_eq!(built.named.get("type"), Some(&Value::String("explorer".into())));
7121    }
7122
7123    #[tokio::test]
7124    async fn build_args_computed_subcommand_selector_fails_loud() {
7125        let kernel = Kernel::transient().expect("kernel");
7126        let schema = kj_tree_schema();
7127        // kj $(echo context) — routing can't see the value; fail loud.
7128        let args = vec![Arg::Positional(Expr::CommandSubst(vec![Stmt::Command(
7129            crate::ast::Command { name: "echo".into(), args: vec![], redirects: vec![] },
7130        )]))];
7131        let err = kernel
7132            .build_args_async(&args, Some(&schema))
7133            .await
7134            .expect_err("computed subcommand selector must error");
7135        assert!(
7136            err.to_string().contains("subcommand name is required"),
7137            "got: {err}"
7138        );
7139    }
7140
7141    // ── finalize_output: --json rendering vs. owns_output opt-out ───────────
7142
7143    #[test]
7144    fn finalize_output_renders_when_kernel_owns_it() {
7145        use crate::interpreter::{OutputData, OutputFormat};
7146        let r = ExecResult::with_output(OutputData::text("RAW"));
7147        let out = finalize_output(r, Some(OutputFormat::Json), false);
7148        // Kernel renders the typed OutputData → JSON; text is no longer bare.
7149        assert_ne!(out.text_out(), "RAW", "kernel should reformat to JSON");
7150    }
7151
7152    #[test]
7153    fn finalize_output_skips_when_tool_owns_output() {
7154        use crate::interpreter::{OutputData, OutputFormat};
7155        let r = ExecResult::with_output(OutputData::text("RAW"));
7156        let out = finalize_output(r, Some(OutputFormat::Json), true);
7157        // owns_output: the tool already rendered; kernel leaves bytes untouched.
7158        assert_eq!(out.text_out(), "RAW", "owned output must be left as-is");
7159    }
7160
7161    #[test]
7162    fn finalize_output_no_format_is_noop() {
7163        use crate::interpreter::OutputData;
7164        let r = ExecResult::with_output(OutputData::text("RAW"));
7165        let out = finalize_output(r, None, false);
7166        assert_eq!(out.text_out(), "RAW");
7167    }
7168
7169    // ── initial_vars + execute_with_vars + hermetic env ───────────────────
7170
7171    #[tokio::test]
7172    async fn test_initial_vars_set_and_exported() {
7173        let config = KernelConfig::transient()
7174            .with_var("INIT_FOO", Value::String("bar".into()));
7175        let kernel = Kernel::new(config).expect("failed to create kernel");
7176
7177        assert_eq!(
7178            kernel.get_var("INIT_FOO").await,
7179            Some(Value::String("bar".into()))
7180        );
7181        assert!(
7182            kernel.scope.read().await.is_exported("INIT_FOO"),
7183            "initial_vars entries must be marked exported"
7184        );
7185    }
7186
7187    #[tokio::test]
7188    async fn test_execute_with_vars_overlay_visible() {
7189        let kernel = Kernel::transient().expect("failed to create kernel");
7190        let mut overlay = HashMap::new();
7191        overlay.insert("OVERLAY_X".to_string(), Value::String("yes".into()));
7192
7193        let result = kernel
7194            .execute_with_options(r#"echo "${OVERLAY_X}""#, ExecuteOptions::new().with_vars(overlay))
7195            .await
7196            .expect("execute failed");
7197
7198        assert!(result.ok());
7199        assert_eq!(result.text_out().trim(), "yes");
7200    }
7201
7202    #[tokio::test]
7203    async fn test_execute_with_vars_overlay_cleanup() {
7204        let kernel = Kernel::transient().expect("failed to create kernel");
7205        let mut overlay = HashMap::new();
7206        overlay.insert("EPHEMERAL".to_string(), Value::String("transient".into()));
7207
7208        kernel
7209            .execute_with_options("echo ignored", ExecuteOptions::new().with_vars(overlay))
7210            .await
7211            .expect("execute failed");
7212
7213        assert_eq!(kernel.get_var("EPHEMERAL").await, None);
7214        assert!(
7215            !kernel.scope.read().await.is_exported("EPHEMERAL"),
7216            "overlay-only export must be cleared on return"
7217        );
7218    }
7219
7220    #[tokio::test]
7221    async fn test_execute_with_vars_does_not_clobber_existing_export() {
7222        let kernel = Kernel::transient().expect("failed to create kernel");
7223        kernel
7224            .execute("export OUTER=outer")
7225            .await
7226            .expect("export failed");
7227
7228        let mut overlay = HashMap::new();
7229        overlay.insert("OUTER".to_string(), Value::String("inner".into()));
7230        let result = kernel
7231            .execute_with_options(r#"echo "${OUTER}""#, ExecuteOptions::new().with_vars(overlay))
7232            .await
7233            .expect("execute failed");
7234        assert_eq!(result.text_out().trim(), "inner");
7235
7236        assert_eq!(
7237            kernel.get_var("OUTER").await,
7238            Some(Value::String("outer".into())),
7239            "outer value must reappear after pop"
7240        );
7241        assert!(
7242            kernel.scope.read().await.is_exported("OUTER"),
7243            "outer export must survive overlay"
7244        );
7245    }
7246
7247    #[tokio::test]
7248    async fn test_execute_with_vars_inner_assignment_is_local() {
7249        let kernel = Kernel::transient().expect("failed to create kernel");
7250        let mut overlay = HashMap::new();
7251        overlay.insert("LOCAL_FOO".to_string(), Value::String("from-overlay".into()));
7252
7253        // Variable assignment inside a single statement uses set() (innermost
7254        // frame), not set_global() — this matches bash function-local semantics.
7255        // We explicitly use `local FOO=...` style by relying on the pushed
7256        // frame; the assignment in the script body modifies the same frame.
7257        let result = kernel
7258            .execute_with_options(
7259                r#"LOCAL_FOO="reassigned"; echo "${LOCAL_FOO}""#,
7260                ExecuteOptions::new().with_vars(overlay),
7261            )
7262            .await
7263            .expect("execute failed");
7264        assert!(result.ok());
7265
7266        // After the call the frame is popped, so LOCAL_FOO is gone regardless
7267        // of how the script reassigned it.
7268        assert_eq!(kernel.get_var("LOCAL_FOO").await, None);
7269    }
7270
7271    #[tokio::test]
7272    async fn test_external_command_sees_exported_var() {
7273        let kernel = Kernel::transient().expect("failed to create kernel");
7274        let result = kernel
7275            .execute("export EXT_FOO=bar; printenv EXT_FOO")
7276            .await
7277            .expect("execute failed");
7278
7279        assert!(result.ok(), "printenv should succeed: stderr={}", result.err);
7280        assert_eq!(result.text_out().trim(), "bar");
7281    }
7282
7283    #[tokio::test]
7284    async fn test_external_command_does_not_see_unexported_var() {
7285        let kernel = Kernel::transient().expect("failed to create kernel");
7286
7287        // Set without exporting; printenv must not see it (exit code != 0,
7288        // empty stdout per printenv semantics).
7289        let result = kernel
7290            .execute("EXT_BAR=hidden; printenv EXT_BAR")
7291            .await
7292            .expect("execute failed");
7293
7294        assert!(!result.ok(), "printenv should fail when var is unexported");
7295        assert!(
7296            result.text_out().trim().is_empty(),
7297            "no stdout when var is missing, got: {}",
7298            result.text_out()
7299        );
7300    }
7301
7302    #[tokio::test]
7303    async fn test_external_command_does_not_see_os_env() {
7304        // The kernel is hermetic: it never reads std::env::vars() and only
7305        // exports what it has been told to export. Cargo always sets PATH for
7306        // tests, so PATH is reliably present in the OS env — but a transient
7307        // kernel doesn't seed it into initial_vars, so `printenv PATH` from
7308        // inside the kernel must fail.
7309        assert!(
7310            std::env::var_os("PATH").is_some(),
7311            "test precondition: cargo should set PATH"
7312        );
7313
7314        let kernel = Kernel::transient().expect("failed to create kernel");
7315        let result = kernel
7316            .execute("printenv PATH")
7317            .await
7318            .expect("execute failed");
7319
7320        assert!(
7321            !result.ok(),
7322            "printenv PATH must fail in hermetic kernel, got stdout={:?}",
7323            result.text_out()
7324        );
7325        assert!(
7326            result.text_out().trim().is_empty(),
7327            "no PATH in subprocess env, got stdout={:?}",
7328            result.text_out()
7329        );
7330    }
7331
7332    #[tokio::test]
7333    async fn test_execute_with_vars_overlay_reaches_subprocess() {
7334        let kernel = Kernel::transient().expect("failed to create kernel");
7335        let mut overlay = HashMap::new();
7336        overlay.insert("SUB_FOO".to_string(), Value::String("subproc".into()));
7337
7338        let result = kernel
7339            .execute_with_options("printenv SUB_FOO", ExecuteOptions::new().with_vars(overlay))
7340            .await
7341            .expect("execute failed");
7342
7343        assert!(
7344            result.ok(),
7345            "printenv should succeed: code={} stdout={:?} stderr={:?}",
7346            result.code,
7347            result.text_out(),
7348            result.err
7349        );
7350        assert_eq!(result.text_out().trim(), "subproc");
7351    }
7352}