Skip to main content

kaish_kernel/
kernel.rs

1//! The Kernel (核) — the heart of kaish.
2//!
3//! The Kernel owns and coordinates all core components:
4//! - Interpreter state (scope, $?)
5//! - Tool registry (builtins, user tools)
6//! - VFS router (mount points)
7//! - Job manager (background jobs)
8//!
9//! # Architecture
10//!
11//! ```text
12//! ┌────────────────────────────────────────────────────────────┐
13//! │                         Kernel (核)                         │
14//! │  ┌──────────────┐  ┌──────────────┐  ┌──────────────────┐  │
15//! │  │   Scope      │  │ ToolRegistry │  │  VfsRouter       │  │
16//! │  │  (variables) │  │  (builtins,  │  │  (mount points)  │  │
17//! │  │              │  │   user tools)│  │                  │  │
18//! │  └──────────────┘  └──────────────┘  └──────────────────┘  │
19//! │  ┌──────────────────────────────┐  ┌──────────────────┐    │
20//! │  │  JobManager (background)     │  │  ExecResult ($?) │    │
21//! │  └──────────────────────────────┘  └──────────────────┘    │
22//! └────────────────────────────────────────────────────────────┘
23//! ```
24
25use std::collections::HashMap;
26use std::path::PathBuf;
27use std::sync::Arc;
28use std::sync::atomic::{AtomicU64, Ordering};
29use std::time::Duration;
30
31use anyhow::{Context, Result};
32use tokio::sync::RwLock;
33
34/// Monotonic counter assigned to each Kernel at construction time, exposed
35/// via `$$` / `${$}`. Starts at 1; each new Kernel gets the next value.
36/// `Kernel::fork()` inherits the parent's value (matching bash's "subshell
37/// keeps parent's $$" semantics) because forks clone the parent's Scope
38/// rather than calling `set_pid` again.
39///
40/// Deliberately *not* the OS PID — kaish runs as a long-lived MCP server
41/// or embedded inside other binaries (kaijutsu), where the host PID is
42/// meaningless to the script. See
43/// `~/.claude/projects/-home-atobey-src-kaish/memory/lang_dollar_dollar_identifier.md`
44/// for the design rationale.
45static KERNEL_COUNTER: AtomicU64 = AtomicU64::new(1);
46
47use async_trait::async_trait;
48
49use crate::ast::{Arg, Command, Expr, FileTestOp, Stmt, StringPart, TestExpr, ToolDef, Value, BinaryOp};
50pub use kaish_types::ExecuteOptions;
51use crate::backend::{BackendError, KernelBackend};
52use kaish_glob::glob_match;
53use crate::dispatch::{CommandDispatcher, PipelinePosition};
54use crate::interpreter::{apply_output_format, eval_expr, expand_tilde, json_to_value, value_to_bool, value_to_string, ControlFlow, ExecResult, Scope};
55use crate::parser::parse;
56use crate::scheduler::{is_bool_type, schema_param_lookup, select_leaf, stderr_stream, BoundedStream, JobManager, PipelineRunner, StderrReceiver};
57#[cfg(feature = "subprocess")]
58use crate::scheduler::{drain_to_stream, DEFAULT_STREAM_MAX_SIZE};
59use crate::tools::{register_builtins, ExecContext, GlobalFlags, ToolArgs, ToolRegistry};
60#[cfg(feature = "subprocess")]
61use crate::tools::resolve_in_path;
62use crate::validator::{Severity, Validator};
63#[cfg(feature = "localfs")]
64use crate::vfs::LocalFs;
65use crate::vfs::{BuiltinFs, JobFs, MemoryFs, VfsRouter};
66
67/// VFS mount mode determines how the local filesystem is exposed.
68///
69/// Different modes trade off convenience vs. security:
70/// - `Passthrough` gives native path access (best for human REPL use)
71/// - `Sandboxed` restricts access to a subtree (safer for agents)
72/// - `NoLocal` provides complete isolation (tests, pure memory mode)
73#[derive(Debug, Clone)]
74pub enum VfsMountMode {
75    /// LocalFs at "/" — native paths work directly.
76    ///
77    /// Full filesystem access. Use for human-operated REPL sessions where
78    /// native paths like `/home/user/project` should just work.
79    ///
80    /// Mounts:
81    /// - `/` → LocalFs("/")
82    /// - `/v` → MemoryFs (blob storage)
83    #[cfg(feature = "localfs")]
84    Passthrough,
85
86    /// Transparent sandbox — paths look native but access is restricted.
87    ///
88    /// The local filesystem is mounted at its real path (e.g., `/home/user`),
89    /// so `/home/user/src/project` just works. But paths outside the sandbox
90    /// root are not accessible.
91    ///
92    /// **Note:** This only restricts VFS (builtin) operations. External commands
93    /// bypass the sandbox entirely — see [`KernelConfig::allow_external_commands`].
94    ///
95    /// Mounts:
96    /// - `/` → MemoryFs (catches paths outside sandbox)
97    /// - `{root}` → LocalFs(root)  (e.g., `/home/user` → LocalFs)
98    /// - `/tmp` → LocalFs("/tmp")
99    /// - `/v` → MemoryFs (blob storage)
100    #[cfg(feature = "localfs")]
101    Sandboxed {
102        /// Root path for local filesystem. Defaults to `$HOME`.
103        /// Can be restricted further, e.g., `~/src`.
104        root: Option<PathBuf>,
105    },
106
107    /// No local filesystem. Memory only.
108    ///
109    /// Complete isolation — no access to the host filesystem.
110    /// Useful for tests or pure sandboxed execution.
111    ///
112    /// Output spill is forced to [`SpillMode::Memory`](crate::output_limit::SpillMode::Memory)
113    /// for this mode at kernel construction: with no host filesystem mounted,
114    /// large output must not write a host spill file (`paths::spill_dir()`
115    /// bypasses the VFS). This overrides any explicit `SpillMode::Disk`.
116    ///
117    /// Mounts:
118    /// - `/` → MemoryFs
119    /// - `/tmp` → MemoryFs
120    /// - `/v` → MemoryFs
121    NoLocal,
122}
123
124#[allow(clippy::derivable_impls)] // native has multiple variants; not derivable cross-feature
125impl Default for VfsMountMode {
126    fn default() -> Self {
127        #[cfg(feature = "localfs")]
128        { VfsMountMode::Sandboxed { root: None } }
129        #[cfg(not(feature = "localfs"))]
130        { VfsMountMode::NoLocal }
131    }
132}
133
134/// Configuration for kernel initialization.
135#[derive(Debug, Clone)]
136pub struct KernelConfig {
137    /// Name of this kernel (for identification).
138    pub name: String,
139
140    /// VFS mount mode — controls how local filesystem is exposed.
141    pub vfs_mode: VfsMountMode,
142
143    /// Initial working directory (VFS path).
144    pub cwd: PathBuf,
145
146    /// Whether to skip pre-execution validation.
147    ///
148    /// When false (default), scripts are validated before execution to catch
149    /// errors early. Set to true to skip validation for performance or to
150    /// allow dynamic/external commands.
151    pub skip_validation: bool,
152
153    /// When true, standalone external commands inherit stdio for real-time output.
154    ///
155    /// Set by script runner and REPL for human-visible output.
156    /// Not set by MCP server (output must be captured for structured responses).
157    pub interactive: bool,
158
159    /// Ignore file configuration for file-walking tools.
160    pub ignore_config: crate::ignore_config::IgnoreConfig,
161
162    /// Output size limit configuration for agent safety.
163    pub output_limit: crate::output_limit::OutputLimitConfig,
164
165    /// Whether external command execution (PATH lookup, `exec`, `spawn`) is allowed.
166    ///
167    /// When `true` (default), commands not found as builtins are resolved via PATH
168    /// and executed as child processes. When `false`, only kaish builtins and
169    /// backend-registered tools are available.
170    ///
171    /// **Security:** External commands bypass the VFS sandbox entirely — they see
172    /// the real filesystem, network, and environment. Set to `false` when running
173    /// untrusted input.
174    pub allow_external_commands: bool,
175
176    /// Enable confirmation latch for dangerous operations (set -o latch).
177    ///
178    /// When enabled, destructive operations like `rm` require nonce confirmation.
179    /// Can also be enabled at runtime with `set -o latch` or via `KAISH_LATCH=1`.
180    pub latch_enabled: bool,
181
182    /// Enable trash-on-delete for rm (set -o trash).
183    ///
184    /// When enabled, small files are moved to freedesktop.org Trash instead of
185    /// being permanently deleted. Can also be enabled at runtime with `set -o trash`
186    /// or via `KAISH_TRASH=1`.
187    pub trash_enabled: bool,
188
189    /// Shared nonce store for cross-request confirmation latch.
190    ///
191    /// When `Some`, the kernel uses this store instead of creating a fresh one.
192    /// This allows nonces issued in one MCP `execute()` call to be validated
193    /// in a subsequent call. When `None` (default), a fresh store is created.
194    pub nonce_store: Option<crate::nonce::NonceStore>,
195
196    /// Variables to populate the root scope with at construction, all marked
197    /// for export to child processes.
198    ///
199    /// The kernel itself is hermetic — it never reads `std::env::vars()` —
200    /// so frontends that want OS-env passthrough (REPL, MCP) populate this
201    /// from `std::env::vars()`. Embedders that want isolation pass nothing
202    /// (or only the keys they curate).
203    pub initial_vars: HashMap<String, Value>,
204
205    /// Default per-request timeout. When `Some`, every `execute_with_options`
206    /// call without an explicit `ExecuteOptions::timeout` uses this duration.
207    /// When elapsed, the kernel cancels the request, kills any external
208    /// children with the configured grace, and returns exit code 124.
209    ///
210    /// `None` means no default timeout — only explicit per-call timeouts apply.
211    pub request_timeout: Option<Duration>,
212
213    /// Grace period between SIGTERM and SIGKILL when killing an external
214    /// child on cancellation or timeout.
215    ///
216    /// Defaults to 2 seconds. Set to `Duration::ZERO` to escalate immediately
217    /// to SIGKILL. Long-shutdown processes (databases, etc.) may need more.
218    pub kill_grace: Duration,
219}
220
221/// Get the default sandbox root ($HOME).
222#[cfg(feature = "localfs")]
223fn default_sandbox_root() -> PathBuf {
224    std::env::var("HOME")
225        .map(PathBuf::from)
226        .unwrap_or_else(|_| PathBuf::from("/"))
227}
228
229impl Default for KernelConfig {
230    fn default() -> Self {
231        #[cfg(feature = "localfs")]
232        {
233            let home = default_sandbox_root();
234            Self {
235                name: "default".to_string(),
236                vfs_mode: VfsMountMode::Sandboxed { root: None },
237                cwd: home,
238                skip_validation: false,
239                interactive: false,
240                ignore_config: crate::ignore_config::IgnoreConfig::none(),
241                output_limit: crate::output_limit::OutputLimitConfig::none(),
242                allow_external_commands: cfg!(feature = "subprocess"),
243                latch_enabled: std::env::var("KAISH_LATCH").is_ok_and(|v| v == "1"),
244                trash_enabled: std::env::var("KAISH_TRASH").is_ok_and(|v| v == "1"),
245                nonce_store: None,
246                initial_vars: HashMap::new(),
247                request_timeout: None,
248                kill_grace: Duration::from_secs(2),
249            }
250        }
251        #[cfg(not(feature = "localfs"))]
252        {
253            Self {
254                name: "default".to_string(),
255                vfs_mode: VfsMountMode::NoLocal,
256                cwd: PathBuf::from("/"),
257                skip_validation: false,
258                interactive: false,
259                ignore_config: crate::ignore_config::IgnoreConfig::none(),
260                output_limit: crate::output_limit::OutputLimitConfig::none(),
261                allow_external_commands: false,
262                latch_enabled: false,
263                trash_enabled: false,
264                nonce_store: None,
265                initial_vars: HashMap::new(),
266                request_timeout: None,
267                kill_grace: Duration::from_secs(2),
268            }
269        }
270    }
271}
272
273impl KernelConfig {
274    /// Create a transient kernel config (sandboxed, for temporary use).
275    #[cfg(feature = "localfs")]
276    pub fn transient() -> Self {
277        let home = default_sandbox_root();
278        Self {
279            name: "transient".to_string(),
280            vfs_mode: VfsMountMode::Sandboxed { root: None },
281            cwd: home,
282            skip_validation: false,
283            interactive: false,
284            ignore_config: crate::ignore_config::IgnoreConfig::none(),
285            output_limit: crate::output_limit::OutputLimitConfig::none(),
286            allow_external_commands: cfg!(feature = "subprocess"),
287            latch_enabled: false,
288            trash_enabled: false,
289            nonce_store: None,
290            initial_vars: HashMap::new(),
291            request_timeout: None,
292            kill_grace: Duration::from_secs(2),
293        }
294    }
295
296    /// Create a transient kernel config (isolated, no-default-features).
297    #[cfg(not(feature = "localfs"))]
298    pub fn transient() -> Self {
299        Self::isolated()
300    }
301
302    /// Create a kernel config with the given name (sandboxed by default).
303    #[cfg(feature = "localfs")]
304    pub fn named(name: &str) -> Self {
305        let home = default_sandbox_root();
306        Self {
307            name: name.to_string(),
308            vfs_mode: VfsMountMode::Sandboxed { root: None },
309            cwd: home,
310            skip_validation: false,
311            interactive: false,
312            ignore_config: crate::ignore_config::IgnoreConfig::none(),
313            output_limit: crate::output_limit::OutputLimitConfig::none(),
314            allow_external_commands: cfg!(feature = "subprocess"),
315            latch_enabled: false,
316            trash_enabled: false,
317            nonce_store: None,
318            initial_vars: HashMap::new(),
319            request_timeout: None,
320            kill_grace: Duration::from_secs(2),
321        }
322    }
323
324    /// Create a kernel config with the given name (isolated, no-default-features).
325    #[cfg(not(feature = "localfs"))]
326    pub fn named(name: &str) -> Self {
327        Self {
328            name: name.to_string(),
329            ..Self::isolated()
330        }
331    }
332
333    /// Create a REPL config with passthrough filesystem access.
334    ///
335    /// Native paths like `/home/user/project` work directly.
336    /// The cwd is set to the actual current working directory.
337    #[cfg(feature = "localfs")]
338    pub fn repl() -> Self {
339        let cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("/"));
340        Self {
341            name: "repl".to_string(),
342            vfs_mode: VfsMountMode::Passthrough,
343            cwd,
344            skip_validation: false,
345            interactive: false,
346            ignore_config: crate::ignore_config::IgnoreConfig::none(),
347            output_limit: crate::output_limit::OutputLimitConfig::none(),
348            allow_external_commands: cfg!(feature = "subprocess"),
349            latch_enabled: std::env::var("KAISH_LATCH").is_ok_and(|v| v == "1"),
350            trash_enabled: std::env::var("KAISH_TRASH").is_ok_and(|v| v == "1"),
351            nonce_store: None,
352            initial_vars: HashMap::new(),
353            request_timeout: None,
354            kill_grace: Duration::from_secs(2),
355        }
356    }
357
358    /// Create an MCP server config with sandboxed filesystem access.
359    ///
360    /// Local filesystem is accessible at its real path (e.g., `/home/user`),
361    /// but sandboxed to `$HOME`. Paths outside the sandbox are not accessible
362    /// through builtins. External commands still access the real filesystem —
363    /// use `.with_allow_external_commands(false)` to block them.
364    #[cfg(feature = "localfs")]
365    pub fn mcp() -> Self {
366        let home = default_sandbox_root();
367        Self {
368            name: "mcp".to_string(),
369            vfs_mode: VfsMountMode::Sandboxed { root: None },
370            cwd: home,
371            skip_validation: false,
372            interactive: false,
373            ignore_config: crate::ignore_config::IgnoreConfig::mcp(),
374            output_limit: crate::output_limit::OutputLimitConfig::mcp(),
375            allow_external_commands: cfg!(feature = "subprocess"),
376            latch_enabled: std::env::var("KAISH_LATCH").is_ok_and(|v| v == "1"),
377            trash_enabled: std::env::var("KAISH_TRASH").is_ok_and(|v| v == "1"),
378            nonce_store: None,
379            initial_vars: HashMap::new(),
380            request_timeout: None,
381            kill_grace: Duration::from_secs(2),
382        }
383    }
384
385    /// Create an MCP server config with a custom sandbox root.
386    ///
387    /// Use this to restrict access to a subdirectory like `~/src`.
388    #[cfg(feature = "localfs")]
389    pub fn mcp_with_root(root: PathBuf) -> Self {
390        Self {
391            name: "mcp".to_string(),
392            vfs_mode: VfsMountMode::Sandboxed { root: Some(root.clone()) },
393            cwd: root,
394            skip_validation: false,
395            interactive: false,
396            ignore_config: crate::ignore_config::IgnoreConfig::mcp(),
397            output_limit: crate::output_limit::OutputLimitConfig::mcp(),
398            allow_external_commands: cfg!(feature = "subprocess"),
399            latch_enabled: std::env::var("KAISH_LATCH").is_ok_and(|v| v == "1"),
400            trash_enabled: std::env::var("KAISH_TRASH").is_ok_and(|v| v == "1"),
401            nonce_store: None,
402            initial_vars: HashMap::new(),
403            request_timeout: None,
404            kill_grace: Duration::from_secs(2),
405        }
406    }
407
408    /// Create a config with no local filesystem (memory only).
409    ///
410    /// Complete isolation: no local filesystem and external commands are disabled.
411    /// Useful for tests or pure sandboxed execution.
412    pub fn isolated() -> Self {
413        Self {
414            name: "isolated".to_string(),
415            vfs_mode: VfsMountMode::NoLocal,
416            cwd: PathBuf::from("/"),
417            skip_validation: false,
418            interactive: false,
419            ignore_config: crate::ignore_config::IgnoreConfig::none(),
420            output_limit: crate::output_limit::OutputLimitConfig::none(),
421            allow_external_commands: false,
422            latch_enabled: false,
423            trash_enabled: false,
424            nonce_store: None,
425            initial_vars: HashMap::new(),
426            request_timeout: None,
427            kill_grace: Duration::from_secs(2),
428        }
429    }
430
431    /// Set the VFS mount mode.
432    pub fn with_vfs_mode(mut self, mode: VfsMountMode) -> Self {
433        self.vfs_mode = mode;
434        self
435    }
436
437    /// Set the initial working directory.
438    pub fn with_cwd(mut self, cwd: PathBuf) -> Self {
439        self.cwd = cwd;
440        self
441    }
442
443    /// Skip pre-execution validation.
444    pub fn with_skip_validation(mut self, skip: bool) -> Self {
445        self.skip_validation = skip;
446        self
447    }
448
449    /// Enable interactive mode (external commands inherit stdio).
450    pub fn with_interactive(mut self, interactive: bool) -> Self {
451        self.interactive = interactive;
452        self
453    }
454
455    /// Set the ignore file configuration.
456    pub fn with_ignore_config(mut self, config: crate::ignore_config::IgnoreConfig) -> Self {
457        self.ignore_config = config;
458        self
459    }
460
461    /// Set the output limit configuration.
462    pub fn with_output_limit(mut self, config: crate::output_limit::OutputLimitConfig) -> Self {
463        self.output_limit = config;
464        self
465    }
466
467    /// Set whether external command execution is allowed.
468    ///
469    /// When `false`, commands not found as builtins produce "command not found"
470    /// instead of searching PATH. The `exec` and `spawn` builtins also return
471    /// errors. Use this to prevent VFS sandbox bypass via external binaries.
472    pub fn with_allow_external_commands(mut self, allow: bool) -> Self {
473        self.allow_external_commands = allow;
474        self
475    }
476
477    /// Enable or disable confirmation latch at startup.
478    pub fn with_latch(mut self, enabled: bool) -> Self {
479        self.latch_enabled = enabled;
480        self
481    }
482
483    /// Enable or disable trash-on-delete at startup.
484    pub fn with_trash(mut self, enabled: bool) -> Self {
485        self.trash_enabled = enabled;
486        self
487    }
488
489    /// Use a shared nonce store for cross-request confirmation latch.
490    ///
491    /// Pass a `NonceStore` that outlives individual kernel instances so nonces
492    /// issued in one MCP `execute()` call can be validated in subsequent calls.
493    pub fn with_nonce_store(mut self, store: crate::nonce::NonceStore) -> Self {
494        self.nonce_store = Some(store);
495        self
496    }
497
498    /// Add a single initial variable; marked exported when the kernel boots.
499    ///
500    /// Repeated calls add (last write wins on key collision).
501    pub fn with_var(mut self, name: impl Into<String>, value: Value) -> Self {
502        self.initial_vars.insert(name.into(), value);
503        self
504    }
505
506    /// Replace the entire initial-vars map. All entries are marked exported.
507    pub fn with_initial_vars(mut self, vars: HashMap<String, Value>) -> Self {
508        self.initial_vars = vars;
509        self
510    }
511
512    /// Extend the initial-vars map with the given entries (last write wins).
513    pub fn with_vars(mut self, vars: HashMap<String, Value>) -> Self {
514        self.initial_vars.extend(vars);
515        self
516    }
517
518    /// Set the default per-request timeout (kernel-wide).
519    ///
520    /// Each `execute_with_options` call without an explicit timeout uses
521    /// this. On elapsed, the kernel cancels and returns exit code 124.
522    pub fn with_request_timeout(mut self, timeout: Duration) -> Self {
523        self.request_timeout = Some(timeout);
524        self
525    }
526
527    /// Set the SIGTERM-to-SIGKILL grace period for child kills.
528    pub fn with_kill_grace(mut self, grace: Duration) -> Self {
529        self.kill_grace = grace;
530        self
531    }
532}
533
534/// The Kernel (核) — executes kaish code.
535///
536/// This is the primary interface for running kaish commands. It owns all
537/// the runtime state: variables, tools, VFS, jobs, and persistence.
538pub struct Kernel {
539    /// Kernel name.
540    name: String,
541    /// Variable scope.
542    scope: RwLock<Scope>,
543    /// Tool registry.
544    tools: Arc<ToolRegistry>,
545    /// User-defined tools (from `tool name { body }` statements).
546    user_tools: RwLock<HashMap<String, ToolDef>>,
547    /// Virtual filesystem router.
548    vfs: Arc<VfsRouter>,
549    /// Background job manager.
550    jobs: Arc<JobManager>,
551    /// Pipeline runner.
552    runner: PipelineRunner,
553    /// Execution context (cwd, stdin, etc.).
554    exec_ctx: RwLock<ExecContext>,
555    /// Whether to skip pre-execution validation.
556    skip_validation: bool,
557    /// When true, standalone external commands inherit stdio for real-time output.
558    interactive: bool,
559    /// Whether external command execution is allowed.
560    allow_external_commands: bool,
561    /// Default per-request timeout (None = no default).
562    request_timeout: Option<Duration>,
563    /// SIGTERM-to-SIGKILL grace period for child kills.
564    kill_grace: Duration,
565    /// Receiver for the kernel stderr stream.
566    ///
567    /// Pipeline stages write to the corresponding `StderrStream` (set on ExecContext).
568    /// The kernel drains this after each statement in `execute_streaming`.
569    stderr_receiver: tokio::sync::Mutex<StderrReceiver>,
570    /// Cancellation token for interrupting execution (Ctrl-C).
571    ///
572    /// Protected by `std::sync::Mutex` (not tokio) because the SIGINT handler
573    /// needs sync access. Each `execute()` call gets a fresh child token;
574    /// `cancel()` cancels the current token and replaces it.
575    cancel_token: std::sync::Mutex<tokio_util::sync::CancellationToken>,
576    /// Terminal state for job control (interactive mode only, Unix only).
577    #[cfg(all(unix, feature = "subprocess"))]
578    terminal_state: Option<Arc<crate::terminal::TerminalState>>,
579    /// Weak self-reference for handing out `Arc<dyn CommandDispatcher>`.
580    ///
581    /// Set by `into_arc()`. Allows builtins to re-dispatch inner commands
582    /// through the full Kernel resolution chain.
583    self_weak: std::sync::OnceLock<std::sync::Weak<Self>>,
584    /// Serializes concurrent `execute()` / `execute_streaming()` callers on
585    /// this Kernel instance. Tokio's Mutex is fair (FIFO) and acts as the
586    /// queue. Background jobs, scatter workers, and concurrent pipeline
587    /// stages do NOT take this lock — they run against a *forked* Kernel
588    /// (see [`Kernel::fork`]) so they never contend with the foreground.
589    execute_lock: tokio::sync::Mutex<()>,
590}
591
592impl Kernel {
593    /// Create a new kernel with the given configuration.
594    pub fn new(config: KernelConfig) -> Result<Self> {
595        let mut vfs = Self::setup_vfs(&config);
596        let jobs = Arc::new(JobManager::new());
597
598        // Mount JobFs for job observability at /v/jobs
599        vfs.mount("/v/jobs", JobFs::new(jobs.clone()));
600
601        // Mode-based construction: the kernel owns its host mounts, so whether
602        // host side channels are allowed is decided by the VFS mode inside
603        // `assemble` (NoLocal forbids them).
604        Self::assemble(config, vfs, jobs, false, |_| {}, |vfs_ref, tools| {
605            ExecContext::with_vfs_and_tools(vfs_ref.clone(), tools.clone())
606        })
607    }
608
609    /// Set up VFS based on mount mode.
610    fn setup_vfs(config: &KernelConfig) -> VfsRouter {
611        let mut vfs = VfsRouter::new();
612
613        match &config.vfs_mode {
614            #[cfg(feature = "localfs")]
615            VfsMountMode::Passthrough => {
616                // LocalFs at "/" — native paths work directly
617                vfs.mount("/", LocalFs::new(PathBuf::from("/")));
618                // Memory for blobs
619                vfs.mount("/v", MemoryFs::new());
620            }
621            #[cfg(feature = "localfs")]
622            VfsMountMode::Sandboxed { root } => {
623                // Memory at root for safety (catches paths outside sandbox)
624                vfs.mount("/", MemoryFs::new());
625                vfs.mount("/v", MemoryFs::new());
626
627                // Real /tmp for interop with other processes
628                vfs.mount("/tmp", LocalFs::new(PathBuf::from("/tmp")));
629
630                // Mount XDG runtime dir for spill files and socket access
631                let runtime = crate::paths::xdg_runtime_dir();
632                if runtime.exists() {
633                    let runtime_str = runtime.to_string_lossy().to_string();
634                    vfs.mount(&runtime_str, LocalFs::new(runtime));
635                }
636
637                // Resolve the sandbox root (defaults to $HOME)
638                let local_root = root.clone().unwrap_or_else(|| {
639                    std::env::var("HOME")
640                        .map(PathBuf::from)
641                        .unwrap_or_else(|_| PathBuf::from("/"))
642                });
643
644                // Mount at the real path for transparent access
645                // e.g., /home/atobey → LocalFs("/home/atobey")
646                // so /home/atobey/src/kaish just works
647                let mount_point = local_root.to_string_lossy().to_string();
648                vfs.mount(&mount_point, LocalFs::new(local_root));
649            }
650            VfsMountMode::NoLocal => {
651                // Pure memory mode — no local filesystem
652                vfs.mount("/", MemoryFs::new());
653                vfs.mount("/tmp", MemoryFs::new());
654                vfs.mount("/v", MemoryFs::new());
655            }
656        }
657
658        vfs
659    }
660
661    /// Create a transient kernel (no persistence).
662    pub fn transient() -> Result<Self> {
663        Self::new(KernelConfig::transient())
664    }
665
666    /// Create a kernel with a custom backend and `/v/*` virtual path support.
667    ///
668    /// This is the constructor for embedding kaish in other systems that provide
669    /// their own storage backend (e.g., CRDT-backed storage in kaijutsu).
670    ///
671    /// A `VirtualOverlayBackend` routes paths automatically:
672    /// - `/v/*` → Internal VFS (JobFs at `/v/jobs`, MemoryFs at `/v/blobs`)
673    /// - Everything else → Your custom backend
674    ///
675    /// The optional `configure_vfs` closure lets you add additional virtual mounts
676    /// (e.g., `/v/docs` for CRDT blocks) after the built-in mounts are set up.
677    ///
678    /// **Note:** The config's `vfs_mode` is ignored — all non-`/v/*` path routing
679    /// is handled by your custom backend. The config is only used for `name`, `cwd`,
680    /// `skip_validation`, and `interactive`.
681    ///
682    /// # Example
683    ///
684    /// ```ignore
685    /// // Simple: default /v/* mounts only
686    /// let kernel = Kernel::with_backend(backend, config, |_| {}, |_| {})?;
687    ///
688    /// // With custom mounts
689    /// let kernel = Kernel::with_backend(backend, config, |vfs| {
690    ///     vfs.mount_arc("/v/docs", docs_fs);
691    ///     vfs.mount_arc("/v/g", git_fs);
692    /// }, |_| {})?;
693    ///
694    /// // With custom tools
695    /// let kernel = Kernel::with_backend(backend, config, |_| {}, |tools| {
696    ///     tools.register(MyCustomTool::new());
697    /// })?;
698    /// ```
699    pub fn with_backend(
700        backend: Arc<dyn KernelBackend>,
701        config: KernelConfig,
702        configure_vfs: impl FnOnce(&mut VfsRouter),
703        configure_tools: impl FnOnce(&mut ToolRegistry),
704    ) -> Result<Self> {
705        use crate::backend::VirtualOverlayBackend;
706
707        let mut vfs = VfsRouter::new();
708        let jobs = Arc::new(JobManager::new());
709
710        vfs.mount("/v/jobs", JobFs::new(jobs.clone()));
711        vfs.mount("/v/blobs", MemoryFs::new());
712
713        // Let caller add custom mounts (e.g., /v/docs, /v/g)
714        configure_vfs(&mut vfs);
715
716        // A custom-backend kernel owns no host mounts — the embedder supplies
717        // the entire VFS — so any kernel write to a host filesystem via
718        // `std::fs` (output spill, job output files) bypasses that VFS and its
719        // read-only guarantees. Forbid host side channels unconditionally.
720        Self::assemble(config, vfs, jobs, true, configure_tools, |vfs_arc: &Arc<VfsRouter>, _: &Arc<ToolRegistry>| {
721            let overlay: Arc<dyn KernelBackend> =
722                Arc::new(VirtualOverlayBackend::new(backend, vfs_arc.clone()));
723            ExecContext::with_backend(overlay)
724        })
725    }
726
727    /// Shared assembly: wires up tools, runner, scope, and ExecContext.
728    ///
729    /// The `make_ctx` closure receives the VFS and tools so backends that need
730    /// them (like `LocalBackend::with_tools`) can capture them. Custom backends
731    /// that already have their own storage can ignore these parameters.
732    fn assemble(
733        config: KernelConfig,
734        mut vfs: VfsRouter,
735        jobs: Arc<JobManager>,
736        no_host_filesystem: bool,
737        configure_tools: impl FnOnce(&mut ToolRegistry),
738        make_ctx: impl FnOnce(&Arc<VfsRouter>, &Arc<ToolRegistry>) -> ExecContext,
739    ) -> Result<Self> {
740        // A kernel with no host filesystem of its own must never write to one
741        // through a side channel. Two paths bypass the VFS by going straight to
742        // `std::fs`: output spill (`paths::spill_dir()` → host temp/cache) and
743        // background-job output files (`Job::write_output_file` → host temp).
744        // Both would punch through the isolation, so force them off:
745        // in-memory truncation for spill, no host file for job output.
746        //
747        // This is true for a `NoLocal` kernel (mounts nothing) and for any
748        // `with_backend` kernel (`no_host_filesystem` — the embedder owns the
749        // VFS, so the kernel controls no host mounts and any host write is a
750        // bypass). Overrides an explicit `SpillMode::Disk`, which is nonsensical
751        // when there is no kernel-owned host filesystem to spill to.
752        let no_host_side_channel =
753            no_host_filesystem || matches!(config.vfs_mode, VfsMountMode::NoLocal);
754
755        let KernelConfig { name, cwd, skip_validation, interactive, ignore_config, mut output_limit, allow_external_commands, latch_enabled, trash_enabled, nonce_store, initial_vars, request_timeout, kill_grace, .. } = config;
756
757        if no_host_side_channel {
758            output_limit.set_spill_mode(crate::output_limit::SpillMode::Memory);
759            jobs.set_persist_output_files(false);
760        }
761
762        let mut tools = ToolRegistry::new();
763        register_builtins(&mut tools);
764        configure_tools(&mut tools);
765        let tools = Arc::new(tools);
766
767        // Mount BuiltinFs so `ls /v/bin` lists builtins
768        vfs.mount("/v/bin", BuiltinFs::new(tools.clone()));
769
770        let vfs = Arc::new(vfs);
771
772        let runner = PipelineRunner::new(tools.clone());
773
774        let (stderr_writer, stderr_receiver) = stderr_stream();
775
776        let mut exec_ctx = make_ctx(&vfs, &tools);
777        exec_ctx.set_cwd(cwd);
778        exec_ctx.set_job_manager(jobs.clone());
779        exec_ctx.set_tool_schemas(tools.schemas());
780        exec_ctx.set_tools(tools.clone());
781        #[cfg(feature = "os-integration")]
782        exec_ctx.set_trash_backend(Arc::new(crate::trash_system::SystemTrash));
783        exec_ctx.stderr = Some(stderr_writer);
784        exec_ctx.ignore_config = ignore_config;
785        exec_ctx.output_limit = output_limit;
786        exec_ctx.allow_external_commands = allow_external_commands;
787        if let Some(store) = nonce_store {
788            exec_ctx.nonce_store = store;
789        }
790
791        Ok(Self {
792            name,
793            scope: RwLock::new({
794                let mut scope = Scope::new();
795                scope.set_pid(KERNEL_COUNTER.fetch_add(1, Ordering::Relaxed));
796                // HOME is NOT read from the host env here — the kernel is
797                // hermetic. Frontends (REPL, MCP) seed it via `initial_vars`
798                // below (from `std::env::vars()`); a hermetic embedder leaves
799                // `initial_vars` empty and gets no HOME (tilde stays literal).
800                // Apply caller-supplied initial variables, all marked exported.
801                // Frontends (REPL, MCP) populate this from std::env::vars()
802                // for shell-like UX; embedders that want hermetic behavior
803                // simply leave it empty.
804                for (name, value) in initial_vars {
805                    scope.set_exported(name, value);
806                }
807                scope.set_latch_enabled(latch_enabled);
808                scope.set_trash_enabled(trash_enabled);
809                scope
810            }),
811            tools,
812            user_tools: RwLock::new(HashMap::new()),
813            vfs,
814            jobs,
815            runner,
816            exec_ctx: RwLock::new(exec_ctx),
817            skip_validation,
818            interactive,
819            allow_external_commands,
820            request_timeout,
821            kill_grace,
822            stderr_receiver: tokio::sync::Mutex::new(stderr_receiver),
823            cancel_token: std::sync::Mutex::new(tokio_util::sync::CancellationToken::new()),
824            #[cfg(all(unix, feature = "subprocess"))]
825            terminal_state: None,
826            self_weak: std::sync::OnceLock::new(),
827            execute_lock: tokio::sync::Mutex::new(()),
828        })
829    }
830
831    /// Get the kernel name.
832    pub fn name(&self) -> &str {
833        &self.name
834    }
835
836    /// Wrap this Kernel in an Arc and initialize its self-reference.
837    ///
838    /// This enables the Kernel to hand out `Arc<dyn CommandDispatcher>` references
839    /// to child contexts, allowing builtins like `timeout` to dispatch inner
840    /// commands through the full resolution chain (user tools → builtins →
841    /// .kai scripts → external commands).
842    pub fn into_arc(self) -> Arc<Self> {
843        let arc = Arc::new(self);
844        let _ = arc.self_weak.set(Arc::downgrade(&arc));
845        arc
846    }
847
848    /// Fork a subsidiary kernel for concurrent execution.
849    ///
850    /// The fork is a fully-functional `Kernel` that:
851    /// - **Snapshots** per-session state from the parent: scope (COW — cheap),
852    ///   user-defined tools, cwd, aliases, ignore config, etc. Mutations on
853    ///   the fork do NOT propagate back to the parent — matching bash
854    ///   subshell / background-job semantics.
855    /// - **Shares** read-mostly resources with the parent via `Arc`: the tool
856    ///   registry, the VFS router, and the job manager. A job registered by
857    ///   the fork is visible to the parent's `jobs` builtin, and the fork
858    ///   sees the same VFS mounts.
859    /// - **Owns** its own `stderr_receiver`, `cancel_token`, and
860    ///   `execute_lock`. It is never the TTY owner, so `interactive` is
861    ///   `false` and `terminal_state` is `None`.
862    ///
863    /// The returned Arc has its `self_weak` populated (via `into_arc`), so
864    /// nested dispatch through `ctx.dispatcher` (e.g. the `timeout` builtin)
865    /// routes through the fork itself, not the parent — which is essential
866    /// for concurrency safety.
867    ///
868    /// Use this for **detached** background concurrency where the fork should
869    /// survive parent cancellation: the `&` background-job operator and any
870    /// other "fire and forget" worker. The fork gets a fresh, independent
871    /// cancellation token.
872    ///
873    /// For foreground concurrency (scatter workers, concurrent pipeline
874    /// stages, `$(...)` cmdsubs) where parent timeout/cancel must cascade
875    /// into the fork's external children, use [`Self::fork_attached`].
876    pub async fn fork(&self) -> Arc<Self> {
877        self.fork_inner(tokio_util::sync::CancellationToken::new()).await
878    }
879
880    /// Fork attached to the parent's cancellation.
881    ///
882    /// Same as [`Self::fork`] but the fork's `cancel_token` is a child of
883    /// the parent's. When the parent cancels (request timeout, embedder
884    /// `Kernel::cancel`, etc.), the fork's token also cancels, which in
885    /// turn kills any external children spawned in the fork via the
886    /// `wait_or_kill` / SIGTERM-grace-SIGKILL path.
887    pub async fn fork_attached(&self) -> Arc<Self> {
888        let child_token = {
889            #[allow(clippy::expect_used)]
890            let parent = self.cancel_token.lock().expect("cancel_token poisoned");
891            parent.child_token()
892        };
893        self.fork_inner(child_token).await
894    }
895
896    /// Shared fork implementation. Caller decides the cancellation token.
897    async fn fork_inner(&self, cancel: tokio_util::sync::CancellationToken) -> Arc<Self> {
898        let scope_snapshot = self.scope.read().await.clone();
899        let user_tools_snapshot = self.user_tools.read().await.clone();
900
901        // Snapshot exec_ctx by cloning the cloneable fields, then override
902        // the ones that should not carry over (stderr channel, dispatcher,
903        // interactive flag, terminal state, cancel — set from `cancel` arg).
904        let mut fork_ctx = {
905            let parent_ctx = self.exec_ctx.read().await;
906            parent_ctx.child_for_pipeline()
907        };
908        let (stderr_writer, stderr_receiver) = stderr_stream();
909        fork_ctx.stderr = Some(stderr_writer);
910        // Clear dispatcher; dispatch_command will repopulate it to point at
911        // the fork on the first dispatch call.
912        fork_ctx.dispatcher = None;
913        fork_ctx.interactive = false;
914        fork_ctx.cancel = cancel.clone();
915        #[cfg(all(unix, feature = "subprocess"))]
916        {
917            fork_ctx.terminal_state = None;
918        }
919
920        let fork = Self {
921            name: format!("{}:fork", self.name),
922            scope: RwLock::new(scope_snapshot),
923            tools: Arc::clone(&self.tools),
924            user_tools: RwLock::new(user_tools_snapshot),
925            vfs: Arc::clone(&self.vfs),
926            jobs: Arc::clone(&self.jobs),
927            runner: self.runner.clone(),
928            exec_ctx: RwLock::new(fork_ctx),
929            skip_validation: self.skip_validation,
930            // Forks are never the TTY owner — they run in the background.
931            interactive: false,
932            allow_external_commands: self.allow_external_commands,
933            request_timeout: self.request_timeout,
934            kill_grace: self.kill_grace,
935            stderr_receiver: tokio::sync::Mutex::new(stderr_receiver),
936            cancel_token: std::sync::Mutex::new(cancel),
937            #[cfg(all(unix, feature = "subprocess"))]
938            terminal_state: None,
939            self_weak: std::sync::OnceLock::new(),
940            execute_lock: tokio::sync::Mutex::new(()),
941        };
942
943        fork.into_arc()
944    }
945
946    /// Get an `Arc<dyn CommandDispatcher>` to this Kernel, if wrapped via `into_arc()`.
947    ///
948    /// Returns `None` if the Kernel was not wrapped, or if all strong references
949    /// have been dropped (the `Weak` can no longer upgrade).
950    pub fn dispatcher(&self) -> Option<Arc<dyn CommandDispatcher>> {
951        self.self_weak
952            .get()
953            .and_then(|weak| weak.upgrade())
954            .map(|arc| arc as Arc<dyn CommandDispatcher>)
955    }
956
957    /// Initialize terminal state for interactive job control.
958    ///
959    /// Call this after kernel creation when running as an interactive REPL
960    /// and stdin is a TTY. Sets up process groups and signal handling.
961    #[cfg(all(unix, feature = "subprocess"))]
962    pub fn init_terminal(&mut self) {
963        if !self.interactive {
964            return;
965        }
966        match crate::terminal::TerminalState::init() {
967            Ok(state) => {
968                let state = Arc::new(state);
969                self.terminal_state = Some(state.clone());
970                // Set on exec_ctx so builtins (fg, bg, kill) can access it
971                self.exec_ctx.get_mut().terminal_state = Some(state);
972                tracing::debug!("terminal job control initialized");
973            }
974            Err(e) => {
975                tracing::warn!("failed to initialize terminal job control: {}", e);
976            }
977        }
978    }
979
980    /// Cancel the current execution.
981    ///
982    /// This cancels the current cancellation token, causing any execution
983    /// loop to exit at the next checkpoint with exit code 130 (SIGINT).
984    /// A fresh token is installed for the next `execute()` call.
985    pub fn cancel(&self) {
986        #[allow(clippy::expect_used)]
987        let token = self.cancel_token.lock().expect("cancel_token poisoned");
988        token.cancel();
989    }
990
991    /// Check if the current execution has been cancelled.
992    pub fn is_cancelled(&self) -> bool {
993        #[allow(clippy::expect_used)]
994        let token = self.cancel_token.lock().expect("cancel_token poisoned");
995        token.is_cancelled()
996    }
997
998    /// Reset the cancellation token (called at the start of each execute).
999    fn reset_cancel(&self) -> tokio_util::sync::CancellationToken {
1000        #[allow(clippy::expect_used)]
1001        let mut token = self.cancel_token.lock().expect("cancel_token poisoned");
1002        if token.is_cancelled() {
1003            *token = tokio_util::sync::CancellationToken::new();
1004        }
1005        token.clone()
1006    }
1007
1008    /// Acquire the per-Kernel execute lock, warning on contention.
1009    ///
1010    /// Tokio's Mutex is fair (FIFO) so callers queue in arrival order. When
1011    /// the lock is already held, emit a warning so the silent serialization
1012    /// is observable in logs — if you need real parallelism, fork the kernel.
1013    async fn acquire_execute_lock(&self) -> tokio::sync::MutexGuard<'_, ()> {
1014        match self.execute_lock.try_lock() {
1015            Ok(guard) => guard,
1016            Err(_) => {
1017                tracing::warn!(
1018                    target: "kaish::kernel::concurrency",
1019                    kernel = %self.name,
1020                    "execute() contended — serializing concurrent caller; \
1021                     use Kernel::fork() for parallelism instead of sharing"
1022                );
1023                self.execute_lock.lock().await
1024            }
1025        }
1026    }
1027
1028    /// Execute kaish source code with default options.
1029    ///
1030    /// Equivalent to `execute_with_options(input, ExecuteOptions::default())`.
1031    /// Returns the result of the last statement executed.
1032    pub async fn execute(&self, input: &str) -> Result<ExecResult> {
1033        self.run_inner(input, ExecuteOptions::default(), None).await
1034    }
1035
1036    /// Execute with per-call options. The primary entry point for embedders
1037    /// that don't need per-statement output streaming.
1038    ///
1039    /// `opts` carries timeout, transient vars overlay, optional cwd override,
1040    /// and optional embedder-owned cancellation token. See [`ExecuteOptions`]
1041    /// for semantics. For streaming, use [`Self::execute_with_options_streaming`].
1042    ///
1043    /// **Cancellation:** if `opts.cancel_token` is `Some`, it is *raced*
1044    /// against the kernel's internal token. Either firing cancels and kills
1045    /// external children. The embedder's token is read-only — kernel
1046    /// timeouts do NOT propagate into it. Distinguish via the returned
1047    /// `code`: 124 = timeout, 130 = cancellation.
1048    ///
1049    /// **Timeout:** `opts.timeout` overrides `KernelConfig::request_timeout`.
1050    /// `Some(Duration::ZERO)` returns 124 immediately without spawning.
1051    ///
1052    /// Concurrent callers on the same Kernel serialize on the kernel-wide
1053    /// execute lock. For true parallelism, call [`Kernel::fork`] (detached)
1054    /// or [`Kernel::fork_attached`] (cancellation cascades from this kernel).
1055    pub async fn execute_with_options(
1056        &self,
1057        input: &str,
1058        opts: ExecuteOptions,
1059    ) -> Result<ExecResult> {
1060        self.run_inner(input, opts, None).await
1061    }
1062
1063    /// Same as [`Self::execute_with_options`] but with a per-statement output
1064    /// callback. The callback fires after each top-level statement so the
1065    /// embedder (REPL, MCP streaming) can flush output incrementally.
1066    pub async fn execute_with_options_streaming(
1067        &self,
1068        input: &str,
1069        opts: ExecuteOptions,
1070        on_output: &mut (dyn FnMut(&ExecResult) + Send),
1071    ) -> Result<ExecResult> {
1072        self.run_inner(input, opts, Some(on_output)).await
1073    }
1074
1075    /// Execute kaish source code with a transient overlay of exported variables.
1076    ///
1077    /// Deprecated thin wrapper over [`Self::execute_with_options`]. New code
1078    /// should use that method directly:
1079    /// `execute_with_options(input, ExecuteOptions::new().with_vars(vars))`.
1080    #[deprecated(note = "use Kernel::execute_with_options with ExecuteOptions::with_vars")]
1081    pub async fn execute_with_vars(
1082        &self,
1083        input: &str,
1084        vars: HashMap<String, Value>,
1085    ) -> Result<ExecResult> {
1086        self.run_inner(input, ExecuteOptions::new().with_vars(vars), None).await
1087    }
1088
1089    /// Execute kaish source code with a per-statement callback.
1090    ///
1091    /// Deprecated thin wrapper. New code should use
1092    /// [`Self::execute_with_options_streaming`].
1093    #[deprecated(note = "use Kernel::execute_with_options_streaming")]
1094    pub async fn execute_streaming(
1095        &self,
1096        input: &str,
1097        on_output: &mut (dyn FnMut(&ExecResult) + Send),
1098    ) -> Result<ExecResult> {
1099        self.run_inner(input, ExecuteOptions::default(), Some(on_output)).await
1100    }
1101
1102    /// Link embedder trace context, then run [`Self::execute_with_options_inner`].
1103    ///
1104    /// The `#[instrument]` execution span resolves its parent from the *current*
1105    /// OpenTelemetry context (see `tracing-opentelemetry`'s `parent_context`),
1106    /// captured when the span is first entered — not when the future is
1107    /// constructed. So a thread-local `attach()` scoped to construction is too
1108    /// early to be seen (the integration test confirms this). `with_context`
1109    /// re-attaches the embedder's context on *every* poll of the inner future,
1110    /// so the context is current at first-enter and survives runtime thread
1111    /// hops. With no embedder trace context, the future runs unwrapped.
1112    async fn run_inner(
1113        &self,
1114        input: &str,
1115        opts: ExecuteOptions,
1116        on_output: Option<&mut (dyn FnMut(&ExecResult) + Send)>,
1117    ) -> Result<ExecResult> {
1118        use opentelemetry::context::FutureExt;
1119
1120        // Capture the embedder's baggage before `opts` is consumed so it can be
1121        // echoed back onto the result on egress (see `merge_egress_baggage`).
1122        let embedder_baggage = opts.baggage.clone();
1123
1124        let result = match crate::telemetry::extract_parent(&opts) {
1125            Some(parent) => self
1126                .execute_with_options_inner(input, opts, on_output)
1127                .with_context(parent)
1128                .await,
1129            None => self.execute_with_options_inner(input, opts, on_output).await,
1130        };
1131
1132        result.map(|mut r| {
1133            crate::telemetry::merge_egress_baggage(&mut r, embedder_baggage);
1134            r
1135        })
1136    }
1137
1138    /// Shared body for `execute`, `execute_with_options(_streaming)`, and
1139    /// the deprecated wrappers. Owns the per-call cancel token, vars overlay,
1140    /// cwd override, and timeout race.
1141    #[tracing::instrument(level = "info", skip(self, opts, on_output), fields(input_len = input.len()))]
1142    async fn execute_with_options_inner(
1143        &self,
1144        input: &str,
1145        opts: ExecuteOptions,
1146        on_output: Option<&mut (dyn FnMut(&ExecResult) + Send)>,
1147    ) -> Result<ExecResult> {
1148        let _guard = self.acquire_execute_lock().await;
1149
1150        // Always reset to a fresh internal token; this is the kernel's own
1151        // cancel surface for embedders calling `Kernel::cancel()`. The
1152        // embedder-supplied `opts.cancel_token` is a *read-only input* — it
1153        // is NOT written into `self.cancel_token`, because doing so would
1154        // (a) leak the embedder's token past this call's lifetime,
1155        // (b) re-route a later `Kernel::cancel()` into the embedder's token,
1156        // (c) extend the token's lifetime via the kernel's strong clone.
1157        let internal = self.reset_cancel();
1158        // Race the embedder token against the kernel's internal token via a
1159        // tracked watcher task. We hold the JoinHandle so we can abort the
1160        // task at function exit — otherwise it would wait forever for either
1161        // token to fire and leak per call.
1162        let (effective_cancel, watcher_handle): (
1163            tokio_util::sync::CancellationToken,
1164            Option<tokio::task::JoinHandle<()>>,
1165        ) = if let Some(ext) = opts.cancel_token {
1166            let combined = tokio_util::sync::CancellationToken::new();
1167            let combined_writer = combined.clone();
1168            let i = internal.clone();
1169            let handle = tokio::spawn(async move {
1170                tokio::select! {
1171                    _ = i.cancelled() => combined_writer.cancel(),
1172                    _ = ext.cancelled() => combined_writer.cancel(),
1173                }
1174            });
1175            (combined, Some(handle))
1176        } else {
1177            (internal, None)
1178        };
1179
1180        // Effective timeout: per-call wins over kernel-config default.
1181        let timeout = opts.timeout.or(self.request_timeout);
1182
1183        // ZERO timeout: return 124 immediately without spawning anything.
1184        if timeout == Some(Duration::ZERO) {
1185            if let Some(h) = watcher_handle {
1186                h.abort();
1187            }
1188            return Ok(ExecResult::failure(124, "timeout: timed out after 0s".to_string()));
1189        }
1190
1191        // Apply per-call vars overlay (push frame + set_exported), wrapped in
1192        // an RAII guard so a panic inside `execute_streaming_inner` still
1193        // pops the frame and unexports the temporarily-exported names.
1194        struct VarsFrameGuard<'a> {
1195            kernel: &'a Kernel,
1196            newly_exported: Vec<String>,
1197        }
1198        impl Drop for VarsFrameGuard<'_> {
1199            fn drop(&mut self) {
1200                // Best-effort cleanup using try_write. The execute_lock held
1201                // throughout execute_with_options means there is no concurrent
1202                // foreground caller; forks have their own scope and won't
1203                // block this. blocking_write would deadlock the runtime when
1204                // called from a tokio worker thread, so we explicitly do NOT
1205                // fall back to it — if try_write fails (which we've never
1206                // seen in practice), log loudly and accept the leak rather
1207                // than deadlock the entire kernel.
1208                let Ok(mut scope) = self.kernel.scope.try_write() else {
1209                    tracing::error!(
1210                        "vars frame guard: scope lock unexpectedly busy; \
1211                         skipping pop_frame to avoid runtime deadlock — \
1212                         transient vars may leak"
1213                    );
1214                    return;
1215                };
1216                scope.pop_frame();
1217                for name in self.newly_exported.drain(..) {
1218                    scope.unexport(&name);
1219                }
1220            }
1221        }
1222
1223        // Per-call cwd override: save current cwd, set the new one, restore
1224        // on Drop so the kernel's persistent cwd doesn't leak between calls.
1225        // Same RAII pattern as VarsFrameGuard, same blocking_write trade-off.
1226        struct CwdGuard<'a> {
1227            kernel: &'a Kernel,
1228            saved: PathBuf,
1229        }
1230        impl Drop for CwdGuard<'_> {
1231            fn drop(&mut self) {
1232                let Ok(mut ec) = self.kernel.exec_ctx.try_write() else {
1233                    tracing::error!(
1234                        "cwd guard: exec_ctx lock unexpectedly busy; \
1235                         skipping cwd restore — kernel cwd may be wrong for next call"
1236                    );
1237                    return;
1238                };
1239                ec.cwd = std::mem::take(&mut self.saved);
1240            }
1241        }
1242        let _cwd_guard: Option<CwdGuard<'_>> = if let Some(new_cwd) = opts.cwd {
1243            let mut ec = self.exec_ctx.write().await;
1244            let saved = std::mem::replace(&mut ec.cwd, new_cwd);
1245            drop(ec);
1246            Some(CwdGuard { kernel: self, saved })
1247        } else {
1248            None
1249        };
1250
1251        let _vars_guard: Option<VarsFrameGuard<'_>> = if !opts.vars.is_empty() {
1252            let mut scope = self.scope.write().await;
1253            scope.push_frame();
1254            let mut newly = Vec::with_capacity(opts.vars.len());
1255            for (name, value) in opts.vars {
1256                if !scope.is_exported(&name) {
1257                    newly.push(name.clone());
1258                }
1259                scope.set_exported(name, value);
1260            }
1261            drop(scope);
1262            Some(VarsFrameGuard { kernel: self, newly_exported: newly })
1263        } else {
1264            None
1265        };
1266
1267        // Sync the effective cancel into self.exec_ctx so try_execute_external
1268        // (which reads via self.cancel_token) sees cancellation. We also need
1269        // builtins to see it via ctx.cancel — handled in execute_command.
1270        // For simplicity here we mirror effective_cancel into self.cancel_token
1271        // for the duration of this call, then restore the internal token at
1272        // the end (so a later Kernel::cancel still hits our internal surface).
1273        {
1274            #[allow(clippy::expect_used)]
1275            let mut cur = self.cancel_token.lock().expect("cancel_token poisoned");
1276            *cur = effective_cancel.clone();
1277        }
1278
1279        // Run inner with optional timeout. The timer task cancels our token
1280        // on elapsed; the cascade fires SIGTERM/SIGKILL on any external
1281        // children via the wait_or_kill discipline in try_execute_external.
1282        let mut noop_cb: Box<dyn FnMut(&ExecResult) + Send> = Box::new(|_| {});
1283        let cb_ref: &mut (dyn FnMut(&ExecResult) + Send) = match on_output {
1284            Some(cb) => cb,
1285            None => &mut *noop_cb,
1286        };
1287
1288        let result = if let Some(d) = timeout {
1289            let elapsed = Arc::new(std::sync::atomic::AtomicBool::new(false));
1290            let elapsed_writer = elapsed.clone();
1291            let timer_token = effective_cancel.clone();
1292            let timer = tokio::spawn(async move {
1293                tokio::time::sleep(d).await;
1294                elapsed_writer.store(true, std::sync::atomic::Ordering::SeqCst);
1295                timer_token.cancel();
1296            });
1297            let r = self.execute_streaming_inner(input, cb_ref).await;
1298            timer.abort();
1299            match r {
1300                Ok(mut res) => {
1301                    if elapsed.load(std::sync::atomic::Ordering::SeqCst) {
1302                        res.code = 124;
1303                        if res.err.is_empty() {
1304                            res.err = format!("timeout: timed out after {:?}", d);
1305                        }
1306                    }
1307                    Ok(res)
1308                }
1309                Err(e) => Err(e),
1310            }
1311        } else {
1312            self.execute_streaming_inner(input, cb_ref).await
1313        };
1314
1315        // Restore self.cancel_token to a fresh, uncancelled token so the
1316        // embedder's view of `Kernel::cancel()` stays predictable on the
1317        // next call (it cancels the kernel's own token, not whatever was
1318        // left over from this call's combined token).
1319        {
1320            #[allow(clippy::expect_used)]
1321            let mut cur = self.cancel_token.lock().expect("cancel_token poisoned");
1322            *cur = tokio_util::sync::CancellationToken::new();
1323        }
1324
1325        // Tear down the embedder-token race watcher (if any). Leaving it
1326        // alive would idle forever waiting for tokens that may never fire.
1327        if let Some(h) = watcher_handle {
1328            h.abort();
1329        }
1330
1331        // VarsFrameGuard drops here on the success path and on early-return
1332        // paths above (error path included). Panic safety preserved.
1333        result
1334    }
1335
1336    /// The actual body of `execute_streaming`, run while holding the execute lock.
1337    ///
1338    /// Split out so internal kernel paths that are already under the lock can
1339    /// call this without deadlocking on re-entry. External callers must go
1340    /// through [`Self::execute_streaming`] so they acquire the lock.
1341    async fn execute_streaming_inner(
1342        &self,
1343        input: &str,
1344        on_output: &mut (dyn FnMut(&ExecResult) + Send),
1345    ) -> Result<ExecResult> {
1346        let program = parse(input).map_err(|errors| {
1347            let msg = errors
1348                .iter()
1349                .map(|e| e.format(input))
1350                .collect::<Vec<_>>()
1351                .join("\n");
1352            anyhow::anyhow!("parse error:\n{}", msg)
1353        })?;
1354
1355        // AST display mode: show AST instead of executing
1356        {
1357            let scope = self.scope.read().await;
1358            if scope.show_ast() {
1359                let output = format!("{:#?}\n", program);
1360                return Ok(ExecResult::with_output(crate::interpreter::OutputData::text(output)));
1361            }
1362        }
1363
1364        // Pre-execution validation
1365        if !self.skip_validation {
1366            let user_tools = self.user_tools.read().await;
1367            let validator = Validator::new(&self.tools, &user_tools);
1368            let issues = validator.validate(&program);
1369
1370            // Collect errors (warnings are logged but don't prevent execution)
1371            let errors: Vec<_> = issues
1372                .iter()
1373                .filter(|i| i.severity == Severity::Error)
1374                .collect();
1375
1376            if !errors.is_empty() {
1377                let error_msg = errors
1378                    .iter()
1379                    .map(|e| e.format(input))
1380                    .collect::<Vec<_>>()
1381                    .join("\n");
1382                return Err(anyhow::anyhow!("validation failed:\n{}", error_msg));
1383            }
1384
1385            // Log warnings via tracing (trace level to avoid noise)
1386            for warning in issues.iter().filter(|i| i.severity == Severity::Warning) {
1387                tracing::trace!("validation: {}", warning.format(input));
1388            }
1389        }
1390
1391        let mut result = ExecResult::success("");
1392
1393        // Reset cancellation token for this execution.
1394        let cancel = self.reset_cancel();
1395
1396        for stmt in program.statements {
1397            if matches!(stmt, Stmt::Empty) {
1398                continue;
1399            }
1400
1401            // Cancellation checkpoint
1402            if cancel.is_cancelled() {
1403                result.code = 130;
1404                return Ok(result);
1405            }
1406
1407            let flow = self.execute_stmt_flow(&stmt).await?;
1408
1409            // Drain any stderr written by pipeline stages during this statement.
1410            // This captures stderr from intermediate pipeline stages that would
1411            // otherwise be lost (only the last stage's result is returned).
1412            let drained_stderr = {
1413                let mut receiver = self.stderr_receiver.lock().await;
1414                receiver.drain_lossy()
1415            };
1416
1417            match flow {
1418                ControlFlow::Normal(mut r) => {
1419                    if !drained_stderr.is_empty() {
1420                        if !r.err.is_empty() && !r.err.ends_with('\n') {
1421                            r.err.push('\n');
1422                        }
1423                        // Prepend pipeline stderr before the last stage's stderr
1424                        let combined = format!("{}{}", drained_stderr, r.err);
1425                        r.err = combined;
1426                    }
1427                    on_output(&r);
1428                    // Carry the last statement's structured output for MCP TOON encoding.
1429                    // Must be done here (not in accumulate_result) because accumulate_result
1430                    // is also used in loops where per-iteration output would be wrong.
1431                    let last_output = r.output().cloned();
1432                    accumulate_result(&mut result, &r);
1433                    result.set_output(last_output);
1434                }
1435                ControlFlow::Exit { code } => {
1436                    if !drained_stderr.is_empty() {
1437                        result.err.push_str(&drained_stderr);
1438                    }
1439                    result.code = code;
1440                    return Ok(result);
1441                }
1442                ControlFlow::Return { mut value } => {
1443                    if !drained_stderr.is_empty() {
1444                        value.err = format!("{}{}", drained_stderr, value.err);
1445                    }
1446                    on_output(&value);
1447                    result = value;
1448                }
1449                ControlFlow::Break { result: mut r, .. } | ControlFlow::Continue { result: mut r, .. } => {
1450                    if !drained_stderr.is_empty() {
1451                        r.err = format!("{}{}", drained_stderr, r.err);
1452                    }
1453                    on_output(&r);
1454                    result = r;
1455                }
1456            }
1457        }
1458
1459        Ok(result)
1460    }
1461
1462    /// Execute a single statement, returning control flow information.
1463    fn execute_stmt_flow<'a>(
1464        &'a self,
1465        stmt: &'a Stmt,
1466    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<ControlFlow>> + Send + 'a>> {
1467        use tracing::Instrument;
1468        let span = tracing::debug_span!("execute_stmt_flow", stmt_type = %stmt.kind_name());
1469        Box::pin(async move {
1470        match stmt {
1471            Stmt::Assignment(assign) => {
1472                // Use async evaluator to support command substitution
1473                let value = self.eval_expr_async(&assign.value).await
1474                    .context("failed to evaluate assignment")?;
1475                let mut scope = self.scope.write().await;
1476                if assign.local {
1477                    // local: set in innermost (current function) frame
1478                    scope.set(&assign.name, value.clone());
1479                } else {
1480                    // non-local: update existing or create in root frame
1481                    scope.set_global(&assign.name, value.clone());
1482                }
1483                drop(scope);
1484
1485                // Assignments don't produce output (like sh)
1486                Ok(ControlFlow::ok(ExecResult::success("")))
1487            }
1488            Stmt::Command(cmd) => {
1489                // Route single commands through execute_pipeline for a unified path.
1490                // This ensures all commands go through the dispatcher chain.
1491                let pipeline = crate::ast::Pipeline {
1492                    commands: vec![cmd.clone()],
1493                    background: false,
1494                };
1495                let result = self.execute_pipeline(&pipeline).await?;
1496                self.update_last_result(&result).await;
1497
1498                // Check for error exit mode (set -e)
1499                if !result.ok() {
1500                    let scope = self.scope.read().await;
1501                    if scope.error_exit_enabled() {
1502                        return Ok(ControlFlow::exit_code(result.code));
1503                    }
1504                }
1505
1506                Ok(ControlFlow::ok(result))
1507            }
1508            Stmt::Pipeline(pipeline) => {
1509                let result = self.execute_pipeline(pipeline).await?;
1510                self.update_last_result(&result).await;
1511
1512                // Check for error exit mode (set -e)
1513                if !result.ok() {
1514                    let scope = self.scope.read().await;
1515                    if scope.error_exit_enabled() {
1516                        return Ok(ControlFlow::exit_code(result.code));
1517                    }
1518                }
1519
1520                Ok(ControlFlow::ok(result))
1521            }
1522            Stmt::If(if_stmt) => {
1523                // Use async evaluator to support command substitution in conditions
1524                let cond_value = self.eval_expr_async(&if_stmt.condition).await?;
1525
1526                let branch = if is_truthy(&cond_value) {
1527                    &if_stmt.then_branch
1528                } else {
1529                    if_stmt.else_branch.as_deref().unwrap_or(&[])
1530                };
1531
1532                let mut result = ExecResult::success("");
1533                for stmt in branch {
1534                    let flow = self.execute_stmt_flow(stmt).await?;
1535                    match flow {
1536                        ControlFlow::Normal(r) => {
1537                            accumulate_result(&mut result, &r);
1538                            self.drain_stderr_into(&mut result).await;
1539                        }
1540                        other => {
1541                            self.drain_stderr_into(&mut result).await;
1542                            return Ok(other);
1543                        }
1544                    }
1545                }
1546                Ok(ControlFlow::ok(result))
1547            }
1548            Stmt::For(for_loop) => {
1549                // Evaluate all items and collect values for iteration
1550                // Use async evaluator to support command substitution like $(seq 1 5)
1551                let mut items: Vec<Value> = Vec::new();
1552                for item_expr in &for_loop.items {
1553                    // Glob expansion in for-loop items: `for f in *.txt`
1554                    if let Expr::GlobPattern(pattern) = item_expr {
1555                        let glob_enabled = {
1556                            let scope = self.scope.read().await;
1557                            scope.glob_enabled()
1558                        };
1559                        if glob_enabled {
1560                            let (paths, cwd) = {
1561                                let ctx = self.exec_ctx.read().await;
1562                                let paths = ctx.expand_glob(pattern).await
1563                                    .map_err(|e| anyhow::anyhow!("glob: {}", e))?;
1564                                let cwd = ctx.resolve_path(".");
1565                                (paths, cwd)
1566                            };
1567                            if paths.is_empty() {
1568                                return Err(anyhow::anyhow!("no matches: {}", pattern));
1569                            }
1570                            for path in paths {
1571                                let display = if !pattern.starts_with('/') {
1572                                    path.strip_prefix(&cwd)
1573                                        .unwrap_or(&path)
1574                                        .to_string_lossy().into_owned()
1575                                } else {
1576                                    path.to_string_lossy().into_owned()
1577                                };
1578                                items.push(Value::String(display));
1579                            }
1580                            continue;
1581                        }
1582                    }
1583                    // Track whether this item came from $(cmd); that's the
1584                    // only position where multi-line stdout auto-splits per
1585                    // line. Arrays still spread element-by-element; bare
1586                    // $VAR is rejected upstream by validator E012. See
1587                    // docs/plan-for-loop-newline-split.md.
1588                    let from_command_subst = matches!(item_expr, Expr::CommandSubst(_));
1589                    let item = self.eval_expr_async(item_expr).await?;
1590                    match item {
1591                        // JSON arrays iterate over elements (preferred path
1592                        // when builtins emit .data — seq, jq, cut, find, …)
1593                        Value::Json(serde_json::Value::Array(arr)) => {
1594                            for elem in arr {
1595                                items.push(json_to_value(elem));
1596                            }
1597                        }
1598                        // Strings from $(cmd): empty → 0 iterations,
1599                        // multi-line → split per line (trimming trailing
1600                        // newlines and per-line trailing \r), single-line
1601                        // → one iteration. Whitespace within a line is
1602                        // NOT split — the "$VAR with spaces just works"
1603                        // promise is preserved because this only fires
1604                        // in CommandSubst position.
1605                        Value::String(s) if from_command_subst => {
1606                            let trimmed = s.trim_end_matches(['\n', '\r']);
1607                            if trimmed.is_empty() {
1608                                continue;
1609                            }
1610                            if trimmed.contains('\n') {
1611                                for line in trimmed.split('\n') {
1612                                    let line = line.trim_end_matches('\r');
1613                                    items.push(Value::String(line.to_string()));
1614                                }
1615                            } else {
1616                                items.push(Value::String(trimmed.to_string()));
1617                            }
1618                        }
1619                        // Strings not from $(cmd) stay as one value.
1620                        other => items.push(other),
1621                    }
1622                }
1623
1624                let mut result = ExecResult::success("");
1625                {
1626                    let mut scope = self.scope.write().await;
1627                    scope.push_frame();
1628                }
1629
1630                'outer: for item in items {
1631                    // Cancellation checkpoint per iteration
1632                    if self.is_cancelled() {
1633                        let mut scope = self.scope.write().await;
1634                        scope.pop_frame();
1635                        result.code = 130;
1636                        return Ok(ControlFlow::ok(result));
1637                    }
1638                    {
1639                        let mut scope = self.scope.write().await;
1640                        scope.set(&for_loop.variable, item);
1641                    }
1642                    for stmt in &for_loop.body {
1643                        let mut flow = match self.execute_stmt_flow(stmt).await {
1644                            Ok(f) => f,
1645                            Err(e) => {
1646                                let mut scope = self.scope.write().await;
1647                                scope.pop_frame();
1648                                return Err(e);
1649                            }
1650                        };
1651                        self.drain_stderr_into(&mut result).await;
1652                        match &mut flow {
1653                            ControlFlow::Normal(r) => {
1654                                accumulate_result(&mut result, r);
1655                                if !r.ok() {
1656                                    let scope = self.scope.read().await;
1657                                    if scope.error_exit_enabled() {
1658                                        drop(scope);
1659                                        let mut scope = self.scope.write().await;
1660                                        scope.pop_frame();
1661                                        return Ok(ControlFlow::exit_code(r.code));
1662                                    }
1663                                }
1664                            }
1665                            ControlFlow::Break { .. } => {
1666                                if flow.decrement_level() {
1667                                    break 'outer;
1668                                }
1669                                let mut scope = self.scope.write().await;
1670                                scope.pop_frame();
1671                                return Ok(flow);
1672                            }
1673                            ControlFlow::Continue { .. } => {
1674                                if flow.decrement_level() {
1675                                    continue 'outer;
1676                                }
1677                                let mut scope = self.scope.write().await;
1678                                scope.pop_frame();
1679                                return Ok(flow);
1680                            }
1681                            ControlFlow::Return { .. } | ControlFlow::Exit { .. } => {
1682                                let mut scope = self.scope.write().await;
1683                                scope.pop_frame();
1684                                return Ok(flow);
1685                            }
1686                        }
1687                    }
1688                }
1689
1690                {
1691                    let mut scope = self.scope.write().await;
1692                    scope.pop_frame();
1693                }
1694                Ok(ControlFlow::ok(result))
1695            }
1696            Stmt::While(while_loop) => {
1697                let mut result = ExecResult::success("");
1698
1699                'outer: loop {
1700                    // Evaluate condition - use async to support command substitution
1701                    // Cancellation checkpoint per iteration
1702                    if self.is_cancelled() {
1703                        result.code = 130;
1704                        return Ok(ControlFlow::ok(result));
1705                    }
1706
1707                    let cond_value = self.eval_expr_async(&while_loop.condition).await?;
1708
1709                    if !is_truthy(&cond_value) {
1710                        break;
1711                    }
1712
1713                    // Execute body
1714                    for stmt in &while_loop.body {
1715                        let mut flow = self.execute_stmt_flow(stmt).await?;
1716                        self.drain_stderr_into(&mut result).await;
1717                        match &mut flow {
1718                            ControlFlow::Normal(r) => {
1719                                accumulate_result(&mut result, r);
1720                                if !r.ok() {
1721                                    let scope = self.scope.read().await;
1722                                    if scope.error_exit_enabled() {
1723                                        return Ok(ControlFlow::exit_code(r.code));
1724                                    }
1725                                }
1726                            }
1727                            ControlFlow::Break { .. } => {
1728                                if flow.decrement_level() {
1729                                    break 'outer;
1730                                }
1731                                return Ok(flow);
1732                            }
1733                            ControlFlow::Continue { .. } => {
1734                                if flow.decrement_level() {
1735                                    continue 'outer;
1736                                }
1737                                return Ok(flow);
1738                            }
1739                            ControlFlow::Return { .. } | ControlFlow::Exit { .. } => {
1740                                return Ok(flow);
1741                            }
1742                        }
1743                    }
1744                }
1745
1746                Ok(ControlFlow::ok(result))
1747            }
1748            Stmt::Case(case_stmt) => {
1749                // Evaluate the expression to match against
1750                let match_value = {
1751                    let value = self.eval_expr_async(&case_stmt.expr).await?;
1752                    value_to_string(&value)
1753                };
1754
1755                // Try each branch until we find a match
1756                for branch in &case_stmt.branches {
1757                    let matched = branch.patterns.iter().any(|pattern| {
1758                        glob_match(pattern, &match_value)
1759                    });
1760
1761                    if matched {
1762                        // Execute the branch body
1763                        let mut result = ExecResult::success("");
1764                        for stmt in &branch.body {
1765                            let flow = self.execute_stmt_flow(stmt).await?;
1766                            match flow {
1767                                ControlFlow::Normal(r) => {
1768                                    accumulate_result(&mut result, &r);
1769                                    self.drain_stderr_into(&mut result).await;
1770                                }
1771                                other => {
1772                                    self.drain_stderr_into(&mut result).await;
1773                                    return Ok(other);
1774                                }
1775                            }
1776                        }
1777                        return Ok(ControlFlow::ok(result));
1778                    }
1779                }
1780
1781                // No match - return success with empty output (like sh)
1782                Ok(ControlFlow::ok(ExecResult::success("")))
1783            }
1784            Stmt::Break(levels) => {
1785                Ok(ControlFlow::break_n(levels.unwrap_or(1)))
1786            }
1787            Stmt::Continue(levels) => {
1788                Ok(ControlFlow::continue_n(levels.unwrap_or(1)))
1789            }
1790            Stmt::Return(expr) => {
1791                // return [N] - N becomes the exit code, NOT stdout
1792                // Shell semantics: return sets exit code, doesn't produce output
1793                let result = if let Some(e) = expr {
1794                    let val = self.eval_expr_async(e).await?;
1795                    let code = crate::interpreter::value_to_exit_code(&val)
1796                        .map_err(|e| anyhow::anyhow!("return: {}", e))?;
1797                    ExecResult::from_parts(code, String::new(), String::new(), None)
1798                } else {
1799                    ExecResult::success("")
1800                };
1801                Ok(ControlFlow::return_value(result))
1802            }
1803            Stmt::Exit(expr) => {
1804                let code = if let Some(e) = expr {
1805                    let val = self.eval_expr_async(e).await?;
1806                    crate::interpreter::value_to_exit_code(&val)
1807                        .map_err(|e| anyhow::anyhow!("exit: {}", e))?
1808                } else {
1809                    0
1810                };
1811                Ok(ControlFlow::exit_code(code))
1812            }
1813            Stmt::ToolDef(tool_def) => {
1814                let mut user_tools = self.user_tools.write().await;
1815                user_tools.insert(tool_def.name.clone(), tool_def.clone());
1816                Ok(ControlFlow::ok(ExecResult::success("")))
1817            }
1818            Stmt::AndChain { left, right } => {
1819                // cmd1 && cmd2 - run cmd2 only if cmd1 succeeds (exit code 0)
1820                // Suppress errexit for the left side — && handles failure itself.
1821                {
1822                    let mut scope = self.scope.write().await;
1823                    scope.suppress_errexit();
1824                }
1825                let left_flow = match self.execute_stmt_flow(left).await {
1826                    Ok(f) => f,
1827                    Err(e) => {
1828                        let mut scope = self.scope.write().await;
1829                        scope.unsuppress_errexit();
1830                        return Err(e);
1831                    }
1832                };
1833                {
1834                    let mut scope = self.scope.write().await;
1835                    scope.unsuppress_errexit();
1836                }
1837                match left_flow {
1838                    ControlFlow::Normal(mut left_result) => {
1839                        self.drain_stderr_into(&mut left_result).await;
1840                        self.update_last_result(&left_result).await;
1841                        if left_result.ok() {
1842                            let right_flow = self.execute_stmt_flow(right).await?;
1843                            match right_flow {
1844                                ControlFlow::Normal(mut right_result) => {
1845                                    self.drain_stderr_into(&mut right_result).await;
1846                                    self.update_last_result(&right_result).await;
1847                                    let mut combined = left_result;
1848                                    accumulate_result(&mut combined, &right_result);
1849                                    Ok(ControlFlow::ok(combined))
1850                                }
1851                                other => Ok(other),
1852                            }
1853                        } else {
1854                            Ok(ControlFlow::ok(left_result))
1855                        }
1856                    }
1857                    _ => Ok(left_flow),
1858                }
1859            }
1860            Stmt::OrChain { left, right } => {
1861                // cmd1 || cmd2 - run cmd2 only if cmd1 fails (non-zero exit code)
1862                // Suppress errexit for the left side — || handles failure itself.
1863                {
1864                    let mut scope = self.scope.write().await;
1865                    scope.suppress_errexit();
1866                }
1867                let left_flow = match self.execute_stmt_flow(left).await {
1868                    Ok(f) => f,
1869                    Err(e) => {
1870                        let mut scope = self.scope.write().await;
1871                        scope.unsuppress_errexit();
1872                        return Err(e);
1873                    }
1874                };
1875                {
1876                    let mut scope = self.scope.write().await;
1877                    scope.unsuppress_errexit();
1878                }
1879                match left_flow {
1880                    ControlFlow::Normal(mut left_result) => {
1881                        self.drain_stderr_into(&mut left_result).await;
1882                        self.update_last_result(&left_result).await;
1883                        if !left_result.ok() {
1884                            let right_flow = self.execute_stmt_flow(right).await?;
1885                            match right_flow {
1886                                ControlFlow::Normal(mut right_result) => {
1887                                    self.drain_stderr_into(&mut right_result).await;
1888                                    self.update_last_result(&right_result).await;
1889                                    let mut combined = left_result;
1890                                    accumulate_result(&mut combined, &right_result);
1891                                    Ok(ControlFlow::ok(combined))
1892                                }
1893                                other => Ok(other),
1894                            }
1895                        } else {
1896                            Ok(ControlFlow::ok(left_result))
1897                        }
1898                    }
1899                    _ => Ok(left_flow), // Propagate non-normal flow
1900                }
1901            }
1902            Stmt::Test(test_expr) => {
1903                let is_true = self.eval_test_async(test_expr).await?;
1904                if is_true {
1905                    Ok(ControlFlow::ok(ExecResult::success("")))
1906                } else {
1907                    Ok(ControlFlow::ok(ExecResult::failure(1, "")))
1908                }
1909            }
1910            Stmt::Empty => Ok(ControlFlow::ok(ExecResult::success(""))),
1911        }
1912        }.instrument(span))
1913    }
1914
1915    /// Execute a pipeline.
1916    #[tracing::instrument(level = "debug", skip(self, pipeline), fields(background = pipeline.background, command_count = pipeline.commands.len()))]
1917    async fn execute_pipeline(&self, pipeline: &crate::ast::Pipeline) -> Result<ExecResult> {
1918        if pipeline.commands.is_empty() {
1919            return Ok(ExecResult::success(""));
1920        }
1921
1922        // Handle background execution (`&` operator)
1923        if pipeline.background {
1924            return self.execute_background(pipeline).await;
1925        }
1926
1927        // All commands go through the runner with the Kernel as dispatcher.
1928        // This is the single execution path — no fast path for single commands.
1929        //
1930        // IMPORTANT: We snapshot exec_ctx into a local context and release the
1931        // lock before running. This prevents deadlocks when dispatch_command
1932        // is called from within the pipeline and recursively triggers another
1933        // pipeline (e.g., via user-defined tools).
1934        let mut ctx = {
1935            let ec = self.exec_ctx.read().await;
1936            let scope = self.scope.read().await;
1937            ExecContext {
1938                backend: ec.backend.clone(),
1939                scope: scope.clone(),
1940                cwd: ec.cwd.clone(),
1941                prev_cwd: ec.prev_cwd.clone(),
1942                stdin: None,
1943                stdin_data: None,
1944                pipe_stdin: None,
1945                pipe_stdout: None,
1946                stderr: ec.stderr.clone(),
1947                tool_schemas: ec.tool_schemas.clone(),
1948                tools: ec.tools.clone(),
1949                job_manager: ec.job_manager.clone(),
1950                pipeline_position: PipelinePosition::Only,
1951                interactive: self.interactive,
1952                aliases: ec.aliases.clone(),
1953                ignore_config: ec.ignore_config.clone(),
1954                output_limit: ec.output_limit.clone(),
1955                allow_external_commands: self.allow_external_commands,
1956                nonce_store: ec.nonce_store.clone(),
1957                trash_backend: ec.trash_backend.clone(),
1958                #[cfg(all(unix, feature = "subprocess"))]
1959                terminal_state: ec.terminal_state.clone(),
1960                dispatcher: self.dispatcher(),
1961                cancel: {
1962                    #[allow(clippy::expect_used)]
1963                    let token = self.cancel_token.lock().expect("cancel_token poisoned");
1964                    token.clone()
1965                },
1966                output_format: None,
1967            }
1968        }; // locks released
1969
1970        let mut result = self.runner.run(&pipeline.commands, &mut ctx, self).await;
1971
1972        // Post-hoc spill check (catches builtins and fast external commands)
1973        if ctx.output_limit.is_enabled() {
1974            let _ = crate::output_limit::spill_if_needed(&mut result, &ctx.output_limit).await;
1975        }
1976
1977        // Signal spill with exit 3; agent reads the spill file directly
1978        // (use `set +o output-limit` before cat/head/tail to bypass the limit)
1979        if result.did_spill {
1980            result.original_code = Some(result.code);
1981            result.code = 3;
1982        }
1983
1984        // Sync changes back from context
1985        {
1986            let mut ec = self.exec_ctx.write().await;
1987            ec.cwd = ctx.cwd.clone();
1988            ec.prev_cwd = ctx.prev_cwd.clone();
1989            ec.aliases = ctx.aliases.clone();
1990            ec.ignore_config = ctx.ignore_config.clone();
1991            ec.output_limit = ctx.output_limit.clone();
1992        }
1993        {
1994            let mut scope = self.scope.write().await;
1995            *scope = ctx.scope.clone();
1996        }
1997
1998        Ok(result)
1999    }
2000
2001    /// Execute a pipeline in the background.
2002    ///
2003    /// The command is spawned as a tokio task, registered with the JobManager,
2004    /// and its output is captured via BoundedStreams. The job is observable via
2005    /// `/v/jobs/{id}/stdout`, `/v/jobs/{id}/stderr`, and `/v/jobs/{id}/status`.
2006    ///
2007    /// Returns immediately with a job ID like "[1]".
2008    #[tracing::instrument(level = "debug", skip(self, pipeline), fields(command_count = pipeline.commands.len()))]
2009    async fn execute_background(&self, pipeline: &crate::ast::Pipeline) -> Result<ExecResult> {
2010        use tokio::sync::oneshot;
2011
2012        // Format the command for display in /v/jobs/{id}/command
2013        let command_str = self.format_pipeline(pipeline);
2014
2015        // Create bounded streams for output capture
2016        let stdout = Arc::new(BoundedStream::default_size());
2017        let stderr = Arc::new(BoundedStream::default_size());
2018
2019        // Create channel for result notification
2020        let (tx, rx) = oneshot::channel();
2021
2022        // Register with JobManager to get job ID and create VFS entries
2023        let job_id = self.jobs.register_with_streams(
2024            command_str.clone(),
2025            rx,
2026            stdout.clone(),
2027            stderr.clone(),
2028        ).await;
2029
2030        // Fork the kernel for this background job. The fork snapshots the
2031        // parent's scope/cwd/aliases/user_tools so mutations stay isolated,
2032        // while sharing the job manager, VFS, and tool registry. The fork's
2033        // full dispatch chain (user tools, .kai scripts, `$(...)` in args)
2034        // is available here — something BackendDispatcher couldn't provide.
2035        let fork = self.fork().await;
2036        let runner = self.runner.clone();
2037        let commands = pipeline.commands.clone();
2038
2039        // Snapshot the fork's exec_ctx for the spawned task. We have to do
2040        // this before tokio::spawn because the fork's exec_ctx is behind a
2041        // tokio RwLock and we want the spawned task to own its ctx.
2042        let mut bg_ctx = {
2043            let ec = fork.exec_ctx.read().await;
2044            ec.child_for_pipeline()
2045        };
2046        bg_ctx.scope = fork.scope.read().await.clone();
2047        // The fork's dispatcher points at the fork itself; set it here so
2048        // builtins inside the background task (e.g. timeout) re-dispatch
2049        // through the fork, not the parent.
2050        bg_ctx.dispatcher = fork.dispatcher();
2051
2052        // Spawn the background task. Propagate the embedder's trace context
2053        // across the spawn boundary so the job's spans stay in the same trace.
2054        tokio::spawn(crate::telemetry::bind_current_context(async move {
2055            // runner.run needs a &dyn CommandDispatcher; fork.as_ref()
2056            // gives us that (Kernel implements CommandDispatcher).
2057            let result = runner.run(&commands, &mut bg_ctx, fork.as_ref()).await;
2058
2059            // Write output to streams
2060            let text = result.text_out();
2061            if !text.is_empty() {
2062                stdout.write(text.as_bytes()).await;
2063            }
2064            if !result.err.is_empty() {
2065                stderr.write(result.err.as_bytes()).await;
2066            }
2067
2068            // Close streams
2069            stdout.close().await;
2070            stderr.close().await;
2071
2072            // Send result to JobManager (ignore error if receiver dropped)
2073            let _ = tx.send(result);
2074        }));
2075
2076        Ok(ExecResult::success(format!("[{}]", job_id)))
2077    }
2078
2079    /// Format a pipeline as a command string for display.
2080    fn format_pipeline(&self, pipeline: &crate::ast::Pipeline) -> String {
2081        pipeline.commands
2082            .iter()
2083            .map(|cmd| {
2084                let mut parts = vec![cmd.name.clone()];
2085                for arg in &cmd.args {
2086                    match arg {
2087                        Arg::Positional(expr) => {
2088                            parts.push(self.format_expr(expr));
2089                        }
2090                        Arg::Named { key, value } => {
2091                            parts.push(format!("--{}={}", key, self.format_expr(value)));
2092                        }
2093                        Arg::WordAssign { key, value } => {
2094                            parts.push(format!("{}={}", key, self.format_expr(value)));
2095                        }
2096                        Arg::ShortFlag(name) => {
2097                            parts.push(format!("-{}", name));
2098                        }
2099                        Arg::LongFlag(name) => {
2100                            parts.push(format!("--{}", name));
2101                        }
2102                        Arg::DoubleDash => {
2103                            parts.push("--".to_string());
2104                        }
2105                    }
2106                }
2107                parts.join(" ")
2108            })
2109            .collect::<Vec<_>>()
2110            .join(" | ")
2111    }
2112
2113    /// Format an expression as a string for display.
2114    fn format_expr(&self, expr: &Expr) -> String {
2115        match expr {
2116            Expr::Literal(Value::String(s)) => {
2117                if s.contains(' ') || s.contains('"') {
2118                    format!("'{}'", s.replace('\'', "\\'"))
2119                } else {
2120                    s.clone()
2121                }
2122            }
2123            Expr::Literal(Value::Int(i)) => i.to_string(),
2124            Expr::Literal(Value::Float(f)) => f.to_string(),
2125            Expr::Literal(Value::Bool(b)) => b.to_string(),
2126            Expr::Literal(Value::Null) => "null".to_string(),
2127            Expr::VarRef(path) => {
2128                let name = path.segments.iter()
2129                    .map(|seg| match seg {
2130                        crate::ast::VarSegment::Field(f) => f.clone(),
2131                    })
2132                    .collect::<Vec<_>>()
2133                    .join(".");
2134                format!("${{{}}}", name)
2135            }
2136            Expr::Interpolated(_) => "\"...\"".to_string(),
2137            Expr::HereDocBody { .. } => "<<heredoc".to_string(),
2138            _ => "...".to_string(),
2139        }
2140    }
2141
2142    /// Execute a single command.
2143    async fn execute_command(&self, name: &str, args: &[Arg]) -> Result<ExecResult> {
2144        self.execute_command_depth(name, args, 0).await
2145    }
2146
2147    #[tracing::instrument(level = "info", skip(self, args, alias_depth), fields(command = %name), err)]
2148    async fn execute_command_depth(&self, name: &str, args: &[Arg], alias_depth: u8) -> Result<ExecResult> {
2149        // Special built-ins
2150        match name {
2151            "true" => return Ok(ExecResult::success("")),
2152            "false" => return Ok(ExecResult::failure(1, "")),
2153            "source" | "." => return self.execute_source(args).await,
2154            _ => {}
2155        }
2156
2157        // Alias expansion (with recursion limit)
2158        if alias_depth < 10 {
2159            let alias_value = {
2160                let ctx = self.exec_ctx.read().await;
2161                ctx.aliases.get(name).cloned()
2162            };
2163            if let Some(alias_val) = alias_value {
2164                // Split alias value into command + args
2165                let parts: Vec<&str> = alias_val.split_whitespace().collect();
2166                if let Some((alias_cmd, alias_args)) = parts.split_first() {
2167                    let mut new_args: Vec<Arg> = alias_args
2168                        .iter()
2169                        .map(|a| Arg::Positional(Expr::Literal(Value::String(a.to_string()))))
2170                        .collect();
2171                    new_args.extend_from_slice(args);
2172                    return Box::pin(self.execute_command_depth(alias_cmd, &new_args, alias_depth + 1)).await;
2173                }
2174            }
2175        }
2176
2177        // Handle /v/bin/ prefix — dispatch to builtins via virtual path
2178        if let Some(builtin_name) = name.strip_prefix("/v/bin/") {
2179            return match self.tools.get(builtin_name) {
2180                Some(_) => Box::pin(self.execute_command_depth(builtin_name, args, alias_depth)).await,
2181                None => Ok(ExecResult::failure(127, format!("command not found: {}", name))),
2182            };
2183        }
2184
2185        // Check user-defined tools first
2186        {
2187            let user_tools = self.user_tools.read().await;
2188            if let Some(tool_def) = user_tools.get(name) {
2189                let tool_def = tool_def.clone();
2190                drop(user_tools);
2191                return self.execute_user_tool(tool_def, args).await;
2192            }
2193        }
2194
2195        // Look up builtin tool
2196        let tool = match self.tools.get(name) {
2197            Some(t) => t,
2198            None => {
2199                // Try executing as .kai script from PATH
2200                if let Some(result) = self.try_execute_script(name, args).await? {
2201                    return Ok(result);
2202                }
2203                // Try executing as external command from PATH
2204                if let Some(result) = self.try_execute_external(name, args).await? {
2205                    return Ok(result);
2206                }
2207
2208                // Try backend-registered tools (embedder engines, etc.)
2209                // Look up tool schema for positional→named mapping.
2210                // Clone backend and drop read lock before awaiting (may involve network I/O).
2211                // Backend tools expect named JSON params, so enable positional mapping.
2212                let backend = self.exec_ctx.read().await.backend.clone();
2213                let tool_schema = backend.get_tool(name).await.ok().flatten().map(|t| {
2214                    let mut s = t.schema;
2215                    // Flat backend/MCP tools expect named JSON params, so map
2216                    // bare positionals onto named params. Subcommand-aware tools
2217                    // route positionals through the subcommand path and declare
2218                    // map_positionals per leaf (kj keeps it false so it re-parses
2219                    // the argv with its own clap) — don't blanket-override them.
2220                    if s.subcommands.is_empty() {
2221                        s.map_positionals = true;
2222                    }
2223                    s
2224                });
2225                let tool_args = self.build_args_async(args, tool_schema.as_ref()).await?;
2226                let mut ctx = self.exec_ctx.write().await;
2227                {
2228                    let scope = self.scope.read().await;
2229                    ctx.scope = scope.clone();
2230                }
2231                let backend = ctx.backend.clone();
2232                match backend.call_tool(name, tool_args, &mut *ctx).await {
2233                    Ok(tool_result) => {
2234                        let mut scope = self.scope.write().await;
2235                        *scope = ctx.scope.clone();
2236                        let mut exec = ExecResult::from_output(
2237                            tool_result.code as i64, tool_result.stdout, tool_result.stderr,
2238                        );
2239                        exec.set_output(tool_result.output);
2240                        return Ok(exec);
2241                    }
2242                    Err(BackendError::ToolNotFound(_)) => {
2243                        // Fall through to "command not found"
2244                    }
2245                    Err(e) => {
2246                        // Backend dispatch is last-resort lookup — if it fails
2247                        // for any reason, the command simply doesn't exist.
2248                        tracing::debug!("backend error for {name}: {e}");
2249                    }
2250                }
2251
2252                return Ok(ExecResult::failure(127, format!("command not found: {}", name)));
2253            }
2254        };
2255
2256        // Build arguments (async to support command substitution, schema-aware for flag values)
2257        let schema = tool.schema();
2258        let tool_args = self.build_args_async(args, Some(&schema)).await?;
2259
2260        // --help / -h: show help unless the tool's schema claims that flag
2261        let schema_claims = |flag: &str| -> bool {
2262            let bare = flag.trim_start_matches('-');
2263            schema.params.iter().any(|p| p.matches_flag(flag) || p.matches_flag(bare))
2264        };
2265        let wants_help =
2266            (tool_args.flags.contains("help") && !schema_claims("help"))
2267            || (tool_args.flags.contains("h") && !schema_claims("-h"));
2268        if wants_help {
2269            let help_topic = crate::help::HelpTopic::Tool(name.to_string());
2270            let ctx = self.exec_ctx.read().await;
2271            let content = crate::help::get_help(&help_topic, &ctx.tool_schemas);
2272            return Ok(ExecResult::with_output(crate::interpreter::OutputData::text(content)));
2273        }
2274
2275        // Snapshot exec_ctx into a local context and release the write lock
2276        // before calling tool.execute. Holding the write across tool execution
2277        // would deadlock any builtin that re-dispatches through ctx.dispatcher
2278        // (timeout, scatter) — the inner dispatch_command needs its own
2279        // exec_ctx.write() and would block forever.
2280        let mut ctx = {
2281            let ec = self.exec_ctx.write().await;
2282            let scope = self.scope.read().await;
2283            ExecContext {
2284                backend: ec.backend.clone(),
2285                scope: scope.clone(),
2286                cwd: ec.cwd.clone(),
2287                prev_cwd: ec.prev_cwd.clone(),
2288                stdin: ec.stdin.clone(),
2289                stdin_data: ec.stdin_data.clone(),
2290                pipe_stdin: None, // streaming pipes are per-pipeline; not snapshotted
2291                pipe_stdout: None,
2292                stderr: ec.stderr.clone(),
2293                tool_schemas: ec.tool_schemas.clone(),
2294                tools: ec.tools.clone(),
2295                job_manager: ec.job_manager.clone(),
2296                pipeline_position: ec.pipeline_position,
2297                interactive: self.interactive,
2298                aliases: ec.aliases.clone(),
2299                ignore_config: ec.ignore_config.clone(),
2300                output_limit: ec.output_limit.clone(),
2301                allow_external_commands: self.allow_external_commands,
2302                nonce_store: ec.nonce_store.clone(),
2303                trash_backend: ec.trash_backend.clone(),
2304                #[cfg(all(unix, feature = "subprocess"))]
2305                terminal_state: ec.terminal_state.clone(),
2306                dispatcher: self.dispatcher(),
2307                // Use ec.cancel (set by dispatch_command from the runner's
2308                // ctx.cancel) so any builtin-swapped child token (e.g. timeout's
2309                // child token) reaches the spawned external via wait_or_kill.
2310                // Falls back to the kernel's own token when ec.cancel is the
2311                // default fresh token from a non-dispatch path.
2312                cancel: ec.cancel.clone(),
2313                output_format: None,
2314            }
2315        }; // both locks released — tool.execute can re-dispatch safely
2316
2317        // Move stdin out of self.exec_ctx into the snapshot (consumed-by-tool
2318        // semantics): take() so a later dispatch doesn't see stale stdin.
2319        // Done after the snapshot above so we hold the write briefly.
2320        {
2321            let mut ec = self.exec_ctx.write().await;
2322            ctx.stdin = ec.stdin.take();
2323            ctx.stdin_data = ec.stdin_data.take();
2324            ctx.pipe_stdin = ec.pipe_stdin.take();
2325            ctx.pipe_stdout = ec.pipe_stdout.take();
2326        }
2327
2328        // Honor --json before the builtin runs so its setting survives a clap
2329        // parse failure (e.g. `cmd --json --bogus-flag` would otherwise drop
2330        // --json on the floor when `try_parse_from` returns Err early).
2331        // The builtin's own `parsed.global.apply(ctx)` becomes idempotent.
2332        GlobalFlags::apply_from_args(&tool_args, &mut ctx);
2333
2334        let result = tool.execute(tool_args, &mut ctx).await;
2335
2336        // Sync mutations back. Tools may have changed scope (set/cd),
2337        // cwd/prev_cwd (cd), and aliases (alias). Also return any unused pipe
2338        // endpoints to self.exec_ctx so dispatch_command's post-execute sync
2339        // hands them back to the pipeline runner — the runner uses
2340        // stage_ctx.pipe_stdout to write the result to the next stage when
2341        // the tool itself didn't take and write to it.
2342        {
2343            let mut scope = self.scope.write().await;
2344            *scope = ctx.scope.clone();
2345        }
2346        {
2347            let mut ec = self.exec_ctx.write().await;
2348            ec.cwd = ctx.cwd;
2349            ec.prev_cwd = ctx.prev_cwd;
2350            ec.aliases = ctx.aliases;
2351            ec.pipe_stdin = ctx.pipe_stdin.take();
2352            ec.pipe_stdout = ctx.pipe_stdout.take();
2353        }
2354
2355        // Builtins parse --json via the GlobalFlags flatten in their clap
2356        // struct and write ctx.output_format. The kernel applies it — unless the
2357        // tool owns its own output (renders --json itself), in which case we
2358        // leave its bytes untouched.
2359        let result = finalize_output(result, ctx.output_format, schema.owns_output);
2360
2361        Ok(result)
2362    }
2363
2364    /// The session `HOME` from the kernel scope, if set. Tilde expansion reads
2365    /// this rather than `std::env::var("HOME")` so the kernel stays hermetic —
2366    /// a hermetic embedder (empty `initial_vars`) gets `None`, and `~` is left
2367    /// unexpanded rather than leaking the host home directory.
2368    async fn scope_home(&self) -> Option<String> {
2369        match self.scope.read().await.get("HOME") {
2370            Some(Value::String(s)) => Some(s.clone()),
2371            _ => None,
2372        }
2373    }
2374
2375    /// Pull `consumes` positional args after a non-bool flag and stash them
2376    /// on `tool_args.named` under the canonical param name.
2377    ///
2378    /// - `consumes == 1` keeps the historical contract: a single scalar value.
2379    /// - `consumes > 1` accumulates each occurrence as an inner
2380    ///   `serde_json::Value::Array` inside `named[canonical] =
2381    ///   Value::Json(Array(...))`, preserving invocation order. This is the
2382    ///   shape jq's `--arg NAME VAL` / `--argjson NAME VAL` land in.
2383    ///
2384    /// Errors loudly if the flag is missing required positionals — matches
2385    /// kaish's "no silent fallback" posture and mirrors real jq, which
2386    /// errors on `--arg NAME` with no value.
2387    #[allow(clippy::too_many_arguments)]
2388    async fn consume_flag_positionals(
2389        &self,
2390        args: &[Arg],
2391        flag_name: &str,
2392        canonical: &str,
2393        consumes: usize,
2394        positional_indices: &[usize],
2395        consumed: &mut std::collections::HashSet<usize>,
2396        current_idx: usize,
2397        tool_args: &mut ToolArgs,
2398    ) -> Result<()> {
2399        let home = self.scope_home().await;
2400        let mut collected: Vec<Value> = Vec::with_capacity(consumes.max(1));
2401        for _ in 0..consumes.max(1) {
2402            let next_pos = positional_indices
2403                .iter()
2404                .find(|idx| **idx > current_idx && !consumed.contains(idx))
2405                .copied();
2406            match next_pos {
2407                Some(pos_idx) => {
2408                    if let Arg::Positional(expr) = &args[pos_idx] {
2409                        let value = self.eval_expr_async(expr).await?;
2410                        let value = apply_tilde_expansion(value, home.as_deref());
2411                        collected.push(value);
2412                        consumed.insert(pos_idx);
2413                    }
2414                }
2415                None => {
2416                    if consumes <= 1 && collected.is_empty() {
2417                        // Back-compat: a flag with no follow-up positional
2418                        // becomes a bare flag. `--path` with nothing after
2419                        // lands in `flags`, same as before this refactor.
2420                        tool_args.flags.insert(flag_name.to_string());
2421                        return Ok(());
2422                    }
2423                    anyhow::bail!(
2424                        "--{flag_name} requires {consumes} argument{}, got {}",
2425                        if consumes == 1 { "" } else { "s" },
2426                        collected.len()
2427                    );
2428                }
2429            }
2430        }
2431
2432        if consumes <= 1 {
2433            if let Some(v) = collected.pop() {
2434                tool_args.named.insert(canonical.to_string(), v);
2435            }
2436            return Ok(());
2437        }
2438
2439        // Multi-consume: accumulate under named[canonical] as array-of-arrays.
2440        let occ: Vec<serde_json::Value> = collected
2441            .into_iter()
2442            .map(|v| crate::interpreter::value_to_json(&v))
2443            .collect();
2444        let entry = tool_args
2445            .named
2446            .entry(canonical.to_string())
2447            .or_insert_with(|| Value::Json(serde_json::Value::Array(Vec::new())));
2448        if let Value::Json(serde_json::Value::Array(outer)) = entry {
2449            outer.push(serde_json::Value::Array(occ));
2450        } else {
2451            anyhow::bail!(
2452                "--{flag_name}: named[{canonical}] already holds a non-array value"
2453            );
2454        }
2455        Ok(())
2456    }
2457
2458    /// Build tool arguments from AST args.
2459    ///
2460    /// Uses async evaluation to support command substitution in arguments.
2461    async fn build_args_async(&self, args: &[Arg], schema: Option<&crate::tools::ToolSchema>) -> Result<ToolArgs> {
2462        let mut tool_args = ToolArgs::new();
2463        let home = self.scope_home().await;
2464        // Subcommand-aware tools (e.g. `kj context list`) expose a tree of
2465        // schemas; pick the leaf the leading positionals route to and bind
2466        // flags against *its* params. Flat tools return the root. select_leaf
2467        // errors (fail loud) if a computed positional sits where a subcommand
2468        // selector is required.
2469        let leaf = match schema {
2470            Some(s) => Some(select_leaf(s, args)?),
2471            None => None,
2472        };
2473        // Bind against the leaf's params, but MERGE the root schema's params on
2474        // top as "global" flags: a value-flag declared at the tool's top level
2475        // (e.g. kj's `--confirm <nonce>`) must bind at every leaf, including when
2476        // it trails the subcommand path (`kj context retag a b --confirm <n>`).
2477        // The leaf wins on name conflicts. For a flat tool, leaf == root, so the
2478        // merge is a harmless no-op.
2479        let mut param_lookup = schema.map(schema_param_lookup).unwrap_or_default();
2480        if let Some(l) = leaf {
2481            param_lookup.extend(schema_param_lookup(l));
2482        }
2483        // accepts_word_assign keys off the root tool name (the WORD_ASSIGN list),
2484        // not the leaf — it's a property of the command, not the subcommand.
2485        let accepts_word_assign = schema
2486            .map(|s| crate::tools::accepts_word_assign(s.name.as_str()))
2487            .unwrap_or(false);
2488
2489        // Track which positional indices have been consumed as flag values
2490        let mut consumed: std::collections::HashSet<usize> = std::collections::HashSet::new();
2491        let mut past_double_dash = false;
2492
2493        // Find positional arg indices for flag value consumption
2494        let positional_indices: Vec<usize> = args.iter().enumerate()
2495            .filter_map(|(i, a)| matches!(a, Arg::Positional(_)).then_some(i))
2496            .collect();
2497
2498        let mut i = 0;
2499        while i < args.len() {
2500            match &args[i] {
2501                Arg::DoubleDash => {
2502                    past_double_dash = true;
2503                }
2504                Arg::Positional(expr) => {
2505                    if !consumed.contains(&i) {
2506                        // Glob expansion: bare glob patterns expand to matching files
2507                        if let Expr::GlobPattern(pattern) = expr {
2508                            let glob_enabled = {
2509                                let scope = self.scope.read().await;
2510                                scope.glob_enabled()
2511                            };
2512                            if glob_enabled {
2513                                let (paths, cwd) = {
2514                                    let ctx = self.exec_ctx.read().await;
2515                                    let paths = ctx.expand_glob(pattern).await
2516                                        .map_err(|e| anyhow::anyhow!("glob: {}", e))?;
2517                                    let cwd = ctx.resolve_path(".");
2518                                    (paths, cwd)
2519                                };
2520                                if paths.is_empty() {
2521                                    return Err(anyhow::anyhow!("no matches: {}", pattern));
2522                                }
2523                                for path in paths {
2524                                    let display = if !pattern.starts_with('/') {
2525                                        path.strip_prefix(&cwd)
2526                                            .unwrap_or(&path)
2527                                            .to_string_lossy().into_owned()
2528                                    } else {
2529                                        path.to_string_lossy().into_owned()
2530                                    };
2531                                    tool_args.positional.push(Value::String(display));
2532                                }
2533                                i += 1;
2534                                continue;
2535                            }
2536                        }
2537                        let value = self.eval_expr_async(expr).await?;
2538                        let value = apply_tilde_expansion(value, home.as_deref());
2539                        tool_args.positional.push(value);
2540                    }
2541                }
2542                Arg::Named { key, value } => {
2543                    let val = self.eval_expr_async(value).await?;
2544                    let val = apply_tilde_expansion(val, home.as_deref());
2545                    tool_args.named.insert(key.clone(), val);
2546                }
2547                Arg::WordAssign { key, value } => {
2548                    let val = self.eval_expr_async(value).await?;
2549                    let val = apply_tilde_expansion(val, home.as_deref());
2550                    if accepts_word_assign {
2551                        tool_args.named.insert(key.clone(), val);
2552                    } else {
2553                        // Stringify "key=value" and pass as a positional.
2554                        // Matches bash: `cat foo=bar` opens a file named `foo=bar`.
2555                        let val_str = crate::interpreter::value_to_string(&val);
2556                        tool_args.positional.push(Value::String(format!("{key}={val_str}")));
2557                    }
2558                }
2559                Arg::ShortFlag(name) => {
2560                    if past_double_dash {
2561                        tool_args.positional.push(Value::String(format!("-{name}")));
2562                    } else if name.len() == 1 {
2563                        let flag_name = name.as_str();
2564                        let lookup = param_lookup.get(flag_name);
2565                        let is_bool = lookup.map(|(_, typ, _)| is_bool_type(typ)).unwrap_or(true);
2566
2567                        if is_bool {
2568                            tool_args.flags.insert(flag_name.to_string());
2569                        } else {
2570                            // Non-bool: consume `consumes` positionals as value(s)
2571                            let canonical = lookup.map(|(n, _, _)| *n).unwrap_or(flag_name);
2572                            let consumes = lookup.map(|(_, _, c)| *c).unwrap_or(1);
2573                            self.consume_flag_positionals(
2574                                args,
2575                                name,
2576                                canonical,
2577                                consumes,
2578                                &positional_indices,
2579                                &mut consumed,
2580                                i,
2581                                &mut tool_args,
2582                            )
2583                            .await?;
2584                        }
2585                    } else if let Some(&(canonical, typ, consumes)) = param_lookup.get(name.as_str()) {
2586                        // Multi-char short flag matches a schema param (POSIX style: -name value)
2587                        if is_bool_type(typ) {
2588                            tool_args.flags.insert(canonical.to_string());
2589                        } else {
2590                            self.consume_flag_positionals(
2591                                args,
2592                                name,
2593                                canonical,
2594                                consumes,
2595                                &positional_indices,
2596                                &mut consumed,
2597                                i,
2598                                &mut tool_args,
2599                            )
2600                            .await?;
2601                        }
2602                    } else {
2603                        // Multi-char combined flags like -la: always boolean
2604                        for c in name.chars() {
2605                            tool_args.flags.insert(c.to_string());
2606                        }
2607                    }
2608                }
2609                Arg::LongFlag(name) => {
2610                    if past_double_dash {
2611                        tool_args.positional.push(Value::String(format!("--{name}")));
2612                    } else {
2613                        let lookup = param_lookup.get(name.as_str());
2614                        // An *undeclared* long flag under a `map_positionals`
2615                        // (backend/MCP) schema that is immediately followed by an
2616                        // unconsumed positional is ambiguous: kaish can't tell the
2617                        // space-form value (`--type explorer`) from a bool flag
2618                        // before a real positional (`--force file.txt`). Defaulting
2619                        // to bool here silently divorces the value and misroutes it
2620                        // — a privilege-escalation-by-typo against deny-by-default
2621                        // embedders (docs/issues.md). Fail loud instead of guessing.
2622                        let ambiguous_value = (lookup.is_none()
2623                            && leaf.is_some_and(|s| s.map_positionals)
2624                            && !consumed.contains(&(i + 1)))
2625                            .then(|| match args.get(i + 1) {
2626                                // Echo a concrete value for a copy-pasteable fix
2627                                // when it's a plain literal; fall back to VALUE.
2628                                Some(Arg::Positional(Expr::Literal(Value::String(s)))) => {
2629                                    Some(s.clone())
2630                                }
2631                                Some(Arg::Positional(_)) => Some("VALUE".to_string()),
2632                                _ => None,
2633                            })
2634                            .flatten();
2635                        if let Some(val) = ambiguous_value {
2636                            let tool = leaf.map(|s| s.name.as_str()).unwrap_or("command");
2637                            anyhow::bail!(
2638                                "{tool}: --{name} is not a declared flag, so the \
2639                                 space-separated value would be silently dropped. \
2640                                 Use --{name}={val}, or have {tool} declare --{name} \
2641                                 in its schema."
2642                            );
2643                        }
2644                        let is_bool = lookup.map(|(_, typ, _)| is_bool_type(typ)).unwrap_or(true);
2645
2646                        if is_bool {
2647                            tool_args.flags.insert(name.clone());
2648                        } else {
2649                            let canonical = lookup.map(|(n, _, _)| *n).unwrap_or(name.as_str());
2650                            let consumes = lookup.map(|(_, _, c)| *c).unwrap_or(1);
2651                            self.consume_flag_positionals(
2652                                args,
2653                                name,
2654                                canonical,
2655                                consumes,
2656                                &positional_indices,
2657                                &mut consumed,
2658                                i,
2659                                &mut tool_args,
2660                            )
2661                            .await?;
2662                        }
2663                    }
2664                }
2665            }
2666            i += 1;
2667        }
2668
2669        // Map remaining positionals to unfilled non-bool schema params (in order).
2670        // This enables `drift_push "abc" "hello"` → named["target_ctx"] = "abc", named["content"] = "hello"
2671        // Positionals that appeared after `--` are never mapped (they're raw data).
2672        // Only for backend/external tools (map_positionals=true). Builtins handle their own positionals.
2673        // Keyed off the routed leaf so a subcommand tool maps against the active
2674        // leaf's params (kj leaves keep map_positionals=false → block skipped).
2675        if let Some(schema) = leaf.filter(|s| s.map_positionals) {
2676            let pre_dash_count = if past_double_dash {
2677                let dash_pos = args.iter().position(|a| matches!(a, Arg::DoubleDash)).unwrap_or(args.len());
2678                positional_indices.iter()
2679                    .filter(|idx| **idx < dash_pos && !consumed.contains(idx))
2680                    .count()
2681            } else {
2682                tool_args.positional.len()
2683            };
2684
2685            let mut remaining = Vec::new();
2686            let mut positional_iter = tool_args.positional.drain(..).enumerate();
2687
2688            for param in &schema.params {
2689                if tool_args.named.contains_key(&param.name) || tool_args.flags.contains(&param.name) {
2690                    continue;
2691                }
2692                if is_bool_type(&param.param_type) {
2693                    continue;
2694                }
2695                loop {
2696                    match positional_iter.next() {
2697                        Some((idx, val)) if idx < pre_dash_count => {
2698                            tool_args.named.insert(param.name.clone(), val);
2699                            break;
2700                        }
2701                        Some((_, val)) => {
2702                            remaining.push(val);
2703                        }
2704                        None => break,
2705                    }
2706                }
2707            }
2708
2709            remaining.extend(positional_iter.map(|(_, v)| v));
2710            tool_args.positional = remaining;
2711        }
2712
2713        Ok(tool_args)
2714    }
2715
2716    /// Build arguments as flat string list for external commands.
2717    ///
2718    /// Unlike `build_args_async` which separates flags into a HashSet (for schema-aware builtins),
2719    /// this preserves the original flag format as strings for external commands:
2720    /// - `-l` stays as `-l`
2721    /// - `--verbose` stays as `--verbose`
2722    /// - `key=value` stays as `key=value`
2723    ///
2724    /// This is what external commands expect in their argv.
2725    #[cfg(feature = "subprocess")]
2726    async fn build_args_flat(&self, args: &[Arg]) -> Result<Vec<String>> {
2727        let mut argv = Vec::new();
2728        let home = self.scope_home().await;
2729        for arg in args {
2730            match arg {
2731                Arg::Positional(expr) => {
2732                    // Glob expansion for external commands
2733                    if let Expr::GlobPattern(pattern) = expr {
2734                        let glob_enabled = {
2735                            let scope = self.scope.read().await;
2736                            scope.glob_enabled()
2737                        };
2738                        if glob_enabled {
2739                            let (paths, cwd) = {
2740                                let ctx = self.exec_ctx.read().await;
2741                                let paths = ctx.expand_glob(pattern).await
2742                                    .map_err(|e| anyhow::anyhow!("glob: {}", e))?;
2743                                let cwd = ctx.resolve_path(".");
2744                                (paths, cwd)
2745                            };
2746                            if paths.is_empty() {
2747                                return Err(anyhow::anyhow!("no matches: {}", pattern));
2748                            }
2749                            for path in paths {
2750                                let display = if !pattern.starts_with('/') {
2751                                    path.strip_prefix(&cwd)
2752                                        .unwrap_or(&path)
2753                                        .to_string_lossy().into_owned()
2754                                } else {
2755                                    path.to_string_lossy().into_owned()
2756                                };
2757                                argv.push(display);
2758                            }
2759                            continue;
2760                        }
2761                    }
2762                    let value = self.eval_expr_async(expr).await?;
2763                    let value = apply_tilde_expansion(value, home.as_deref());
2764                    argv.push(value_to_string(&value));
2765                }
2766                Arg::Named { key, value } => {
2767                    let val = self.eval_expr_async(value).await?;
2768                    let val = apply_tilde_expansion(val, home.as_deref());
2769                    argv.push(format!("--{}={}", key, value_to_string(&val)));
2770                }
2771                Arg::WordAssign { key, value } => {
2772                    let val = self.eval_expr_async(value).await?;
2773                    let val = apply_tilde_expansion(val, home.as_deref());
2774                    argv.push(format!("{}={}", key, value_to_string(&val)));
2775                }
2776                Arg::ShortFlag(name) => {
2777                    // Preserve original format: -l, -la (combined flags)
2778                    argv.push(format!("-{}", name));
2779                }
2780                Arg::LongFlag(name) => {
2781                    // Preserve original format: --verbose
2782                    argv.push(format!("--{}", name));
2783                }
2784                Arg::DoubleDash => {
2785                    // Preserve the -- marker
2786                    argv.push("--".to_string());
2787                }
2788            }
2789        }
2790        Ok(argv)
2791    }
2792
2793    /// Async expression evaluator that supports command substitution.
2794    ///
2795    /// This is used for contexts where expressions may contain `$(...)` command
2796    /// substitution. Unlike the sync `eval_expr`, this can execute pipelines.
2797    fn eval_expr_async<'a>(&'a self, expr: &'a Expr) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Value>> + Send + 'a>> {
2798        Box::pin(async move {
2799        match expr {
2800            Expr::Literal(value) => Ok(value.clone()),
2801            Expr::VarRef(path) => {
2802                let scope = self.scope.read().await;
2803                scope.resolve_path(path)
2804                    .ok_or_else(|| anyhow::anyhow!("undefined variable"))
2805            }
2806            Expr::Interpolated(parts) => {
2807                let mut result = String::new();
2808                for part in parts {
2809                    result.push_str(&self.eval_string_part_async(part).await?);
2810                }
2811                Ok(Value::String(result))
2812            }
2813            Expr::HereDocBody { parts, strip_tabs } => {
2814                let mut result = String::new();
2815                for sp in parts {
2816                    result.push_str(&self.eval_string_part_async(&sp.part).await?);
2817                }
2818                if *strip_tabs {
2819                    Ok(Value::String(crate::interpreter::strip_leading_tabs(&result)))
2820                } else {
2821                    Ok(Value::String(result))
2822                }
2823            }
2824            Expr::BinaryOp { left, op, right } => match op {
2825                BinaryOp::And => {
2826                    let left_val = self.eval_expr_async(left).await?;
2827                    if !is_truthy(&left_val) {
2828                        return Ok(left_val);
2829                    }
2830                    self.eval_expr_async(right).await
2831                }
2832                BinaryOp::Or => {
2833                    let left_val = self.eval_expr_async(left).await?;
2834                    if is_truthy(&left_val) {
2835                        return Ok(left_val);
2836                    }
2837                    self.eval_expr_async(right).await
2838                }
2839            },
2840            Expr::CommandSubst(pipeline) => {
2841                // Snapshot scope+cwd before running — only output escapes,
2842                // not side effects like `cd` or variable assignments.
2843                let saved_scope = { self.scope.read().await.clone() };
2844                let saved_cwd = {
2845                    let ec = self.exec_ctx.read().await;
2846                    (ec.cwd.clone(), ec.prev_cwd.clone())
2847                };
2848
2849                // Capture result without `?` — restore state unconditionally
2850                let run_result = self.execute_pipeline(pipeline).await;
2851
2852                // Restore scope and cwd regardless of success/failure
2853                {
2854                    let mut scope = self.scope.write().await;
2855                    *scope = saved_scope;
2856                    if let Ok(ref r) = run_result {
2857                        scope.set_last_result(r.clone());
2858                    }
2859                }
2860                {
2861                    let mut ec = self.exec_ctx.write().await;
2862                    ec.cwd = saved_cwd.0;
2863                    ec.prev_cwd = saved_cwd.1;
2864                }
2865
2866                // Now propagate the error
2867                let result = run_result?;
2868
2869                // Prefer structured data (enables `for i in $(cmd)` iteration)
2870                if let Some(data) = &result.data {
2871                    Ok(data.clone())
2872                } else if let Some(output) = result.output() {
2873                    // Flat non-text node lists (glob, ls, tree) → iterable array
2874                    if output.is_flat() && !output.is_simple_text() && !output.root.is_empty() {
2875                        let items: Vec<serde_json::Value> = output.root.iter()
2876                            .map(|n| serde_json::Value::String(n.display_name().to_string()))
2877                            .collect();
2878                        Ok(Value::Json(serde_json::Value::Array(items)))
2879                    } else {
2880                        Ok(Value::String(result.text_out().trim_end().to_string()))
2881                    }
2882                } else {
2883                    // Otherwise return stdout as single string (NO implicit splitting)
2884                    Ok(Value::String(result.text_out().trim_end().to_string()))
2885                }
2886            }
2887            Expr::Test(test_expr) => {
2888                Ok(Value::Bool(self.eval_test_async(test_expr).await?))
2889            }
2890            Expr::Positional(n) => {
2891                let scope = self.scope.read().await;
2892                match scope.get_positional(*n) {
2893                    Some(s) => Ok(Value::String(s.to_string())),
2894                    None => Ok(Value::String(String::new())),
2895                }
2896            }
2897            Expr::AllArgs => {
2898                let scope = self.scope.read().await;
2899                Ok(Value::String(scope.all_args().join(" ")))
2900            }
2901            Expr::ArgCount => {
2902                let scope = self.scope.read().await;
2903                Ok(Value::Int(scope.arg_count() as i64))
2904            }
2905            Expr::VarLength(name) => {
2906                let scope = self.scope.read().await;
2907                match scope.get(name) {
2908                    Some(value) => Ok(Value::Int(value_to_string(value).len() as i64)),
2909                    None => Ok(Value::Int(0)),
2910                }
2911            }
2912            Expr::VarWithDefault { name, default } => {
2913                let scope = self.scope.read().await;
2914                let use_default = match scope.get(name) {
2915                    Some(value) => value_to_string(value).is_empty(),
2916                    None => true,
2917                };
2918                drop(scope); // Release the lock before recursive evaluation
2919                if use_default {
2920                    // Evaluate the default parts (supports nested expansions)
2921                    self.eval_string_parts_async(default).await.map(Value::String)
2922                } else {
2923                    let scope = self.scope.read().await;
2924                    scope.get(name).cloned().ok_or_else(|| anyhow::anyhow!("variable '{}' not found", name))
2925                }
2926            }
2927            Expr::Arithmetic(expr_str) => {
2928                let scope = self.scope.read().await;
2929                crate::arithmetic::eval_arithmetic(expr_str, &scope)
2930                    .map(Value::Int)
2931                    .map_err(|e| anyhow::anyhow!("arithmetic error: {}", e))
2932            }
2933            Expr::Command(cmd) => {
2934                // Execute command and return boolean based on exit code
2935                let result = self.execute_command(&cmd.name, &cmd.args).await?;
2936                Ok(Value::Bool(result.code == 0))
2937            }
2938            Expr::LastExitCode => {
2939                let scope = self.scope.read().await;
2940                Ok(Value::Int(scope.last_result().code))
2941            }
2942            Expr::CurrentPid => {
2943                let scope = self.scope.read().await;
2944                Ok(Value::Int(scope.pid() as i64))
2945            }
2946            Expr::GlobPattern(s) => Ok(Value::String(s.clone())),
2947        }
2948        })
2949    }
2950
2951    /// Async helper to evaluate multiple StringParts into a single string.
2952    fn eval_string_parts_async<'a>(&'a self, parts: &'a [StringPart]) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<String>> + Send + 'a>> {
2953        Box::pin(async move {
2954            let mut result = String::new();
2955            for part in parts {
2956                result.push_str(&self.eval_string_part_async(part).await?);
2957            }
2958            Ok(result)
2959        })
2960    }
2961
2962    /// Async helper to evaluate a StringPart.
2963    /// Evaluate a `[[ ]]` test expression asynchronously, routing file tests
2964    /// through the VFS backend instead of using raw `std::path`.
2965    fn eval_test_async<'a>(&'a self, test_expr: &'a TestExpr) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<bool>> + Send + 'a>> {
2966        Box::pin(async move {
2967            match test_expr {
2968                TestExpr::FileTest { op, path } => {
2969                    let path_value = self.eval_expr_async(path).await?;
2970                    let path_str = value_to_string(&path_value);
2971                    let backend = self.exec_ctx.read().await.backend.clone();
2972                    let entry = backend.stat(std::path::Path::new(&path_str)).await.ok();
2973                    Ok(match op {
2974                        FileTestOp::Exists => entry.is_some(),
2975                        FileTestOp::IsFile => entry.as_ref().is_some_and(|e| e.is_file()),
2976                        FileTestOp::IsDir => entry.as_ref().is_some_and(|e| e.is_dir()),
2977                        FileTestOp::Readable => entry.is_some(),
2978                        FileTestOp::Writable => entry.as_ref().is_some_and(|e| {
2979                            e.permissions.is_none_or(|p| p & 0o222 != 0)
2980                        }),
2981                        FileTestOp::Executable => entry.as_ref().is_some_and(|e| {
2982                            e.permissions.is_some_and(|p| p & 0o111 != 0)
2983                        }),
2984                    })
2985                }
2986                TestExpr::StringTest { op, value } => {
2987                    let val = self.eval_expr_async(value).await?;
2988                    let s = value_to_string(&val);
2989                    Ok(match op {
2990                        crate::ast::StringTestOp::IsEmpty => s.is_empty(),
2991                        crate::ast::StringTestOp::IsNonEmpty => !s.is_empty(),
2992                    })
2993                }
2994                TestExpr::Comparison { left, op, right } => {
2995                    // Evaluate operands async (handles $(cmd)), then compare sync
2996                    let left_val = self.eval_expr_async(left).await?;
2997                    let right_val = self.eval_expr_async(right).await?;
2998                    let resolved = TestExpr::Comparison {
2999                        left: Box::new(Expr::Literal(left_val)),
3000                        op: *op,
3001                        right: Box::new(Expr::Literal(right_val)),
3002                    };
3003                    let expr = Expr::Test(Box::new(resolved));
3004                    let mut scope = self.scope.write().await;
3005                    let value = eval_expr(&expr, &mut scope)
3006                        .map_err(|e| anyhow::anyhow!("{}", e))?;
3007                    Ok(value_to_bool(&value))
3008                }
3009                TestExpr::And { left, right } => {
3010                    if !self.eval_test_async(left).await? {
3011                        Ok(false)
3012                    } else {
3013                        self.eval_test_async(right).await
3014                    }
3015                }
3016                TestExpr::Or { left, right } => {
3017                    if self.eval_test_async(left).await? {
3018                        Ok(true)
3019                    } else {
3020                        self.eval_test_async(right).await
3021                    }
3022                }
3023                TestExpr::Not { expr } => {
3024                    Ok(!self.eval_test_async(expr).await?)
3025                }
3026            }
3027        })
3028    }
3029
3030    fn eval_string_part_async<'a>(&'a self, part: &'a StringPart) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<String>> + Send + 'a>> {
3031        Box::pin(async move {
3032            match part {
3033                StringPart::Literal(s) => Ok(s.clone()),
3034                StringPart::Var(path) => {
3035                    let scope = self.scope.read().await;
3036                    match scope.resolve_path(path) {
3037                        Some(value) => Ok(value_to_string(&value)),
3038                        None => Ok(String::new()), // Unset vars expand to empty
3039                    }
3040                }
3041                StringPart::VarWithDefault { name, default } => {
3042                    let scope = self.scope.read().await;
3043                    let use_default = match scope.get(name) {
3044                        Some(value) => value_to_string(value).is_empty(),
3045                        None => true,
3046                    };
3047                    drop(scope); // Release lock before recursive evaluation
3048                    if use_default {
3049                        // Evaluate the default parts (supports nested expansions)
3050                        self.eval_string_parts_async(default).await
3051                    } else {
3052                        let scope = self.scope.read().await;
3053                        Ok(value_to_string(scope.get(name).ok_or_else(|| anyhow::anyhow!("variable '{}' not found", name))?))
3054                    }
3055                }
3056            StringPart::VarLength(name) => {
3057                let scope = self.scope.read().await;
3058                match scope.get(name) {
3059                    Some(value) => Ok(value_to_string(value).len().to_string()),
3060                    None => Ok("0".to_string()),
3061                }
3062            }
3063            StringPart::Positional(n) => {
3064                let scope = self.scope.read().await;
3065                match scope.get_positional(*n) {
3066                    Some(s) => Ok(s.to_string()),
3067                    None => Ok(String::new()),
3068                }
3069            }
3070            StringPart::AllArgs => {
3071                let scope = self.scope.read().await;
3072                Ok(scope.all_args().join(" "))
3073            }
3074            StringPart::ArgCount => {
3075                let scope = self.scope.read().await;
3076                Ok(scope.arg_count().to_string())
3077            }
3078            StringPart::Arithmetic(expr) => {
3079                let scope = self.scope.read().await;
3080                match crate::arithmetic::eval_arithmetic(expr, &scope) {
3081                    Ok(value) => Ok(value.to_string()),
3082                    Err(_) => Ok(String::new()),
3083                }
3084            }
3085            StringPart::CommandSubst(pipeline) => {
3086                // Snapshot scope+cwd — command substitution in strings must
3087                // not leak side effects (e.g., `"dir: $(cd /; pwd)"` must not change cwd).
3088                let saved_scope = { self.scope.read().await.clone() };
3089                let saved_cwd = {
3090                    let ec = self.exec_ctx.read().await;
3091                    (ec.cwd.clone(), ec.prev_cwd.clone())
3092                };
3093
3094                // Capture result without `?` — restore state unconditionally
3095                let run_result = self.execute_pipeline(pipeline).await;
3096
3097                // Restore scope and cwd regardless of success/failure
3098                {
3099                    let mut scope = self.scope.write().await;
3100                    *scope = saved_scope;
3101                    if let Ok(ref r) = run_result {
3102                        scope.set_last_result(r.clone());
3103                    }
3104                }
3105                {
3106                    let mut ec = self.exec_ctx.write().await;
3107                    ec.cwd = saved_cwd.0;
3108                    ec.prev_cwd = saved_cwd.1;
3109                }
3110
3111                // Now propagate the error
3112                let result = run_result?;
3113
3114                Ok(result.text_out().trim_end_matches('\n').to_string())
3115            }
3116            StringPart::LastExitCode => {
3117                let scope = self.scope.read().await;
3118                Ok(scope.last_result().code.to_string())
3119            }
3120            StringPart::CurrentPid => {
3121                let scope = self.scope.read().await;
3122                Ok(scope.pid().to_string())
3123            }
3124        }
3125        })
3126    }
3127
3128    /// Update the last result in scope.
3129    async fn update_last_result(&self, result: &ExecResult) {
3130        let mut scope = self.scope.write().await;
3131        scope.set_last_result(result.clone());
3132    }
3133
3134    /// Drain accumulated pipeline stderr into a result.
3135    ///
3136    /// Called after each sub-statement inside control structures (`if`, `for`,
3137    /// `while`, `case`, `&&`, `||`) so that stderr appears incrementally rather
3138    /// than batching until the entire structure finishes.
3139    async fn drain_stderr_into(&self, result: &mut ExecResult) {
3140        let drained = {
3141            let mut receiver = self.stderr_receiver.lock().await;
3142            receiver.drain_lossy()
3143        };
3144        if !drained.is_empty() {
3145            if !result.err.is_empty() && !result.err.ends_with('\n') {
3146                result.err.push('\n');
3147            }
3148            result.err.push_str(&drained);
3149        }
3150    }
3151
3152    /// Execute a user-defined function with local variable scoping.
3153    ///
3154    /// Functions push a new scope frame for local variables. Variables declared
3155    /// with `local` are scoped to the function; other assignments modify outer
3156    /// scopes (or create in root if new).
3157    async fn execute_user_tool(&self, def: ToolDef, args: &[Arg]) -> Result<ExecResult> {
3158        // 1. Build function args from AST args (async to support command substitution)
3159        let tool_args = self.build_args_async(args, None).await?;
3160
3161        // 2. Push a new scope frame for local variables
3162        {
3163            let mut scope = self.scope.write().await;
3164            scope.push_frame();
3165        }
3166
3167        // 3. Save current positional parameters and set new ones for this function
3168        let saved_positional = {
3169            let mut scope = self.scope.write().await;
3170            let saved = scope.save_positional();
3171
3172            // Set up new positional parameters ($0 = function name, $1, $2, ... = args)
3173            let positional_args: Vec<String> = tool_args.positional
3174                .iter()
3175                .map(value_to_string)
3176                .collect();
3177            scope.set_positional(&def.name, positional_args);
3178
3179            saved
3180        };
3181
3182        // 3. Execute body statements with control flow handling
3183        // Accumulate output across statements (like sh)
3184        let mut accumulated_out = String::new();
3185        let mut accumulated_err = String::new();
3186        let mut last_code = 0i64;
3187        let mut last_data: Option<Value> = None;
3188
3189        // Track execution error for propagation after cleanup
3190        let mut exec_error: Option<anyhow::Error> = None;
3191        let mut exit_code: Option<i64> = None;
3192
3193        for stmt in &def.body {
3194            match self.execute_stmt_flow(stmt).await {
3195                Ok(flow) => {
3196                    // Drain pipeline stderr after each sub-statement.
3197                    let drained = {
3198                        let mut receiver = self.stderr_receiver.lock().await;
3199                        receiver.drain_lossy()
3200                    };
3201                    if !drained.is_empty() {
3202                        accumulated_err.push_str(&drained);
3203                    }
3204
3205                    match flow {
3206                        ControlFlow::Normal(r) => {
3207                            accumulated_out.push_str(&r.text_out());
3208                            accumulated_err.push_str(&r.err);
3209                            last_code = r.code;
3210                            last_data = r.data;
3211                        }
3212                        ControlFlow::Return { value } => {
3213                            accumulated_out.push_str(&value.text_out());
3214                            accumulated_err.push_str(&value.err);
3215                            last_code = value.code;
3216                            last_data = value.data;
3217                            break;
3218                        }
3219                        ControlFlow::Exit { code } => {
3220                            exit_code = Some(code);
3221                            break;
3222                        }
3223                        ControlFlow::Break { result: r, .. } | ControlFlow::Continue { result: r, .. } => {
3224                            accumulated_out.push_str(&r.text_out());
3225                            accumulated_err.push_str(&r.err);
3226                            last_code = r.code;
3227                            last_data = r.data;
3228                        }
3229                    }
3230                }
3231                Err(e) => {
3232                    exec_error = Some(e);
3233                    break;
3234                }
3235            }
3236        }
3237
3238        // 4. Pop scope frame and restore original positional parameters (unconditionally)
3239        {
3240            let mut scope = self.scope.write().await;
3241            scope.pop_frame();
3242            scope.set_positional(saved_positional.0, saved_positional.1);
3243        }
3244
3245        // 5. Propagate error or exit after cleanup
3246        if let Some(e) = exec_error {
3247            return Err(e);
3248        }
3249        if let Some(code) = exit_code {
3250            return Ok(ExecResult::from_parts(code, accumulated_out, accumulated_err, last_data));
3251        }
3252
3253        Ok(ExecResult::from_parts(last_code, accumulated_out, accumulated_err, last_data))
3254    }
3255
3256    /// Execute the `source` / `.` command to include and run a script.
3257    ///
3258    /// Unlike regular tool execution, `source` executes in the CURRENT scope,
3259    /// allowing the sourced script to set variables and modify shell state.
3260    async fn execute_source(&self, args: &[Arg]) -> Result<ExecResult> {
3261        // Get the file path from the first positional argument
3262        let tool_args = self.build_args_async(args, None).await?;
3263        let path = match tool_args.positional.first() {
3264            Some(Value::String(s)) => s.clone(),
3265            Some(v) => value_to_string(v),
3266            None => {
3267                return Ok(ExecResult::failure(1, "source: missing filename"));
3268            }
3269        };
3270
3271        // Resolve path relative to cwd
3272        let full_path = {
3273            let ctx = self.exec_ctx.read().await;
3274            if path.starts_with('/') {
3275                std::path::PathBuf::from(&path)
3276            } else {
3277                ctx.cwd.join(&path)
3278            }
3279        };
3280
3281        // Read file content via backend
3282        let content = {
3283            let ctx = self.exec_ctx.read().await;
3284            match ctx.backend.read(&full_path, None).await {
3285                Ok(bytes) => {
3286                    String::from_utf8(bytes).map_err(|e| {
3287                        anyhow::anyhow!("source: {}: invalid UTF-8: {}", path, e)
3288                    })?
3289                }
3290                Err(e) => {
3291                    return Ok(ExecResult::failure(
3292                        1,
3293                        format!("source: {}: {}", path, e),
3294                    ));
3295                }
3296            }
3297        };
3298
3299        // Parse the content
3300        let program = match crate::parser::parse(&content) {
3301            Ok(p) => p,
3302            Err(errors) => {
3303                let msg = errors
3304                    .iter()
3305                    .map(|e| format!("{}:{}: {}", path, e.span.start, e.message))
3306                    .collect::<Vec<_>>()
3307                    .join("\n");
3308                return Ok(ExecResult::failure(1, format!("source: {}", msg)));
3309            }
3310        };
3311
3312        // Execute each statement in the CURRENT scope (not isolated)
3313        let mut result = ExecResult::success("");
3314        for stmt in program.statements {
3315            if matches!(stmt, crate::ast::Stmt::Empty) {
3316                continue;
3317            }
3318
3319            match self.execute_stmt_flow(&stmt).await {
3320                Ok(flow) => {
3321                    self.drain_stderr_into(&mut result).await;
3322                    match flow {
3323                        ControlFlow::Normal(r) => {
3324                            result = r.clone();
3325                            self.update_last_result(&r).await;
3326                        }
3327                        ControlFlow::Break { .. } | ControlFlow::Continue { .. } => {
3328                            return Err(anyhow::anyhow!(
3329                                "source: {}: unexpected break/continue outside loop",
3330                                path
3331                            ));
3332                        }
3333                        ControlFlow::Return { value } => {
3334                            return Ok(value);
3335                        }
3336                        ControlFlow::Exit { code } => {
3337                            result.code = code;
3338                            return Ok(result);
3339                        }
3340                    }
3341                }
3342                Err(e) => {
3343                    return Err(e.context(format!("source: {}", path)));
3344                }
3345            }
3346        }
3347
3348        Ok(result)
3349    }
3350
3351    /// Try to execute a script from PATH directories.
3352    ///
3353    /// Searches PATH for `{name}.kai` files and executes them in isolated scope
3354    /// (like user-defined tools). Returns None if no script is found.
3355    async fn try_execute_script(&self, name: &str, args: &[Arg]) -> Result<Option<ExecResult>> {
3356        // Get PATH from scope (default to "/bin")
3357        let path_value = {
3358            let scope = self.scope.read().await;
3359            scope
3360                .get("PATH")
3361                .map(value_to_string)
3362                .unwrap_or_else(|| "/bin".to_string())
3363        };
3364
3365        // Search PATH directories for script
3366        for dir in path_value.split(':') {
3367            if dir.is_empty() {
3368                continue;
3369            }
3370
3371            // Build script path: {dir}/{name}.kai
3372            let script_path = PathBuf::from(dir).join(format!("{}.kai", name));
3373
3374            // Check if script exists
3375            let exists = {
3376                let ctx = self.exec_ctx.read().await;
3377                ctx.backend.exists(&script_path).await
3378            };
3379
3380            if !exists {
3381                continue;
3382            }
3383
3384            // Read script content
3385            let content = {
3386                let ctx = self.exec_ctx.read().await;
3387                match ctx.backend.read(&script_path, None).await {
3388                    Ok(bytes) => match String::from_utf8(bytes) {
3389                        Ok(s) => s,
3390                        Err(e) => {
3391                            return Ok(Some(ExecResult::failure(
3392                                1,
3393                                format!("{}: invalid UTF-8: {}", script_path.display(), e),
3394                            )));
3395                        }
3396                    },
3397                    Err(e) => {
3398                        return Ok(Some(ExecResult::failure(
3399                            1,
3400                            format!("{}: {}", script_path.display(), e),
3401                        )));
3402                    }
3403                }
3404            };
3405
3406            // Parse the script
3407            let program = match crate::parser::parse(&content) {
3408                Ok(p) => p,
3409                Err(errors) => {
3410                    let msg = errors
3411                        .iter()
3412                        .map(|e| format!("{}:{}: {}", script_path.display(), e.span.start, e.message))
3413                        .collect::<Vec<_>>()
3414                        .join("\n");
3415                    return Ok(Some(ExecResult::failure(1, msg)));
3416                }
3417            };
3418
3419            // Build tool_args from args (async for command substitution support)
3420            let tool_args = self.build_args_async(args, None).await?;
3421
3422            // Create isolated scope (like user tools)
3423            let mut isolated_scope = Scope::new();
3424
3425            // Set up positional parameters ($0 = script name, $1, $2, ... = args)
3426            let positional_args: Vec<String> = tool_args.positional
3427                .iter()
3428                .map(value_to_string)
3429                .collect();
3430            isolated_scope.set_positional(name, positional_args);
3431
3432            // Save current scope and swap with isolated scope
3433            let original_scope = {
3434                let mut scope = self.scope.write().await;
3435                std::mem::replace(&mut *scope, isolated_scope)
3436            };
3437
3438            // Execute script statements — track outcome for cleanup
3439            let mut result = ExecResult::success("");
3440            let mut exec_error: Option<anyhow::Error> = None;
3441            let mut exit_code: Option<i64> = None;
3442
3443            for stmt in program.statements {
3444                if matches!(stmt, crate::ast::Stmt::Empty) {
3445                    continue;
3446                }
3447
3448                match self.execute_stmt_flow(&stmt).await {
3449                    Ok(flow) => {
3450                        match flow {
3451                            ControlFlow::Normal(r) => result = r,
3452                            ControlFlow::Return { value } => {
3453                                result = value;
3454                                break;
3455                            }
3456                            ControlFlow::Exit { code } => {
3457                                exit_code = Some(code);
3458                                break;
3459                            }
3460                            ControlFlow::Break { result: r, .. } | ControlFlow::Continue { result: r, .. } => {
3461                                result = r;
3462                            }
3463                        }
3464                    }
3465                    Err(e) => {
3466                        exec_error = Some(e);
3467                        break;
3468                    }
3469                }
3470            }
3471
3472            // Restore original scope unconditionally
3473            {
3474                let mut scope = self.scope.write().await;
3475                *scope = original_scope;
3476            }
3477
3478            // Propagate error or exit after cleanup
3479            if let Some(e) = exec_error {
3480                return Err(e.context(format!("script: {}", script_path.display())));
3481            }
3482            if let Some(code) = exit_code {
3483                result.code = code;
3484                return Ok(Some(result));
3485            }
3486
3487            return Ok(Some(result));
3488        }
3489
3490        // No script found
3491        Ok(None)
3492    }
3493
3494    /// Try to execute an external command from PATH.
3495    ///
3496    /// This is the fallback when no builtin or user-defined tool matches.
3497    /// External commands receive a clean argv (flags preserved in their original format).
3498    ///
3499    /// # Requirements
3500    /// - Command must be found in PATH
3501    /// - Current working directory must be on a real filesystem (not virtual like /v)
3502    ///
3503    /// # Returns
3504    /// - `Ok(Some(result))` if command was found and executed
3505    /// - `Ok(None)` if command was not found in PATH
3506    /// - `Err` on execution errors
3507    #[cfg(not(feature = "subprocess"))]
3508    async fn try_execute_external(&self, _name: &str, _args: &[Arg]) -> Result<Option<ExecResult>> {
3509        Ok(None)
3510    }
3511
3512    /// Try to execute an external command from PATH.
3513    #[cfg(feature = "subprocess")]
3514    #[tracing::instrument(level = "debug", skip(self, args), fields(command = %name))]
3515    async fn try_execute_external(&self, name: &str, args: &[Arg]) -> Result<Option<ExecResult>> {
3516        // Read the cancel token from `self.exec_ctx`, which `dispatch_command`
3517        // populates from the inbound ctx.cancel on every dispatch. This is
3518        // what makes the `timeout` builtin's swapped child token reach the
3519        // wait_or_kill discipline below — reading `self.cancel_token` would
3520        // give the kernel-wide token and miss the timeout's child cascade.
3521        let cancel = {
3522            let ec = self.exec_ctx.read().await;
3523            ec.cancel.clone()
3524        };
3525        let kill_grace = self.kill_grace;
3526        if !self.allow_external_commands {
3527            return Ok(None);
3528        }
3529
3530        // Get real working directory for relative path resolution and child cwd.
3531        // If the CWD is virtual (no real filesystem path), skip external command
3532        // execution entirely — return None so the dispatch can fall through to
3533        // backend-registered tools.
3534        let real_cwd = {
3535            let ctx = self.exec_ctx.read().await;
3536            match ctx.backend.resolve_real_path(&ctx.cwd) {
3537                Some(p) => p,
3538                None => return Ok(None),
3539            }
3540        };
3541
3542        let executable = if name.contains('/') {
3543            // Resolve relative paths (./script, ../bin/tool) against the shell's cwd
3544            let resolved = if std::path::Path::new(name).is_absolute() {
3545                std::path::PathBuf::from(name)
3546            } else {
3547                real_cwd.join(name)
3548            };
3549            if !resolved.exists() {
3550                return Ok(Some(ExecResult::failure(
3551                    127,
3552                    format!("{}: No such file or directory", name),
3553                )));
3554            }
3555            if !resolved.is_file() {
3556                return Ok(Some(ExecResult::failure(
3557                    126,
3558                    format!("{}: Is a directory", name),
3559                )));
3560            }
3561            #[cfg(unix)]
3562            {
3563                use std::os::unix::fs::PermissionsExt;
3564                let mode = std::fs::metadata(&resolved)
3565                    .map(|m| m.permissions().mode())
3566                    .unwrap_or(0);
3567                if mode & 0o111 == 0 {
3568                    return Ok(Some(ExecResult::failure(
3569                        126,
3570                        format!("{}: Permission denied", name),
3571                    )));
3572                }
3573            }
3574            resolved.to_string_lossy().into_owned()
3575        } else {
3576            // Get PATH from scope or environment
3577            let path_var = {
3578                let scope = self.scope.read().await;
3579                scope
3580                    .get("PATH")
3581                    .map(value_to_string)
3582                    .unwrap_or_else(|| std::env::var("PATH").unwrap_or_default())
3583            };
3584
3585            // Resolve command in PATH
3586            match resolve_in_path(name, &path_var) {
3587                Some(path) => path,
3588                None => return Ok(None), // Not found - let caller handle error
3589            }
3590        };
3591
3592        tracing::debug!(executable = %executable, "resolved external command");
3593
3594        // Build flat argv (preserves flag format)
3595        let argv = self.build_args_flat(args).await?;
3596
3597        // Get stdin if available
3598        let stdin_data = {
3599            let mut ctx = self.exec_ctx.write().await;
3600            ctx.take_stdin()
3601        };
3602
3603        // Build and spawn the command
3604        use tokio::process::Command;
3605
3606        let mut cmd = Command::new(&executable);
3607        cmd.args(&argv);
3608        cmd.current_dir(&real_cwd);
3609
3610        // Hermetic env: child sees only kaish's exported vars, not the kaish
3611        // process's OS env. Frontends that want OS-env passthrough (REPL, MCP)
3612        // populate it via KernelConfig::initial_vars at construction.
3613        cmd.env_clear();
3614        {
3615            let scope = self.scope.read().await;
3616            for (var_name, value) in scope.exported_vars() {
3617                cmd.env(var_name, value_to_string(&value));
3618            }
3619        }
3620
3621        // Handle stdin
3622        cmd.stdin(if stdin_data.is_some() {
3623            std::process::Stdio::piped()
3624        } else if self.interactive {
3625            std::process::Stdio::inherit()
3626        } else {
3627            std::process::Stdio::null()
3628        });
3629
3630        // In interactive mode, standalone or last-in-pipeline commands inherit
3631        // the terminal's stdout/stderr so output streams in real-time.
3632        // First/middle commands must capture stdout for the pipe — same as bash.
3633        let pipeline_position = {
3634            let ctx = self.exec_ctx.read().await;
3635            ctx.pipeline_position
3636        };
3637        let inherit_output = self.interactive
3638            && matches!(pipeline_position, PipelinePosition::Only | PipelinePosition::Last);
3639
3640        if inherit_output {
3641            cmd.stdout(std::process::Stdio::inherit());
3642            cmd.stderr(std::process::Stdio::inherit());
3643        } else {
3644            cmd.stdout(std::process::Stdio::piped());
3645            cmd.stderr(std::process::Stdio::piped());
3646        }
3647
3648        // On Unix, always put the child in its own process group so cancellation
3649        // can `killpg` the whole tree (the child plus any grandchildren).
3650        // Restoring default tty-related signal handlers stays gated on
3651        // job-control mode — those only matter when the child has a controlling
3652        // terminal.
3653        #[cfg(unix)]
3654        {
3655            let restore_jc_signals = self.terminal_state.is_some() && inherit_output;
3656            // SAFETY: setpgid and sigaction(SIG_DFL) are async-signal-safe per POSIX
3657            #[allow(unsafe_code)]
3658            unsafe {
3659                cmd.pre_exec(move || {
3660                    // Own process group — for kill scope.
3661                    nix::unistd::setpgid(nix::unistd::Pid::from_raw(0), nix::unistd::Pid::from_raw(0))
3662                        .map_err(|e| std::io::Error::from_raw_os_error(e as i32))?;
3663                    if restore_jc_signals {
3664                        use nix::libc::{sigaction, SIGTSTP, SIGTTOU, SIGTTIN, SIGINT, SIG_DFL};
3665                        let mut sa: nix::libc::sigaction = std::mem::zeroed();
3666                        sa.sa_sigaction = SIG_DFL;
3667                        if sigaction(SIGTSTP, &sa, std::ptr::null_mut()) != 0 {
3668                            return Err(std::io::Error::last_os_error());
3669                        }
3670                        if sigaction(SIGTTOU, &sa, std::ptr::null_mut()) != 0 {
3671                            return Err(std::io::Error::last_os_error());
3672                        }
3673                        if sigaction(SIGTTIN, &sa, std::ptr::null_mut()) != 0 {
3674                            return Err(std::io::Error::last_os_error());
3675                        }
3676                        if sigaction(SIGINT, &sa, std::ptr::null_mut()) != 0 {
3677                            return Err(std::io::Error::last_os_error());
3678                        }
3679                    }
3680                    Ok(())
3681                });
3682            }
3683        }
3684
3685        // Backstop for kill on drop in case our explicit kill path is bypassed
3686        // (panic, early return, etc) on the **capture** wait path. We do NOT
3687        // set this on the JC inherit path: that uses sync `waitpid` outside
3688        // tokio's view of the child, so on drop tokio would try to kill an
3689        // already-reaped (possibly-reused) PID. The JC path has its own
3690        // cancel handling via the side-task watcher.
3691        let in_jc_inherit_path = inherit_output && self.terminal_state.is_some();
3692        if !in_jc_inherit_path {
3693            cmd.kill_on_drop(true);
3694        }
3695
3696        // Spawn the process. Capture a `KillTarget` immediately so cancel/
3697        // timeout paths can deliver signals via pidfd (Linux ≥ 5.3) — bound
3698        // to this process's generation, immune to PID reuse if the OS reaps
3699        // the child before our kill syscalls fire.
3700        let mut child = match cmd.spawn() {
3701            Ok(child) => child,
3702            Err(e) => {
3703                return Ok(Some(ExecResult::failure(
3704                    127,
3705                    format!("{}: {}", name, e),
3706                )));
3707            }
3708        };
3709        let kill_target = crate::pidfd::KillTarget::from_child(&child);
3710
3711        // Write stdin if present
3712        if let Some(data) = stdin_data
3713            && let Some(mut stdin) = child.stdin.take()
3714        {
3715            use tokio::io::AsyncWriteExt;
3716            if let Err(e) = stdin.write_all(data.as_bytes()).await {
3717                return Ok(Some(ExecResult::failure(
3718                    1,
3719                    format!("{}: failed to write stdin: {}", name, e),
3720                )));
3721            }
3722            // Drop stdin to signal EOF
3723        }
3724
3725        if inherit_output {
3726            // Job control path: use waitpid with WUNTRACED for Ctrl-Z support
3727            #[cfg(unix)]
3728            if let Some(ref term) = self.terminal_state {
3729                let child_id = child.id().unwrap_or(0);
3730                let pid = nix::unistd::Pid::from_raw(child_id as i32);
3731                let pgid = pid; // child is its own pgid leader
3732
3733                // Give the terminal to the child's process group
3734                if let Err(e) = term.give_terminal_to(pgid) {
3735                    tracing::warn!("failed to give terminal to child: {}", e);
3736                }
3737
3738                let term_clone = term.clone();
3739                let cmd_name = name.to_string();
3740                let cmd_display = format!("{} {}", name, argv.join(" "));
3741                let jobs = self.jobs.clone();
3742
3743                // Side task that watches for cancellation while the blocking
3744                // waitpid runs. On cancel, it SIGTERMs the process group, waits
3745                // the grace period, then SIGKILLs. The blocking waitpid returns
3746                // when the child dies. AbortOnDrop guard cancels the watcher
3747                // on the success path so it doesn't keep running after wait
3748                // returns naturally.
3749                //
3750                // `wait_complete` shrinks the PID-reuse race: the watcher
3751                // checks it before each kill syscall and bails out if
3752                // wait_for_foreground has already reaped the child. This
3753                // doesn't fully eliminate the race (atomic load + kill is
3754                // not atomic with the OS reap+reuse), but narrows the window
3755                // to nanoseconds — enough to be ignorable in practice.
3756                let wait_complete = std::sync::Arc::new(
3757                    std::sync::atomic::AtomicBool::new(false)
3758                );
3759                let cancel_watcher = {
3760                    let cancel = cancel.clone();
3761                    let wc = wait_complete.clone();
3762                    // Ownership transfer: the JC path's sync wait inside
3763                    // block_in_place owns the child's reaping, so the
3764                    // cancel_watcher drives the kill side via KillTarget
3765                    // (pidfd-bound on Linux). When kill_target is None
3766                    // (older kernel + open failure, or non-Linux), falls
3767                    // through to the older PID-based path the closure
3768                    // captures from `pid`.
3769                    let target = kill_target.as_ref().map(|t| {
3770                        // Re-borrow the components we need into Owned-ish form
3771                        // so the spawned task is 'static. We can't move
3772                        // KillTarget directly because try_execute_external
3773                        // still uses it after the spawn — but on the JC path
3774                        // there is no further use after the watcher spawn,
3775                        // so a clone-of-pid + owned None pidfd is safe.
3776                        // Simpler: signal via the existing target by cloning
3777                        // a fresh pidfd; the original keeps its handle.
3778                        // Pidfd is just an OwnedFd — not Clone — so do it
3779                        // by re-opening from the pid. Fall back if reopen
3780                        // fails (race already reaped → best-effort kill).
3781                        crate::pidfd::KillTarget::from_pid(t.pid())
3782                    });
3783                    tokio::spawn(async move {
3784                        cancel.cancelled().await;
3785                        if wc.load(std::sync::atomic::Ordering::SeqCst) { return; }
3786                        use nix::sys::signal::Signal;
3787                        if let Some(t) = &target {
3788                            t.signal(Signal::SIGTERM);
3789                            t.signal_pg(Signal::SIGTERM);
3790                        } else {
3791                            let _ = nix::sys::signal::kill(pid, Signal::SIGTERM);
3792                            let _ = nix::sys::signal::killpg(pid, Signal::SIGTERM);
3793                        }
3794                        if kill_grace > Duration::ZERO {
3795                            tokio::time::sleep(kill_grace).await;
3796                            if wc.load(std::sync::atomic::Ordering::SeqCst) { return; }
3797                        }
3798                        if let Some(t) = &target {
3799                            t.signal(Signal::SIGKILL);
3800                            t.signal_pg(Signal::SIGKILL);
3801                        } else {
3802                            let _ = nix::sys::signal::kill(pid, Signal::SIGKILL);
3803                            let _ = nix::sys::signal::killpg(pid, Signal::SIGKILL);
3804                        }
3805                    })
3806                };
3807                struct AbortOnDrop(tokio::task::JoinHandle<()>);
3808                impl Drop for AbortOnDrop {
3809                    fn drop(&mut self) {
3810                        self.0.abort();
3811                    }
3812                }
3813                let _watcher_guard = AbortOnDrop(cancel_watcher);
3814
3815                let wait_complete_setter = wait_complete.clone();
3816                let code = tokio::task::block_in_place(move || {
3817                    let result = term_clone.wait_for_foreground(pid);
3818                    // Mark wait done before the watcher might fire.
3819                    wait_complete_setter.store(true, std::sync::atomic::Ordering::SeqCst);
3820
3821                    // Always reclaim the terminal
3822                    if let Err(e) = term_clone.reclaim_terminal() {
3823                        tracing::warn!("failed to reclaim terminal: {}", e);
3824                    }
3825
3826                    match result {
3827                        crate::terminal::WaitResult::Exited(code) => code as i64,
3828                        crate::terminal::WaitResult::Signaled(sig) => 128 + sig as i64,
3829                        crate::terminal::WaitResult::Stopped(_sig) => {
3830                            // Register as a stopped job
3831                            let rt = tokio::runtime::Handle::current();
3832                            let job_id = rt.block_on(jobs.register_stopped(
3833                                cmd_display,
3834                                child_id,
3835                                child_id, // pgid = pid for group leader
3836                            ));
3837                            eprintln!("\n[{}]+ Stopped\t{}", job_id, cmd_name);
3838                            148 // 128 + SIGTSTP(20) on most systems, but we use a fixed value
3839                        }
3840                    }
3841                });
3842
3843                return Ok(Some(ExecResult::from_output(code, String::new(), String::new())));
3844            }
3845
3846            // Non-job-control path with inherited stdio.
3847            let status = match wait_or_kill(&mut child, kill_target.as_ref(), &cancel, kill_grace).await {
3848                Ok(s) => s,
3849                Err(e) => {
3850                    return Ok(Some(ExecResult::failure(
3851                        1,
3852                        format!("{}: failed to wait: {}", name, e),
3853                    )));
3854                }
3855            };
3856
3857            let code = status.code().unwrap_or_else(|| {
3858                #[cfg(unix)]
3859                {
3860                    use std::os::unix::process::ExitStatusExt;
3861                    128 + status.signal().unwrap_or(0)
3862                }
3863                #[cfg(not(unix))]
3864                {
3865                    -1
3866                }
3867            }) as i64;
3868
3869            // stdout/stderr already went to the terminal
3870            Ok(Some(ExecResult::from_output(code, String::new(), String::new())))
3871        } else {
3872            // Capture output via bounded streams
3873            let stdout_stream = Arc::new(BoundedStream::new(DEFAULT_STREAM_MAX_SIZE));
3874            let stderr_stream = Arc::new(BoundedStream::new(DEFAULT_STREAM_MAX_SIZE));
3875
3876            let stdout_pipe = child.stdout.take();
3877            let stderr_pipe = child.stderr.take();
3878
3879            let stdout_clone = stdout_stream.clone();
3880            let stderr_clone = stderr_stream.clone();
3881
3882            let stdout_task = stdout_pipe.map(|pipe| {
3883                tokio::spawn(async move {
3884                    drain_to_stream(pipe, stdout_clone).await;
3885                })
3886            });
3887
3888            let stderr_task = stderr_pipe.map(|pipe| {
3889                tokio::spawn(async move {
3890                    drain_to_stream(pipe, stderr_clone).await;
3891                })
3892            });
3893
3894            let cancelled_before_wait = cancel.is_cancelled();
3895            let status = match wait_or_kill(&mut child, kill_target.as_ref(), &cancel, kill_grace).await {
3896                Ok(s) => s,
3897                Err(e) => {
3898                    if let Some(task) = stdout_task { task.abort(); let _ = task.await; }
3899                    if let Some(task) = stderr_task { task.abort(); let _ = task.await; }
3900                    return Ok(Some(ExecResult::failure(
3901                        1,
3902                        format!("{}: failed to wait: {}", name, e),
3903                    )));
3904                }
3905            };
3906
3907            // On cancel, abort the drain tasks (the child's pipes are gone;
3908            // late output is lost but predictable death beats partial capture).
3909            // On normal exit, await drains so we don't lose buffered output.
3910            if cancelled_before_wait || cancel.is_cancelled() {
3911                if let Some(task) = stdout_task { task.abort(); let _ = task.await; }
3912                if let Some(task) = stderr_task { task.abort(); let _ = task.await; }
3913            } else {
3914                if let Some(task) = stdout_task {
3915                    // Ignore join error — the drain task logs its own errors
3916                    let _ = task.await;
3917                }
3918                if let Some(task) = stderr_task {
3919                    let _ = task.await;
3920                }
3921            }
3922
3923            let code = status.code().unwrap_or_else(|| {
3924                #[cfg(unix)]
3925                {
3926                    use std::os::unix::process::ExitStatusExt;
3927                    128 + status.signal().unwrap_or(0)
3928                }
3929                #[cfg(not(unix))]
3930                {
3931                    -1
3932                }
3933            }) as i64;
3934
3935            let stdout = stdout_stream.read_string().await;
3936            let stderr = stderr_stream.read_string().await;
3937
3938            Ok(Some(ExecResult::from_output(code, stdout, stderr)))
3939        }
3940    }
3941
3942    // --- Variable Access ---
3943
3944    /// Get a variable value.
3945    pub async fn get_var(&self, name: &str) -> Option<Value> {
3946        let scope = self.scope.read().await;
3947        scope.get(name).cloned()
3948    }
3949
3950    /// Check if error-exit mode is enabled (for testing).
3951    #[cfg(test)]
3952    pub async fn error_exit_enabled(&self) -> bool {
3953        let scope = self.scope.read().await;
3954        scope.error_exit_enabled()
3955    }
3956
3957    /// Set a variable value.
3958    pub async fn set_var(&self, name: &str, value: Value) {
3959        let mut scope = self.scope.write().await;
3960        scope.set(name.to_string(), value);
3961    }
3962
3963    /// Set positional parameters ($0 script name and $1-$9 args).
3964    pub async fn set_positional(&self, script_name: impl Into<String>, args: Vec<String>) {
3965        let mut scope = self.scope.write().await;
3966        scope.set_positional(script_name, args);
3967    }
3968
3969    /// List all variables.
3970    pub async fn list_vars(&self) -> Vec<(String, Value)> {
3971        let scope = self.scope.read().await;
3972        scope.all()
3973    }
3974
3975    /// List exported variables (name, value), sorted by name. These are the
3976    /// vars a child process would see (see `dispatch`'s hermetic env build).
3977    pub async fn exported_vars(&self) -> Vec<(String, Value)> {
3978        let scope = self.scope.read().await;
3979        scope.exported_vars()
3980    }
3981
3982    // --- CWD ---
3983
3984    /// Get current working directory.
3985    pub async fn cwd(&self) -> PathBuf {
3986        self.exec_ctx.read().await.cwd.clone()
3987    }
3988
3989    /// Set current working directory.
3990    pub async fn set_cwd(&self, path: PathBuf) {
3991        let mut ctx = self.exec_ctx.write().await;
3992        ctx.set_cwd(path);
3993    }
3994
3995    /// Set the working directory only if `path` resolves to a directory in the
3996    /// kernel's backend — the same namespace `cd` validates against. Unlike a
3997    /// raw host-FS `is_dir()` check, this correctly accepts virtual mounts
3998    /// (`/v/docs`, in-memory scratch, …) and rejects real paths that have since
3999    /// disappeared. Returns whether the cwd was changed.
4000    pub async fn try_set_cwd(&self, path: PathBuf) -> bool {
4001        // Clone the backend Arc out before the stat so we never hold the
4002        // exec_ctx lock across the await.
4003        let backend = self.exec_ctx.read().await.backend.clone();
4004        let is_dir = matches!(backend.stat(&path).await, Ok(entry) if entry.is_dir());
4005        if is_dir {
4006            self.exec_ctx.write().await.set_cwd(path);
4007        }
4008        is_dir
4009    }
4010
4011    // --- Last Result ---
4012
4013    /// Get the last result ($?).
4014    pub async fn last_result(&self) -> ExecResult {
4015        let scope = self.scope.read().await;
4016        scope.last_result().clone()
4017    }
4018
4019    // --- Tools ---
4020
4021    /// Check if a user-defined function exists.
4022    pub async fn has_function(&self, name: &str) -> bool {
4023        self.user_tools.read().await.contains_key(name)
4024    }
4025
4026    /// Get available tool schemas.
4027    pub fn tool_schemas(&self) -> Vec<crate::tools::ToolSchema> {
4028        self.tools.schemas()
4029    }
4030
4031    // --- Jobs ---
4032
4033    /// Get job manager.
4034    pub fn jobs(&self) -> Arc<JobManager> {
4035        self.jobs.clone()
4036    }
4037
4038    // --- VFS ---
4039
4040    /// Get VFS router.
4041    pub fn vfs(&self) -> Arc<VfsRouter> {
4042        self.vfs.clone()
4043    }
4044
4045    // --- State ---
4046
4047    /// Reset kernel to initial state.
4048    ///
4049    /// Clears in-memory variables and resets cwd to root.
4050    /// History is not cleared (it persists across resets).
4051    pub async fn reset(&self) -> Result<()> {
4052        {
4053            let mut scope = self.scope.write().await;
4054            *scope = Scope::new();
4055        }
4056        {
4057            let mut ctx = self.exec_ctx.write().await;
4058            ctx.cwd = PathBuf::from("/");
4059        }
4060        Ok(())
4061    }
4062
4063    /// Shutdown the kernel.
4064    pub async fn shutdown(self) -> Result<()> {
4065        // Wait for all background jobs
4066        self.jobs.wait_all().await;
4067        Ok(())
4068    }
4069
4070    /// Dispatch a single command using the full resolution chain.
4071    ///
4072    /// This is the core of `CommandDispatcher` — it syncs state between the
4073    /// passed-in `ExecContext` and kernel-internal state (scope, exec_ctx),
4074    /// then delegates to `execute_command` for the actual dispatch.
4075    ///
4076    /// State flow:
4077    /// 1. ctx → self: sync scope, cwd, stdin so internal methods see current state
4078    /// 2. execute_command: full dispatch chain (user tools, builtins, scripts, external, backend)
4079    /// 3. self → ctx: sync scope, cwd changes back so the pipeline runner sees them
4080    async fn dispatch_command(&self, cmd: &Command, ctx: &mut ExecContext) -> Result<ExecResult> {
4081        // Ensure nested dispatch (e.g. the `timeout` builtin re-dispatching
4082        // its inner command via ctx.dispatcher) routes through THIS kernel,
4083        // not a stale parent. Critical for forks: the fork's builtins must
4084        // use the fork's dispatcher, not the parent's.
4085        if let Some(d) = self.dispatcher() {
4086            ctx.dispatcher = Some(d);
4087        }
4088
4089        // 1. Sync ctx → self internals
4090        {
4091            let mut scope = self.scope.write().await;
4092            *scope = ctx.scope.clone();
4093        }
4094        {
4095            let mut ec = self.exec_ctx.write().await;
4096            ec.cwd = ctx.cwd.clone();
4097            ec.prev_cwd = ctx.prev_cwd.clone();
4098            ec.stdin = ctx.stdin.take();
4099            ec.stdin_data = ctx.stdin_data.take();
4100            // Streaming pipe endpoints and kernel stderr must flow to the
4101            // tool via self.exec_ctx — execute_command reads that, not the
4102            // passed-in ctx. Without moving these, concurrent pipeline
4103            // stages dispatched via a fork get pipe_stdin = None and
4104            // silently read nothing.
4105            ec.pipe_stdin = ctx.pipe_stdin.take();
4106            ec.pipe_stdout = ctx.pipe_stdout.take();
4107            if let Some(stderr) = ctx.stderr.clone() {
4108                ec.stderr = Some(stderr);
4109            }
4110            ec.aliases = ctx.aliases.clone();
4111            ec.ignore_config = ctx.ignore_config.clone();
4112            ec.output_limit = ctx.output_limit.clone();
4113            ec.pipeline_position = ctx.pipeline_position;
4114            // Sync the cancel token from ctx → ec. Builtins like `timeout`
4115            // swap ctx.cancel to a derived child token before re-dispatching;
4116            // execute_command's snapshot reads ec.cancel (kept aligned by
4117            // this sync), so try_execute_external sees the right token.
4118            ec.cancel = ctx.cancel.clone();
4119        }
4120
4121        // 2. Execute via the full dispatch chain
4122        let result = self.execute_command(&cmd.name, &cmd.args).await?;
4123
4124        // 3. Sync self → ctx
4125        {
4126            let scope = self.scope.read().await;
4127            ctx.scope = scope.clone();
4128        }
4129        {
4130            let mut ec = self.exec_ctx.write().await;
4131            ctx.cwd = ec.cwd.clone();
4132            ctx.prev_cwd = ec.prev_cwd.clone();
4133            ctx.aliases = ec.aliases.clone();
4134            ctx.ignore_config = ec.ignore_config.clone();
4135            ctx.output_limit = ec.output_limit.clone();
4136            // Return any pipe endpoints that the tool didn't consume.
4137            // `take()` here keeps the fork's exec_ctx in a clean state for
4138            // the next dispatch — these are per-command and shouldn't leak
4139            // between calls.
4140            ctx.pipe_stdin = ec.pipe_stdin.take();
4141            ctx.pipe_stdout = ec.pipe_stdout.take();
4142        }
4143
4144        Ok(result)
4145    }
4146}
4147
4148#[async_trait]
4149impl CommandDispatcher for Kernel {
4150    /// Dispatch a command through the Kernel's full resolution chain.
4151    ///
4152    /// This is the single path for all command execution when called from
4153    /// the pipeline runner. It provides the full dispatch chain:
4154    /// user tools → builtins → .kai scripts → external commands → backend tools.
4155    async fn dispatch(&self, cmd: &Command, ctx: &mut ExecContext) -> Result<ExecResult> {
4156        self.dispatch_command(cmd, ctx).await
4157    }
4158
4159    /// Evaluate an expression through the kernel's async chain, including
4160    /// command substitution. Delegates to `eval_expr_async`, which snapshots
4161    /// the kernel's scope/cwd and restores them after any `$(...)` runs, so
4162    /// only command output escapes. The `ctx` is unused here because the
4163    /// kernel evaluates against its own session state (a fork carries the
4164    /// pipeline stage's snapshot); var refs resolve against that scope.
4165    async fn eval_expr(&self, expr: &Expr, _ctx: &ExecContext) -> Result<Value> {
4166        self.eval_expr_async(expr).await
4167    }
4168
4169    /// Produce a forked dispatcher with independent mutable state (detached).
4170    ///
4171    /// Calls the inherent `Kernel::fork` method (note the UFCS to avoid
4172    /// recursing into the trait method we're defining) and coerces the
4173    /// returned `Arc<Kernel>` to `Arc<dyn CommandDispatcher>`.
4174    async fn fork(&self) -> Arc<dyn CommandDispatcher> {
4175        let fork: Arc<Kernel> = Kernel::fork(self).await;
4176        fork
4177    }
4178
4179    /// Produce a forked dispatcher with cancellation cascading from this kernel.
4180    async fn fork_attached(&self) -> Arc<dyn CommandDispatcher> {
4181        let fork: Arc<Kernel> = Kernel::fork_attached(self).await;
4182        fork
4183    }
4184}
4185
4186/// Apply the requested output format to a builtin's result, unless the tool
4187/// owns its own output.
4188///
4189/// `format` is `ctx.output_format` (set from `--json`). When `owns_output` is
4190/// true the tool already rendered its bytes (bespoke JSON envelope), so the
4191/// kernel leaves the result untouched rather than re-formatting its
4192/// `OutputData`. Otherwise the kernel renders the typed `OutputData` uniformly.
4193fn finalize_output(
4194    result: ExecResult,
4195    format: Option<crate::interpreter::OutputFormat>,
4196    owns_output: bool,
4197) -> ExecResult {
4198    match format {
4199        Some(_) if owns_output => result,
4200        Some(format) => apply_output_format(result, format),
4201        None => result,
4202    }
4203}
4204
4205/// Accumulate output from one result into another.
4206///
4207/// This appends stdout and stderr (with newlines as separators) and updates
4208/// the exit code to match the new result. Used to preserve output from
4209/// multiple statements, loop iterations, and command chains.
4210fn accumulate_result(accumulated: &mut ExecResult, new: &ExecResult) {
4211    // Materialize lazy OutputData into .out before accumulating.
4212    // Without this, the first command's output stays in .output while
4213    // the second's text gets appended to .out, losing the first.
4214    accumulated.materialize();
4215    let new_text = new.text_out();
4216    if !accumulated.text_out().is_empty() && !new_text.is_empty() && !accumulated.text_out().ends_with('\n') {
4217        accumulated.push_out("\n");
4218    }
4219    accumulated.push_out(&new_text);
4220    if !accumulated.err.is_empty() && !new.err.is_empty() && !accumulated.err.ends_with('\n') {
4221        accumulated.err.push('\n');
4222    }
4223    accumulated.err.push_str(&new.err);
4224    accumulated.code = new.code;
4225    accumulated.data = new.data.clone();
4226    accumulated.did_spill = new.did_spill;
4227    accumulated.original_code = new.original_code;
4228    accumulated.content_type = new.content_type.clone();
4229    accumulated.baggage.clone_from(&new.baggage);
4230}
4231
4232/// Check if a value is truthy.
4233fn is_truthy(value: &Value) -> bool {
4234    match value {
4235        Value::Null => false,
4236        Value::Bool(b) => *b,
4237        Value::Int(i) => *i != 0,
4238        Value::Float(f) => *f != 0.0,
4239        Value::String(s) => !s.is_empty(),
4240        Value::Json(json) => match json {
4241            serde_json::Value::Null => false,
4242            serde_json::Value::Array(arr) => !arr.is_empty(),
4243            serde_json::Value::Object(obj) => !obj.is_empty(),
4244            serde_json::Value::Bool(b) => *b,
4245            serde_json::Value::Number(n) => n.as_f64().map(|f| f != 0.0).unwrap_or(false),
4246            serde_json::Value::String(s) => !s.is_empty(),
4247        },
4248        Value::Blob(_) => true, // Blob references are always truthy
4249    }
4250}
4251
4252/// Apply tilde expansion to a value.
4253///
4254/// Only string values starting with `~` are expanded. `home` is the session
4255/// `HOME` from the kernel scope (the kernel is hermetic and never reads the
4256/// host env); `None` leaves `~`/`~/path` unexpanded. See [`expand_tilde`].
4257fn apply_tilde_expansion(value: Value, home: Option<&str>) -> Value {
4258    match value {
4259        Value::String(s) if s.starts_with('~') => Value::String(expand_tilde(&s, home)),
4260        _ => value,
4261    }
4262}
4263
4264/// Wait for a child to exit, killing it if `cancel` fires first.
4265///
4266/// `target` carries a Linux pidfd (when available) for race-free direct-child
4267/// kill; fall-through to PID-based kill otherwise. On non-unix targets the
4268/// parameter is ignored and we use tokio's cross-platform `start_kill`.
4269#[cfg(all(unix, feature = "subprocess"))]
4270pub(crate) async fn wait_or_kill(
4271    child: &mut tokio::process::Child,
4272    target: Option<&crate::pidfd::KillTarget>,
4273    cancel: &tokio_util::sync::CancellationToken,
4274    grace: Duration,
4275) -> std::io::Result<std::process::ExitStatus> {
4276    tokio::select! {
4277        biased;
4278        status = child.wait() => status,
4279        _ = cancel.cancelled() => kill_with_grace(child, target, grace).await,
4280    }
4281}
4282
4283#[cfg(all(not(unix), feature = "subprocess"))]
4284pub(crate) async fn wait_or_kill(
4285    child: &mut tokio::process::Child,
4286    _target: Option<&()>,
4287    cancel: &tokio_util::sync::CancellationToken,
4288    _grace: Duration,
4289) -> std::io::Result<std::process::ExitStatus> {
4290    tokio::select! {
4291        biased;
4292        status = child.wait() => status,
4293        _ = cancel.cancelled() => {
4294            let _ = child.start_kill();
4295            child.wait().await
4296        }
4297    }
4298}
4299
4300/// Send SIGTERM to the child and its process group; wait `grace`; then SIGKILL.
4301///
4302/// Direct-child kill goes through `target.signal()`, which on Linux uses a
4303/// pidfd (immune to PID reuse). Process-group kill uses `killpg` — there is
4304/// no PGID-equivalent of pidfd, so grandchildren retain a small reuse window.
4305#[cfg(all(unix, feature = "subprocess"))]
4306pub(crate) async fn kill_with_grace(
4307    child: &mut tokio::process::Child,
4308    target: Option<&crate::pidfd::KillTarget>,
4309    grace: Duration,
4310) -> std::io::Result<std::process::ExitStatus> {
4311    use nix::sys::signal::Signal;
4312
4313    if let Some(t) = target {
4314        t.signal(Signal::SIGTERM);
4315        t.signal_pg(Signal::SIGTERM);
4316        if grace > Duration::ZERO
4317            && let Ok(status) = tokio::time::timeout(grace, child.wait()).await
4318        {
4319            return status;
4320        }
4321        t.signal(Signal::SIGKILL);
4322        t.signal_pg(Signal::SIGKILL);
4323    }
4324    child.wait().await
4325}
4326
4327#[cfg(all(test, feature = "subprocess"))]
4328mod tests {
4329    use super::*;
4330
4331    #[tokio::test]
4332    async fn test_kernel_transient() {
4333        let kernel = Kernel::transient().expect("failed to create kernel");
4334        assert_eq!(kernel.name(), "transient");
4335    }
4336
4337    #[tokio::test]
4338    async fn test_kernel_execute_echo() {
4339        let kernel = Kernel::transient().expect("failed to create kernel");
4340        let result = kernel.execute("echo hello").await.expect("execution failed");
4341        assert!(result.ok());
4342        assert_eq!(result.text_out().trim(), "hello");
4343    }
4344
4345    #[tokio::test]
4346    async fn test_multiple_statements_accumulate_output() {
4347        let kernel = Kernel::transient().expect("failed to create kernel");
4348        let result = kernel
4349            .execute("echo one\necho two\necho three")
4350            .await
4351            .expect("execution failed");
4352        assert!(result.ok());
4353        // Should have all three outputs separated by newlines
4354        assert!(result.text_out().contains("one"), "missing 'one': {}", result.text_out());
4355        assert!(result.text_out().contains("two"), "missing 'two': {}", result.text_out());
4356        assert!(result.text_out().contains("three"), "missing 'three': {}", result.text_out());
4357    }
4358
4359    #[tokio::test]
4360    async fn test_and_chain_accumulates_output() {
4361        let kernel = Kernel::transient().expect("failed to create kernel");
4362        let result = kernel
4363            .execute("echo first && echo second")
4364            .await
4365            .expect("execution failed");
4366        assert!(result.ok());
4367        assert!(result.text_out().contains("first"), "missing 'first': {}", result.text_out());
4368        assert!(result.text_out().contains("second"), "missing 'second': {}", result.text_out());
4369    }
4370
4371    #[tokio::test]
4372    async fn test_for_loop_accumulates_output() {
4373        let kernel = Kernel::transient().expect("failed to create kernel");
4374        let result = kernel
4375            .execute(r#"for X in a b c; do echo "item: ${X}"; done"#)
4376            .await
4377            .expect("execution failed");
4378        assert!(result.ok());
4379        assert!(result.text_out().contains("item: a"), "missing 'item: a': {}", result.text_out());
4380        assert!(result.text_out().contains("item: b"), "missing 'item: b': {}", result.text_out());
4381        assert!(result.text_out().contains("item: c"), "missing 'item: c': {}", result.text_out());
4382    }
4383
4384    #[tokio::test]
4385    async fn test_while_loop_accumulates_output() {
4386        let kernel = Kernel::transient().expect("failed to create kernel");
4387        let result = kernel
4388            .execute(r#"
4389                N=3
4390                while [[ ${N} -gt 0 ]]; do
4391                    echo "N=${N}"
4392                    N=$((N - 1))
4393                done
4394            "#)
4395            .await
4396            .expect("execution failed");
4397        assert!(result.ok());
4398        assert!(result.text_out().contains("N=3"), "missing 'N=3': {}", result.text_out());
4399        assert!(result.text_out().contains("N=2"), "missing 'N=2': {}", result.text_out());
4400        assert!(result.text_out().contains("N=1"), "missing 'N=1': {}", result.text_out());
4401    }
4402
4403    #[tokio::test]
4404    async fn test_kernel_set_var() {
4405        let kernel = Kernel::transient().expect("failed to create kernel");
4406
4407        kernel.execute("X=42").await.expect("set failed");
4408
4409        let value = kernel.get_var("X").await;
4410        assert_eq!(value, Some(Value::Int(42)));
4411    }
4412
4413    #[tokio::test]
4414    async fn test_kernel_var_expansion() {
4415        let kernel = Kernel::transient().expect("failed to create kernel");
4416
4417        kernel.execute("NAME=\"world\"").await.expect("set failed");
4418        let result = kernel.execute("echo \"hello ${NAME}\"").await.expect("echo failed");
4419
4420        assert!(result.ok());
4421        assert_eq!(result.text_out().trim(), "hello world");
4422    }
4423
4424    #[tokio::test]
4425    async fn test_kernel_last_result() {
4426        let kernel = Kernel::transient().expect("failed to create kernel");
4427
4428        kernel.execute("echo test").await.expect("echo failed");
4429
4430        let last = kernel.last_result().await;
4431        assert!(last.ok());
4432        assert_eq!(last.text_out().trim(), "test");
4433    }
4434
4435    #[tokio::test]
4436    async fn test_kernel_tool_not_found() {
4437        let kernel = Kernel::transient().expect("failed to create kernel");
4438
4439        let result = kernel.execute("nonexistent_tool").await.expect("execution failed");
4440        assert!(!result.ok());
4441        assert_eq!(result.code, 127);
4442        assert!(result.err.contains("command not found"));
4443    }
4444
4445    #[tokio::test]
4446    async fn test_external_command_true() {
4447        // Use REPL config for passthrough filesystem access
4448        let kernel = Kernel::new(KernelConfig::repl()).expect("failed to create kernel");
4449
4450        // /bin/true should be available on any Unix system
4451        let result = kernel.execute("true").await.expect("execution failed");
4452        // This should use the builtin true, which returns 0
4453        assert!(result.ok(), "true should succeed: {:?}", result);
4454    }
4455
4456    #[tokio::test]
4457    async fn test_external_command_basic() {
4458        // Use REPL config for passthrough filesystem access
4459        let kernel = Kernel::new(KernelConfig::repl()).expect("failed to create kernel");
4460
4461        // Test with /bin/echo which is external
4462        // Note: kaish has a builtin echo, so this will use the builtin
4463        // Let's test with a command that's not a builtin
4464        // Actually, let's just test that PATH resolution works by checking the PATH var
4465        let path_var = std::env::var("PATH").unwrap_or_default();
4466        eprintln!("System PATH: {}", path_var);
4467
4468        // Set PATH in kernel to ensure it's available
4469        kernel.execute(&format!(r#"PATH="{}""#, path_var)).await.expect("set PATH failed");
4470
4471        // Now try an external command like /usr/bin/env
4472        // But env is also a builtin... let's try uname
4473        let result = kernel.execute("uname").await.expect("execution failed");
4474        eprintln!("uname result: {:?}", result);
4475        // uname should succeed if external commands work
4476        assert!(result.ok() || result.code == 127, "uname: {:?}", result);
4477    }
4478
4479    #[tokio::test]
4480    async fn test_kernel_reset() {
4481        let kernel = Kernel::transient().expect("failed to create kernel");
4482
4483        kernel.execute("X=1").await.expect("set failed");
4484        assert!(kernel.get_var("X").await.is_some());
4485
4486        kernel.reset().await.expect("reset failed");
4487        assert!(kernel.get_var("X").await.is_none());
4488    }
4489
4490    #[tokio::test]
4491    async fn test_kernel_cwd() {
4492        let kernel = Kernel::transient().expect("failed to create kernel");
4493
4494        // Transient kernel uses sandboxed mode with cwd=$HOME
4495        let cwd = kernel.cwd().await;
4496        let home = std::env::var("HOME")
4497            .map(PathBuf::from)
4498            .unwrap_or_else(|_| PathBuf::from("/"));
4499        assert_eq!(cwd, home);
4500
4501        kernel.set_cwd(PathBuf::from("/tmp")).await;
4502        assert_eq!(kernel.cwd().await, PathBuf::from("/tmp"));
4503    }
4504
4505    #[tokio::test]
4506    async fn test_kernel_list_vars() {
4507        let kernel = Kernel::transient().expect("failed to create kernel");
4508
4509        kernel.execute("A=1").await.ok();
4510        kernel.execute("B=2").await.ok();
4511
4512        let vars = kernel.list_vars().await;
4513        assert!(vars.iter().any(|(n, v)| n == "A" && *v == Value::Int(1)));
4514        assert!(vars.iter().any(|(n, v)| n == "B" && *v == Value::Int(2)));
4515    }
4516
4517    #[tokio::test]
4518    async fn test_is_truthy() {
4519        assert!(!is_truthy(&Value::Null));
4520        assert!(!is_truthy(&Value::Bool(false)));
4521        assert!(is_truthy(&Value::Bool(true)));
4522        assert!(!is_truthy(&Value::Int(0)));
4523        assert!(is_truthy(&Value::Int(1)));
4524        assert!(!is_truthy(&Value::String("".into())));
4525        assert!(is_truthy(&Value::String("x".into())));
4526    }
4527
4528    #[tokio::test]
4529    async fn test_jq_in_pipeline() {
4530        let kernel = Kernel::transient().expect("failed to create kernel");
4531        // kaish uses double quotes only; escape inner quotes
4532        let result = kernel
4533            .execute(r#"echo "{\"name\": \"Alice\"}" | jq ".name" -r"#)
4534            .await
4535            .expect("execution failed");
4536        assert!(result.ok(), "jq pipeline failed: {}", result.err);
4537        assert_eq!(result.text_out().trim(), "Alice");
4538    }
4539
4540    #[tokio::test]
4541    async fn test_user_defined_tool() {
4542        let kernel = Kernel::transient().expect("failed to create kernel");
4543
4544        // Define a function
4545        kernel
4546            .execute(r#"greet() { echo "Hello, $1!" }"#)
4547            .await
4548            .expect("function definition failed");
4549
4550        // Call the function
4551        let result = kernel
4552            .execute(r#"greet "World""#)
4553            .await
4554            .expect("function call failed");
4555
4556        assert!(result.ok(), "greet failed: {}", result.err);
4557        assert_eq!(result.text_out().trim(), "Hello, World!");
4558    }
4559
4560    #[tokio::test]
4561    async fn test_user_tool_positional_args() {
4562        let kernel = Kernel::transient().expect("failed to create kernel");
4563
4564        // Define a function with positional param
4565        kernel
4566            .execute(r#"greet() { echo "Hi $1" }"#)
4567            .await
4568            .expect("function definition failed");
4569
4570        // Call with positional argument
4571        let result = kernel
4572            .execute(r#"greet "Amy""#)
4573            .await
4574            .expect("function call failed");
4575
4576        assert!(result.ok(), "greet failed: {}", result.err);
4577        assert_eq!(result.text_out().trim(), "Hi Amy");
4578    }
4579
4580    #[tokio::test]
4581    async fn test_function_shared_scope() {
4582        let kernel = Kernel::transient().expect("failed to create kernel");
4583
4584        // Set a variable in parent scope
4585        kernel
4586            .execute(r#"SECRET="hidden""#)
4587            .await
4588            .expect("set failed");
4589
4590        // Define a function that accesses and modifies parent variable
4591        kernel
4592            .execute(r#"access_parent() {
4593                echo "${SECRET}"
4594                SECRET="modified"
4595            }"#)
4596            .await
4597            .expect("function definition failed");
4598
4599        // Call the function - it SHOULD see SECRET (shared scope like sh)
4600        let result = kernel.execute("access_parent").await.expect("function call failed");
4601
4602        // Function should have access to parent scope
4603        assert!(
4604            result.text_out().contains("hidden"),
4605            "Function should access parent scope, got: {}",
4606            result.text_out()
4607        );
4608
4609        // Function should have modified the parent variable
4610        let secret = kernel.get_var("SECRET").await;
4611        assert_eq!(
4612            secret,
4613            Some(Value::String("modified".into())),
4614            "Function should modify parent scope"
4615        );
4616    }
4617
4618    #[tokio::test]
4619    #[ignore = "exec replaces the test binary via CommandExt::exec, hangs libtest; cannot be run under cargo test"]
4620    async fn test_exec_builtin() {
4621        let kernel = Kernel::transient().expect("failed to create kernel");
4622        // argv is now a space-separated string or JSON array string
4623        let result = kernel
4624            .execute(r#"exec command="/bin/echo" argv="hello world""#)
4625            .await
4626            .expect("exec failed");
4627
4628        assert!(result.ok(), "exec failed: {}", result.err);
4629        assert_eq!(result.text_out().trim(), "hello world");
4630    }
4631
4632    #[tokio::test]
4633    async fn test_while_false_never_runs() {
4634        let kernel = Kernel::transient().expect("failed to create kernel");
4635
4636        // A while loop with false condition should never run
4637        let result = kernel
4638            .execute(r#"
4639                while false; do
4640                    echo "should not run"
4641                done
4642            "#)
4643            .await
4644            .expect("while false failed");
4645
4646        assert!(result.ok());
4647        assert!(result.text_out().is_empty(), "while false should not execute body: {}", result.text_out());
4648    }
4649
4650    #[tokio::test]
4651    async fn test_while_string_comparison() {
4652        let kernel = Kernel::transient().expect("failed to create kernel");
4653
4654        // Set a flag
4655        kernel.execute(r#"FLAG="go""#).await.expect("set failed");
4656
4657        // Use string comparison as condition (shell-compatible [[ ]] syntax)
4658        // Note: Put echo last so we can check the output
4659        let result = kernel
4660            .execute(r#"
4661                while [[ ${FLAG} == "go" ]]; do
4662                    FLAG="stop"
4663                    echo "running"
4664                done
4665            "#)
4666            .await
4667            .expect("while with string cmp failed");
4668
4669        assert!(result.ok());
4670        assert!(result.text_out().contains("running"), "should have run once: {}", result.text_out());
4671
4672        // Verify flag was changed
4673        let flag = kernel.get_var("FLAG").await;
4674        assert_eq!(flag, Some(Value::String("stop".into())));
4675    }
4676
4677    #[tokio::test]
4678    async fn test_while_numeric_comparison() {
4679        let kernel = Kernel::transient().expect("failed to create kernel");
4680
4681        // Test > comparison (shell-compatible [[ ]] with -gt)
4682        kernel.execute("N=5").await.expect("set failed");
4683
4684        // Note: Put echo last so we can check the output
4685        let result = kernel
4686            .execute(r#"
4687                while [[ ${N} -gt 3 ]]; do
4688                    N=3
4689                    echo "N was greater"
4690                done
4691            "#)
4692            .await
4693            .expect("while with > failed");
4694
4695        assert!(result.ok());
4696        assert!(result.text_out().contains("N was greater"), "should have run once: {}", result.text_out());
4697    }
4698
4699    #[tokio::test]
4700    async fn test_break_in_while_loop() {
4701        let kernel = Kernel::transient().expect("failed to create kernel");
4702
4703        let result = kernel
4704            .execute(r#"
4705                I=0
4706                while true; do
4707                    I=1
4708                    echo "before break"
4709                    break
4710                    echo "after break"
4711                done
4712            "#)
4713            .await
4714            .expect("while with break failed");
4715
4716        assert!(result.ok());
4717        assert!(result.text_out().contains("before break"), "should see before break: {}", result.text_out());
4718        assert!(!result.text_out().contains("after break"), "should not see after break: {}", result.text_out());
4719
4720        // Verify we exited the loop
4721        let i = kernel.get_var("I").await;
4722        assert_eq!(i, Some(Value::Int(1)));
4723    }
4724
4725    #[tokio::test]
4726    async fn test_continue_in_while_loop() {
4727        let kernel = Kernel::transient().expect("failed to create kernel");
4728
4729        // Test continue in a while loop where variables persist
4730        // We use string state transition: "start" -> "middle" -> "end"
4731        // continue on "middle" should skip to next iteration
4732        // Shell-compatible: use [[ ]] for comparisons
4733        let result = kernel
4734            .execute(r#"
4735                STATE="start"
4736                AFTER_CONTINUE="no"
4737                while [[ ${STATE} != "done" ]]; do
4738                    if [[ ${STATE} == "start" ]]; then
4739                        STATE="middle"
4740                        continue
4741                        AFTER_CONTINUE="yes"
4742                    fi
4743                    if [[ ${STATE} == "middle" ]]; then
4744                        STATE="done"
4745                    fi
4746                done
4747            "#)
4748            .await
4749            .expect("while with continue failed");
4750
4751        assert!(result.ok());
4752
4753        // STATE should be "done" (we completed the loop)
4754        let state = kernel.get_var("STATE").await;
4755        assert_eq!(state, Some(Value::String("done".into())));
4756
4757        // AFTER_CONTINUE should still be "no" (continue skipped the assignment)
4758        let after = kernel.get_var("AFTER_CONTINUE").await;
4759        assert_eq!(after, Some(Value::String("no".into())));
4760    }
4761
4762    #[tokio::test]
4763    async fn test_break_with_level() {
4764        let kernel = Kernel::transient().expect("failed to create kernel");
4765
4766        // Nested loop with break 2 to exit both loops
4767        // We verify by checking OUTER value:
4768        // - If break 2 works, OUTER stays at 1 (set before for loop)
4769        // - If break 2 fails, OUTER becomes 2 (set after for loop)
4770        let result = kernel
4771            .execute(r#"
4772                OUTER=0
4773                while true; do
4774                    OUTER=1
4775                    for X in "1 2"; do
4776                        break 2
4777                    done
4778                    OUTER=2
4779                done
4780            "#)
4781            .await
4782            .expect("nested break failed");
4783
4784        assert!(result.ok());
4785
4786        // OUTER should be 1 (set before for loop), not 2 (would be set after for loop)
4787        let outer = kernel.get_var("OUTER").await;
4788        assert_eq!(outer, Some(Value::Int(1)), "break 2 should have skipped OUTER=2");
4789    }
4790
4791    #[tokio::test]
4792    async fn test_return_from_tool() {
4793        let kernel = Kernel::transient().expect("failed to create kernel");
4794
4795        // Define a function that returns early
4796        kernel
4797            .execute(r#"early_return() {
4798                if [[ $1 == 1 ]]; then
4799                    return 42
4800                fi
4801                echo "not returned"
4802            }"#)
4803            .await
4804            .expect("function definition failed");
4805
4806        // Call with arg=1 should return with exit code 42
4807        // (POSIX shell behavior: return N sets exit code, doesn't output N)
4808        let result = kernel
4809            .execute("early_return 1")
4810            .await
4811            .expect("function call failed");
4812
4813        // Exit code should be 42 (non-zero, so not ok())
4814        assert_eq!(result.code, 42);
4815        // Output should be empty (we returned before echo)
4816        assert!(result.text_out().is_empty());
4817    }
4818
4819    #[tokio::test]
4820    async fn test_return_without_value() {
4821        let kernel = Kernel::transient().expect("failed to create kernel");
4822
4823        // Define a function that returns without a value
4824        kernel
4825            .execute(r#"early_exit() {
4826                if [[ $1 == "stop" ]]; then
4827                    return
4828                fi
4829                echo "continued"
4830            }"#)
4831            .await
4832            .expect("function definition failed");
4833
4834        // Call with arg="stop" should return early
4835        let result = kernel
4836            .execute(r#"early_exit "stop""#)
4837            .await
4838            .expect("function call failed");
4839
4840        assert!(result.ok());
4841        assert!(result.text_out().is_empty() || result.text_out().trim().is_empty());
4842    }
4843
4844    #[tokio::test]
4845    async fn test_exit_stops_execution() {
4846        let kernel = Kernel::transient().expect("failed to create kernel");
4847
4848        // exit should stop further execution
4849        kernel
4850            .execute(r#"
4851                BEFORE="yes"
4852                exit 0
4853                AFTER="yes"
4854            "#)
4855            .await
4856            .expect("execution failed");
4857
4858        // BEFORE should be set, AFTER should not
4859        let before = kernel.get_var("BEFORE").await;
4860        assert_eq!(before, Some(Value::String("yes".into())));
4861
4862        let after = kernel.get_var("AFTER").await;
4863        assert!(after.is_none(), "AFTER should not be set after exit");
4864    }
4865
4866    #[tokio::test]
4867    async fn test_exit_with_code() {
4868        let kernel = Kernel::transient().expect("failed to create kernel");
4869
4870        // exit with code should propagate the exit code
4871        let result = kernel
4872            .execute("exit 42")
4873            .await
4874            .expect("exit failed");
4875
4876        assert_eq!(result.code, 42);
4877        assert!(result.text_out().is_empty(), "exit should not produce stdout");
4878    }
4879
4880    #[tokio::test]
4881    async fn test_set_e_stops_on_failure() {
4882        let kernel = Kernel::transient().expect("failed to create kernel");
4883
4884        // Enable error-exit mode
4885        kernel.execute("set -e").await.expect("set -e failed");
4886
4887        // Run a sequence where the middle command fails
4888        kernel
4889            .execute(r#"
4890                STEP1="done"
4891                false
4892                STEP2="done"
4893            "#)
4894            .await
4895            .expect("execution failed");
4896
4897        // STEP1 should be set, but STEP2 should NOT be set (exit on false)
4898        let step1 = kernel.get_var("STEP1").await;
4899        assert_eq!(step1, Some(Value::String("done".into())));
4900
4901        let step2 = kernel.get_var("STEP2").await;
4902        assert!(step2.is_none(), "STEP2 should not be set after false with set -e");
4903    }
4904
4905    #[tokio::test]
4906    async fn test_set_plus_e_disables_error_exit() {
4907        let kernel = Kernel::transient().expect("failed to create kernel");
4908
4909        // Enable then disable error-exit mode
4910        kernel.execute("set -e").await.expect("set -e failed");
4911        kernel.execute("set +e").await.expect("set +e failed");
4912
4913        // Now failure should NOT stop execution
4914        kernel
4915            .execute(r#"
4916                STEP1="done"
4917                false
4918                STEP2="done"
4919            "#)
4920            .await
4921            .expect("execution failed");
4922
4923        // Both should be set since +e disables error exit
4924        let step1 = kernel.get_var("STEP1").await;
4925        assert_eq!(step1, Some(Value::String("done".into())));
4926
4927        let step2 = kernel.get_var("STEP2").await;
4928        assert_eq!(step2, Some(Value::String("done".into())));
4929    }
4930
4931    #[tokio::test]
4932    async fn test_set_ignores_unknown_options() {
4933        let kernel = Kernel::transient().expect("failed to create kernel");
4934
4935        // Bash idiom: set -euo pipefail (we support -e, ignore the rest)
4936        let result = kernel
4937            .execute("set -e -u -o pipefail")
4938            .await
4939            .expect("set with unknown options failed");
4940
4941        assert!(result.ok(), "set should succeed with unknown options");
4942
4943        // -e should still be enabled
4944        kernel
4945            .execute(r#"
4946                BEFORE="yes"
4947                false
4948                AFTER="yes"
4949            "#)
4950            .await
4951            .ok();
4952
4953        let after = kernel.get_var("AFTER").await;
4954        assert!(after.is_none(), "-e should be enabled despite unknown options");
4955    }
4956
4957    #[tokio::test]
4958    async fn test_set_no_args_shows_settings() {
4959        let kernel = Kernel::transient().expect("failed to create kernel");
4960
4961        // Enable -e
4962        kernel.execute("set -e").await.expect("set -e failed");
4963
4964        // Call set with no args to see settings
4965        let result = kernel.execute("set").await.expect("set failed");
4966
4967        assert!(result.ok());
4968        assert!(result.text_out().contains("set -e"), "should show -e is enabled: {}", result.text_out());
4969    }
4970
4971    #[tokio::test]
4972    async fn test_set_e_in_pipeline() {
4973        let kernel = Kernel::transient().expect("failed to create kernel");
4974
4975        kernel.execute("set -e").await.expect("set -e failed");
4976
4977        // Pipeline failure should trigger exit
4978        kernel
4979            .execute(r#"
4980                BEFORE="yes"
4981                false | cat
4982                AFTER="yes"
4983            "#)
4984            .await
4985            .ok();
4986
4987        let before = kernel.get_var("BEFORE").await;
4988        assert_eq!(before, Some(Value::String("yes".into())));
4989
4990        // AFTER should not be set if pipeline failure triggers exit
4991        // Note: The exit code of a pipeline is the exit code of the last command
4992        // So `false | cat` returns 0 (cat succeeds). This is bash-compatible behavior.
4993        // To test pipeline failure, we need the last command to fail.
4994    }
4995
4996    #[tokio::test]
4997    async fn test_set_e_with_and_chain() {
4998        let kernel = Kernel::transient().expect("failed to create kernel");
4999
5000        kernel.execute("set -e").await.expect("set -e failed");
5001
5002        // Commands in && chain should not trigger -e on the first failure
5003        // because && explicitly handles the error
5004        kernel
5005            .execute(r#"
5006                RESULT="initial"
5007                false && RESULT="chained"
5008                RESULT="continued"
5009            "#)
5010            .await
5011            .ok();
5012
5013        // In bash, commands in && don't trigger -e. The chain handles the failure.
5014        // Our implementation may differ - let's verify current behavior.
5015        let result = kernel.get_var("RESULT").await;
5016        // If we follow bash semantics, RESULT should be "continued"
5017        // If we trigger -e on the false, RESULT stays "initial"
5018        assert!(result.is_some(), "RESULT should be set");
5019    }
5020
5021    #[tokio::test]
5022    async fn test_set_e_exits_in_for_loop() {
5023        let kernel = Kernel::transient().expect("failed to create kernel");
5024
5025        kernel.execute("set -e").await.expect("set -e failed");
5026
5027        kernel
5028            .execute(r#"
5029                REACHED="no"
5030                for x in 1 2 3; do
5031                    false
5032                    REACHED="yes"
5033                done
5034            "#)
5035            .await
5036            .ok();
5037
5038        // With set -e, false should trigger exit; REACHED should remain "no"
5039        let reached = kernel.get_var("REACHED").await;
5040        assert_eq!(reached, Some(Value::String("no".into())),
5041            "set -e should exit on failure in for loop body");
5042    }
5043
5044    #[tokio::test]
5045    async fn test_for_loop_continues_without_set_e() {
5046        let kernel = Kernel::transient().expect("failed to create kernel");
5047
5048        // Without set -e, for loop should continue normally
5049        kernel
5050            .execute(r#"
5051                COUNT=0
5052                for x in 1 2 3; do
5053                    false
5054                    COUNT=$((COUNT + 1))
5055                done
5056            "#)
5057            .await
5058            .ok();
5059
5060        let count = kernel.get_var("COUNT").await;
5061        // Arithmetic produces Int values; accept either Int or String representation
5062        let count_val = match &count {
5063            Some(Value::Int(n)) => *n,
5064            Some(Value::String(s)) => s.parse().unwrap_or(-1),
5065            _ => -1,
5066        };
5067        assert_eq!(count_val, 3,
5068            "without set -e, loop should complete all iterations (got {:?})", count);
5069    }
5070
5071    // ═══════════════════════════════════════════════════════════════════════════
5072    // Source Tests
5073    // ═══════════════════════════════════════════════════════════════════════════
5074
5075    #[tokio::test]
5076    async fn test_source_sets_variables() {
5077        let kernel = Kernel::transient().expect("failed to create kernel");
5078
5079        // Write a script to the VFS
5080        kernel
5081            .execute(r#"write "/test.kai" 'FOO="bar"'"#)
5082            .await
5083            .expect("write failed");
5084
5085        // Source the script
5086        let result = kernel
5087            .execute(r#"source "/test.kai""#)
5088            .await
5089            .expect("source failed");
5090
5091        assert!(result.ok(), "source should succeed");
5092
5093        // Variable should be set in current scope
5094        let foo = kernel.get_var("FOO").await;
5095        assert_eq!(foo, Some(Value::String("bar".into())));
5096    }
5097
5098    #[tokio::test]
5099    async fn test_source_with_dot_alias() {
5100        let kernel = Kernel::transient().expect("failed to create kernel");
5101
5102        // Write a script to the VFS
5103        kernel
5104            .execute(r#"write "/vars.kai" 'X=42'"#)
5105            .await
5106            .expect("write failed");
5107
5108        // Source using . alias
5109        let result = kernel
5110            .execute(r#". "/vars.kai""#)
5111            .await
5112            .expect(". failed");
5113
5114        assert!(result.ok(), ". should succeed");
5115
5116        // Variable should be set in current scope
5117        let x = kernel.get_var("X").await;
5118        assert_eq!(x, Some(Value::Int(42)));
5119    }
5120
5121    #[tokio::test]
5122    async fn test_source_not_found() {
5123        let kernel = Kernel::transient().expect("failed to create kernel");
5124
5125        // Try to source a non-existent file
5126        let result = kernel
5127            .execute(r#"source "/nonexistent.kai""#)
5128            .await
5129            .expect("source should not fail with error");
5130
5131        assert!(!result.ok(), "source of non-existent file should fail");
5132        assert!(result.err.contains("nonexistent.kai"), "error should mention filename");
5133    }
5134
5135    #[tokio::test]
5136    async fn test_source_missing_filename() {
5137        let kernel = Kernel::transient().expect("failed to create kernel");
5138
5139        // Call source with no arguments
5140        let result = kernel
5141            .execute("source")
5142            .await
5143            .expect("source should not fail with error");
5144
5145        assert!(!result.ok(), "source without filename should fail");
5146        assert!(result.err.contains("missing filename"), "error should mention missing filename");
5147    }
5148
5149    #[tokio::test]
5150    async fn test_source_executes_multiple_statements() {
5151        let kernel = Kernel::transient().expect("failed to create kernel");
5152
5153        // Write a script with multiple statements
5154        kernel
5155            .execute(r#"write "/multi.kai" 'A=1
5156B=2
5157C=3'"#)
5158            .await
5159            .expect("write failed");
5160
5161        // Source it
5162        kernel
5163            .execute(r#"source "/multi.kai""#)
5164            .await
5165            .expect("source failed");
5166
5167        // All variables should be set
5168        assert_eq!(kernel.get_var("A").await, Some(Value::Int(1)));
5169        assert_eq!(kernel.get_var("B").await, Some(Value::Int(2)));
5170        assert_eq!(kernel.get_var("C").await, Some(Value::Int(3)));
5171    }
5172
5173    #[tokio::test]
5174    async fn test_source_can_define_functions() {
5175        let kernel = Kernel::transient().expect("failed to create kernel");
5176
5177        // Write a script that defines a function
5178        kernel
5179            .execute(r#"write "/functions.kai" 'greet() {
5180    echo "Hello, $1!"
5181}'"#)
5182            .await
5183            .expect("write failed");
5184
5185        // Source it
5186        kernel
5187            .execute(r#"source "/functions.kai""#)
5188            .await
5189            .expect("source failed");
5190
5191        // Use the defined function
5192        let result = kernel
5193            .execute(r#"greet "World""#)
5194            .await
5195            .expect("greet failed");
5196
5197        assert!(result.ok());
5198        assert!(result.text_out().contains("Hello, World!"));
5199    }
5200
5201    #[tokio::test]
5202    async fn test_source_inherits_error_exit() {
5203        let kernel = Kernel::transient().expect("failed to create kernel");
5204
5205        // Enable error exit
5206        kernel.execute("set -e").await.expect("set -e failed");
5207
5208        // Write a script that has a failure
5209        kernel
5210            .execute(r#"write "/fail.kai" 'BEFORE="yes"
5211false
5212AFTER="yes"'"#)
5213            .await
5214            .expect("write failed");
5215
5216        // Source it (should exit on false due to set -e)
5217        kernel
5218            .execute(r#"source "/fail.kai""#)
5219            .await
5220            .ok();
5221
5222        // BEFORE should be set, AFTER should NOT be set due to error exit
5223        let before = kernel.get_var("BEFORE").await;
5224        assert_eq!(before, Some(Value::String("yes".into())));
5225
5226        // Note: This test depends on whether error exit is checked within source
5227        // Currently our implementation checks per-statement in the main kernel
5228    }
5229
5230    // ═══════════════════════════════════════════════════════════════════════════
5231    // set -e with && / || chains
5232    // ═══════════════════════════════════════════════════════════════════════════
5233
5234    #[tokio::test]
5235    async fn test_set_e_and_chain_left_fails() {
5236        // set -e; false && echo hi; REACHED=1 → REACHED should be set
5237        let kernel = Kernel::transient().expect("failed to create kernel");
5238        kernel.execute("set -e").await.expect("set -e failed");
5239
5240        kernel
5241            .execute("false && echo hi; REACHED=1")
5242            .await
5243            .expect("execution failed");
5244
5245        let reached = kernel.get_var("REACHED").await;
5246        assert_eq!(
5247            reached,
5248            Some(Value::Int(1)),
5249            "set -e should not trigger on left side of &&"
5250        );
5251    }
5252
5253    #[tokio::test]
5254    async fn test_set_e_and_chain_right_fails() {
5255        // set -e; true && false; REACHED=1 → REACHED should NOT be set
5256        let kernel = Kernel::transient().expect("failed to create kernel");
5257        kernel.execute("set -e").await.expect("set -e failed");
5258
5259        kernel
5260            .execute("true && false; REACHED=1")
5261            .await
5262            .expect("execution failed");
5263
5264        let reached = kernel.get_var("REACHED").await;
5265        assert!(
5266            reached.is_none(),
5267            "set -e should trigger when right side of && fails"
5268        );
5269    }
5270
5271    #[tokio::test]
5272    async fn test_set_e_or_chain_recovers() {
5273        // set -e; false || echo recovered; REACHED=1 → REACHED should be set
5274        let kernel = Kernel::transient().expect("failed to create kernel");
5275        kernel.execute("set -e").await.expect("set -e failed");
5276
5277        kernel
5278            .execute("false || echo recovered; REACHED=1")
5279            .await
5280            .expect("execution failed");
5281
5282        let reached = kernel.get_var("REACHED").await;
5283        assert_eq!(
5284            reached,
5285            Some(Value::Int(1)),
5286            "set -e should not trigger when || recovers the failure"
5287        );
5288    }
5289
5290    #[tokio::test]
5291    async fn test_set_e_or_chain_both_fail() {
5292        // set -e; false || false; REACHED=1 → REACHED should NOT be set
5293        let kernel = Kernel::transient().expect("failed to create kernel");
5294        kernel.execute("set -e").await.expect("set -e failed");
5295
5296        kernel
5297            .execute("false || false; REACHED=1")
5298            .await
5299            .expect("execution failed");
5300
5301        let reached = kernel.get_var("REACHED").await;
5302        assert!(
5303            reached.is_none(),
5304            "set -e should trigger when || chain ultimately fails"
5305        );
5306    }
5307
5308    // ═══════════════════════════════════════════════════════════════════════════
5309    // Cancellation Tests
5310    // ═══════════════════════════════════════════════════════════════════════════
5311
5312    /// Helper: schedule a cancel after a delay from a background thread.
5313    /// Uses std::thread because cancel() is sync and Kernel is not Send.
5314    fn schedule_cancel(kernel: &Arc<Kernel>, delay: std::time::Duration) {
5315        let k = Arc::clone(kernel);
5316        std::thread::spawn(move || {
5317            std::thread::sleep(delay);
5318            k.cancel();
5319        });
5320    }
5321
5322    #[tokio::test]
5323    async fn test_cancel_interrupts_for_loop() {
5324        let kernel = Arc::new(Kernel::transient().expect("failed to create kernel"));
5325
5326        // Schedule cancel after a short delay from a background OS thread
5327        schedule_cancel(&kernel, std::time::Duration::from_millis(10));
5328
5329        let result = kernel
5330            .execute("for i in $(seq 1 100000); do X=$i; done")
5331            .await
5332            .expect("execute failed");
5333
5334        assert_eq!(result.code, 130, "cancelled execution should exit with code 130");
5335
5336        // The loop variable should be set to something < 100000
5337        let x = kernel.get_var("X").await;
5338        if let Some(Value::Int(n)) = x {
5339            assert!(n < 100000, "loop should have been interrupted before finishing, got X={n}");
5340        }
5341    }
5342
5343    #[tokio::test]
5344    async fn test_cancel_interrupts_while_loop() {
5345        let kernel = Arc::new(Kernel::transient().expect("failed to create kernel"));
5346        kernel.execute("COUNT=0").await.expect("init failed");
5347
5348        schedule_cancel(&kernel, std::time::Duration::from_millis(10));
5349
5350        let result = kernel
5351            .execute("while true; do COUNT=$((COUNT + 1)); done")
5352            .await
5353            .expect("execute failed");
5354
5355        assert_eq!(result.code, 130);
5356
5357        let count = kernel.get_var("COUNT").await;
5358        if let Some(Value::Int(n)) = count {
5359            assert!(n > 0, "loop should have run at least once");
5360        }
5361    }
5362
5363    #[tokio::test]
5364    async fn test_reset_after_cancel() {
5365        // After cancellation, the next execute() should work normally
5366        let kernel = Kernel::transient().expect("failed to create kernel");
5367        kernel.cancel(); // cancel with nothing running
5368
5369        let result = kernel.execute("echo hello").await.expect("execute failed");
5370        assert!(result.ok(), "execute after cancel should succeed");
5371        assert_eq!(result.text_out().trim(), "hello");
5372    }
5373
5374    #[tokio::test]
5375    async fn test_cancel_interrupts_statement_sequence() {
5376        let kernel = Arc::new(Kernel::transient().expect("failed to create kernel"));
5377
5378        // Schedule cancel after the first statement runs but before sleep finishes
5379        schedule_cancel(&kernel, std::time::Duration::from_millis(50));
5380
5381        let result = kernel
5382            .execute("STEP=1; sleep 5; STEP=2; sleep 5; STEP=3")
5383            .await
5384            .expect("execute failed");
5385
5386        assert_eq!(result.code, 130);
5387
5388        // STEP should be 1 (set before sleep), not 2 or 3
5389        let step = kernel.get_var("STEP").await;
5390        assert_eq!(step, Some(Value::Int(1)), "cancel should stop before STEP=2");
5391    }
5392
5393    // ═══════════════════════════════════════════════════════════════════════════
5394    // Case Statement Tests
5395    // ═══════════════════════════════════════════════════════════════════════════
5396
5397    #[tokio::test]
5398    async fn test_case_simple_match() {
5399        let kernel = Kernel::transient().expect("failed to create kernel");
5400
5401        let result = kernel
5402            .execute(r#"
5403                case "hello" in
5404                    hello) echo "matched hello" ;;
5405                    world) echo "matched world" ;;
5406                esac
5407            "#)
5408            .await
5409            .expect("case failed");
5410
5411        assert!(result.ok());
5412        assert_eq!(result.text_out().trim(), "matched hello");
5413    }
5414
5415    #[tokio::test]
5416    async fn test_case_wildcard_match() {
5417        let kernel = Kernel::transient().expect("failed to create kernel");
5418
5419        let result = kernel
5420            .execute(r#"
5421                case "main.rs" in
5422                    *.py) echo "Python" ;;
5423                    *.rs) echo "Rust" ;;
5424                    *) echo "Unknown" ;;
5425                esac
5426            "#)
5427            .await
5428            .expect("case failed");
5429
5430        assert!(result.ok());
5431        assert_eq!(result.text_out().trim(), "Rust");
5432    }
5433
5434    #[tokio::test]
5435    async fn test_case_default_match() {
5436        let kernel = Kernel::transient().expect("failed to create kernel");
5437
5438        let result = kernel
5439            .execute(r#"
5440                case "unknown.xyz" in
5441                    *.py) echo "Python" ;;
5442                    *.rs) echo "Rust" ;;
5443                    *) echo "Default" ;;
5444                esac
5445            "#)
5446            .await
5447            .expect("case failed");
5448
5449        assert!(result.ok());
5450        assert_eq!(result.text_out().trim(), "Default");
5451    }
5452
5453    #[tokio::test]
5454    async fn test_case_no_match() {
5455        let kernel = Kernel::transient().expect("failed to create kernel");
5456
5457        // Case with no default branch and no match
5458        let result = kernel
5459            .execute(r#"
5460                case "nope" in
5461                    "yes") echo "yes" ;;
5462                    "no") echo "no" ;;
5463                esac
5464            "#)
5465            .await
5466            .expect("case failed");
5467
5468        assert!(result.ok());
5469        assert!(result.text_out().is_empty(), "no match should produce empty output");
5470    }
5471
5472    #[tokio::test]
5473    async fn test_case_with_variable() {
5474        let kernel = Kernel::transient().expect("failed to create kernel");
5475
5476        kernel.execute(r#"LANG="rust""#).await.expect("set failed");
5477
5478        let result = kernel
5479            .execute(r#"
5480                case ${LANG} in
5481                    python) echo "snake" ;;
5482                    rust) echo "crab" ;;
5483                    go) echo "gopher" ;;
5484                esac
5485            "#)
5486            .await
5487            .expect("case failed");
5488
5489        assert!(result.ok());
5490        assert_eq!(result.text_out().trim(), "crab");
5491    }
5492
5493    #[tokio::test]
5494    async fn test_case_multiple_patterns() {
5495        let kernel = Kernel::transient().expect("failed to create kernel");
5496
5497        let result = kernel
5498            .execute(r#"
5499                case "yes" in
5500                    "y"|"yes"|"Y"|"YES") echo "affirmative" ;;
5501                    "n"|"no"|"N"|"NO") echo "negative" ;;
5502                esac
5503            "#)
5504            .await
5505            .expect("case failed");
5506
5507        assert!(result.ok());
5508        assert_eq!(result.text_out().trim(), "affirmative");
5509    }
5510
5511    #[tokio::test]
5512    async fn test_case_glob_question_mark() {
5513        let kernel = Kernel::transient().expect("failed to create kernel");
5514
5515        let result = kernel
5516            .execute(r#"
5517                case "test1" in
5518                    test?) echo "matched test?" ;;
5519                    *) echo "default" ;;
5520                esac
5521            "#)
5522            .await
5523            .expect("case failed");
5524
5525        assert!(result.ok());
5526        assert_eq!(result.text_out().trim(), "matched test?");
5527    }
5528
5529    #[tokio::test]
5530    async fn test_case_char_class() {
5531        let kernel = Kernel::transient().expect("failed to create kernel");
5532
5533        let result = kernel
5534            .execute(r#"
5535                case "Yes" in
5536                    [Yy]*) echo "yes-like" ;;
5537                    [Nn]*) echo "no-like" ;;
5538                esac
5539            "#)
5540            .await
5541            .expect("case failed");
5542
5543        assert!(result.ok());
5544        assert_eq!(result.text_out().trim(), "yes-like");
5545    }
5546
5547    // ═══════════════════════════════════════════════════════════════════════════
5548    // Cat Stdin Tests
5549    // ═══════════════════════════════════════════════════════════════════════════
5550
5551    #[tokio::test]
5552    async fn test_cat_from_pipeline() {
5553        let kernel = Kernel::transient().expect("failed to create kernel");
5554
5555        let result = kernel
5556            .execute(r#"echo "piped text" | cat"#)
5557            .await
5558            .expect("cat pipeline failed");
5559
5560        assert!(result.ok(), "cat failed: {}", result.err);
5561        assert_eq!(result.text_out().trim(), "piped text");
5562    }
5563
5564    #[tokio::test]
5565    async fn test_cat_from_pipeline_multiline() {
5566        let kernel = Kernel::transient().expect("failed to create kernel");
5567
5568        let result = kernel
5569            .execute(r#"echo "line1\nline2" | cat -n"#)
5570            .await
5571            .expect("cat pipeline failed");
5572
5573        assert!(result.ok(), "cat failed: {}", result.err);
5574        assert!(result.text_out().contains("1\t"), "output: {}", result.text_out());
5575    }
5576
5577    // ═══════════════════════════════════════════════════════════════════════════
5578    // Heredoc Tests
5579    // ═══════════════════════════════════════════════════════════════════════════
5580
5581    #[tokio::test]
5582    async fn test_heredoc_basic() {
5583        let kernel = Kernel::transient().expect("failed to create kernel");
5584
5585        let result = kernel
5586            .execute("cat <<EOF\nhello\nEOF")
5587            .await
5588            .expect("heredoc failed");
5589
5590        assert!(result.ok(), "cat with heredoc failed: {}", result.err);
5591        assert_eq!(result.text_out().trim(), "hello");
5592    }
5593
5594    #[tokio::test]
5595    async fn test_arithmetic_in_string() {
5596        let kernel = Kernel::transient().expect("failed to create kernel");
5597
5598        let result = kernel
5599            .execute(r#"echo "result: $((1 + 2))""#)
5600            .await
5601            .expect("arithmetic in string failed");
5602
5603        assert!(result.ok(), "echo failed: {}", result.err);
5604        assert_eq!(result.text_out().trim(), "result: 3");
5605    }
5606
5607    #[tokio::test]
5608    async fn test_heredoc_multiline() {
5609        let kernel = Kernel::transient().expect("failed to create kernel");
5610
5611        let result = kernel
5612            .execute("cat <<EOF\nline1\nline2\nline3\nEOF")
5613            .await
5614            .expect("heredoc failed");
5615
5616        assert!(result.ok(), "cat with heredoc failed: {}", result.err);
5617        assert!(result.text_out().contains("line1"), "output: {}", result.text_out());
5618        assert!(result.text_out().contains("line2"), "output: {}", result.text_out());
5619        assert!(result.text_out().contains("line3"), "output: {}", result.text_out());
5620    }
5621
5622    #[tokio::test]
5623    async fn test_heredoc_variable_expansion() {
5624        // Bug N: unquoted heredoc should expand variables
5625        let kernel = Kernel::transient().expect("failed to create kernel");
5626
5627        kernel.execute("GREETING=hello").await.expect("set var");
5628
5629        let result = kernel
5630            .execute("cat <<EOF\n$GREETING world\nEOF")
5631            .await
5632            .expect("heredoc expansion failed");
5633
5634        assert!(result.ok(), "heredoc expansion failed: {}", result.err);
5635        assert_eq!(result.text_out().trim(), "hello world");
5636    }
5637
5638    #[tokio::test]
5639    async fn test_heredoc_quoted_no_expansion() {
5640        // Bug N: quoted heredoc (<<'EOF') should NOT expand variables
5641        let kernel = Kernel::transient().expect("failed to create kernel");
5642
5643        kernel.execute("GREETING=hello").await.expect("set var");
5644
5645        let result = kernel
5646            .execute("cat <<'EOF'\n$GREETING world\nEOF")
5647            .await
5648            .expect("quoted heredoc failed");
5649
5650        assert!(result.ok(), "quoted heredoc failed: {}", result.err);
5651        assert_eq!(result.text_out().trim(), "$GREETING world");
5652    }
5653
5654    #[tokio::test]
5655    async fn test_heredoc_default_value_expansion() {
5656        // Bug N: ${VAR:-default} should expand in unquoted heredocs
5657        let kernel = Kernel::transient().expect("failed to create kernel");
5658
5659        let result = kernel
5660            .execute("cat <<EOF\n${UNSET:-fallback}\nEOF")
5661            .await
5662            .expect("heredoc default expansion failed");
5663
5664        assert!(result.ok(), "heredoc default expansion failed: {}", result.err);
5665        assert_eq!(result.text_out().trim(), "fallback");
5666    }
5667
5668    // ═══════════════════════════════════════════════════════════════════════════
5669    // Read Builtin Tests
5670    // ═══════════════════════════════════════════════════════════════════════════
5671
5672    #[tokio::test]
5673    async fn test_read_from_pipeline() {
5674        let kernel = Kernel::transient().expect("failed to create kernel");
5675
5676        // Pipe input to read
5677        let result = kernel
5678            .execute(r#"echo "Alice" | read NAME; echo "Hello, ${NAME}""#)
5679            .await
5680            .expect("read pipeline failed");
5681
5682        assert!(result.ok(), "read failed: {}", result.err);
5683        assert!(result.text_out().contains("Hello, Alice"), "output: {}", result.text_out());
5684    }
5685
5686    #[tokio::test]
5687    async fn test_read_multiple_vars_from_pipeline() {
5688        let kernel = Kernel::transient().expect("failed to create kernel");
5689
5690        let result = kernel
5691            .execute(r#"echo "John Doe 42" | read FIRST LAST AGE; echo "${FIRST} is ${AGE}""#)
5692            .await
5693            .expect("read pipeline failed");
5694
5695        assert!(result.ok(), "read failed: {}", result.err);
5696        assert!(result.text_out().contains("John is 42"), "output: {}", result.text_out());
5697    }
5698
5699    // ═══════════════════════════════════════════════════════════════════════════
5700    // Shell-Style Function Tests
5701    // ═══════════════════════════════════════════════════════════════════════════
5702
5703    #[tokio::test]
5704    async fn test_posix_function_with_positional_params() {
5705        let kernel = Kernel::transient().expect("failed to create kernel");
5706
5707        // Define POSIX-style function
5708        kernel
5709            .execute(r#"greet() { echo "Hello, $1!" }"#)
5710            .await
5711            .expect("function definition failed");
5712
5713        // Call the function
5714        let result = kernel
5715            .execute(r#"greet "Amy""#)
5716            .await
5717            .expect("function call failed");
5718
5719        assert!(result.ok(), "greet failed: {}", result.err);
5720        assert_eq!(result.text_out().trim(), "Hello, Amy!");
5721    }
5722
5723    #[tokio::test]
5724    async fn test_posix_function_multiple_args() {
5725        let kernel = Kernel::transient().expect("failed to create kernel");
5726
5727        // Define function using $1 and $2
5728        kernel
5729            .execute(r#"add_greeting() { echo "$1 $2!" }"#)
5730            .await
5731            .expect("function definition failed");
5732
5733        // Call the function
5734        let result = kernel
5735            .execute(r#"add_greeting "Hello" "World""#)
5736            .await
5737            .expect("function call failed");
5738
5739        assert!(result.ok(), "function failed: {}", result.err);
5740        assert_eq!(result.text_out().trim(), "Hello World!");
5741    }
5742
5743    #[tokio::test]
5744    async fn test_bash_function_with_positional_params() {
5745        let kernel = Kernel::transient().expect("failed to create kernel");
5746
5747        // Define bash-style function (function keyword, no parens)
5748        kernel
5749            .execute(r#"function greet { echo "Hi $1" }"#)
5750            .await
5751            .expect("function definition failed");
5752
5753        // Call the function
5754        let result = kernel
5755            .execute(r#"greet "Bob""#)
5756            .await
5757            .expect("function call failed");
5758
5759        assert!(result.ok(), "greet failed: {}", result.err);
5760        assert_eq!(result.text_out().trim(), "Hi Bob");
5761    }
5762
5763    #[tokio::test]
5764    async fn test_shell_function_with_all_args() {
5765        let kernel = Kernel::transient().expect("failed to create kernel");
5766
5767        // Define function using $@ (all args)
5768        kernel
5769            .execute(r#"echo_all() { echo "args: $@" }"#)
5770            .await
5771            .expect("function definition failed");
5772
5773        // Call with multiple args
5774        let result = kernel
5775            .execute(r#"echo_all "a" "b" "c""#)
5776            .await
5777            .expect("function call failed");
5778
5779        assert!(result.ok(), "function failed: {}", result.err);
5780        assert_eq!(result.text_out().trim(), "args: a b c");
5781    }
5782
5783    #[tokio::test]
5784    async fn test_shell_function_with_arg_count() {
5785        let kernel = Kernel::transient().expect("failed to create kernel");
5786
5787        // Define function using $# (arg count)
5788        kernel
5789            .execute(r#"count_args() { echo "count: $#" }"#)
5790            .await
5791            .expect("function definition failed");
5792
5793        // Call with three args
5794        let result = kernel
5795            .execute(r#"count_args "x" "y" "z""#)
5796            .await
5797            .expect("function call failed");
5798
5799        assert!(result.ok(), "function failed: {}", result.err);
5800        assert_eq!(result.text_out().trim(), "count: 3");
5801    }
5802
5803    #[tokio::test]
5804    async fn test_shell_function_shared_scope() {
5805        let kernel = Kernel::transient().expect("failed to create kernel");
5806
5807        // Set a variable in parent scope
5808        kernel
5809            .execute(r#"PARENT_VAR="visible""#)
5810            .await
5811            .expect("set failed");
5812
5813        // Define shell function that reads and writes parent variable
5814        kernel
5815            .execute(r#"modify_parent() {
5816                echo "saw: ${PARENT_VAR}"
5817                PARENT_VAR="changed by function"
5818            }"#)
5819            .await
5820            .expect("function definition failed");
5821
5822        // Call the function - it SHOULD see PARENT_VAR (bash-compatible shared scope)
5823        let result = kernel.execute("modify_parent").await.expect("function failed");
5824
5825        assert!(
5826            result.text_out().contains("visible"),
5827            "Shell function should access parent scope, got: {}",
5828            result.text_out()
5829        );
5830
5831        // Parent variable should be modified
5832        let var = kernel.get_var("PARENT_VAR").await;
5833        assert_eq!(
5834            var,
5835            Some(Value::String("changed by function".into())),
5836            "Shell function should modify parent scope"
5837        );
5838    }
5839
5840    // ═══════════════════════════════════════════════════════════════════════════
5841    // Script Execution via PATH Tests
5842    // ═══════════════════════════════════════════════════════════════════════════
5843
5844    #[tokio::test]
5845    async fn test_script_execution_from_path() {
5846        let kernel = Kernel::transient().expect("failed to create kernel");
5847
5848        // Create /bin directory and script
5849        kernel.execute(r#"mkdir "/bin""#).await.ok();
5850        kernel
5851            .execute(r#"write "/bin/hello.kai" 'echo "Hello from script!"'"#)
5852            .await
5853            .expect("write script failed");
5854
5855        // Set PATH to /bin
5856        kernel.execute(r#"PATH="/bin""#).await.expect("set PATH failed");
5857
5858        // Call script by name (without .kai extension)
5859        let result = kernel
5860            .execute("hello")
5861            .await
5862            .expect("script execution failed");
5863
5864        assert!(result.ok(), "script failed: {}", result.err);
5865        assert_eq!(result.text_out().trim(), "Hello from script!");
5866    }
5867
5868    #[tokio::test]
5869    async fn test_script_with_args() {
5870        let kernel = Kernel::transient().expect("failed to create kernel");
5871
5872        // Create script that uses positional params
5873        kernel.execute(r#"mkdir "/bin""#).await.ok();
5874        kernel
5875            .execute(r#"write "/bin/greet.kai" 'echo "Hello, $1!"'"#)
5876            .await
5877            .expect("write script failed");
5878
5879        // Set PATH
5880        kernel.execute(r#"PATH="/bin""#).await.expect("set PATH failed");
5881
5882        // Call script with arg
5883        let result = kernel
5884            .execute(r#"greet "World""#)
5885            .await
5886            .expect("script execution failed");
5887
5888        assert!(result.ok(), "script failed: {}", result.err);
5889        assert_eq!(result.text_out().trim(), "Hello, World!");
5890    }
5891
5892    #[tokio::test]
5893    async fn test_script_not_found() {
5894        let kernel = Kernel::transient().expect("failed to create kernel");
5895
5896        // Set empty PATH
5897        kernel.execute(r#"PATH="/nonexistent""#).await.expect("set PATH failed");
5898
5899        // Call non-existent script
5900        let result = kernel
5901            .execute("noscript")
5902            .await
5903            .expect("execution failed");
5904
5905        assert!(!result.ok(), "should fail with command not found");
5906        assert_eq!(result.code, 127);
5907        assert!(result.err.contains("command not found"));
5908    }
5909
5910    #[tokio::test]
5911    async fn test_script_path_search_order() {
5912        let kernel = Kernel::transient().expect("failed to create kernel");
5913
5914        // Create two directories with same-named script
5915        // Note: using "myscript" not "test" to avoid conflict with test builtin
5916        kernel.execute(r#"mkdir "/first""#).await.ok();
5917        kernel.execute(r#"mkdir "/second""#).await.ok();
5918        kernel
5919            .execute(r#"write "/first/myscript.kai" 'echo "from first"'"#)
5920            .await
5921            .expect("write failed");
5922        kernel
5923            .execute(r#"write "/second/myscript.kai" 'echo "from second"'"#)
5924            .await
5925            .expect("write failed");
5926
5927        // Set PATH with first before second
5928        kernel.execute(r#"PATH="/first:/second""#).await.expect("set PATH failed");
5929
5930        // Should find first one
5931        let result = kernel
5932            .execute("myscript")
5933            .await
5934            .expect("script execution failed");
5935
5936        assert!(result.ok(), "script failed: {}", result.err);
5937        assert_eq!(result.text_out().trim(), "from first");
5938    }
5939
5940    // ═══════════════════════════════════════════════════════════════════════════
5941    // Special Variable Tests ($?, $$, unset vars)
5942    // ═══════════════════════════════════════════════════════════════════════════
5943
5944    #[tokio::test]
5945    async fn test_last_exit_code_success() {
5946        let kernel = Kernel::transient().expect("failed to create kernel");
5947
5948        // true exits with 0
5949        let result = kernel.execute("true; echo $?").await.expect("execution failed");
5950        assert!(result.text_out().contains("0"), "expected 0, got: {}", result.text_out());
5951    }
5952
5953    #[tokio::test]
5954    async fn test_last_exit_code_failure() {
5955        let kernel = Kernel::transient().expect("failed to create kernel");
5956
5957        // false exits with 1
5958        let result = kernel.execute("false; echo $?").await.expect("execution failed");
5959        assert!(result.text_out().contains("1"), "expected 1, got: {}", result.text_out());
5960    }
5961
5962    #[tokio::test]
5963    async fn test_current_pid() {
5964        let kernel = Kernel::transient().expect("failed to create kernel");
5965
5966        let result = kernel.execute("echo $$").await.expect("execution failed");
5967        // PID should be a positive number
5968        let pid: u32 = result.text_out().trim().parse().expect("PID should be a number");
5969        assert!(pid > 0, "PID should be positive");
5970    }
5971
5972    #[tokio::test]
5973    async fn test_unset_variable_expands_to_empty() {
5974        let kernel = Kernel::transient().expect("failed to create kernel");
5975
5976        // Unset variable in interpolation should be empty
5977        let result = kernel.execute(r#"echo "prefix:${UNSET_VAR}:suffix""#).await.expect("execution failed");
5978        assert_eq!(result.text_out().trim(), "prefix::suffix");
5979    }
5980
5981    #[tokio::test]
5982    async fn test_eq_ne_operators() {
5983        let kernel = Kernel::transient().expect("failed to create kernel");
5984
5985        // Test -eq operator
5986        let result = kernel.execute(r#"if [[ 5 -eq 5 ]]; then echo "eq works"; fi"#).await.expect("execution failed");
5987        assert_eq!(result.text_out().trim(), "eq works");
5988
5989        // Test -ne operator
5990        let result = kernel.execute(r#"if [[ 5 -ne 3 ]]; then echo "ne works"; fi"#).await.expect("execution failed");
5991        assert_eq!(result.text_out().trim(), "ne works");
5992
5993        // Test -eq with different values
5994        let result = kernel.execute(r#"if [[ 5 -eq 3 ]]; then echo "wrong"; else echo "correct"; fi"#).await.expect("execution failed");
5995        assert_eq!(result.text_out().trim(), "correct");
5996    }
5997
5998    #[tokio::test]
5999    async fn test_escaped_dollar_in_string() {
6000        let kernel = Kernel::transient().expect("failed to create kernel");
6001
6002        // \$ should produce literal $
6003        let result = kernel.execute(r#"echo "\$100""#).await.expect("execution failed");
6004        assert_eq!(result.text_out().trim(), "$100");
6005    }
6006
6007    #[tokio::test]
6008    async fn test_special_vars_in_interpolation() {
6009        let kernel = Kernel::transient().expect("failed to create kernel");
6010
6011        // Test $? in string interpolation
6012        let result = kernel.execute(r#"true; echo "exit: $?""#).await.expect("execution failed");
6013        assert_eq!(result.text_out().trim(), "exit: 0");
6014
6015        // Test $$ in string interpolation
6016        let result = kernel.execute(r#"echo "pid: $$""#).await.expect("execution failed");
6017        assert!(result.text_out().starts_with("pid: "), "unexpected output: {}", result.text_out());
6018        let text = result.text_out();
6019        let pid_part = text.trim().strip_prefix("pid: ").unwrap();
6020        let _pid: u32 = pid_part.parse().expect("PID in string should be a number");
6021    }
6022
6023    // ═══════════════════════════════════════════════════════════════════════════
6024    // Command Substitution Tests
6025    // ═══════════════════════════════════════════════════════════════════════════
6026
6027    #[tokio::test]
6028    async fn test_command_subst_assignment() {
6029        let kernel = Kernel::transient().expect("failed to create kernel");
6030
6031        // Command substitution in assignment
6032        let result = kernel.execute(r#"X=$(echo hello); echo "$X""#).await.expect("execution failed");
6033        assert_eq!(result.text_out().trim(), "hello");
6034    }
6035
6036    #[tokio::test]
6037    async fn test_command_subst_with_args() {
6038        let kernel = Kernel::transient().expect("failed to create kernel");
6039
6040        // Command substitution with string argument
6041        let result = kernel.execute(r#"X=$(echo "a b c"); echo "$X""#).await.expect("execution failed");
6042        assert_eq!(result.text_out().trim(), "a b c");
6043    }
6044
6045    #[tokio::test]
6046    async fn test_command_subst_nested_vars() {
6047        let kernel = Kernel::transient().expect("failed to create kernel");
6048
6049        // Variables inside command substitution
6050        let result = kernel.execute(r#"Y=world; X=$(echo "hello $Y"); echo "$X""#).await.expect("execution failed");
6051        assert_eq!(result.text_out().trim(), "hello world");
6052    }
6053
6054    #[tokio::test]
6055    async fn test_background_job_basic() {
6056        use std::time::Duration;
6057
6058        let kernel = Kernel::new(KernelConfig::isolated()).expect("failed to create kernel");
6059
6060        // Run a simple background command
6061        let result = kernel.execute("echo hello &").await.expect("execution failed");
6062        assert!(result.ok(), "background command should succeed: {}", result.err);
6063        assert!(result.text_out().contains("[1]"), "should return job ID: {}", result.text_out());
6064
6065        // Give the job time to complete
6066        tokio::time::sleep(Duration::from_millis(100)).await;
6067
6068        // Check job status
6069        let status = kernel.execute("cat /v/jobs/1/status").await.expect("status check failed");
6070        assert!(status.ok(), "status should succeed: {}", status.err);
6071        assert!(
6072            status.text_out().contains("done:") || status.text_out().contains("running"),
6073            "should have valid status: {}",
6074            status.text_out()
6075        );
6076
6077        // Check stdout
6078        let stdout = kernel.execute("cat /v/jobs/1/stdout").await.expect("stdout check failed");
6079        assert!(stdout.ok());
6080        assert!(stdout.text_out().contains("hello"));
6081    }
6082
6083    #[tokio::test]
6084    async fn test_heredoc_piped_to_command() {
6085        // Bug 4: heredoc content should pipe through to next command
6086        let kernel = Kernel::transient().expect("kernel");
6087        let result = kernel.execute("cat <<EOF | cat\nhello world\nEOF").await.expect("exec");
6088        assert!(result.ok(), "heredoc | cat failed: {}", result.err);
6089        assert_eq!(result.text_out().trim(), "hello world");
6090    }
6091
6092    #[tokio::test]
6093    async fn test_for_loop_glob_iterates() {
6094        // Bug 1: for F in $(glob ...) should iterate per file, not once
6095        let kernel = Kernel::transient().expect("kernel");
6096        let dir = format!("/tmp/kaish_test_glob_{}", std::process::id());
6097        kernel.execute(&format!("mkdir -p {dir}")).await.unwrap();
6098        kernel.execute(&format!("echo a > {dir}/a.txt")).await.unwrap();
6099        kernel.execute(&format!("echo b > {dir}/b.txt")).await.unwrap();
6100        let result = kernel.execute(&format!(r#"
6101            N=0
6102            for F in $(glob "{dir}/*.txt"); do
6103                N=$((N + 1))
6104            done
6105            echo $N
6106        "#)).await.unwrap();
6107        assert!(result.ok(), "for glob failed: {}", result.err);
6108        assert_eq!(result.text_out().trim(), "2", "Should iterate 2 files, got: {}", result.text_out());
6109        kernel.execute(&format!("rm {dir}/a.txt")).await.unwrap();
6110        kernel.execute(&format!("rm {dir}/b.txt")).await.unwrap();
6111    }
6112
6113    #[tokio::test]
6114    async fn test_bare_glob_expansion_echo() {
6115        let kernel = Kernel::transient().expect("kernel");
6116        let dir = format!("/tmp/kaish_test_bareglob_{}", std::process::id());
6117        kernel.execute(&format!("mkdir -p {dir}")).await.unwrap();
6118        kernel.execute(&format!("echo a > {dir}/a.txt")).await.unwrap();
6119        kernel.execute(&format!("echo b > {dir}/b.txt")).await.unwrap();
6120        kernel.execute(&format!("echo c > {dir}/c.rs")).await.unwrap();
6121        kernel.execute(&format!("cd {dir}")).await.unwrap();
6122        let result = kernel.execute("echo *.txt").await.unwrap();
6123        assert!(result.ok(), "echo *.txt failed: {}", result.err);
6124        let out = result.text_out();
6125        let out = out.trim();
6126        // Should contain both .txt files (order may vary)
6127        assert!(out.contains("a.txt"), "missing a.txt in: {}", out);
6128        assert!(out.contains("b.txt"), "missing b.txt in: {}", out);
6129        assert!(!out.contains("c.rs"), "should not contain c.rs in: {}", out);
6130        // cleanup
6131        kernel.execute(&format!("rm {dir}/a.txt")).await.unwrap();
6132        kernel.execute(&format!("rm {dir}/b.txt")).await.unwrap();
6133        kernel.execute(&format!("rm {dir}/c.rs")).await.unwrap();
6134    }
6135
6136    #[tokio::test]
6137    async fn test_bare_glob_no_matches_errors() {
6138        let kernel = Kernel::transient().expect("kernel");
6139        let dir = format!("/tmp/kaish_test_bareglob_nomatch_{}", std::process::id());
6140        kernel.execute(&format!("mkdir -p {dir}")).await.unwrap();
6141        kernel.execute(&format!("cd {dir}")).await.unwrap();
6142        let result = kernel.execute("echo *.nonexistent").await;
6143        match &result {
6144            Ok(exec) => {
6145                // No-match glob should produce a non-zero exit code
6146                assert!(!exec.ok(), "expected failure, got success: out={}, err={}", exec.text_out(), exec.err);
6147                assert!(exec.err.contains("no matches"), "error should say no matches: {}", exec.err);
6148            }
6149            Err(e) => {
6150                assert!(e.to_string().contains("no matches"), "error should say no matches: {}", e);
6151            }
6152        }
6153    }
6154
6155    #[tokio::test]
6156    async fn test_bare_glob_disabled_with_set() {
6157        let kernel = Kernel::transient().expect("kernel");
6158        let dir = format!("/tmp/kaish_test_bareglob_noglob_{}", std::process::id());
6159        kernel.execute(&format!("mkdir -p {dir}")).await.unwrap();
6160        kernel.execute(&format!("echo a > {dir}/a.txt")).await.unwrap();
6161        kernel.execute(&format!("cd {dir}")).await.unwrap();
6162        // Disable glob expansion
6163        kernel.execute("set +o glob").await.unwrap();
6164        let result = kernel.execute("echo *.txt").await.unwrap();
6165        // With glob disabled, *.txt should be passed as literal string
6166        assert!(result.ok(), "echo should succeed: {}", result.err);
6167        assert_eq!(result.text_out().trim(), "*.txt", "should be literal: {}", result.text_out());
6168        // cleanup
6169        kernel.execute("set -o glob").await.unwrap();
6170        kernel.execute(&format!("rm {dir}/a.txt")).await.unwrap();
6171    }
6172
6173    #[tokio::test]
6174    async fn test_bare_glob_quoted_not_expanded() {
6175        let kernel = Kernel::transient().expect("kernel");
6176        let dir = format!("/tmp/kaish_test_bareglob_quoted_{}", std::process::id());
6177        kernel.execute(&format!("mkdir -p {dir}")).await.unwrap();
6178        kernel.execute(&format!("echo a > {dir}/a.txt")).await.unwrap();
6179        kernel.execute(&format!("cd {dir}")).await.unwrap();
6180        // Quoted globs should NOT expand
6181        let result = kernel.execute("echo \"*.txt\"").await.unwrap();
6182        assert!(result.ok(), "echo should succeed: {}", result.err);
6183        assert_eq!(result.text_out().trim(), "*.txt", "quoted should be literal: {}", result.text_out());
6184        // cleanup
6185        kernel.execute(&format!("rm {dir}/a.txt")).await.unwrap();
6186    }
6187
6188    #[tokio::test]
6189    async fn test_bare_glob_for_loop() {
6190        let kernel = Kernel::transient().expect("kernel");
6191        let dir = format!("/tmp/kaish_test_bareglob_forloop_{}", std::process::id());
6192        kernel.execute(&format!("mkdir -p {dir}")).await.unwrap();
6193        kernel.execute(&format!("echo a > {dir}/a.txt")).await.unwrap();
6194        kernel.execute(&format!("echo b > {dir}/b.txt")).await.unwrap();
6195        kernel.execute(&format!("cd {dir}")).await.unwrap();
6196        let result = kernel.execute(r#"
6197            N=0
6198            for f in *.txt; do
6199                N=$((N + 1))
6200            done
6201            echo $N
6202        "#).await.unwrap();
6203        assert!(result.ok(), "for loop failed: {}", result.err);
6204        assert_eq!(result.text_out().trim(), "2", "should iterate 2 files: {}", result.text_out());
6205        // cleanup
6206        kernel.execute(&format!("rm {dir}/a.txt")).await.unwrap();
6207        kernel.execute(&format!("rm {dir}/b.txt")).await.unwrap();
6208    }
6209
6210    #[tokio::test]
6211    async fn test_glob_in_assignment_is_literal() {
6212        let kernel = Kernel::transient().expect("kernel");
6213        let result = kernel.execute("X=*.txt; echo $X").await.unwrap();
6214        assert!(result.ok());
6215        assert_eq!(result.text_out().trim(), "*.txt", "glob in assignment should be literal");
6216    }
6217
6218    #[tokio::test]
6219    async fn test_glob_in_test_expr_is_literal() {
6220        let kernel = Kernel::transient().expect("kernel");
6221        let result = kernel.execute(r#"
6222            if [[ *.txt == "*.txt" ]]; then
6223                echo "match"
6224            else
6225                echo "no"
6226            fi
6227        "#).await.unwrap();
6228        assert!(result.ok());
6229        assert_eq!(result.text_out().trim(), "match", "glob in test expr should be literal");
6230    }
6231
6232    #[tokio::test]
6233    async fn test_command_subst_echo_not_iterable() {
6234        // Regression guard: $(echo "a b c") must remain a single string
6235        let kernel = Kernel::transient().expect("kernel");
6236        let result = kernel.execute(r#"
6237            N=0
6238            for X in $(echo "a b c"); do N=$((N + 1)); done
6239            echo $N
6240        "#).await.unwrap();
6241        assert!(result.ok());
6242        assert_eq!(result.text_out().trim(), "1", "echo should be one item: {}", result.text_out());
6243    }
6244
6245    // -- accumulate_result / newline tests --
6246
6247    #[test]
6248    fn test_accumulate_no_double_newlines() {
6249        // When output already ends with \n, accumulate should not add another
6250        let mut acc = ExecResult::success("line1\n");
6251        let new = ExecResult::success("line2\n");
6252        accumulate_result(&mut acc, &new);
6253        assert_eq!(&*acc.text_out(), "line1\nline2\n");
6254        assert!(!acc.text_out().contains("\n\n"), "should not have double newlines: {:?}", acc.text_out());
6255    }
6256
6257    #[test]
6258    fn test_accumulate_adds_separator_when_needed() {
6259        // When output does NOT end with \n, accumulate adds one
6260        let mut acc = ExecResult::success("line1");
6261        let new = ExecResult::success("line2");
6262        accumulate_result(&mut acc, &new);
6263        assert_eq!(&*acc.text_out(), "line1\nline2");
6264    }
6265
6266    #[test]
6267    fn test_accumulate_empty_into_nonempty() {
6268        let mut acc = ExecResult::success("");
6269        let new = ExecResult::success("hello\n");
6270        accumulate_result(&mut acc, &new);
6271        assert_eq!(&*acc.text_out(), "hello\n");
6272    }
6273
6274    #[test]
6275    fn test_accumulate_nonempty_into_empty() {
6276        let mut acc = ExecResult::success("hello\n");
6277        let new = ExecResult::success("");
6278        accumulate_result(&mut acc, &new);
6279        assert_eq!(&*acc.text_out(), "hello\n");
6280    }
6281
6282    #[test]
6283    fn test_accumulate_stderr_no_double_newlines() {
6284        let mut acc = ExecResult::failure(1, "err1\n");
6285        let new = ExecResult::failure(1, "err2\n");
6286        accumulate_result(&mut acc, &new);
6287        assert!(!acc.err.contains("\n\n"), "stderr should not have double newlines: {:?}", acc.err);
6288    }
6289
6290    #[tokio::test]
6291    async fn test_multiple_echo_no_blank_lines() {
6292        let kernel = Kernel::transient().expect("kernel");
6293        let result = kernel
6294            .execute("echo one\necho two\necho three")
6295            .await
6296            .expect("execution failed");
6297        assert!(result.ok());
6298        assert_eq!(&*result.text_out(), "one\ntwo\nthree\n");
6299    }
6300
6301    #[tokio::test]
6302    async fn test_for_loop_no_blank_lines() {
6303        let kernel = Kernel::transient().expect("kernel");
6304        let result = kernel
6305            .execute(r#"for X in a b c; do echo "item: ${X}"; done"#)
6306            .await
6307            .expect("execution failed");
6308        assert!(result.ok());
6309        assert_eq!(&*result.text_out(), "item: a\nitem: b\nitem: c\n");
6310    }
6311
6312    #[tokio::test]
6313    async fn test_for_command_subst_no_blank_lines() {
6314        let kernel = Kernel::transient().expect("kernel");
6315        let result = kernel
6316            .execute(r#"for N in $(seq 1 3); do echo "n=${N}"; done"#)
6317            .await
6318            .expect("execution failed");
6319        assert!(result.ok());
6320        assert_eq!(&*result.text_out(), "n=1\nn=2\nn=3\n");
6321    }
6322
6323    // ------------------------------------------------------------------
6324    // build_args_async: multi-consume flags (jq --arg NAME VALUE pattern)
6325    // ------------------------------------------------------------------
6326
6327    /// Helper: a throwaway schema with one `--pair` param declared as
6328    /// consuming two positionals per occurrence. Modelled after what
6329    /// jq_native will declare for `--arg` / `--argjson`.
6330    fn multi_consume_schema() -> crate::tools::ToolSchema {
6331        use crate::tools::{ParamSchema, ToolSchema};
6332        ToolSchema::new("test", "multi-consume smoke")
6333            .param(
6334                ParamSchema::optional("pair", "array", Value::Null, "name+value pair")
6335                    .consumes(2),
6336            )
6337    }
6338
6339    fn pos(s: &str) -> Arg {
6340        Arg::Positional(Expr::Literal(Value::String(s.to_string())))
6341    }
6342
6343    #[tokio::test]
6344    async fn build_args_multi_consume_single_occurrence() {
6345        let kernel = Kernel::transient().expect("kernel");
6346        let schema = multi_consume_schema();
6347        // Simulates:  test --pair NAME VALUE filter
6348        let args = vec![
6349            Arg::LongFlag("pair".into()),
6350            pos("NAME"),
6351            pos("VALUE"),
6352            pos("filter"),
6353        ];
6354        let built = kernel
6355            .build_args_async(&args, Some(&schema))
6356            .await
6357            .expect("build_args should succeed");
6358
6359        // `--pair` + its two positionals are consumed into named["pair"],
6360        // which becomes an outer array of one inner 2-element array.
6361        let pair = built.named.get("pair").expect("named[pair] missing");
6362        match pair {
6363            Value::Json(serde_json::Value::Array(occurrences)) => {
6364                assert_eq!(occurrences.len(), 1, "expected one occurrence");
6365                match &occurrences[0] {
6366                    serde_json::Value::Array(values) => {
6367                        assert_eq!(values.len(), 2, "pair must have 2 values");
6368                        assert_eq!(values[0], serde_json::Value::String("NAME".into()));
6369                        assert_eq!(values[1], serde_json::Value::String("VALUE".into()));
6370                    }
6371                    other => panic!("expected inner array, got {other:?}"),
6372                }
6373            }
6374            other => panic!("expected Json(Array(...)) for named[pair], got {other:?}"),
6375        }
6376
6377        // The un-consumed positional ("filter") remains in `positional`.
6378        assert_eq!(built.positional.len(), 1);
6379        assert_eq!(built.positional[0], Value::String("filter".into()));
6380    }
6381    #[tokio::test]
6382    async fn build_args_multi_consume_two_occurrences_accumulate() {
6383        let kernel = Kernel::transient().expect("kernel");
6384        let schema = multi_consume_schema();
6385        // Simulates:  test --pair A 1 --pair B 2 filter
6386        let args = vec![
6387            Arg::LongFlag("pair".into()),
6388            pos("A"),
6389            pos("1"),
6390            Arg::LongFlag("pair".into()),
6391            pos("B"),
6392            pos("2"),
6393            pos("filter"),
6394        ];
6395        let built = kernel
6396            .build_args_async(&args, Some(&schema))
6397            .await
6398            .expect("build_args should succeed");
6399
6400        let pair = built.named.get("pair").expect("named[pair] missing");
6401        match pair {
6402            Value::Json(serde_json::Value::Array(occurrences)) => {
6403                assert_eq!(occurrences.len(), 2, "expected two occurrences");
6404                // Preserved in invocation order.
6405                match &occurrences[0] {
6406                    serde_json::Value::Array(values) => {
6407                        assert_eq!(values[0], serde_json::Value::String("A".into()));
6408                        assert_eq!(values[1], serde_json::Value::String("1".into()));
6409                    }
6410                    other => panic!("expected inner array, got {other:?}"),
6411                }
6412                match &occurrences[1] {
6413                    serde_json::Value::Array(values) => {
6414                        assert_eq!(values[0], serde_json::Value::String("B".into()));
6415                        assert_eq!(values[1], serde_json::Value::String("2".into()));
6416                    }
6417                    other => panic!("expected inner array, got {other:?}"),
6418                }
6419            }
6420            other => panic!("expected Json(Array(...)), got {other:?}"),
6421        }
6422    }
6423
6424    // ── undeclared space-form flag under map_positionals (kj --type val) ──
6425    //
6426    // A backend/MCP tool whose schema does NOT declare a flag must not let
6427    // `--flag value` (space form) silently divorce the value: that was a
6428    // privilege-escalation-by-typo against kaijutsu (see docs/issues.md).
6429    // kaish fails loud rather than guessing.
6430
6431    use crate::tools::{ParamSchema, ToolSchema};
6432
6433    /// Backend-style schema (map_positionals) declaring only a `name`
6434    /// positional — `--type` is intentionally undeclared.
6435    fn kj_like_schema() -> ToolSchema {
6436        ToolSchema::new("kj", "incomplete backend schema")
6437            .param(ParamSchema::optional("name", "string", Value::Null, "context name"))
6438            .with_positional_mapping()
6439    }
6440
6441    #[tokio::test]
6442    async fn build_args_undeclared_space_flag_errors_under_map_positionals() {
6443        let kernel = Kernel::transient().expect("kernel");
6444        let schema = kj_like_schema();
6445        // kj context create exp --type explorer
6446        let args = vec![
6447            pos("context"),
6448            pos("create"),
6449            pos("exp"),
6450            Arg::LongFlag("type".into()),
6451            pos("explorer"),
6452        ];
6453        let err = kernel
6454            .build_args_async(&args, Some(&schema))
6455            .await
6456            .expect_err("undeclared --type with a space value must fail loud");
6457        let msg = err.to_string();
6458        assert!(msg.contains("--type"), "message should name the flag: {msg}");
6459        assert!(msg.contains("--type=explorer"), "message should suggest the = form: {msg}");
6460        assert!(msg.contains("kj"), "message should name the tool: {msg}");
6461    }
6462
6463    #[tokio::test]
6464    async fn build_args_declared_space_flag_still_binds() {
6465        let kernel = Kernel::transient().expect("kernel");
6466        // Same tool, but now the schema DECLARES --type as a string param.
6467        let schema = ToolSchema::new("kj", "complete schema")
6468            .param(ParamSchema::optional("name", "string", Value::Null, "context name"))
6469            .param(ParamSchema::optional("type", "string", Value::Null, "role type"))
6470            .with_positional_mapping();
6471        let args = vec![
6472            pos("exp"),
6473            Arg::LongFlag("type".into()),
6474            pos("explorer"),
6475        ];
6476        let built = kernel.build_args_async(&args, Some(&schema)).await.unwrap();
6477        assert_eq!(built.named.get("type"), Some(&Value::String("explorer".into())));
6478    }
6479
6480    #[tokio::test]
6481    async fn build_args_equals_form_binds_for_undeclared_flag() {
6482        let kernel = Kernel::transient().expect("kernel");
6483        let schema = kj_like_schema();
6484        // The unambiguous `=` form must keep working even when undeclared.
6485        let args = vec![
6486            pos("exp"),
6487            Arg::Named { key: "type".into(), value: Expr::Literal(Value::String("explorer".into())) },
6488        ];
6489        let built = kernel.build_args_async(&args, Some(&schema)).await.unwrap();
6490        assert_eq!(built.named.get("type"), Some(&Value::String("explorer".into())));
6491    }
6492
6493    #[tokio::test]
6494    async fn build_args_undeclared_bool_flag_at_end_is_ok() {
6495        let kernel = Kernel::transient().expect("kernel");
6496        let schema = kj_like_schema();
6497        // No positional follows --force → unambiguously a bare flag.
6498        let args = vec![pos("exp"), Arg::LongFlag("force".into())];
6499        let built = kernel.build_args_async(&args, Some(&schema)).await.unwrap();
6500        assert!(built.flags.contains("force"));
6501    }
6502
6503    #[tokio::test]
6504    async fn build_args_undeclared_flag_before_another_flag_is_ok() {
6505        let kernel = Kernel::transient().expect("kernel");
6506        let schema = kj_like_schema();
6507        // --verbose is followed by a flag, not a positional → not ambiguous.
6508        let args = vec![
6509            Arg::LongFlag("verbose".into()),
6510            Arg::Named { key: "name".into(), value: Expr::Literal(Value::String("x".into())) },
6511        ];
6512        let built = kernel.build_args_async(&args, Some(&schema)).await.unwrap();
6513        assert!(built.flags.contains("verbose"));
6514    }
6515
6516    #[tokio::test]
6517    async fn build_args_undeclared_space_flag_ok_for_builtin_schema() {
6518        let kernel = Kernel::transient().expect("kernel");
6519        // Builtins set map_positionals=false; the ambiguity guard must not
6520        // fire there (clap validates their flags separately).
6521        let schema = ToolSchema::new("frobnicate", "builtin-style")
6522            .param(ParamSchema::optional("name", "string", Value::Null, "name"));
6523        let args = vec![Arg::LongFlag("frob".into()), pos("value")];
6524        let built = kernel.build_args_async(&args, Some(&schema)).await.unwrap();
6525        assert!(built.flags.contains("frob"));
6526    }
6527
6528    // ── subcommand-aware binding (select_leaf wired into build_args_async) ──
6529    //
6530    // A tool exposing a subcommand tree binds flags against the *routed leaf's*
6531    // params, not the root's. The subcommand-path positionals stay positional
6532    // (kj re-parses them with its own clap), and a value flag declared only on
6533    // a deep leaf still binds in space form.
6534
6535    /// kj → context (alias ctx) → create{--type value, --force bool}.
6536    /// map_positionals defaults false on every node (builtin/kj style).
6537    fn kj_tree_schema() -> ToolSchema {
6538        ToolSchema::new("kj", "subcommand tool").subcommand(
6539            ToolSchema::new("context", "context ops")
6540                .with_command_aliases(["ctx"])
6541                .subcommand(
6542                    ToolSchema::new("create", "create context")
6543                        .param(ParamSchema::new("type", "string").with_aliases(["t"]))
6544                        .param(ParamSchema::new("force", "bool")),
6545                ),
6546        )
6547    }
6548
6549    #[tokio::test]
6550    async fn build_args_binds_deep_leaf_value_flag_space_form() {
6551        let kernel = Kernel::transient().expect("kernel");
6552        let schema = kj_tree_schema();
6553        // kj context create --type explorer
6554        let args = vec![
6555            pos("context"),
6556            pos("create"),
6557            Arg::LongFlag("type".into()),
6558            pos("explorer"),
6559        ];
6560        let built = kernel.build_args_async(&args, Some(&schema)).await.expect("build_args");
6561        // --type (declared only on the create leaf) binds in space form.
6562        assert_eq!(built.named.get("type"), Some(&Value::String("explorer".into())));
6563        // The subcommand path survives as positionals for kj to re-parse.
6564        let positionals: Vec<&str> = built
6565            .positional
6566            .iter()
6567            .filter_map(|v| if let Value::String(s) = v { Some(s.as_str()) } else { None })
6568            .collect();
6569        assert_eq!(positionals, vec!["context", "create"]);
6570    }
6571
6572    #[tokio::test]
6573    async fn build_args_leaf_bool_flag_does_not_swallow_positional() {
6574        let kernel = Kernel::transient().expect("kernel");
6575        let schema = kj_tree_schema();
6576        // kj context create --force somearg  → --force is a leaf bool flag,
6577        // it must NOT consume `somearg`.
6578        let args = vec![
6579            pos("context"),
6580            pos("create"),
6581            Arg::LongFlag("force".into()),
6582            pos("somearg"),
6583        ];
6584        let built = kernel.build_args_async(&args, Some(&schema)).await.expect("build_args");
6585        assert!(built.flags.contains("force"), "force should be a bare flag");
6586        let positionals: Vec<&str> = built
6587            .positional
6588            .iter()
6589            .filter_map(|v| if let Value::String(s) = v { Some(s.as_str()) } else { None })
6590            .collect();
6591        assert_eq!(positionals, vec!["context", "create", "somearg"]);
6592    }
6593
6594    #[tokio::test]
6595    async fn build_args_alias_routed_leaf_binds_value_flag() {
6596        let kernel = Kernel::transient().expect("kernel");
6597        let schema = kj_tree_schema();
6598        // kj ctx create -t explorer  → command alias + short flag alias.
6599        let args = vec![
6600            pos("ctx"),
6601            pos("create"),
6602            Arg::ShortFlag("t".into()),
6603            pos("explorer"),
6604        ];
6605        let built = kernel.build_args_async(&args, Some(&schema)).await.expect("build_args");
6606        assert_eq!(built.named.get("type"), Some(&Value::String("explorer".into())));
6607    }
6608
6609    #[tokio::test]
6610    async fn build_args_computed_subcommand_selector_fails_loud() {
6611        let kernel = Kernel::transient().expect("kernel");
6612        let schema = kj_tree_schema();
6613        // kj $(echo context) — routing can't see the value; fail loud.
6614        let args = vec![Arg::Positional(Expr::CommandSubst(Box::new(
6615            crate::ast::Pipeline { commands: vec![], background: false },
6616        )))];
6617        let err = kernel
6618            .build_args_async(&args, Some(&schema))
6619            .await
6620            .expect_err("computed subcommand selector must error");
6621        assert!(
6622            err.to_string().contains("subcommand name is required"),
6623            "got: {err}"
6624        );
6625    }
6626
6627    // ── finalize_output: --json rendering vs. owns_output opt-out ───────────
6628
6629    #[test]
6630    fn finalize_output_renders_when_kernel_owns_it() {
6631        use crate::interpreter::{OutputData, OutputFormat};
6632        let r = ExecResult::with_output(OutputData::text("RAW"));
6633        let out = finalize_output(r, Some(OutputFormat::Json), false);
6634        // Kernel renders the typed OutputData → JSON; text is no longer bare.
6635        assert_ne!(out.text_out(), "RAW", "kernel should reformat to JSON");
6636    }
6637
6638    #[test]
6639    fn finalize_output_skips_when_tool_owns_output() {
6640        use crate::interpreter::{OutputData, OutputFormat};
6641        let r = ExecResult::with_output(OutputData::text("RAW"));
6642        let out = finalize_output(r, Some(OutputFormat::Json), true);
6643        // owns_output: the tool already rendered; kernel leaves bytes untouched.
6644        assert_eq!(out.text_out(), "RAW", "owned output must be left as-is");
6645    }
6646
6647    #[test]
6648    fn finalize_output_no_format_is_noop() {
6649        use crate::interpreter::OutputData;
6650        let r = ExecResult::with_output(OutputData::text("RAW"));
6651        let out = finalize_output(r, None, false);
6652        assert_eq!(out.text_out(), "RAW");
6653    }
6654
6655    // ── initial_vars + execute_with_vars + hermetic env ───────────────────
6656
6657    #[tokio::test]
6658    async fn test_initial_vars_set_and_exported() {
6659        let config = KernelConfig::transient()
6660            .with_var("INIT_FOO", Value::String("bar".into()));
6661        let kernel = Kernel::new(config).expect("failed to create kernel");
6662
6663        assert_eq!(
6664            kernel.get_var("INIT_FOO").await,
6665            Some(Value::String("bar".into()))
6666        );
6667        assert!(
6668            kernel.scope.read().await.is_exported("INIT_FOO"),
6669            "initial_vars entries must be marked exported"
6670        );
6671    }
6672
6673    #[tokio::test]
6674    async fn test_execute_with_vars_overlay_visible() {
6675        let kernel = Kernel::transient().expect("failed to create kernel");
6676        let mut overlay = HashMap::new();
6677        overlay.insert("OVERLAY_X".to_string(), Value::String("yes".into()));
6678
6679        let result = kernel
6680            .execute_with_options(r#"echo "${OVERLAY_X}""#, ExecuteOptions::new().with_vars(overlay))
6681            .await
6682            .expect("execute failed");
6683
6684        assert!(result.ok());
6685        assert_eq!(result.text_out().trim(), "yes");
6686    }
6687
6688    #[tokio::test]
6689    async fn test_execute_with_vars_overlay_cleanup() {
6690        let kernel = Kernel::transient().expect("failed to create kernel");
6691        let mut overlay = HashMap::new();
6692        overlay.insert("EPHEMERAL".to_string(), Value::String("transient".into()));
6693
6694        kernel
6695            .execute_with_options("echo ignored", ExecuteOptions::new().with_vars(overlay))
6696            .await
6697            .expect("execute failed");
6698
6699        assert_eq!(kernel.get_var("EPHEMERAL").await, None);
6700        assert!(
6701            !kernel.scope.read().await.is_exported("EPHEMERAL"),
6702            "overlay-only export must be cleared on return"
6703        );
6704    }
6705
6706    #[tokio::test]
6707    async fn test_execute_with_vars_does_not_clobber_existing_export() {
6708        let kernel = Kernel::transient().expect("failed to create kernel");
6709        kernel
6710            .execute("export OUTER=outer")
6711            .await
6712            .expect("export failed");
6713
6714        let mut overlay = HashMap::new();
6715        overlay.insert("OUTER".to_string(), Value::String("inner".into()));
6716        let result = kernel
6717            .execute_with_options(r#"echo "${OUTER}""#, ExecuteOptions::new().with_vars(overlay))
6718            .await
6719            .expect("execute failed");
6720        assert_eq!(result.text_out().trim(), "inner");
6721
6722        assert_eq!(
6723            kernel.get_var("OUTER").await,
6724            Some(Value::String("outer".into())),
6725            "outer value must reappear after pop"
6726        );
6727        assert!(
6728            kernel.scope.read().await.is_exported("OUTER"),
6729            "outer export must survive overlay"
6730        );
6731    }
6732
6733    #[tokio::test]
6734    async fn test_execute_with_vars_inner_assignment_is_local() {
6735        let kernel = Kernel::transient().expect("failed to create kernel");
6736        let mut overlay = HashMap::new();
6737        overlay.insert("LOCAL_FOO".to_string(), Value::String("from-overlay".into()));
6738
6739        // Variable assignment inside a single statement uses set() (innermost
6740        // frame), not set_global() — this matches bash function-local semantics.
6741        // We explicitly use `local FOO=...` style by relying on the pushed
6742        // frame; the assignment in the script body modifies the same frame.
6743        let result = kernel
6744            .execute_with_options(
6745                r#"LOCAL_FOO="reassigned"; echo "${LOCAL_FOO}""#,
6746                ExecuteOptions::new().with_vars(overlay),
6747            )
6748            .await
6749            .expect("execute failed");
6750        assert!(result.ok());
6751
6752        // After the call the frame is popped, so LOCAL_FOO is gone regardless
6753        // of how the script reassigned it.
6754        assert_eq!(kernel.get_var("LOCAL_FOO").await, None);
6755    }
6756
6757    #[tokio::test]
6758    async fn test_external_command_sees_exported_var() {
6759        let kernel = Kernel::transient().expect("failed to create kernel");
6760        let result = kernel
6761            .execute("export EXT_FOO=bar; printenv EXT_FOO")
6762            .await
6763            .expect("execute failed");
6764
6765        assert!(result.ok(), "printenv should succeed: stderr={}", result.err);
6766        assert_eq!(result.text_out().trim(), "bar");
6767    }
6768
6769    #[tokio::test]
6770    async fn test_external_command_does_not_see_unexported_var() {
6771        let kernel = Kernel::transient().expect("failed to create kernel");
6772
6773        // Set without exporting; printenv must not see it (exit code != 0,
6774        // empty stdout per printenv semantics).
6775        let result = kernel
6776            .execute("EXT_BAR=hidden; printenv EXT_BAR")
6777            .await
6778            .expect("execute failed");
6779
6780        assert!(!result.ok(), "printenv should fail when var is unexported");
6781        assert!(
6782            result.text_out().trim().is_empty(),
6783            "no stdout when var is missing, got: {}",
6784            result.text_out()
6785        );
6786    }
6787
6788    #[tokio::test]
6789    async fn test_external_command_does_not_see_os_env() {
6790        // The kernel is hermetic: it never reads std::env::vars() and only
6791        // exports what it has been told to export. Cargo always sets PATH for
6792        // tests, so PATH is reliably present in the OS env — but a transient
6793        // kernel doesn't seed it into initial_vars, so `printenv PATH` from
6794        // inside the kernel must fail.
6795        assert!(
6796            std::env::var_os("PATH").is_some(),
6797            "test precondition: cargo should set PATH"
6798        );
6799
6800        let kernel = Kernel::transient().expect("failed to create kernel");
6801        let result = kernel
6802            .execute("printenv PATH")
6803            .await
6804            .expect("execute failed");
6805
6806        assert!(
6807            !result.ok(),
6808            "printenv PATH must fail in hermetic kernel, got stdout={:?}",
6809            result.text_out()
6810        );
6811        assert!(
6812            result.text_out().trim().is_empty(),
6813            "no PATH in subprocess env, got stdout={:?}",
6814            result.text_out()
6815        );
6816    }
6817
6818    #[tokio::test]
6819    async fn test_execute_with_vars_overlay_reaches_subprocess() {
6820        let kernel = Kernel::transient().expect("failed to create kernel");
6821        let mut overlay = HashMap::new();
6822        overlay.insert("SUB_FOO".to_string(), Value::String("subproc".into()));
6823
6824        let result = kernel
6825            .execute_with_options("printenv SUB_FOO", ExecuteOptions::new().with_vars(overlay))
6826            .await
6827            .expect("execute failed");
6828
6829        assert!(
6830            result.ok(),
6831            "printenv should succeed: code={} stdout={:?} stderr={:?}",
6832            result.code,
6833            result.text_out(),
6834            result.err
6835        );
6836        assert_eq!(result.text_out().trim(), "subproc");
6837    }
6838}