Skip to main content

kaish_kernel/
kernel.rs

1//! The Kernel (核) — the heart of kaish.
2//!
3//! The Kernel owns and coordinates all core components:
4//! - Interpreter state (scope, $?)
5//! - Tool registry (builtins, user tools)
6//! - VFS router (mount points)
7//! - Job manager (background jobs)
8//!
9//! # Architecture
10//!
11//! ```text
12//! ┌────────────────────────────────────────────────────────────┐
13//! │                         Kernel (核)                         │
14//! │  ┌──────────────┐  ┌──────────────┐  ┌──────────────────┐  │
15//! │  │   Scope      │  │ ToolRegistry │  │  VfsRouter       │  │
16//! │  │  (variables) │  │  (builtins,  │  │  (mount points)  │  │
17//! │  │              │  │   user tools)│  │                  │  │
18//! │  └──────────────┘  └──────────────┘  └──────────────────┘  │
19//! │  ┌──────────────────────────────┐  ┌──────────────────┐    │
20//! │  │  JobManager (background)     │  │  ExecResult ($?) │    │
21//! │  └──────────────────────────────┘  └──────────────────┘    │
22//! └────────────────────────────────────────────────────────────┘
23//! ```
24
25use std::collections::HashMap;
26use std::path::PathBuf;
27use std::sync::Arc;
28use std::sync::atomic::{AtomicU64, Ordering};
29use std::time::Duration;
30
31use anyhow::{Context, Result};
32use tokio::sync::RwLock;
33
34/// Monotonic counter assigned to each Kernel at construction time, exposed
35/// via `$$` / `${$}`. Starts at 1; each new Kernel gets the next value.
36/// `Kernel::fork()` inherits the parent's value (matching bash's "subshell
37/// keeps parent's $$" semantics) because forks clone the parent's Scope
38/// rather than calling `set_pid` again.
39///
40/// Deliberately *not* the OS PID — kaish runs as a long-lived MCP server
41/// or embedded inside other binaries (kaijutsu), where the host PID is
42/// meaningless to the script. See
43/// `~/.claude/projects/-home-atobey-src-kaish/memory/lang_dollar_dollar_identifier.md`
44/// for the design rationale.
45static KERNEL_COUNTER: AtomicU64 = AtomicU64::new(1);
46
47use async_trait::async_trait;
48
49use crate::ast::{Arg, Command, Expr, FileTestOp, Stmt, StringPart, TestExpr, ToolDef, Value, BinaryOp};
50pub use kaish_types::ExecuteOptions;
51use crate::backend::{BackendError, KernelBackend};
52use kaish_glob::glob_match;
53use crate::dispatch::{CommandDispatcher, PipelinePosition};
54use crate::interpreter::{apply_output_format, eval_expr, expand_tilde, json_to_value, value_to_bool, value_to_string, ControlFlow, ExecResult, Scope};
55use crate::parser::parse;
56use crate::scheduler::{is_bool_type, schema_param_lookup, stderr_stream, BoundedStream, JobManager, PipelineRunner, StderrReceiver};
57#[cfg(feature = "native")]
58use crate::scheduler::{drain_to_stream, DEFAULT_STREAM_MAX_SIZE};
59use crate::tools::{extract_output_format, register_builtins, ExecContext, ToolArgs, ToolRegistry};
60#[cfg(feature = "native")]
61use crate::tools::resolve_in_path;
62use crate::validator::{Severity, Validator};
63#[cfg(feature = "native")]
64use crate::vfs::LocalFs;
65use crate::vfs::{BuiltinFs, JobFs, MemoryFs, VfsRouter};
66
67/// VFS mount mode determines how the local filesystem is exposed.
68///
69/// Different modes trade off convenience vs. security:
70/// - `Passthrough` gives native path access (best for human REPL use)
71/// - `Sandboxed` restricts access to a subtree (safer for agents)
72/// - `NoLocal` provides complete isolation (tests, pure memory mode)
73#[derive(Debug, Clone)]
74pub enum VfsMountMode {
75    /// LocalFs at "/" — native paths work directly.
76    ///
77    /// Full filesystem access. Use for human-operated REPL sessions where
78    /// native paths like `/home/user/project` should just work.
79    ///
80    /// Mounts:
81    /// - `/` → LocalFs("/")
82    /// - `/v` → MemoryFs (blob storage)
83    #[cfg(feature = "native")]
84    Passthrough,
85
86    /// Transparent sandbox — paths look native but access is restricted.
87    ///
88    /// The local filesystem is mounted at its real path (e.g., `/home/user`),
89    /// so `/home/user/src/project` just works. But paths outside the sandbox
90    /// root are not accessible.
91    ///
92    /// **Note:** This only restricts VFS (builtin) operations. External commands
93    /// bypass the sandbox entirely — see [`KernelConfig::allow_external_commands`].
94    ///
95    /// Mounts:
96    /// - `/` → MemoryFs (catches paths outside sandbox)
97    /// - `{root}` → LocalFs(root)  (e.g., `/home/user` → LocalFs)
98    /// - `/tmp` → LocalFs("/tmp")
99    /// - `/v` → MemoryFs (blob storage)
100    #[cfg(feature = "native")]
101    Sandboxed {
102        /// Root path for local filesystem. Defaults to `$HOME`.
103        /// Can be restricted further, e.g., `~/src`.
104        root: Option<PathBuf>,
105    },
106
107    /// No local filesystem. Memory only.
108    ///
109    /// Complete isolation — no access to the host filesystem.
110    /// Useful for tests or pure sandboxed execution.
111    ///
112    /// Mounts:
113    /// - `/` → MemoryFs
114    /// - `/tmp` → MemoryFs
115    /// - `/v` → MemoryFs
116    NoLocal,
117}
118
119#[allow(clippy::derivable_impls)] // native has multiple variants; not derivable cross-feature
120impl Default for VfsMountMode {
121    fn default() -> Self {
122        #[cfg(feature = "native")]
123        { VfsMountMode::Sandboxed { root: None } }
124        #[cfg(not(feature = "native"))]
125        { VfsMountMode::NoLocal }
126    }
127}
128
129/// Configuration for kernel initialization.
130#[derive(Debug, Clone)]
131pub struct KernelConfig {
132    /// Name of this kernel (for identification).
133    pub name: String,
134
135    /// VFS mount mode — controls how local filesystem is exposed.
136    pub vfs_mode: VfsMountMode,
137
138    /// Initial working directory (VFS path).
139    pub cwd: PathBuf,
140
141    /// Whether to skip pre-execution validation.
142    ///
143    /// When false (default), scripts are validated before execution to catch
144    /// errors early. Set to true to skip validation for performance or to
145    /// allow dynamic/external commands.
146    pub skip_validation: bool,
147
148    /// When true, standalone external commands inherit stdio for real-time output.
149    ///
150    /// Set by script runner and REPL for human-visible output.
151    /// Not set by MCP server (output must be captured for structured responses).
152    pub interactive: bool,
153
154    /// Ignore file configuration for file-walking tools.
155    pub ignore_config: crate::ignore_config::IgnoreConfig,
156
157    /// Output size limit configuration for agent safety.
158    pub output_limit: crate::output_limit::OutputLimitConfig,
159
160    /// Whether external command execution (PATH lookup, `exec`, `spawn`) is allowed.
161    ///
162    /// When `true` (default), commands not found as builtins are resolved via PATH
163    /// and executed as child processes. When `false`, only kaish builtins and
164    /// backend-registered tools are available.
165    ///
166    /// **Security:** External commands bypass the VFS sandbox entirely — they see
167    /// the real filesystem, network, and environment. Set to `false` when running
168    /// untrusted input.
169    pub allow_external_commands: bool,
170
171    /// Enable confirmation latch for dangerous operations (set -o latch).
172    ///
173    /// When enabled, destructive operations like `rm` require nonce confirmation.
174    /// Can also be enabled at runtime with `set -o latch` or via `KAISH_LATCH=1`.
175    pub latch_enabled: bool,
176
177    /// Enable trash-on-delete for rm (set -o trash).
178    ///
179    /// When enabled, small files are moved to freedesktop.org Trash instead of
180    /// being permanently deleted. Can also be enabled at runtime with `set -o trash`
181    /// or via `KAISH_TRASH=1`.
182    pub trash_enabled: bool,
183
184    /// Shared nonce store for cross-request confirmation latch.
185    ///
186    /// When `Some`, the kernel uses this store instead of creating a fresh one.
187    /// This allows nonces issued in one MCP `execute()` call to be validated
188    /// in a subsequent call. When `None` (default), a fresh store is created.
189    pub nonce_store: Option<crate::nonce::NonceStore>,
190
191    /// Variables to populate the root scope with at construction, all marked
192    /// for export to child processes.
193    ///
194    /// The kernel itself is hermetic — it never reads `std::env::vars()` —
195    /// so frontends that want OS-env passthrough (REPL, MCP) populate this
196    /// from `std::env::vars()`. Embedders that want isolation pass nothing
197    /// (or only the keys they curate).
198    pub initial_vars: HashMap<String, Value>,
199
200    /// Default per-request timeout. When `Some`, every `execute_with_options`
201    /// call without an explicit `ExecuteOptions::timeout` uses this duration.
202    /// When elapsed, the kernel cancels the request, kills any external
203    /// children with the configured grace, and returns exit code 124.
204    ///
205    /// `None` means no default timeout — only explicit per-call timeouts apply.
206    pub request_timeout: Option<Duration>,
207
208    /// Grace period between SIGTERM and SIGKILL when killing an external
209    /// child on cancellation or timeout.
210    ///
211    /// Defaults to 2 seconds. Set to `Duration::ZERO` to escalate immediately
212    /// to SIGKILL. Long-shutdown processes (databases, etc.) may need more.
213    pub kill_grace: Duration,
214}
215
216/// Get the default sandbox root ($HOME).
217#[cfg(feature = "native")]
218fn default_sandbox_root() -> PathBuf {
219    std::env::var("HOME")
220        .map(PathBuf::from)
221        .unwrap_or_else(|_| PathBuf::from("/"))
222}
223
224impl Default for KernelConfig {
225    fn default() -> Self {
226        #[cfg(feature = "native")]
227        {
228            let home = default_sandbox_root();
229            Self {
230                name: "default".to_string(),
231                vfs_mode: VfsMountMode::Sandboxed { root: None },
232                cwd: home,
233                skip_validation: false,
234                interactive: false,
235                ignore_config: crate::ignore_config::IgnoreConfig::none(),
236                output_limit: crate::output_limit::OutputLimitConfig::none(),
237                allow_external_commands: true,
238                latch_enabled: std::env::var("KAISH_LATCH").is_ok_and(|v| v == "1"),
239                trash_enabled: std::env::var("KAISH_TRASH").is_ok_and(|v| v == "1"),
240                nonce_store: None,
241                initial_vars: HashMap::new(),
242                request_timeout: None,
243                kill_grace: Duration::from_secs(2),
244            }
245        }
246        #[cfg(not(feature = "native"))]
247        {
248            Self {
249                name: "default".to_string(),
250                vfs_mode: VfsMountMode::NoLocal,
251                cwd: PathBuf::from("/"),
252                skip_validation: false,
253                interactive: false,
254                ignore_config: crate::ignore_config::IgnoreConfig::none(),
255                output_limit: crate::output_limit::OutputLimitConfig::none(),
256                allow_external_commands: false,
257                latch_enabled: false,
258                trash_enabled: false,
259                nonce_store: None,
260                initial_vars: HashMap::new(),
261                request_timeout: None,
262                kill_grace: Duration::from_secs(2),
263            }
264        }
265    }
266}
267
268impl KernelConfig {
269    /// Create a transient kernel config (sandboxed, for temporary use).
270    #[cfg(feature = "native")]
271    pub fn transient() -> Self {
272        let home = default_sandbox_root();
273        Self {
274            name: "transient".to_string(),
275            vfs_mode: VfsMountMode::Sandboxed { root: None },
276            cwd: home,
277            skip_validation: false,
278            interactive: false,
279            ignore_config: crate::ignore_config::IgnoreConfig::none(),
280            output_limit: crate::output_limit::OutputLimitConfig::none(),
281            allow_external_commands: true,
282            latch_enabled: false,
283            trash_enabled: false,
284            nonce_store: None,
285            initial_vars: HashMap::new(),
286            request_timeout: None,
287            kill_grace: Duration::from_secs(2),
288        }
289    }
290
291    /// Create a transient kernel config (isolated, no-default-features).
292    #[cfg(not(feature = "native"))]
293    pub fn transient() -> Self {
294        Self::isolated()
295    }
296
297    /// Create a kernel config with the given name (sandboxed by default).
298    #[cfg(feature = "native")]
299    pub fn named(name: &str) -> Self {
300        let home = default_sandbox_root();
301        Self {
302            name: name.to_string(),
303            vfs_mode: VfsMountMode::Sandboxed { root: None },
304            cwd: home,
305            skip_validation: false,
306            interactive: false,
307            ignore_config: crate::ignore_config::IgnoreConfig::none(),
308            output_limit: crate::output_limit::OutputLimitConfig::none(),
309            allow_external_commands: true,
310            latch_enabled: false,
311            trash_enabled: false,
312            nonce_store: None,
313            initial_vars: HashMap::new(),
314            request_timeout: None,
315            kill_grace: Duration::from_secs(2),
316        }
317    }
318
319    /// Create a kernel config with the given name (isolated, no-default-features).
320    #[cfg(not(feature = "native"))]
321    pub fn named(name: &str) -> Self {
322        Self {
323            name: name.to_string(),
324            ..Self::isolated()
325        }
326    }
327
328    /// Create a REPL config with passthrough filesystem access.
329    ///
330    /// Native paths like `/home/user/project` work directly.
331    /// The cwd is set to the actual current working directory.
332    #[cfg(feature = "native")]
333    pub fn repl() -> Self {
334        let cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("/"));
335        Self {
336            name: "repl".to_string(),
337            vfs_mode: VfsMountMode::Passthrough,
338            cwd,
339            skip_validation: false,
340            interactive: false,
341            ignore_config: crate::ignore_config::IgnoreConfig::none(),
342            output_limit: crate::output_limit::OutputLimitConfig::none(),
343            allow_external_commands: true,
344            latch_enabled: std::env::var("KAISH_LATCH").is_ok_and(|v| v == "1"),
345            trash_enabled: std::env::var("KAISH_TRASH").is_ok_and(|v| v == "1"),
346            nonce_store: None,
347            initial_vars: HashMap::new(),
348            request_timeout: None,
349            kill_grace: Duration::from_secs(2),
350        }
351    }
352
353    /// Create an MCP server config with sandboxed filesystem access.
354    ///
355    /// Local filesystem is accessible at its real path (e.g., `/home/user`),
356    /// but sandboxed to `$HOME`. Paths outside the sandbox are not accessible
357    /// through builtins. External commands still access the real filesystem —
358    /// use `.with_allow_external_commands(false)` to block them.
359    #[cfg(feature = "native")]
360    pub fn mcp() -> Self {
361        let home = default_sandbox_root();
362        Self {
363            name: "mcp".to_string(),
364            vfs_mode: VfsMountMode::Sandboxed { root: None },
365            cwd: home,
366            skip_validation: false,
367            interactive: false,
368            ignore_config: crate::ignore_config::IgnoreConfig::mcp(),
369            output_limit: crate::output_limit::OutputLimitConfig::mcp(),
370            allow_external_commands: true,
371            latch_enabled: std::env::var("KAISH_LATCH").is_ok_and(|v| v == "1"),
372            trash_enabled: std::env::var("KAISH_TRASH").is_ok_and(|v| v == "1"),
373            nonce_store: None,
374            initial_vars: HashMap::new(),
375            request_timeout: None,
376            kill_grace: Duration::from_secs(2),
377        }
378    }
379
380    /// Create an MCP server config with a custom sandbox root.
381    ///
382    /// Use this to restrict access to a subdirectory like `~/src`.
383    #[cfg(feature = "native")]
384    pub fn mcp_with_root(root: PathBuf) -> Self {
385        Self {
386            name: "mcp".to_string(),
387            vfs_mode: VfsMountMode::Sandboxed { root: Some(root.clone()) },
388            cwd: root,
389            skip_validation: false,
390            interactive: false,
391            ignore_config: crate::ignore_config::IgnoreConfig::mcp(),
392            output_limit: crate::output_limit::OutputLimitConfig::mcp(),
393            allow_external_commands: true,
394            latch_enabled: std::env::var("KAISH_LATCH").is_ok_and(|v| v == "1"),
395            trash_enabled: std::env::var("KAISH_TRASH").is_ok_and(|v| v == "1"),
396            nonce_store: None,
397            initial_vars: HashMap::new(),
398            request_timeout: None,
399            kill_grace: Duration::from_secs(2),
400        }
401    }
402
403    /// Create a config with no local filesystem (memory only).
404    ///
405    /// Complete isolation: no local filesystem and external commands are disabled.
406    /// Useful for tests or pure sandboxed execution.
407    pub fn isolated() -> Self {
408        Self {
409            name: "isolated".to_string(),
410            vfs_mode: VfsMountMode::NoLocal,
411            cwd: PathBuf::from("/"),
412            skip_validation: false,
413            interactive: false,
414            ignore_config: crate::ignore_config::IgnoreConfig::none(),
415            output_limit: crate::output_limit::OutputLimitConfig::none(),
416            allow_external_commands: false,
417            latch_enabled: false,
418            trash_enabled: false,
419            nonce_store: None,
420            initial_vars: HashMap::new(),
421            request_timeout: None,
422            kill_grace: Duration::from_secs(2),
423        }
424    }
425
426    /// Set the VFS mount mode.
427    pub fn with_vfs_mode(mut self, mode: VfsMountMode) -> Self {
428        self.vfs_mode = mode;
429        self
430    }
431
432    /// Set the initial working directory.
433    pub fn with_cwd(mut self, cwd: PathBuf) -> Self {
434        self.cwd = cwd;
435        self
436    }
437
438    /// Skip pre-execution validation.
439    pub fn with_skip_validation(mut self, skip: bool) -> Self {
440        self.skip_validation = skip;
441        self
442    }
443
444    /// Enable interactive mode (external commands inherit stdio).
445    pub fn with_interactive(mut self, interactive: bool) -> Self {
446        self.interactive = interactive;
447        self
448    }
449
450    /// Set the ignore file configuration.
451    pub fn with_ignore_config(mut self, config: crate::ignore_config::IgnoreConfig) -> Self {
452        self.ignore_config = config;
453        self
454    }
455
456    /// Set the output limit configuration.
457    pub fn with_output_limit(mut self, config: crate::output_limit::OutputLimitConfig) -> Self {
458        self.output_limit = config;
459        self
460    }
461
462    /// Set whether external command execution is allowed.
463    ///
464    /// When `false`, commands not found as builtins produce "command not found"
465    /// instead of searching PATH. The `exec` and `spawn` builtins also return
466    /// errors. Use this to prevent VFS sandbox bypass via external binaries.
467    pub fn with_allow_external_commands(mut self, allow: bool) -> Self {
468        self.allow_external_commands = allow;
469        self
470    }
471
472    /// Enable or disable confirmation latch at startup.
473    pub fn with_latch(mut self, enabled: bool) -> Self {
474        self.latch_enabled = enabled;
475        self
476    }
477
478    /// Enable or disable trash-on-delete at startup.
479    pub fn with_trash(mut self, enabled: bool) -> Self {
480        self.trash_enabled = enabled;
481        self
482    }
483
484    /// Use a shared nonce store for cross-request confirmation latch.
485    ///
486    /// Pass a `NonceStore` that outlives individual kernel instances so nonces
487    /// issued in one MCP `execute()` call can be validated in subsequent calls.
488    pub fn with_nonce_store(mut self, store: crate::nonce::NonceStore) -> Self {
489        self.nonce_store = Some(store);
490        self
491    }
492
493    /// Add a single initial variable; marked exported when the kernel boots.
494    ///
495    /// Repeated calls add (last write wins on key collision).
496    pub fn with_var(mut self, name: impl Into<String>, value: Value) -> Self {
497        self.initial_vars.insert(name.into(), value);
498        self
499    }
500
501    /// Replace the entire initial-vars map. All entries are marked exported.
502    pub fn with_initial_vars(mut self, vars: HashMap<String, Value>) -> Self {
503        self.initial_vars = vars;
504        self
505    }
506
507    /// Extend the initial-vars map with the given entries (last write wins).
508    pub fn with_vars(mut self, vars: HashMap<String, Value>) -> Self {
509        self.initial_vars.extend(vars);
510        self
511    }
512
513    /// Set the default per-request timeout (kernel-wide).
514    ///
515    /// Each `execute_with_options` call without an explicit timeout uses
516    /// this. On elapsed, the kernel cancels and returns exit code 124.
517    pub fn with_request_timeout(mut self, timeout: Duration) -> Self {
518        self.request_timeout = Some(timeout);
519        self
520    }
521
522    /// Set the SIGTERM-to-SIGKILL grace period for child kills.
523    pub fn with_kill_grace(mut self, grace: Duration) -> Self {
524        self.kill_grace = grace;
525        self
526    }
527}
528
529/// The Kernel (核) — executes kaish code.
530///
531/// This is the primary interface for running kaish commands. It owns all
532/// the runtime state: variables, tools, VFS, jobs, and persistence.
533pub struct Kernel {
534    /// Kernel name.
535    name: String,
536    /// Variable scope.
537    scope: RwLock<Scope>,
538    /// Tool registry.
539    tools: Arc<ToolRegistry>,
540    /// User-defined tools (from `tool name { body }` statements).
541    user_tools: RwLock<HashMap<String, ToolDef>>,
542    /// Virtual filesystem router.
543    vfs: Arc<VfsRouter>,
544    /// Background job manager.
545    jobs: Arc<JobManager>,
546    /// Pipeline runner.
547    runner: PipelineRunner,
548    /// Execution context (cwd, stdin, etc.).
549    exec_ctx: RwLock<ExecContext>,
550    /// Whether to skip pre-execution validation.
551    skip_validation: bool,
552    /// When true, standalone external commands inherit stdio for real-time output.
553    interactive: bool,
554    /// Whether external command execution is allowed.
555    allow_external_commands: bool,
556    /// Default per-request timeout (None = no default).
557    request_timeout: Option<Duration>,
558    /// SIGTERM-to-SIGKILL grace period for child kills.
559    kill_grace: Duration,
560    /// Receiver for the kernel stderr stream.
561    ///
562    /// Pipeline stages write to the corresponding `StderrStream` (set on ExecContext).
563    /// The kernel drains this after each statement in `execute_streaming`.
564    stderr_receiver: tokio::sync::Mutex<StderrReceiver>,
565    /// Cancellation token for interrupting execution (Ctrl-C).
566    ///
567    /// Protected by `std::sync::Mutex` (not tokio) because the SIGINT handler
568    /// needs sync access. Each `execute()` call gets a fresh child token;
569    /// `cancel()` cancels the current token and replaces it.
570    cancel_token: std::sync::Mutex<tokio_util::sync::CancellationToken>,
571    /// Terminal state for job control (interactive mode only, Unix only).
572    #[cfg(all(unix, feature = "native"))]
573    terminal_state: Option<Arc<crate::terminal::TerminalState>>,
574    /// Weak self-reference for handing out `Arc<dyn CommandDispatcher>`.
575    ///
576    /// Set by `into_arc()`. Allows builtins to re-dispatch inner commands
577    /// through the full Kernel resolution chain.
578    self_weak: std::sync::OnceLock<std::sync::Weak<Self>>,
579    /// Serializes concurrent `execute()` / `execute_streaming()` callers on
580    /// this Kernel instance. Tokio's Mutex is fair (FIFO) and acts as the
581    /// queue. Background jobs, scatter workers, and concurrent pipeline
582    /// stages do NOT take this lock — they run against a *forked* Kernel
583    /// (see [`Kernel::fork`]) so they never contend with the foreground.
584    execute_lock: tokio::sync::Mutex<()>,
585}
586
587impl Kernel {
588    /// Create a new kernel with the given configuration.
589    pub fn new(config: KernelConfig) -> Result<Self> {
590        let mut vfs = Self::setup_vfs(&config);
591        let jobs = Arc::new(JobManager::new());
592
593        // Mount JobFs for job observability at /v/jobs
594        vfs.mount("/v/jobs", JobFs::new(jobs.clone()));
595
596        Self::assemble(config, vfs, jobs, |_| {}, |vfs_ref, tools| {
597            ExecContext::with_vfs_and_tools(vfs_ref.clone(), tools.clone())
598        })
599    }
600
601    /// Set up VFS based on mount mode.
602    fn setup_vfs(config: &KernelConfig) -> VfsRouter {
603        let mut vfs = VfsRouter::new();
604
605        match &config.vfs_mode {
606            #[cfg(feature = "native")]
607            VfsMountMode::Passthrough => {
608                // LocalFs at "/" — native paths work directly
609                vfs.mount("/", LocalFs::new(PathBuf::from("/")));
610                // Memory for blobs
611                vfs.mount("/v", MemoryFs::new());
612            }
613            #[cfg(feature = "native")]
614            VfsMountMode::Sandboxed { root } => {
615                // Memory at root for safety (catches paths outside sandbox)
616                vfs.mount("/", MemoryFs::new());
617                vfs.mount("/v", MemoryFs::new());
618
619                // Real /tmp for interop with other processes
620                vfs.mount("/tmp", LocalFs::new(PathBuf::from("/tmp")));
621
622                // Mount XDG runtime dir for spill files and socket access
623                let runtime = crate::paths::xdg_runtime_dir();
624                if runtime.exists() {
625                    let runtime_str = runtime.to_string_lossy().to_string();
626                    vfs.mount(&runtime_str, LocalFs::new(runtime));
627                }
628
629                // Resolve the sandbox root (defaults to $HOME)
630                let local_root = root.clone().unwrap_or_else(|| {
631                    std::env::var("HOME")
632                        .map(PathBuf::from)
633                        .unwrap_or_else(|_| PathBuf::from("/"))
634                });
635
636                // Mount at the real path for transparent access
637                // e.g., /home/atobey → LocalFs("/home/atobey")
638                // so /home/atobey/src/kaish just works
639                let mount_point = local_root.to_string_lossy().to_string();
640                vfs.mount(&mount_point, LocalFs::new(local_root));
641            }
642            VfsMountMode::NoLocal => {
643                // Pure memory mode — no local filesystem
644                vfs.mount("/", MemoryFs::new());
645                vfs.mount("/tmp", MemoryFs::new());
646                vfs.mount("/v", MemoryFs::new());
647            }
648        }
649
650        vfs
651    }
652
653    /// Create a transient kernel (no persistence).
654    pub fn transient() -> Result<Self> {
655        Self::new(KernelConfig::transient())
656    }
657
658    /// Create a kernel with a custom backend and `/v/*` virtual path support.
659    ///
660    /// This is the constructor for embedding kaish in other systems that provide
661    /// their own storage backend (e.g., CRDT-backed storage in kaijutsu).
662    ///
663    /// A `VirtualOverlayBackend` routes paths automatically:
664    /// - `/v/*` → Internal VFS (JobFs at `/v/jobs`, MemoryFs at `/v/blobs`)
665    /// - Everything else → Your custom backend
666    ///
667    /// The optional `configure_vfs` closure lets you add additional virtual mounts
668    /// (e.g., `/v/docs` for CRDT blocks) after the built-in mounts are set up.
669    ///
670    /// **Note:** The config's `vfs_mode` is ignored — all non-`/v/*` path routing
671    /// is handled by your custom backend. The config is only used for `name`, `cwd`,
672    /// `skip_validation`, and `interactive`.
673    ///
674    /// # Example
675    ///
676    /// ```ignore
677    /// // Simple: default /v/* mounts only
678    /// let kernel = Kernel::with_backend(backend, config, |_| {}, |_| {})?;
679    ///
680    /// // With custom mounts
681    /// let kernel = Kernel::with_backend(backend, config, |vfs| {
682    ///     vfs.mount_arc("/v/docs", docs_fs);
683    ///     vfs.mount_arc("/v/g", git_fs);
684    /// }, |_| {})?;
685    ///
686    /// // With custom tools
687    /// let kernel = Kernel::with_backend(backend, config, |_| {}, |tools| {
688    ///     tools.register(MyCustomTool::new());
689    /// })?;
690    /// ```
691    pub fn with_backend(
692        backend: Arc<dyn KernelBackend>,
693        config: KernelConfig,
694        configure_vfs: impl FnOnce(&mut VfsRouter),
695        configure_tools: impl FnOnce(&mut ToolRegistry),
696    ) -> Result<Self> {
697        use crate::backend::VirtualOverlayBackend;
698
699        let mut vfs = VfsRouter::new();
700        let jobs = Arc::new(JobManager::new());
701
702        vfs.mount("/v/jobs", JobFs::new(jobs.clone()));
703        vfs.mount("/v/blobs", MemoryFs::new());
704
705        // Let caller add custom mounts (e.g., /v/docs, /v/g)
706        configure_vfs(&mut vfs);
707
708        Self::assemble(config, vfs, jobs, configure_tools, |vfs_arc: &Arc<VfsRouter>, _: &Arc<ToolRegistry>| {
709            let overlay: Arc<dyn KernelBackend> =
710                Arc::new(VirtualOverlayBackend::new(backend, vfs_arc.clone()));
711            ExecContext::with_backend(overlay)
712        })
713    }
714
715    /// Shared assembly: wires up tools, runner, scope, and ExecContext.
716    ///
717    /// The `make_ctx` closure receives the VFS and tools so backends that need
718    /// them (like `LocalBackend::with_tools`) can capture them. Custom backends
719    /// that already have their own storage can ignore these parameters.
720    fn assemble(
721        config: KernelConfig,
722        mut vfs: VfsRouter,
723        jobs: Arc<JobManager>,
724        configure_tools: impl FnOnce(&mut ToolRegistry),
725        make_ctx: impl FnOnce(&Arc<VfsRouter>, &Arc<ToolRegistry>) -> ExecContext,
726    ) -> Result<Self> {
727        let KernelConfig { name, cwd, skip_validation, interactive, ignore_config, output_limit, allow_external_commands, latch_enabled, trash_enabled, nonce_store, initial_vars, request_timeout, kill_grace, .. } = config;
728
729        let mut tools = ToolRegistry::new();
730        register_builtins(&mut tools);
731        configure_tools(&mut tools);
732        let tools = Arc::new(tools);
733
734        // Mount BuiltinFs so `ls /v/bin` lists builtins
735        vfs.mount("/v/bin", BuiltinFs::new(tools.clone()));
736
737        let vfs = Arc::new(vfs);
738
739        let runner = PipelineRunner::new(tools.clone());
740
741        let (stderr_writer, stderr_receiver) = stderr_stream();
742
743        let mut exec_ctx = make_ctx(&vfs, &tools);
744        exec_ctx.set_cwd(cwd);
745        exec_ctx.set_job_manager(jobs.clone());
746        exec_ctx.set_tool_schemas(tools.schemas());
747        exec_ctx.set_tools(tools.clone());
748        #[cfg(feature = "native")]
749        exec_ctx.set_trash_backend(Arc::new(crate::trash_system::SystemTrash));
750        exec_ctx.stderr = Some(stderr_writer);
751        exec_ctx.ignore_config = ignore_config;
752        exec_ctx.output_limit = output_limit;
753        exec_ctx.allow_external_commands = allow_external_commands;
754        if let Some(store) = nonce_store {
755            exec_ctx.nonce_store = store;
756        }
757
758        Ok(Self {
759            name,
760            scope: RwLock::new({
761                let mut scope = Scope::new();
762                scope.set_pid(KERNEL_COUNTER.fetch_add(1, Ordering::Relaxed));
763                if let Ok(home) = std::env::var("HOME") {
764                    scope.set("HOME", Value::String(home));
765                }
766                // Apply caller-supplied initial variables, all marked exported.
767                // Frontends (REPL, MCP) populate this from std::env::vars()
768                // for shell-like UX; embedders that want hermetic behavior
769                // simply leave it empty.
770                for (name, value) in initial_vars {
771                    scope.set_exported(name, value);
772                }
773                scope.set_latch_enabled(latch_enabled);
774                scope.set_trash_enabled(trash_enabled);
775                scope
776            }),
777            tools,
778            user_tools: RwLock::new(HashMap::new()),
779            vfs,
780            jobs,
781            runner,
782            exec_ctx: RwLock::new(exec_ctx),
783            skip_validation,
784            interactive,
785            allow_external_commands,
786            request_timeout,
787            kill_grace,
788            stderr_receiver: tokio::sync::Mutex::new(stderr_receiver),
789            cancel_token: std::sync::Mutex::new(tokio_util::sync::CancellationToken::new()),
790            #[cfg(all(unix, feature = "native"))]
791            terminal_state: None,
792            self_weak: std::sync::OnceLock::new(),
793            execute_lock: tokio::sync::Mutex::new(()),
794        })
795    }
796
797    /// Get the kernel name.
798    pub fn name(&self) -> &str {
799        &self.name
800    }
801
802    /// Wrap this Kernel in an Arc and initialize its self-reference.
803    ///
804    /// This enables the Kernel to hand out `Arc<dyn CommandDispatcher>` references
805    /// to child contexts, allowing builtins like `timeout` to dispatch inner
806    /// commands through the full resolution chain (user tools → builtins →
807    /// .kai scripts → external commands).
808    pub fn into_arc(self) -> Arc<Self> {
809        let arc = Arc::new(self);
810        let _ = arc.self_weak.set(Arc::downgrade(&arc));
811        arc
812    }
813
814    /// Fork a subsidiary kernel for concurrent execution.
815    ///
816    /// The fork is a fully-functional `Kernel` that:
817    /// - **Snapshots** per-session state from the parent: scope (COW — cheap),
818    ///   user-defined tools, cwd, aliases, ignore config, etc. Mutations on
819    ///   the fork do NOT propagate back to the parent — matching bash
820    ///   subshell / background-job semantics.
821    /// - **Shares** read-mostly resources with the parent via `Arc`: the tool
822    ///   registry, the VFS router, and the job manager. A job registered by
823    ///   the fork is visible to the parent's `jobs` builtin, and the fork
824    ///   sees the same VFS mounts.
825    /// - **Owns** its own `stderr_receiver`, `cancel_token`, and
826    ///   `execute_lock`. It is never the TTY owner, so `interactive` is
827    ///   `false` and `terminal_state` is `None`.
828    ///
829    /// The returned Arc has its `self_weak` populated (via `into_arc`), so
830    /// nested dispatch through `ctx.dispatcher` (e.g. the `timeout` builtin)
831    /// routes through the fork itself, not the parent — which is essential
832    /// for concurrency safety.
833    ///
834    /// Use this for **detached** background concurrency where the fork should
835    /// survive parent cancellation: the `&` background-job operator and any
836    /// other "fire and forget" worker. The fork gets a fresh, independent
837    /// cancellation token.
838    ///
839    /// For foreground concurrency (scatter workers, concurrent pipeline
840    /// stages, `$(...)` cmdsubs) where parent timeout/cancel must cascade
841    /// into the fork's external children, use [`Self::fork_attached`].
842    pub async fn fork(&self) -> Arc<Self> {
843        self.fork_inner(tokio_util::sync::CancellationToken::new()).await
844    }
845
846    /// Fork attached to the parent's cancellation.
847    ///
848    /// Same as [`Self::fork`] but the fork's `cancel_token` is a child of
849    /// the parent's. When the parent cancels (request timeout, embedder
850    /// `Kernel::cancel`, etc.), the fork's token also cancels, which in
851    /// turn kills any external children spawned in the fork via the
852    /// `wait_or_kill` / SIGTERM-grace-SIGKILL path.
853    pub async fn fork_attached(&self) -> Arc<Self> {
854        let child_token = {
855            #[allow(clippy::expect_used)]
856            let parent = self.cancel_token.lock().expect("cancel_token poisoned");
857            parent.child_token()
858        };
859        self.fork_inner(child_token).await
860    }
861
862    /// Shared fork implementation. Caller decides the cancellation token.
863    async fn fork_inner(&self, cancel: tokio_util::sync::CancellationToken) -> Arc<Self> {
864        let scope_snapshot = self.scope.read().await.clone();
865        let user_tools_snapshot = self.user_tools.read().await.clone();
866
867        // Snapshot exec_ctx by cloning the cloneable fields, then override
868        // the ones that should not carry over (stderr channel, dispatcher,
869        // interactive flag, terminal state, cancel — set from `cancel` arg).
870        let mut fork_ctx = {
871            let parent_ctx = self.exec_ctx.read().await;
872            parent_ctx.child_for_pipeline()
873        };
874        let (stderr_writer, stderr_receiver) = stderr_stream();
875        fork_ctx.stderr = Some(stderr_writer);
876        // Clear dispatcher; dispatch_command will repopulate it to point at
877        // the fork on the first dispatch call.
878        fork_ctx.dispatcher = None;
879        fork_ctx.interactive = false;
880        fork_ctx.cancel = cancel.clone();
881        #[cfg(all(unix, feature = "native"))]
882        {
883            fork_ctx.terminal_state = None;
884        }
885
886        let fork = Self {
887            name: format!("{}:fork", self.name),
888            scope: RwLock::new(scope_snapshot),
889            tools: Arc::clone(&self.tools),
890            user_tools: RwLock::new(user_tools_snapshot),
891            vfs: Arc::clone(&self.vfs),
892            jobs: Arc::clone(&self.jobs),
893            runner: self.runner.clone(),
894            exec_ctx: RwLock::new(fork_ctx),
895            skip_validation: self.skip_validation,
896            // Forks are never the TTY owner — they run in the background.
897            interactive: false,
898            allow_external_commands: self.allow_external_commands,
899            request_timeout: self.request_timeout,
900            kill_grace: self.kill_grace,
901            stderr_receiver: tokio::sync::Mutex::new(stderr_receiver),
902            cancel_token: std::sync::Mutex::new(cancel),
903            #[cfg(all(unix, feature = "native"))]
904            terminal_state: None,
905            self_weak: std::sync::OnceLock::new(),
906            execute_lock: tokio::sync::Mutex::new(()),
907        };
908
909        fork.into_arc()
910    }
911
912    /// Get an `Arc<dyn CommandDispatcher>` to this Kernel, if wrapped via `into_arc()`.
913    ///
914    /// Returns `None` if the Kernel was not wrapped, or if all strong references
915    /// have been dropped (the `Weak` can no longer upgrade).
916    pub fn dispatcher(&self) -> Option<Arc<dyn CommandDispatcher>> {
917        self.self_weak
918            .get()
919            .and_then(|weak| weak.upgrade())
920            .map(|arc| arc as Arc<dyn CommandDispatcher>)
921    }
922
923    /// Initialize terminal state for interactive job control.
924    ///
925    /// Call this after kernel creation when running as an interactive REPL
926    /// and stdin is a TTY. Sets up process groups and signal handling.
927    #[cfg(all(unix, feature = "native"))]
928    pub fn init_terminal(&mut self) {
929        if !self.interactive {
930            return;
931        }
932        match crate::terminal::TerminalState::init() {
933            Ok(state) => {
934                let state = Arc::new(state);
935                self.terminal_state = Some(state.clone());
936                // Set on exec_ctx so builtins (fg, bg, kill) can access it
937                self.exec_ctx.get_mut().terminal_state = Some(state);
938                tracing::debug!("terminal job control initialized");
939            }
940            Err(e) => {
941                tracing::warn!("failed to initialize terminal job control: {}", e);
942            }
943        }
944    }
945
946    /// Cancel the current execution.
947    ///
948    /// This cancels the current cancellation token, causing any execution
949    /// loop to exit at the next checkpoint with exit code 130 (SIGINT).
950    /// A fresh token is installed for the next `execute()` call.
951    pub fn cancel(&self) {
952        #[allow(clippy::expect_used)]
953        let token = self.cancel_token.lock().expect("cancel_token poisoned");
954        token.cancel();
955    }
956
957    /// Check if the current execution has been cancelled.
958    pub fn is_cancelled(&self) -> bool {
959        #[allow(clippy::expect_used)]
960        let token = self.cancel_token.lock().expect("cancel_token poisoned");
961        token.is_cancelled()
962    }
963
964    /// Reset the cancellation token (called at the start of each execute).
965    fn reset_cancel(&self) -> tokio_util::sync::CancellationToken {
966        #[allow(clippy::expect_used)]
967        let mut token = self.cancel_token.lock().expect("cancel_token poisoned");
968        if token.is_cancelled() {
969            *token = tokio_util::sync::CancellationToken::new();
970        }
971        token.clone()
972    }
973
974    /// Acquire the per-Kernel execute lock, warning on contention.
975    ///
976    /// Tokio's Mutex is fair (FIFO) so callers queue in arrival order. When
977    /// the lock is already held, emit a warning so the silent serialization
978    /// is observable in logs — if you need real parallelism, fork the kernel.
979    async fn acquire_execute_lock(&self) -> tokio::sync::MutexGuard<'_, ()> {
980        match self.execute_lock.try_lock() {
981            Ok(guard) => guard,
982            Err(_) => {
983                tracing::warn!(
984                    target: "kaish::kernel::concurrency",
985                    kernel = %self.name,
986                    "execute() contended — serializing concurrent caller; \
987                     use Kernel::fork() for parallelism instead of sharing"
988                );
989                self.execute_lock.lock().await
990            }
991        }
992    }
993
994    /// Execute kaish source code with default options.
995    ///
996    /// Equivalent to `execute_with_options(input, ExecuteOptions::default())`.
997    /// Returns the result of the last statement executed.
998    pub async fn execute(&self, input: &str) -> Result<ExecResult> {
999        self.execute_with_options_inner(input, ExecuteOptions::default(), None).await
1000    }
1001
1002    /// Execute with per-call options. The primary entry point for embedders
1003    /// that don't need per-statement output streaming.
1004    ///
1005    /// `opts` carries timeout, transient vars overlay, optional cwd override,
1006    /// and optional embedder-owned cancellation token. See [`ExecuteOptions`]
1007    /// for semantics. For streaming, use [`Self::execute_with_options_streaming`].
1008    ///
1009    /// **Cancellation:** if `opts.cancel_token` is `Some`, it is *raced*
1010    /// against the kernel's internal token. Either firing cancels and kills
1011    /// external children. The embedder's token is read-only — kernel
1012    /// timeouts do NOT propagate into it. Distinguish via the returned
1013    /// `code`: 124 = timeout, 130 = cancellation.
1014    ///
1015    /// **Timeout:** `opts.timeout` overrides `KernelConfig::request_timeout`.
1016    /// `Some(Duration::ZERO)` returns 124 immediately without spawning.
1017    ///
1018    /// Concurrent callers on the same Kernel serialize on the kernel-wide
1019    /// execute lock. For true parallelism, call [`Kernel::fork`] (detached)
1020    /// or [`Kernel::fork_attached`] (cancellation cascades from this kernel).
1021    pub async fn execute_with_options(
1022        &self,
1023        input: &str,
1024        opts: ExecuteOptions,
1025    ) -> Result<ExecResult> {
1026        self.execute_with_options_inner(input, opts, None).await
1027    }
1028
1029    /// Same as [`Self::execute_with_options`] but with a per-statement output
1030    /// callback. The callback fires after each top-level statement so the
1031    /// embedder (REPL, MCP streaming) can flush output incrementally.
1032    pub async fn execute_with_options_streaming(
1033        &self,
1034        input: &str,
1035        opts: ExecuteOptions,
1036        on_output: &mut (dyn FnMut(&ExecResult) + Send),
1037    ) -> Result<ExecResult> {
1038        self.execute_with_options_inner(input, opts, Some(on_output)).await
1039    }
1040
1041    /// Execute kaish source code with a transient overlay of exported variables.
1042    ///
1043    /// Deprecated thin wrapper over [`Self::execute_with_options`]. New code
1044    /// should use that method directly:
1045    /// `execute_with_options(input, ExecuteOptions::new().with_vars(vars))`.
1046    #[deprecated(note = "use Kernel::execute_with_options with ExecuteOptions::with_vars")]
1047    pub async fn execute_with_vars(
1048        &self,
1049        input: &str,
1050        vars: HashMap<String, Value>,
1051    ) -> Result<ExecResult> {
1052        self.execute_with_options_inner(input, ExecuteOptions::new().with_vars(vars), None).await
1053    }
1054
1055    /// Execute kaish source code with a per-statement callback.
1056    ///
1057    /// Deprecated thin wrapper. New code should use
1058    /// [`Self::execute_with_options_streaming`].
1059    #[deprecated(note = "use Kernel::execute_with_options_streaming")]
1060    pub async fn execute_streaming(
1061        &self,
1062        input: &str,
1063        on_output: &mut (dyn FnMut(&ExecResult) + Send),
1064    ) -> Result<ExecResult> {
1065        self.execute_with_options_inner(input, ExecuteOptions::default(), Some(on_output)).await
1066    }
1067
1068    /// Shared body for `execute`, `execute_with_options(_streaming)`, and
1069    /// the deprecated wrappers. Owns the per-call cancel token, vars overlay,
1070    /// cwd override, and timeout race.
1071    #[tracing::instrument(level = "info", skip(self, opts, on_output), fields(input_len = input.len()))]
1072    async fn execute_with_options_inner(
1073        &self,
1074        input: &str,
1075        opts: ExecuteOptions,
1076        on_output: Option<&mut (dyn FnMut(&ExecResult) + Send)>,
1077    ) -> Result<ExecResult> {
1078        let _guard = self.acquire_execute_lock().await;
1079
1080        // Always reset to a fresh internal token; this is the kernel's own
1081        // cancel surface for embedders calling `Kernel::cancel()`. The
1082        // embedder-supplied `opts.cancel_token` is a *read-only input* — it
1083        // is NOT written into `self.cancel_token`, because doing so would
1084        // (a) leak the embedder's token past this call's lifetime,
1085        // (b) re-route a later `Kernel::cancel()` into the embedder's token,
1086        // (c) extend the token's lifetime via the kernel's strong clone.
1087        let internal = self.reset_cancel();
1088        // Race the embedder token against the kernel's internal token via a
1089        // tracked watcher task. We hold the JoinHandle so we can abort the
1090        // task at function exit — otherwise it would wait forever for either
1091        // token to fire and leak per call.
1092        let (effective_cancel, watcher_handle): (
1093            tokio_util::sync::CancellationToken,
1094            Option<tokio::task::JoinHandle<()>>,
1095        ) = if let Some(ext) = opts.cancel_token {
1096            let combined = tokio_util::sync::CancellationToken::new();
1097            let combined_writer = combined.clone();
1098            let i = internal.clone();
1099            let handle = tokio::spawn(async move {
1100                tokio::select! {
1101                    _ = i.cancelled() => combined_writer.cancel(),
1102                    _ = ext.cancelled() => combined_writer.cancel(),
1103                }
1104            });
1105            (combined, Some(handle))
1106        } else {
1107            (internal, None)
1108        };
1109
1110        // Effective timeout: per-call wins over kernel-config default.
1111        let timeout = opts.timeout.or(self.request_timeout);
1112
1113        // ZERO timeout: return 124 immediately without spawning anything.
1114        if timeout == Some(Duration::ZERO) {
1115            if let Some(h) = watcher_handle {
1116                h.abort();
1117            }
1118            return Ok(ExecResult::failure(124, "timeout: timed out after 0s".to_string()));
1119        }
1120
1121        // Apply per-call vars overlay (push frame + set_exported), wrapped in
1122        // an RAII guard so a panic inside `execute_streaming_inner` still
1123        // pops the frame and unexports the temporarily-exported names.
1124        struct VarsFrameGuard<'a> {
1125            kernel: &'a Kernel,
1126            newly_exported: Vec<String>,
1127        }
1128        impl Drop for VarsFrameGuard<'_> {
1129            fn drop(&mut self) {
1130                // Best-effort cleanup using try_write. The execute_lock held
1131                // throughout execute_with_options means there is no concurrent
1132                // foreground caller; forks have their own scope and won't
1133                // block this. blocking_write would deadlock the runtime when
1134                // called from a tokio worker thread, so we explicitly do NOT
1135                // fall back to it — if try_write fails (which we've never
1136                // seen in practice), log loudly and accept the leak rather
1137                // than deadlock the entire kernel.
1138                let Ok(mut scope) = self.kernel.scope.try_write() else {
1139                    tracing::error!(
1140                        "vars frame guard: scope lock unexpectedly busy; \
1141                         skipping pop_frame to avoid runtime deadlock — \
1142                         transient vars may leak"
1143                    );
1144                    return;
1145                };
1146                scope.pop_frame();
1147                for name in self.newly_exported.drain(..) {
1148                    scope.unexport(&name);
1149                }
1150            }
1151        }
1152
1153        // Per-call cwd override: save current cwd, set the new one, restore
1154        // on Drop so the kernel's persistent cwd doesn't leak between calls.
1155        // Same RAII pattern as VarsFrameGuard, same blocking_write trade-off.
1156        struct CwdGuard<'a> {
1157            kernel: &'a Kernel,
1158            saved: PathBuf,
1159        }
1160        impl Drop for CwdGuard<'_> {
1161            fn drop(&mut self) {
1162                let Ok(mut ec) = self.kernel.exec_ctx.try_write() else {
1163                    tracing::error!(
1164                        "cwd guard: exec_ctx lock unexpectedly busy; \
1165                         skipping cwd restore — kernel cwd may be wrong for next call"
1166                    );
1167                    return;
1168                };
1169                ec.cwd = std::mem::take(&mut self.saved);
1170            }
1171        }
1172        let _cwd_guard: Option<CwdGuard<'_>> = if let Some(new_cwd) = opts.cwd {
1173            let mut ec = self.exec_ctx.write().await;
1174            let saved = std::mem::replace(&mut ec.cwd, new_cwd);
1175            drop(ec);
1176            Some(CwdGuard { kernel: self, saved })
1177        } else {
1178            None
1179        };
1180
1181        let _vars_guard: Option<VarsFrameGuard<'_>> = if !opts.vars.is_empty() {
1182            let mut scope = self.scope.write().await;
1183            scope.push_frame();
1184            let mut newly = Vec::with_capacity(opts.vars.len());
1185            for (name, value) in opts.vars {
1186                if !scope.is_exported(&name) {
1187                    newly.push(name.clone());
1188                }
1189                scope.set_exported(name, value);
1190            }
1191            drop(scope);
1192            Some(VarsFrameGuard { kernel: self, newly_exported: newly })
1193        } else {
1194            None
1195        };
1196
1197        // Sync the effective cancel into self.exec_ctx so try_execute_external
1198        // (which reads via self.cancel_token) sees cancellation. We also need
1199        // builtins to see it via ctx.cancel — handled in execute_command.
1200        // For simplicity here we mirror effective_cancel into self.cancel_token
1201        // for the duration of this call, then restore the internal token at
1202        // the end (so a later Kernel::cancel still hits our internal surface).
1203        {
1204            #[allow(clippy::expect_used)]
1205            let mut cur = self.cancel_token.lock().expect("cancel_token poisoned");
1206            *cur = effective_cancel.clone();
1207        }
1208
1209        // Run inner with optional timeout. The timer task cancels our token
1210        // on elapsed; the cascade fires SIGTERM/SIGKILL on any external
1211        // children via the wait_or_kill discipline in try_execute_external.
1212        let mut noop_cb: Box<dyn FnMut(&ExecResult) + Send> = Box::new(|_| {});
1213        let cb_ref: &mut (dyn FnMut(&ExecResult) + Send) = match on_output {
1214            Some(cb) => cb,
1215            None => &mut *noop_cb,
1216        };
1217
1218        let result = if let Some(d) = timeout {
1219            let elapsed = Arc::new(std::sync::atomic::AtomicBool::new(false));
1220            let elapsed_writer = elapsed.clone();
1221            let timer_token = effective_cancel.clone();
1222            let timer = tokio::spawn(async move {
1223                tokio::time::sleep(d).await;
1224                elapsed_writer.store(true, std::sync::atomic::Ordering::SeqCst);
1225                timer_token.cancel();
1226            });
1227            let r = self.execute_streaming_inner(input, cb_ref).await;
1228            timer.abort();
1229            match r {
1230                Ok(mut res) => {
1231                    if elapsed.load(std::sync::atomic::Ordering::SeqCst) {
1232                        res.code = 124;
1233                        if res.err.is_empty() {
1234                            res.err = format!("timeout: timed out after {:?}", d);
1235                        }
1236                    }
1237                    Ok(res)
1238                }
1239                Err(e) => Err(e),
1240            }
1241        } else {
1242            self.execute_streaming_inner(input, cb_ref).await
1243        };
1244
1245        // Restore self.cancel_token to a fresh, uncancelled token so the
1246        // embedder's view of `Kernel::cancel()` stays predictable on the
1247        // next call (it cancels the kernel's own token, not whatever was
1248        // left over from this call's combined token).
1249        {
1250            #[allow(clippy::expect_used)]
1251            let mut cur = self.cancel_token.lock().expect("cancel_token poisoned");
1252            *cur = tokio_util::sync::CancellationToken::new();
1253        }
1254
1255        // Tear down the embedder-token race watcher (if any). Leaving it
1256        // alive would idle forever waiting for tokens that may never fire.
1257        if let Some(h) = watcher_handle {
1258            h.abort();
1259        }
1260
1261        // VarsFrameGuard drops here on the success path and on early-return
1262        // paths above (error path included). Panic safety preserved.
1263        result
1264    }
1265
1266    /// The actual body of `execute_streaming`, run while holding the execute lock.
1267    ///
1268    /// Split out so internal kernel paths that are already under the lock can
1269    /// call this without deadlocking on re-entry. External callers must go
1270    /// through [`Self::execute_streaming`] so they acquire the lock.
1271    async fn execute_streaming_inner(
1272        &self,
1273        input: &str,
1274        on_output: &mut (dyn FnMut(&ExecResult) + Send),
1275    ) -> Result<ExecResult> {
1276        let program = parse(input).map_err(|errors| {
1277            let msg = errors
1278                .iter()
1279                .map(|e| e.format(input))
1280                .collect::<Vec<_>>()
1281                .join("\n");
1282            anyhow::anyhow!("parse error:\n{}", msg)
1283        })?;
1284
1285        // AST display mode: show AST instead of executing
1286        {
1287            let scope = self.scope.read().await;
1288            if scope.show_ast() {
1289                let output = format!("{:#?}\n", program);
1290                return Ok(ExecResult::with_output(crate::interpreter::OutputData::text(output)));
1291            }
1292        }
1293
1294        // Pre-execution validation
1295        if !self.skip_validation {
1296            let user_tools = self.user_tools.read().await;
1297            let validator = Validator::new(&self.tools, &user_tools);
1298            let issues = validator.validate(&program);
1299
1300            // Collect errors (warnings are logged but don't prevent execution)
1301            let errors: Vec<_> = issues
1302                .iter()
1303                .filter(|i| i.severity == Severity::Error)
1304                .collect();
1305
1306            if !errors.is_empty() {
1307                let error_msg = errors
1308                    .iter()
1309                    .map(|e| e.format(input))
1310                    .collect::<Vec<_>>()
1311                    .join("\n");
1312                return Err(anyhow::anyhow!("validation failed:\n{}", error_msg));
1313            }
1314
1315            // Log warnings via tracing (trace level to avoid noise)
1316            for warning in issues.iter().filter(|i| i.severity == Severity::Warning) {
1317                tracing::trace!("validation: {}", warning.format(input));
1318            }
1319        }
1320
1321        let mut result = ExecResult::success("");
1322
1323        // Reset cancellation token for this execution.
1324        let cancel = self.reset_cancel();
1325
1326        for stmt in program.statements {
1327            if matches!(stmt, Stmt::Empty) {
1328                continue;
1329            }
1330
1331            // Cancellation checkpoint
1332            if cancel.is_cancelled() {
1333                result.code = 130;
1334                return Ok(result);
1335            }
1336
1337            let flow = self.execute_stmt_flow(&stmt).await?;
1338
1339            // Drain any stderr written by pipeline stages during this statement.
1340            // This captures stderr from intermediate pipeline stages that would
1341            // otherwise be lost (only the last stage's result is returned).
1342            let drained_stderr = {
1343                let mut receiver = self.stderr_receiver.lock().await;
1344                receiver.drain_lossy()
1345            };
1346
1347            match flow {
1348                ControlFlow::Normal(mut r) => {
1349                    if !drained_stderr.is_empty() {
1350                        if !r.err.is_empty() && !r.err.ends_with('\n') {
1351                            r.err.push('\n');
1352                        }
1353                        // Prepend pipeline stderr before the last stage's stderr
1354                        let combined = format!("{}{}", drained_stderr, r.err);
1355                        r.err = combined;
1356                    }
1357                    on_output(&r);
1358                    // Carry the last statement's structured output for MCP TOON encoding.
1359                    // Must be done here (not in accumulate_result) because accumulate_result
1360                    // is also used in loops where per-iteration output would be wrong.
1361                    let last_output = r.output().cloned();
1362                    accumulate_result(&mut result, &r);
1363                    result.set_output(last_output);
1364                }
1365                ControlFlow::Exit { code } => {
1366                    if !drained_stderr.is_empty() {
1367                        result.err.push_str(&drained_stderr);
1368                    }
1369                    result.code = code;
1370                    return Ok(result);
1371                }
1372                ControlFlow::Return { mut value } => {
1373                    if !drained_stderr.is_empty() {
1374                        value.err = format!("{}{}", drained_stderr, value.err);
1375                    }
1376                    on_output(&value);
1377                    result = value;
1378                }
1379                ControlFlow::Break { result: mut r, .. } | ControlFlow::Continue { result: mut r, .. } => {
1380                    if !drained_stderr.is_empty() {
1381                        r.err = format!("{}{}", drained_stderr, r.err);
1382                    }
1383                    on_output(&r);
1384                    result = r;
1385                }
1386            }
1387        }
1388
1389        Ok(result)
1390    }
1391
1392    /// Execute a single statement, returning control flow information.
1393    fn execute_stmt_flow<'a>(
1394        &'a self,
1395        stmt: &'a Stmt,
1396    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<ControlFlow>> + Send + 'a>> {
1397        use tracing::Instrument;
1398        let span = tracing::debug_span!("execute_stmt_flow", stmt_type = %stmt.kind_name());
1399        Box::pin(async move {
1400        match stmt {
1401            Stmt::Assignment(assign) => {
1402                // Use async evaluator to support command substitution
1403                let value = self.eval_expr_async(&assign.value).await
1404                    .context("failed to evaluate assignment")?;
1405                let mut scope = self.scope.write().await;
1406                if assign.local {
1407                    // local: set in innermost (current function) frame
1408                    scope.set(&assign.name, value.clone());
1409                } else {
1410                    // non-local: update existing or create in root frame
1411                    scope.set_global(&assign.name, value.clone());
1412                }
1413                drop(scope);
1414
1415                // Assignments don't produce output (like sh)
1416                Ok(ControlFlow::ok(ExecResult::success("")))
1417            }
1418            Stmt::Command(cmd) => {
1419                // Route single commands through execute_pipeline for a unified path.
1420                // This ensures all commands go through the dispatcher chain.
1421                let pipeline = crate::ast::Pipeline {
1422                    commands: vec![cmd.clone()],
1423                    background: false,
1424                };
1425                let result = self.execute_pipeline(&pipeline).await?;
1426                self.update_last_result(&result).await;
1427
1428                // Check for error exit mode (set -e)
1429                if !result.ok() {
1430                    let scope = self.scope.read().await;
1431                    if scope.error_exit_enabled() {
1432                        return Ok(ControlFlow::exit_code(result.code));
1433                    }
1434                }
1435
1436                Ok(ControlFlow::ok(result))
1437            }
1438            Stmt::Pipeline(pipeline) => {
1439                let result = self.execute_pipeline(pipeline).await?;
1440                self.update_last_result(&result).await;
1441
1442                // Check for error exit mode (set -e)
1443                if !result.ok() {
1444                    let scope = self.scope.read().await;
1445                    if scope.error_exit_enabled() {
1446                        return Ok(ControlFlow::exit_code(result.code));
1447                    }
1448                }
1449
1450                Ok(ControlFlow::ok(result))
1451            }
1452            Stmt::If(if_stmt) => {
1453                // Use async evaluator to support command substitution in conditions
1454                let cond_value = self.eval_expr_async(&if_stmt.condition).await?;
1455
1456                let branch = if is_truthy(&cond_value) {
1457                    &if_stmt.then_branch
1458                } else {
1459                    if_stmt.else_branch.as_deref().unwrap_or(&[])
1460                };
1461
1462                let mut result = ExecResult::success("");
1463                for stmt in branch {
1464                    let flow = self.execute_stmt_flow(stmt).await?;
1465                    match flow {
1466                        ControlFlow::Normal(r) => {
1467                            accumulate_result(&mut result, &r);
1468                            self.drain_stderr_into(&mut result).await;
1469                        }
1470                        other => {
1471                            self.drain_stderr_into(&mut result).await;
1472                            return Ok(other);
1473                        }
1474                    }
1475                }
1476                Ok(ControlFlow::ok(result))
1477            }
1478            Stmt::For(for_loop) => {
1479                // Evaluate all items and collect values for iteration
1480                // Use async evaluator to support command substitution like $(seq 1 5)
1481                let mut items: Vec<Value> = Vec::new();
1482                for item_expr in &for_loop.items {
1483                    // Glob expansion in for-loop items: `for f in *.txt`
1484                    if let Expr::GlobPattern(pattern) = item_expr {
1485                        let glob_enabled = {
1486                            let scope = self.scope.read().await;
1487                            scope.glob_enabled()
1488                        };
1489                        if glob_enabled {
1490                            let (paths, cwd) = {
1491                                let ctx = self.exec_ctx.read().await;
1492                                let paths = ctx.expand_glob(pattern).await
1493                                    .map_err(|e| anyhow::anyhow!("glob: {}", e))?;
1494                                let cwd = ctx.resolve_path(".");
1495                                (paths, cwd)
1496                            };
1497                            if paths.is_empty() {
1498                                return Err(anyhow::anyhow!("no matches: {}", pattern));
1499                            }
1500                            for path in paths {
1501                                let display = if !pattern.starts_with('/') {
1502                                    path.strip_prefix(&cwd)
1503                                        .unwrap_or(&path)
1504                                        .to_string_lossy().into_owned()
1505                                } else {
1506                                    path.to_string_lossy().into_owned()
1507                                };
1508                                items.push(Value::String(display));
1509                            }
1510                            continue;
1511                        }
1512                    }
1513                    // Track whether this item came from $(cmd); that's the
1514                    // only position where multi-line stdout auto-splits per
1515                    // line. Arrays still spread element-by-element; bare
1516                    // $VAR is rejected upstream by validator E012. See
1517                    // docs/plan-for-loop-newline-split.md.
1518                    let from_command_subst = matches!(item_expr, Expr::CommandSubst(_));
1519                    let item = self.eval_expr_async(item_expr).await?;
1520                    match item {
1521                        // JSON arrays iterate over elements (preferred path
1522                        // when builtins emit .data — seq, jq, cut, find, …)
1523                        Value::Json(serde_json::Value::Array(arr)) => {
1524                            for elem in arr {
1525                                items.push(json_to_value(elem));
1526                            }
1527                        }
1528                        // Strings from $(cmd): empty → 0 iterations,
1529                        // multi-line → split per line (trimming trailing
1530                        // newlines and per-line trailing \r), single-line
1531                        // → one iteration. Whitespace within a line is
1532                        // NOT split — the "$VAR with spaces just works"
1533                        // promise is preserved because this only fires
1534                        // in CommandSubst position.
1535                        Value::String(s) if from_command_subst => {
1536                            let trimmed = s.trim_end_matches(['\n', '\r']);
1537                            if trimmed.is_empty() {
1538                                continue;
1539                            }
1540                            if trimmed.contains('\n') {
1541                                for line in trimmed.split('\n') {
1542                                    let line = line.trim_end_matches('\r');
1543                                    items.push(Value::String(line.to_string()));
1544                                }
1545                            } else {
1546                                items.push(Value::String(trimmed.to_string()));
1547                            }
1548                        }
1549                        // Strings not from $(cmd) stay as one value.
1550                        other => items.push(other),
1551                    }
1552                }
1553
1554                let mut result = ExecResult::success("");
1555                {
1556                    let mut scope = self.scope.write().await;
1557                    scope.push_frame();
1558                }
1559
1560                'outer: for item in items {
1561                    // Cancellation checkpoint per iteration
1562                    if self.is_cancelled() {
1563                        let mut scope = self.scope.write().await;
1564                        scope.pop_frame();
1565                        result.code = 130;
1566                        return Ok(ControlFlow::ok(result));
1567                    }
1568                    {
1569                        let mut scope = self.scope.write().await;
1570                        scope.set(&for_loop.variable, item);
1571                    }
1572                    for stmt in &for_loop.body {
1573                        let mut flow = match self.execute_stmt_flow(stmt).await {
1574                            Ok(f) => f,
1575                            Err(e) => {
1576                                let mut scope = self.scope.write().await;
1577                                scope.pop_frame();
1578                                return Err(e);
1579                            }
1580                        };
1581                        self.drain_stderr_into(&mut result).await;
1582                        match &mut flow {
1583                            ControlFlow::Normal(r) => {
1584                                accumulate_result(&mut result, r);
1585                                if !r.ok() {
1586                                    let scope = self.scope.read().await;
1587                                    if scope.error_exit_enabled() {
1588                                        drop(scope);
1589                                        let mut scope = self.scope.write().await;
1590                                        scope.pop_frame();
1591                                        return Ok(ControlFlow::exit_code(r.code));
1592                                    }
1593                                }
1594                            }
1595                            ControlFlow::Break { .. } => {
1596                                if flow.decrement_level() {
1597                                    break 'outer;
1598                                }
1599                                let mut scope = self.scope.write().await;
1600                                scope.pop_frame();
1601                                return Ok(flow);
1602                            }
1603                            ControlFlow::Continue { .. } => {
1604                                if flow.decrement_level() {
1605                                    continue 'outer;
1606                                }
1607                                let mut scope = self.scope.write().await;
1608                                scope.pop_frame();
1609                                return Ok(flow);
1610                            }
1611                            ControlFlow::Return { .. } | ControlFlow::Exit { .. } => {
1612                                let mut scope = self.scope.write().await;
1613                                scope.pop_frame();
1614                                return Ok(flow);
1615                            }
1616                        }
1617                    }
1618                }
1619
1620                {
1621                    let mut scope = self.scope.write().await;
1622                    scope.pop_frame();
1623                }
1624                Ok(ControlFlow::ok(result))
1625            }
1626            Stmt::While(while_loop) => {
1627                let mut result = ExecResult::success("");
1628
1629                'outer: loop {
1630                    // Evaluate condition - use async to support command substitution
1631                    // Cancellation checkpoint per iteration
1632                    if self.is_cancelled() {
1633                        result.code = 130;
1634                        return Ok(ControlFlow::ok(result));
1635                    }
1636
1637                    let cond_value = self.eval_expr_async(&while_loop.condition).await?;
1638
1639                    if !is_truthy(&cond_value) {
1640                        break;
1641                    }
1642
1643                    // Execute body
1644                    for stmt in &while_loop.body {
1645                        let mut flow = self.execute_stmt_flow(stmt).await?;
1646                        self.drain_stderr_into(&mut result).await;
1647                        match &mut flow {
1648                            ControlFlow::Normal(r) => {
1649                                accumulate_result(&mut result, r);
1650                                if !r.ok() {
1651                                    let scope = self.scope.read().await;
1652                                    if scope.error_exit_enabled() {
1653                                        return Ok(ControlFlow::exit_code(r.code));
1654                                    }
1655                                }
1656                            }
1657                            ControlFlow::Break { .. } => {
1658                                if flow.decrement_level() {
1659                                    break 'outer;
1660                                }
1661                                return Ok(flow);
1662                            }
1663                            ControlFlow::Continue { .. } => {
1664                                if flow.decrement_level() {
1665                                    continue 'outer;
1666                                }
1667                                return Ok(flow);
1668                            }
1669                            ControlFlow::Return { .. } | ControlFlow::Exit { .. } => {
1670                                return Ok(flow);
1671                            }
1672                        }
1673                    }
1674                }
1675
1676                Ok(ControlFlow::ok(result))
1677            }
1678            Stmt::Case(case_stmt) => {
1679                // Evaluate the expression to match against
1680                let match_value = {
1681                    let value = self.eval_expr_async(&case_stmt.expr).await?;
1682                    value_to_string(&value)
1683                };
1684
1685                // Try each branch until we find a match
1686                for branch in &case_stmt.branches {
1687                    let matched = branch.patterns.iter().any(|pattern| {
1688                        glob_match(pattern, &match_value)
1689                    });
1690
1691                    if matched {
1692                        // Execute the branch body
1693                        let mut result = ExecResult::success("");
1694                        for stmt in &branch.body {
1695                            let flow = self.execute_stmt_flow(stmt).await?;
1696                            match flow {
1697                                ControlFlow::Normal(r) => {
1698                                    accumulate_result(&mut result, &r);
1699                                    self.drain_stderr_into(&mut result).await;
1700                                }
1701                                other => {
1702                                    self.drain_stderr_into(&mut result).await;
1703                                    return Ok(other);
1704                                }
1705                            }
1706                        }
1707                        return Ok(ControlFlow::ok(result));
1708                    }
1709                }
1710
1711                // No match - return success with empty output (like sh)
1712                Ok(ControlFlow::ok(ExecResult::success("")))
1713            }
1714            Stmt::Break(levels) => {
1715                Ok(ControlFlow::break_n(levels.unwrap_or(1)))
1716            }
1717            Stmt::Continue(levels) => {
1718                Ok(ControlFlow::continue_n(levels.unwrap_or(1)))
1719            }
1720            Stmt::Return(expr) => {
1721                // return [N] - N becomes the exit code, NOT stdout
1722                // Shell semantics: return sets exit code, doesn't produce output
1723                let result = if let Some(e) = expr {
1724                    let val = self.eval_expr_async(e).await?;
1725                    let code = crate::interpreter::value_to_exit_code(&val)
1726                        .map_err(|e| anyhow::anyhow!("return: {}", e))?;
1727                    ExecResult::from_parts(code, String::new(), String::new(), None)
1728                } else {
1729                    ExecResult::success("")
1730                };
1731                Ok(ControlFlow::return_value(result))
1732            }
1733            Stmt::Exit(expr) => {
1734                let code = if let Some(e) = expr {
1735                    let val = self.eval_expr_async(e).await?;
1736                    crate::interpreter::value_to_exit_code(&val)
1737                        .map_err(|e| anyhow::anyhow!("exit: {}", e))?
1738                } else {
1739                    0
1740                };
1741                Ok(ControlFlow::exit_code(code))
1742            }
1743            Stmt::ToolDef(tool_def) => {
1744                let mut user_tools = self.user_tools.write().await;
1745                user_tools.insert(tool_def.name.clone(), tool_def.clone());
1746                Ok(ControlFlow::ok(ExecResult::success("")))
1747            }
1748            Stmt::AndChain { left, right } => {
1749                // cmd1 && cmd2 - run cmd2 only if cmd1 succeeds (exit code 0)
1750                // Suppress errexit for the left side — && handles failure itself.
1751                {
1752                    let mut scope = self.scope.write().await;
1753                    scope.suppress_errexit();
1754                }
1755                let left_flow = match self.execute_stmt_flow(left).await {
1756                    Ok(f) => f,
1757                    Err(e) => {
1758                        let mut scope = self.scope.write().await;
1759                        scope.unsuppress_errexit();
1760                        return Err(e);
1761                    }
1762                };
1763                {
1764                    let mut scope = self.scope.write().await;
1765                    scope.unsuppress_errexit();
1766                }
1767                match left_flow {
1768                    ControlFlow::Normal(mut left_result) => {
1769                        self.drain_stderr_into(&mut left_result).await;
1770                        self.update_last_result(&left_result).await;
1771                        if left_result.ok() {
1772                            let right_flow = self.execute_stmt_flow(right).await?;
1773                            match right_flow {
1774                                ControlFlow::Normal(mut right_result) => {
1775                                    self.drain_stderr_into(&mut right_result).await;
1776                                    self.update_last_result(&right_result).await;
1777                                    let mut combined = left_result;
1778                                    accumulate_result(&mut combined, &right_result);
1779                                    Ok(ControlFlow::ok(combined))
1780                                }
1781                                other => Ok(other),
1782                            }
1783                        } else {
1784                            Ok(ControlFlow::ok(left_result))
1785                        }
1786                    }
1787                    _ => Ok(left_flow),
1788                }
1789            }
1790            Stmt::OrChain { left, right } => {
1791                // cmd1 || cmd2 - run cmd2 only if cmd1 fails (non-zero exit code)
1792                // Suppress errexit for the left side — || handles failure itself.
1793                {
1794                    let mut scope = self.scope.write().await;
1795                    scope.suppress_errexit();
1796                }
1797                let left_flow = match self.execute_stmt_flow(left).await {
1798                    Ok(f) => f,
1799                    Err(e) => {
1800                        let mut scope = self.scope.write().await;
1801                        scope.unsuppress_errexit();
1802                        return Err(e);
1803                    }
1804                };
1805                {
1806                    let mut scope = self.scope.write().await;
1807                    scope.unsuppress_errexit();
1808                }
1809                match left_flow {
1810                    ControlFlow::Normal(mut left_result) => {
1811                        self.drain_stderr_into(&mut left_result).await;
1812                        self.update_last_result(&left_result).await;
1813                        if !left_result.ok() {
1814                            let right_flow = self.execute_stmt_flow(right).await?;
1815                            match right_flow {
1816                                ControlFlow::Normal(mut right_result) => {
1817                                    self.drain_stderr_into(&mut right_result).await;
1818                                    self.update_last_result(&right_result).await;
1819                                    let mut combined = left_result;
1820                                    accumulate_result(&mut combined, &right_result);
1821                                    Ok(ControlFlow::ok(combined))
1822                                }
1823                                other => Ok(other),
1824                            }
1825                        } else {
1826                            Ok(ControlFlow::ok(left_result))
1827                        }
1828                    }
1829                    _ => Ok(left_flow), // Propagate non-normal flow
1830                }
1831            }
1832            Stmt::Test(test_expr) => {
1833                let is_true = self.eval_test_async(test_expr).await?;
1834                if is_true {
1835                    Ok(ControlFlow::ok(ExecResult::success("")))
1836                } else {
1837                    Ok(ControlFlow::ok(ExecResult::failure(1, "")))
1838                }
1839            }
1840            Stmt::Empty => Ok(ControlFlow::ok(ExecResult::success(""))),
1841        }
1842        }.instrument(span))
1843    }
1844
1845    /// Execute a pipeline.
1846    #[tracing::instrument(level = "debug", skip(self, pipeline), fields(background = pipeline.background, command_count = pipeline.commands.len()))]
1847    async fn execute_pipeline(&self, pipeline: &crate::ast::Pipeline) -> Result<ExecResult> {
1848        if pipeline.commands.is_empty() {
1849            return Ok(ExecResult::success(""));
1850        }
1851
1852        // Handle background execution (`&` operator)
1853        if pipeline.background {
1854            return self.execute_background(pipeline).await;
1855        }
1856
1857        // All commands go through the runner with the Kernel as dispatcher.
1858        // This is the single execution path — no fast path for single commands.
1859        //
1860        // IMPORTANT: We snapshot exec_ctx into a local context and release the
1861        // lock before running. This prevents deadlocks when dispatch_command
1862        // is called from within the pipeline and recursively triggers another
1863        // pipeline (e.g., via user-defined tools).
1864        let mut ctx = {
1865            let ec = self.exec_ctx.read().await;
1866            let scope = self.scope.read().await;
1867            ExecContext {
1868                backend: ec.backend.clone(),
1869                scope: scope.clone(),
1870                cwd: ec.cwd.clone(),
1871                prev_cwd: ec.prev_cwd.clone(),
1872                stdin: None,
1873                stdin_data: None,
1874                pipe_stdin: None,
1875                pipe_stdout: None,
1876                stderr: ec.stderr.clone(),
1877                tool_schemas: ec.tool_schemas.clone(),
1878                tools: ec.tools.clone(),
1879                job_manager: ec.job_manager.clone(),
1880                pipeline_position: PipelinePosition::Only,
1881                interactive: self.interactive,
1882                aliases: ec.aliases.clone(),
1883                ignore_config: ec.ignore_config.clone(),
1884                output_limit: ec.output_limit.clone(),
1885                allow_external_commands: self.allow_external_commands,
1886                nonce_store: ec.nonce_store.clone(),
1887                trash_backend: ec.trash_backend.clone(),
1888                #[cfg(all(unix, feature = "native"))]
1889                terminal_state: ec.terminal_state.clone(),
1890                dispatcher: self.dispatcher(),
1891                cancel: {
1892                    #[allow(clippy::expect_used)]
1893                    let token = self.cancel_token.lock().expect("cancel_token poisoned");
1894                    token.clone()
1895                },
1896            }
1897        }; // locks released
1898
1899        let mut result = self.runner.run(&pipeline.commands, &mut ctx, self).await;
1900
1901        // Post-hoc spill check (catches builtins and fast external commands)
1902        if ctx.output_limit.is_enabled() {
1903            let _ = crate::output_limit::spill_if_needed(&mut result, &ctx.output_limit).await;
1904        }
1905
1906        // Signal spill with exit 3; agent reads the spill file directly
1907        // (use `set +o output-limit` before cat/head/tail to bypass the limit)
1908        if result.did_spill {
1909            result.original_code = Some(result.code);
1910            result.code = 3;
1911        }
1912
1913        // Sync changes back from context
1914        {
1915            let mut ec = self.exec_ctx.write().await;
1916            ec.cwd = ctx.cwd.clone();
1917            ec.prev_cwd = ctx.prev_cwd.clone();
1918            ec.aliases = ctx.aliases.clone();
1919            ec.ignore_config = ctx.ignore_config.clone();
1920            ec.output_limit = ctx.output_limit.clone();
1921        }
1922        {
1923            let mut scope = self.scope.write().await;
1924            *scope = ctx.scope.clone();
1925        }
1926
1927        Ok(result)
1928    }
1929
1930    /// Execute a pipeline in the background.
1931    ///
1932    /// The command is spawned as a tokio task, registered with the JobManager,
1933    /// and its output is captured via BoundedStreams. The job is observable via
1934    /// `/v/jobs/{id}/stdout`, `/v/jobs/{id}/stderr`, and `/v/jobs/{id}/status`.
1935    ///
1936    /// Returns immediately with a job ID like "[1]".
1937    #[tracing::instrument(level = "debug", skip(self, pipeline), fields(command_count = pipeline.commands.len()))]
1938    async fn execute_background(&self, pipeline: &crate::ast::Pipeline) -> Result<ExecResult> {
1939        use tokio::sync::oneshot;
1940
1941        // Format the command for display in /v/jobs/{id}/command
1942        let command_str = self.format_pipeline(pipeline);
1943
1944        // Create bounded streams for output capture
1945        let stdout = Arc::new(BoundedStream::default_size());
1946        let stderr = Arc::new(BoundedStream::default_size());
1947
1948        // Create channel for result notification
1949        let (tx, rx) = oneshot::channel();
1950
1951        // Register with JobManager to get job ID and create VFS entries
1952        let job_id = self.jobs.register_with_streams(
1953            command_str.clone(),
1954            rx,
1955            stdout.clone(),
1956            stderr.clone(),
1957        ).await;
1958
1959        // Fork the kernel for this background job. The fork snapshots the
1960        // parent's scope/cwd/aliases/user_tools so mutations stay isolated,
1961        // while sharing the job manager, VFS, and tool registry. The fork's
1962        // full dispatch chain (user tools, .kai scripts, `$(...)` in args)
1963        // is available here — something BackendDispatcher couldn't provide.
1964        let fork = self.fork().await;
1965        let runner = self.runner.clone();
1966        let commands = pipeline.commands.clone();
1967
1968        // Snapshot the fork's exec_ctx for the spawned task. We have to do
1969        // this before tokio::spawn because the fork's exec_ctx is behind a
1970        // tokio RwLock and we want the spawned task to own its ctx.
1971        let mut bg_ctx = {
1972            let ec = fork.exec_ctx.read().await;
1973            ec.child_for_pipeline()
1974        };
1975        bg_ctx.scope = fork.scope.read().await.clone();
1976        // The fork's dispatcher points at the fork itself; set it here so
1977        // builtins inside the background task (e.g. timeout) re-dispatch
1978        // through the fork, not the parent.
1979        bg_ctx.dispatcher = fork.dispatcher();
1980
1981        // Spawn the background task
1982        tokio::spawn(async move {
1983            // runner.run needs a &dyn CommandDispatcher; fork.as_ref()
1984            // gives us that (Kernel implements CommandDispatcher).
1985            let result = runner.run(&commands, &mut bg_ctx, fork.as_ref()).await;
1986
1987            // Write output to streams
1988            let text = result.text_out();
1989            if !text.is_empty() {
1990                stdout.write(text.as_bytes()).await;
1991            }
1992            if !result.err.is_empty() {
1993                stderr.write(result.err.as_bytes()).await;
1994            }
1995
1996            // Close streams
1997            stdout.close().await;
1998            stderr.close().await;
1999
2000            // Send result to JobManager (ignore error if receiver dropped)
2001            let _ = tx.send(result);
2002        });
2003
2004        Ok(ExecResult::success(format!("[{}]", job_id)))
2005    }
2006
2007    /// Format a pipeline as a command string for display.
2008    fn format_pipeline(&self, pipeline: &crate::ast::Pipeline) -> String {
2009        pipeline.commands
2010            .iter()
2011            .map(|cmd| {
2012                let mut parts = vec![cmd.name.clone()];
2013                for arg in &cmd.args {
2014                    match arg {
2015                        Arg::Positional(expr) => {
2016                            parts.push(self.format_expr(expr));
2017                        }
2018                        Arg::Named { key, value } => {
2019                            parts.push(format!("{}={}", key, self.format_expr(value)));
2020                        }
2021                        Arg::ShortFlag(name) => {
2022                            parts.push(format!("-{}", name));
2023                        }
2024                        Arg::LongFlag(name) => {
2025                            parts.push(format!("--{}", name));
2026                        }
2027                        Arg::DoubleDash => {
2028                            parts.push("--".to_string());
2029                        }
2030                    }
2031                }
2032                parts.join(" ")
2033            })
2034            .collect::<Vec<_>>()
2035            .join(" | ")
2036    }
2037
2038    /// Format an expression as a string for display.
2039    fn format_expr(&self, expr: &Expr) -> String {
2040        match expr {
2041            Expr::Literal(Value::String(s)) => {
2042                if s.contains(' ') || s.contains('"') {
2043                    format!("'{}'", s.replace('\'', "\\'"))
2044                } else {
2045                    s.clone()
2046                }
2047            }
2048            Expr::Literal(Value::Int(i)) => i.to_string(),
2049            Expr::Literal(Value::Float(f)) => f.to_string(),
2050            Expr::Literal(Value::Bool(b)) => b.to_string(),
2051            Expr::Literal(Value::Null) => "null".to_string(),
2052            Expr::VarRef(path) => {
2053                let name = path.segments.iter()
2054                    .map(|seg| match seg {
2055                        crate::ast::VarSegment::Field(f) => f.clone(),
2056                    })
2057                    .collect::<Vec<_>>()
2058                    .join(".");
2059                format!("${{{}}}", name)
2060            }
2061            Expr::Interpolated(_) => "\"...\"".to_string(),
2062            Expr::HereDocBody { .. } => "<<heredoc".to_string(),
2063            _ => "...".to_string(),
2064        }
2065    }
2066
2067    /// Execute a single command.
2068    async fn execute_command(&self, name: &str, args: &[Arg]) -> Result<ExecResult> {
2069        self.execute_command_depth(name, args, 0).await
2070    }
2071
2072    #[tracing::instrument(level = "info", skip(self, args, alias_depth), fields(command = %name), err)]
2073    async fn execute_command_depth(&self, name: &str, args: &[Arg], alias_depth: u8) -> Result<ExecResult> {
2074        // Special built-ins
2075        match name {
2076            "true" => return Ok(ExecResult::success("")),
2077            "false" => return Ok(ExecResult::failure(1, "")),
2078            "source" | "." => return self.execute_source(args).await,
2079            _ => {}
2080        }
2081
2082        // Alias expansion (with recursion limit)
2083        if alias_depth < 10 {
2084            let alias_value = {
2085                let ctx = self.exec_ctx.read().await;
2086                ctx.aliases.get(name).cloned()
2087            };
2088            if let Some(alias_val) = alias_value {
2089                // Split alias value into command + args
2090                let parts: Vec<&str> = alias_val.split_whitespace().collect();
2091                if let Some((alias_cmd, alias_args)) = parts.split_first() {
2092                    let mut new_args: Vec<Arg> = alias_args
2093                        .iter()
2094                        .map(|a| Arg::Positional(Expr::Literal(Value::String(a.to_string()))))
2095                        .collect();
2096                    new_args.extend_from_slice(args);
2097                    return Box::pin(self.execute_command_depth(alias_cmd, &new_args, alias_depth + 1)).await;
2098                }
2099            }
2100        }
2101
2102        // Handle /v/bin/ prefix — dispatch to builtins via virtual path
2103        if let Some(builtin_name) = name.strip_prefix("/v/bin/") {
2104            return match self.tools.get(builtin_name) {
2105                Some(_) => Box::pin(self.execute_command_depth(builtin_name, args, alias_depth)).await,
2106                None => Ok(ExecResult::failure(127, format!("command not found: {}", name))),
2107            };
2108        }
2109
2110        // Check user-defined tools first
2111        {
2112            let user_tools = self.user_tools.read().await;
2113            if let Some(tool_def) = user_tools.get(name) {
2114                let tool_def = tool_def.clone();
2115                drop(user_tools);
2116                return self.execute_user_tool(tool_def, args).await;
2117            }
2118        }
2119
2120        // Look up builtin tool
2121        let tool = match self.tools.get(name) {
2122            Some(t) => t,
2123            None => {
2124                // Try executing as .kai script from PATH
2125                if let Some(result) = self.try_execute_script(name, args).await? {
2126                    return Ok(result);
2127                }
2128                // Try executing as external command from PATH
2129                if let Some(result) = self.try_execute_external(name, args).await? {
2130                    return Ok(result);
2131                }
2132
2133                // Try backend-registered tools (embedder engines, etc.)
2134                // Look up tool schema for positional→named mapping.
2135                // Clone backend and drop read lock before awaiting (may involve network I/O).
2136                // Backend tools expect named JSON params, so enable positional mapping.
2137                let backend = self.exec_ctx.read().await.backend.clone();
2138                let tool_schema = backend.get_tool(name).await.ok().flatten().map(|t| {
2139                    let mut s = t.schema;
2140                    s.map_positionals = true;
2141                    s
2142                });
2143                let tool_args = self.build_args_async(args, tool_schema.as_ref()).await?;
2144                let mut ctx = self.exec_ctx.write().await;
2145                {
2146                    let scope = self.scope.read().await;
2147                    ctx.scope = scope.clone();
2148                }
2149                let backend = ctx.backend.clone();
2150                match backend.call_tool(name, tool_args, &mut ctx).await {
2151                    Ok(tool_result) => {
2152                        let mut scope = self.scope.write().await;
2153                        *scope = ctx.scope.clone();
2154                        let mut exec = ExecResult::from_output(
2155                            tool_result.code as i64, tool_result.stdout, tool_result.stderr,
2156                        );
2157                        exec.set_output(tool_result.output);
2158                        return Ok(exec);
2159                    }
2160                    Err(BackendError::ToolNotFound(_)) => {
2161                        // Fall through to "command not found"
2162                    }
2163                    Err(e) => {
2164                        // Backend dispatch is last-resort lookup — if it fails
2165                        // for any reason, the command simply doesn't exist.
2166                        tracing::debug!("backend error for {name}: {e}");
2167                    }
2168                }
2169
2170                return Ok(ExecResult::failure(127, format!("command not found: {}", name)));
2171            }
2172        };
2173
2174        // Build arguments (async to support command substitution, schema-aware for flag values)
2175        let schema = tool.schema();
2176        let mut tool_args = self.build_args_async(args, Some(&schema)).await?;
2177        let output_format = extract_output_format(&mut tool_args, Some(&schema));
2178
2179        // --help / -h: show help unless the tool's schema claims that flag
2180        let schema_claims = |flag: &str| -> bool {
2181            let bare = flag.trim_start_matches('-');
2182            schema.params.iter().any(|p| p.matches_flag(flag) || p.matches_flag(bare))
2183        };
2184        let wants_help =
2185            (tool_args.flags.contains("help") && !schema_claims("help"))
2186            || (tool_args.flags.contains("h") && !schema_claims("-h"));
2187        if wants_help {
2188            let help_topic = crate::help::HelpTopic::Tool(name.to_string());
2189            let ctx = self.exec_ctx.read().await;
2190            let content = crate::help::get_help(&help_topic, &ctx.tool_schemas);
2191            return Ok(ExecResult::with_output(crate::interpreter::OutputData::text(content)));
2192        }
2193
2194        // Snapshot exec_ctx into a local context and release the write lock
2195        // before calling tool.execute. Holding the write across tool execution
2196        // would deadlock any builtin that re-dispatches through ctx.dispatcher
2197        // (timeout, scatter) — the inner dispatch_command needs its own
2198        // exec_ctx.write() and would block forever.
2199        let mut ctx = {
2200            let ec = self.exec_ctx.write().await;
2201            let scope = self.scope.read().await;
2202            ExecContext {
2203                backend: ec.backend.clone(),
2204                scope: scope.clone(),
2205                cwd: ec.cwd.clone(),
2206                prev_cwd: ec.prev_cwd.clone(),
2207                stdin: ec.stdin.clone(),
2208                stdin_data: ec.stdin_data.clone(),
2209                pipe_stdin: None, // streaming pipes are per-pipeline; not snapshotted
2210                pipe_stdout: None,
2211                stderr: ec.stderr.clone(),
2212                tool_schemas: ec.tool_schemas.clone(),
2213                tools: ec.tools.clone(),
2214                job_manager: ec.job_manager.clone(),
2215                pipeline_position: ec.pipeline_position,
2216                interactive: self.interactive,
2217                aliases: ec.aliases.clone(),
2218                ignore_config: ec.ignore_config.clone(),
2219                output_limit: ec.output_limit.clone(),
2220                allow_external_commands: self.allow_external_commands,
2221                nonce_store: ec.nonce_store.clone(),
2222                trash_backend: ec.trash_backend.clone(),
2223                #[cfg(all(unix, feature = "native"))]
2224                terminal_state: ec.terminal_state.clone(),
2225                dispatcher: self.dispatcher(),
2226                // Use ec.cancel (set by dispatch_command from the runner's
2227                // ctx.cancel) so any builtin-swapped child token (e.g. timeout's
2228                // child token) reaches the spawned external via wait_or_kill.
2229                // Falls back to the kernel's own token when ec.cancel is the
2230                // default fresh token from a non-dispatch path.
2231                cancel: ec.cancel.clone(),
2232            }
2233        }; // both locks released — tool.execute can re-dispatch safely
2234
2235        // Move stdin out of self.exec_ctx into the snapshot (consumed-by-tool
2236        // semantics): take() so a later dispatch doesn't see stale stdin.
2237        // Done after the snapshot above so we hold the write briefly.
2238        {
2239            let mut ec = self.exec_ctx.write().await;
2240            ctx.stdin = ec.stdin.take();
2241            ctx.stdin_data = ec.stdin_data.take();
2242            ctx.pipe_stdin = ec.pipe_stdin.take();
2243            ctx.pipe_stdout = ec.pipe_stdout.take();
2244        }
2245
2246        let result = tool.execute(tool_args, &mut ctx).await;
2247
2248        // Sync mutations back. Tools may have changed scope (set/cd),
2249        // cwd/prev_cwd (cd), and aliases (alias). Also return any unused pipe
2250        // endpoints to self.exec_ctx so dispatch_command's post-execute sync
2251        // hands them back to the pipeline runner — the runner uses
2252        // stage_ctx.pipe_stdout to write the result to the next stage when
2253        // the tool itself didn't take and write to it.
2254        {
2255            let mut scope = self.scope.write().await;
2256            *scope = ctx.scope.clone();
2257        }
2258        {
2259            let mut ec = self.exec_ctx.write().await;
2260            ec.cwd = ctx.cwd;
2261            ec.prev_cwd = ctx.prev_cwd;
2262            ec.aliases = ctx.aliases;
2263            ec.pipe_stdin = ctx.pipe_stdin.take();
2264            ec.pipe_stdout = ctx.pipe_stdout.take();
2265        }
2266
2267        let result = match output_format {
2268            Some(format) => apply_output_format(result, format),
2269            None => result,
2270        };
2271
2272        Ok(result)
2273    }
2274
2275    /// Pull `consumes` positional args after a non-bool flag and stash them
2276    /// on `tool_args.named` under the canonical param name.
2277    ///
2278    /// - `consumes == 1` keeps the historical contract: a single scalar value.
2279    /// - `consumes > 1` accumulates each occurrence as an inner
2280    ///   `serde_json::Value::Array` inside `named[canonical] =
2281    ///   Value::Json(Array(...))`, preserving invocation order. This is the
2282    ///   shape jq's `--arg NAME VAL` / `--argjson NAME VAL` land in.
2283    ///
2284    /// Errors loudly if the flag is missing required positionals — matches
2285    /// kaish's "no silent fallback" posture and mirrors real jq, which
2286    /// errors on `--arg NAME` with no value.
2287    #[allow(clippy::too_many_arguments)]
2288    async fn consume_flag_positionals(
2289        &self,
2290        args: &[Arg],
2291        flag_name: &str,
2292        canonical: &str,
2293        consumes: usize,
2294        positional_indices: &[usize],
2295        consumed: &mut std::collections::HashSet<usize>,
2296        current_idx: usize,
2297        tool_args: &mut ToolArgs,
2298    ) -> Result<()> {
2299        let mut collected: Vec<Value> = Vec::with_capacity(consumes.max(1));
2300        for _ in 0..consumes.max(1) {
2301            let next_pos = positional_indices
2302                .iter()
2303                .find(|idx| **idx > current_idx && !consumed.contains(idx))
2304                .copied();
2305            match next_pos {
2306                Some(pos_idx) => {
2307                    if let Arg::Positional(expr) = &args[pos_idx] {
2308                        let value = self.eval_expr_async(expr).await?;
2309                        let value = apply_tilde_expansion(value);
2310                        collected.push(value);
2311                        consumed.insert(pos_idx);
2312                    }
2313                }
2314                None => {
2315                    if consumes <= 1 && collected.is_empty() {
2316                        // Back-compat: a flag with no follow-up positional
2317                        // becomes a bare flag. `--path` with nothing after
2318                        // lands in `flags`, same as before this refactor.
2319                        tool_args.flags.insert(flag_name.to_string());
2320                        return Ok(());
2321                    }
2322                    anyhow::bail!(
2323                        "--{flag_name} requires {consumes} argument{}, got {}",
2324                        if consumes == 1 { "" } else { "s" },
2325                        collected.len()
2326                    );
2327                }
2328            }
2329        }
2330
2331        if consumes <= 1 {
2332            if let Some(v) = collected.pop() {
2333                tool_args.named.insert(canonical.to_string(), v);
2334            }
2335            return Ok(());
2336        }
2337
2338        // Multi-consume: accumulate under named[canonical] as array-of-arrays.
2339        let occ: Vec<serde_json::Value> = collected
2340            .into_iter()
2341            .map(|v| crate::interpreter::value_to_json(&v))
2342            .collect();
2343        let entry = tool_args
2344            .named
2345            .entry(canonical.to_string())
2346            .or_insert_with(|| Value::Json(serde_json::Value::Array(Vec::new())));
2347        if let Value::Json(serde_json::Value::Array(outer)) = entry {
2348            outer.push(serde_json::Value::Array(occ));
2349        } else {
2350            anyhow::bail!(
2351                "--{flag_name}: named[{canonical}] already holds a non-array value"
2352            );
2353        }
2354        Ok(())
2355    }
2356
2357    /// Build tool arguments from AST args.
2358    ///
2359    /// Uses async evaluation to support command substitution in arguments.
2360    async fn build_args_async(&self, args: &[Arg], schema: Option<&crate::tools::ToolSchema>) -> Result<ToolArgs> {
2361        let mut tool_args = ToolArgs::new();
2362        let param_lookup = schema.map(schema_param_lookup).unwrap_or_default();
2363
2364        // Track which positional indices have been consumed as flag values
2365        let mut consumed: std::collections::HashSet<usize> = std::collections::HashSet::new();
2366        let mut past_double_dash = false;
2367
2368        // Find positional arg indices for flag value consumption
2369        let positional_indices: Vec<usize> = args.iter().enumerate()
2370            .filter_map(|(i, a)| matches!(a, Arg::Positional(_)).then_some(i))
2371            .collect();
2372
2373        let mut i = 0;
2374        while i < args.len() {
2375            match &args[i] {
2376                Arg::DoubleDash => {
2377                    past_double_dash = true;
2378                }
2379                Arg::Positional(expr) => {
2380                    if !consumed.contains(&i) {
2381                        // Glob expansion: bare glob patterns expand to matching files
2382                        if let Expr::GlobPattern(pattern) = expr {
2383                            let glob_enabled = {
2384                                let scope = self.scope.read().await;
2385                                scope.glob_enabled()
2386                            };
2387                            if glob_enabled {
2388                                let (paths, cwd) = {
2389                                    let ctx = self.exec_ctx.read().await;
2390                                    let paths = ctx.expand_glob(pattern).await
2391                                        .map_err(|e| anyhow::anyhow!("glob: {}", e))?;
2392                                    let cwd = ctx.resolve_path(".");
2393                                    (paths, cwd)
2394                                };
2395                                if paths.is_empty() {
2396                                    return Err(anyhow::anyhow!("no matches: {}", pattern));
2397                                }
2398                                for path in paths {
2399                                    let display = if !pattern.starts_with('/') {
2400                                        path.strip_prefix(&cwd)
2401                                            .unwrap_or(&path)
2402                                            .to_string_lossy().into_owned()
2403                                    } else {
2404                                        path.to_string_lossy().into_owned()
2405                                    };
2406                                    tool_args.positional.push(Value::String(display));
2407                                }
2408                                i += 1;
2409                                continue;
2410                            }
2411                        }
2412                        let value = self.eval_expr_async(expr).await?;
2413                        let value = apply_tilde_expansion(value);
2414                        tool_args.positional.push(value);
2415                    }
2416                }
2417                Arg::Named { key, value } => {
2418                    let val = self.eval_expr_async(value).await?;
2419                    let val = apply_tilde_expansion(val);
2420                    tool_args.named.insert(key.clone(), val);
2421                }
2422                Arg::ShortFlag(name) => {
2423                    if past_double_dash {
2424                        tool_args.positional.push(Value::String(format!("-{name}")));
2425                    } else if name.len() == 1 {
2426                        let flag_name = name.as_str();
2427                        let lookup = param_lookup.get(flag_name);
2428                        let is_bool = lookup.map(|(_, typ, _)| is_bool_type(typ)).unwrap_or(true);
2429
2430                        if is_bool {
2431                            tool_args.flags.insert(flag_name.to_string());
2432                        } else {
2433                            // Non-bool: consume `consumes` positionals as value(s)
2434                            let canonical = lookup.map(|(n, _, _)| *n).unwrap_or(flag_name);
2435                            let consumes = lookup.map(|(_, _, c)| *c).unwrap_or(1);
2436                            self.consume_flag_positionals(
2437                                args,
2438                                name,
2439                                canonical,
2440                                consumes,
2441                                &positional_indices,
2442                                &mut consumed,
2443                                i,
2444                                &mut tool_args,
2445                            )
2446                            .await?;
2447                        }
2448                    } else if let Some(&(canonical, typ, consumes)) = param_lookup.get(name.as_str()) {
2449                        // Multi-char short flag matches a schema param (POSIX style: -name value)
2450                        if is_bool_type(typ) {
2451                            tool_args.flags.insert(canonical.to_string());
2452                        } else {
2453                            self.consume_flag_positionals(
2454                                args,
2455                                name,
2456                                canonical,
2457                                consumes,
2458                                &positional_indices,
2459                                &mut consumed,
2460                                i,
2461                                &mut tool_args,
2462                            )
2463                            .await?;
2464                        }
2465                    } else {
2466                        // Multi-char combined flags like -la: always boolean
2467                        for c in name.chars() {
2468                            tool_args.flags.insert(c.to_string());
2469                        }
2470                    }
2471                }
2472                Arg::LongFlag(name) => {
2473                    if past_double_dash {
2474                        tool_args.positional.push(Value::String(format!("--{name}")));
2475                    } else {
2476                        let lookup = param_lookup.get(name.as_str());
2477                        let is_bool = lookup.map(|(_, typ, _)| is_bool_type(typ)).unwrap_or(true);
2478
2479                        if is_bool {
2480                            tool_args.flags.insert(name.clone());
2481                        } else {
2482                            let canonical = lookup.map(|(n, _, _)| *n).unwrap_or(name.as_str());
2483                            let consumes = lookup.map(|(_, _, c)| *c).unwrap_or(1);
2484                            self.consume_flag_positionals(
2485                                args,
2486                                name,
2487                                canonical,
2488                                consumes,
2489                                &positional_indices,
2490                                &mut consumed,
2491                                i,
2492                                &mut tool_args,
2493                            )
2494                            .await?;
2495                        }
2496                    }
2497                }
2498            }
2499            i += 1;
2500        }
2501
2502        // Map remaining positionals to unfilled non-bool schema params (in order).
2503        // This enables `drift_push "abc" "hello"` → named["target_ctx"] = "abc", named["content"] = "hello"
2504        // Positionals that appeared after `--` are never mapped (they're raw data).
2505        // Only for backend/external tools (map_positionals=true). Builtins handle their own positionals.
2506        if let Some(schema) = schema.filter(|s| s.map_positionals) {
2507            let pre_dash_count = if past_double_dash {
2508                let dash_pos = args.iter().position(|a| matches!(a, Arg::DoubleDash)).unwrap_or(args.len());
2509                positional_indices.iter()
2510                    .filter(|idx| **idx < dash_pos && !consumed.contains(idx))
2511                    .count()
2512            } else {
2513                tool_args.positional.len()
2514            };
2515
2516            let mut remaining = Vec::new();
2517            let mut positional_iter = tool_args.positional.drain(..).enumerate();
2518
2519            for param in &schema.params {
2520                if tool_args.named.contains_key(&param.name) || tool_args.flags.contains(&param.name) {
2521                    continue;
2522                }
2523                if is_bool_type(&param.param_type) {
2524                    continue;
2525                }
2526                loop {
2527                    match positional_iter.next() {
2528                        Some((idx, val)) if idx < pre_dash_count => {
2529                            tool_args.named.insert(param.name.clone(), val);
2530                            break;
2531                        }
2532                        Some((_, val)) => {
2533                            remaining.push(val);
2534                        }
2535                        None => break,
2536                    }
2537                }
2538            }
2539
2540            remaining.extend(positional_iter.map(|(_, v)| v));
2541            tool_args.positional = remaining;
2542        }
2543
2544        Ok(tool_args)
2545    }
2546
2547    /// Build arguments as flat string list for external commands.
2548    ///
2549    /// Unlike `build_args_async` which separates flags into a HashSet (for schema-aware builtins),
2550    /// this preserves the original flag format as strings for external commands:
2551    /// - `-l` stays as `-l`
2552    /// - `--verbose` stays as `--verbose`
2553    /// - `key=value` stays as `key=value`
2554    ///
2555    /// This is what external commands expect in their argv.
2556    #[cfg(feature = "native")]
2557    async fn build_args_flat(&self, args: &[Arg]) -> Result<Vec<String>> {
2558        let mut argv = Vec::new();
2559        for arg in args {
2560            match arg {
2561                Arg::Positional(expr) => {
2562                    // Glob expansion for external commands
2563                    if let Expr::GlobPattern(pattern) = expr {
2564                        let glob_enabled = {
2565                            let scope = self.scope.read().await;
2566                            scope.glob_enabled()
2567                        };
2568                        if glob_enabled {
2569                            let (paths, cwd) = {
2570                                let ctx = self.exec_ctx.read().await;
2571                                let paths = ctx.expand_glob(pattern).await
2572                                    .map_err(|e| anyhow::anyhow!("glob: {}", e))?;
2573                                let cwd = ctx.resolve_path(".");
2574                                (paths, cwd)
2575                            };
2576                            if paths.is_empty() {
2577                                return Err(anyhow::anyhow!("no matches: {}", pattern));
2578                            }
2579                            for path in paths {
2580                                let display = if !pattern.starts_with('/') {
2581                                    path.strip_prefix(&cwd)
2582                                        .unwrap_or(&path)
2583                                        .to_string_lossy().into_owned()
2584                                } else {
2585                                    path.to_string_lossy().into_owned()
2586                                };
2587                                argv.push(display);
2588                            }
2589                            continue;
2590                        }
2591                    }
2592                    let value = self.eval_expr_async(expr).await?;
2593                    let value = apply_tilde_expansion(value);
2594                    argv.push(value_to_string(&value));
2595                }
2596                Arg::Named { key, value } => {
2597                    let val = self.eval_expr_async(value).await?;
2598                    let val = apply_tilde_expansion(val);
2599                    argv.push(format!("{}={}", key, value_to_string(&val)));
2600                }
2601                Arg::ShortFlag(name) => {
2602                    // Preserve original format: -l, -la (combined flags)
2603                    argv.push(format!("-{}", name));
2604                }
2605                Arg::LongFlag(name) => {
2606                    // Preserve original format: --verbose
2607                    argv.push(format!("--{}", name));
2608                }
2609                Arg::DoubleDash => {
2610                    // Preserve the -- marker
2611                    argv.push("--".to_string());
2612                }
2613            }
2614        }
2615        Ok(argv)
2616    }
2617
2618    /// Async expression evaluator that supports command substitution.
2619    ///
2620    /// This is used for contexts where expressions may contain `$(...)` command
2621    /// substitution. Unlike the sync `eval_expr`, this can execute pipelines.
2622    fn eval_expr_async<'a>(&'a self, expr: &'a Expr) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Value>> + Send + 'a>> {
2623        Box::pin(async move {
2624        match expr {
2625            Expr::Literal(value) => Ok(value.clone()),
2626            Expr::VarRef(path) => {
2627                let scope = self.scope.read().await;
2628                scope.resolve_path(path)
2629                    .ok_or_else(|| anyhow::anyhow!("undefined variable"))
2630            }
2631            Expr::Interpolated(parts) => {
2632                let mut result = String::new();
2633                for part in parts {
2634                    result.push_str(&self.eval_string_part_async(part).await?);
2635                }
2636                Ok(Value::String(result))
2637            }
2638            Expr::HereDocBody { parts, strip_tabs } => {
2639                let mut result = String::new();
2640                for sp in parts {
2641                    result.push_str(&self.eval_string_part_async(&sp.part).await?);
2642                }
2643                if *strip_tabs {
2644                    Ok(Value::String(crate::interpreter::strip_leading_tabs(&result)))
2645                } else {
2646                    Ok(Value::String(result))
2647                }
2648            }
2649            Expr::BinaryOp { left, op, right } => match op {
2650                BinaryOp::And => {
2651                    let left_val = self.eval_expr_async(left).await?;
2652                    if !is_truthy(&left_val) {
2653                        return Ok(left_val);
2654                    }
2655                    self.eval_expr_async(right).await
2656                }
2657                BinaryOp::Or => {
2658                    let left_val = self.eval_expr_async(left).await?;
2659                    if is_truthy(&left_val) {
2660                        return Ok(left_val);
2661                    }
2662                    self.eval_expr_async(right).await
2663                }
2664            },
2665            Expr::CommandSubst(pipeline) => {
2666                // Snapshot scope+cwd before running — only output escapes,
2667                // not side effects like `cd` or variable assignments.
2668                let saved_scope = { self.scope.read().await.clone() };
2669                let saved_cwd = {
2670                    let ec = self.exec_ctx.read().await;
2671                    (ec.cwd.clone(), ec.prev_cwd.clone())
2672                };
2673
2674                // Capture result without `?` — restore state unconditionally
2675                let run_result = self.execute_pipeline(pipeline).await;
2676
2677                // Restore scope and cwd regardless of success/failure
2678                {
2679                    let mut scope = self.scope.write().await;
2680                    *scope = saved_scope;
2681                    if let Ok(ref r) = run_result {
2682                        scope.set_last_result(r.clone());
2683                    }
2684                }
2685                {
2686                    let mut ec = self.exec_ctx.write().await;
2687                    ec.cwd = saved_cwd.0;
2688                    ec.prev_cwd = saved_cwd.1;
2689                }
2690
2691                // Now propagate the error
2692                let result = run_result?;
2693
2694                // Prefer structured data (enables `for i in $(cmd)` iteration)
2695                if let Some(data) = &result.data {
2696                    Ok(data.clone())
2697                } else if let Some(output) = result.output() {
2698                    // Flat non-text node lists (glob, ls, tree) → iterable array
2699                    if output.is_flat() && !output.is_simple_text() && !output.root.is_empty() {
2700                        let items: Vec<serde_json::Value> = output.root.iter()
2701                            .map(|n| serde_json::Value::String(n.display_name().to_string()))
2702                            .collect();
2703                        Ok(Value::Json(serde_json::Value::Array(items)))
2704                    } else {
2705                        Ok(Value::String(result.text_out().trim_end().to_string()))
2706                    }
2707                } else {
2708                    // Otherwise return stdout as single string (NO implicit splitting)
2709                    Ok(Value::String(result.text_out().trim_end().to_string()))
2710                }
2711            }
2712            Expr::Test(test_expr) => {
2713                Ok(Value::Bool(self.eval_test_async(test_expr).await?))
2714            }
2715            Expr::Positional(n) => {
2716                let scope = self.scope.read().await;
2717                match scope.get_positional(*n) {
2718                    Some(s) => Ok(Value::String(s.to_string())),
2719                    None => Ok(Value::String(String::new())),
2720                }
2721            }
2722            Expr::AllArgs => {
2723                let scope = self.scope.read().await;
2724                Ok(Value::String(scope.all_args().join(" ")))
2725            }
2726            Expr::ArgCount => {
2727                let scope = self.scope.read().await;
2728                Ok(Value::Int(scope.arg_count() as i64))
2729            }
2730            Expr::VarLength(name) => {
2731                let scope = self.scope.read().await;
2732                match scope.get(name) {
2733                    Some(value) => Ok(Value::Int(value_to_string(value).len() as i64)),
2734                    None => Ok(Value::Int(0)),
2735                }
2736            }
2737            Expr::VarWithDefault { name, default } => {
2738                let scope = self.scope.read().await;
2739                let use_default = match scope.get(name) {
2740                    Some(value) => value_to_string(value).is_empty(),
2741                    None => true,
2742                };
2743                drop(scope); // Release the lock before recursive evaluation
2744                if use_default {
2745                    // Evaluate the default parts (supports nested expansions)
2746                    self.eval_string_parts_async(default).await.map(Value::String)
2747                } else {
2748                    let scope = self.scope.read().await;
2749                    scope.get(name).cloned().ok_or_else(|| anyhow::anyhow!("variable '{}' not found", name))
2750                }
2751            }
2752            Expr::Arithmetic(expr_str) => {
2753                let scope = self.scope.read().await;
2754                crate::arithmetic::eval_arithmetic(expr_str, &scope)
2755                    .map(Value::Int)
2756                    .map_err(|e| anyhow::anyhow!("arithmetic error: {}", e))
2757            }
2758            Expr::Command(cmd) => {
2759                // Execute command and return boolean based on exit code
2760                let result = self.execute_command(&cmd.name, &cmd.args).await?;
2761                Ok(Value::Bool(result.code == 0))
2762            }
2763            Expr::LastExitCode => {
2764                let scope = self.scope.read().await;
2765                Ok(Value::Int(scope.last_result().code))
2766            }
2767            Expr::CurrentPid => {
2768                let scope = self.scope.read().await;
2769                Ok(Value::Int(scope.pid() as i64))
2770            }
2771            Expr::GlobPattern(s) => Ok(Value::String(s.clone())),
2772        }
2773        })
2774    }
2775
2776    /// Async helper to evaluate multiple StringParts into a single string.
2777    fn eval_string_parts_async<'a>(&'a self, parts: &'a [StringPart]) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<String>> + Send + 'a>> {
2778        Box::pin(async move {
2779            let mut result = String::new();
2780            for part in parts {
2781                result.push_str(&self.eval_string_part_async(part).await?);
2782            }
2783            Ok(result)
2784        })
2785    }
2786
2787    /// Async helper to evaluate a StringPart.
2788    /// Evaluate a `[[ ]]` test expression asynchronously, routing file tests
2789    /// through the VFS backend instead of using raw `std::path`.
2790    fn eval_test_async<'a>(&'a self, test_expr: &'a TestExpr) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<bool>> + Send + 'a>> {
2791        Box::pin(async move {
2792            match test_expr {
2793                TestExpr::FileTest { op, path } => {
2794                    let path_value = self.eval_expr_async(path).await?;
2795                    let path_str = value_to_string(&path_value);
2796                    let backend = self.exec_ctx.read().await.backend.clone();
2797                    let entry = backend.stat(std::path::Path::new(&path_str)).await.ok();
2798                    Ok(match op {
2799                        FileTestOp::Exists => entry.is_some(),
2800                        FileTestOp::IsFile => entry.as_ref().is_some_and(|e| e.is_file()),
2801                        FileTestOp::IsDir => entry.as_ref().is_some_and(|e| e.is_dir()),
2802                        FileTestOp::Readable => entry.is_some(),
2803                        FileTestOp::Writable => entry.as_ref().is_some_and(|e| {
2804                            e.permissions.is_none_or(|p| p & 0o222 != 0)
2805                        }),
2806                        FileTestOp::Executable => entry.as_ref().is_some_and(|e| {
2807                            e.permissions.is_some_and(|p| p & 0o111 != 0)
2808                        }),
2809                    })
2810                }
2811                TestExpr::StringTest { op, value } => {
2812                    let val = self.eval_expr_async(value).await?;
2813                    let s = value_to_string(&val);
2814                    Ok(match op {
2815                        crate::ast::StringTestOp::IsEmpty => s.is_empty(),
2816                        crate::ast::StringTestOp::IsNonEmpty => !s.is_empty(),
2817                    })
2818                }
2819                TestExpr::Comparison { left, op, right } => {
2820                    // Evaluate operands async (handles $(cmd)), then compare sync
2821                    let left_val = self.eval_expr_async(left).await?;
2822                    let right_val = self.eval_expr_async(right).await?;
2823                    let resolved = TestExpr::Comparison {
2824                        left: Box::new(Expr::Literal(left_val)),
2825                        op: *op,
2826                        right: Box::new(Expr::Literal(right_val)),
2827                    };
2828                    let expr = Expr::Test(Box::new(resolved));
2829                    let mut scope = self.scope.write().await;
2830                    let value = eval_expr(&expr, &mut scope)
2831                        .map_err(|e| anyhow::anyhow!("{}", e))?;
2832                    Ok(value_to_bool(&value))
2833                }
2834                TestExpr::And { left, right } => {
2835                    if !self.eval_test_async(left).await? {
2836                        Ok(false)
2837                    } else {
2838                        self.eval_test_async(right).await
2839                    }
2840                }
2841                TestExpr::Or { left, right } => {
2842                    if self.eval_test_async(left).await? {
2843                        Ok(true)
2844                    } else {
2845                        self.eval_test_async(right).await
2846                    }
2847                }
2848                TestExpr::Not { expr } => {
2849                    Ok(!self.eval_test_async(expr).await?)
2850                }
2851            }
2852        })
2853    }
2854
2855    fn eval_string_part_async<'a>(&'a self, part: &'a StringPart) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<String>> + Send + 'a>> {
2856        Box::pin(async move {
2857            match part {
2858                StringPart::Literal(s) => Ok(s.clone()),
2859                StringPart::Var(path) => {
2860                    let scope = self.scope.read().await;
2861                    match scope.resolve_path(path) {
2862                        Some(value) => Ok(value_to_string(&value)),
2863                        None => Ok(String::new()), // Unset vars expand to empty
2864                    }
2865                }
2866                StringPart::VarWithDefault { name, default } => {
2867                    let scope = self.scope.read().await;
2868                    let use_default = match scope.get(name) {
2869                        Some(value) => value_to_string(value).is_empty(),
2870                        None => true,
2871                    };
2872                    drop(scope); // Release lock before recursive evaluation
2873                    if use_default {
2874                        // Evaluate the default parts (supports nested expansions)
2875                        self.eval_string_parts_async(default).await
2876                    } else {
2877                        let scope = self.scope.read().await;
2878                        Ok(value_to_string(scope.get(name).ok_or_else(|| anyhow::anyhow!("variable '{}' not found", name))?))
2879                    }
2880                }
2881            StringPart::VarLength(name) => {
2882                let scope = self.scope.read().await;
2883                match scope.get(name) {
2884                    Some(value) => Ok(value_to_string(value).len().to_string()),
2885                    None => Ok("0".to_string()),
2886                }
2887            }
2888            StringPart::Positional(n) => {
2889                let scope = self.scope.read().await;
2890                match scope.get_positional(*n) {
2891                    Some(s) => Ok(s.to_string()),
2892                    None => Ok(String::new()),
2893                }
2894            }
2895            StringPart::AllArgs => {
2896                let scope = self.scope.read().await;
2897                Ok(scope.all_args().join(" "))
2898            }
2899            StringPart::ArgCount => {
2900                let scope = self.scope.read().await;
2901                Ok(scope.arg_count().to_string())
2902            }
2903            StringPart::Arithmetic(expr) => {
2904                let scope = self.scope.read().await;
2905                match crate::arithmetic::eval_arithmetic(expr, &scope) {
2906                    Ok(value) => Ok(value.to_string()),
2907                    Err(_) => Ok(String::new()),
2908                }
2909            }
2910            StringPart::CommandSubst(pipeline) => {
2911                // Snapshot scope+cwd — command substitution in strings must
2912                // not leak side effects (e.g., `"dir: $(cd /; pwd)"` must not change cwd).
2913                let saved_scope = { self.scope.read().await.clone() };
2914                let saved_cwd = {
2915                    let ec = self.exec_ctx.read().await;
2916                    (ec.cwd.clone(), ec.prev_cwd.clone())
2917                };
2918
2919                // Capture result without `?` — restore state unconditionally
2920                let run_result = self.execute_pipeline(pipeline).await;
2921
2922                // Restore scope and cwd regardless of success/failure
2923                {
2924                    let mut scope = self.scope.write().await;
2925                    *scope = saved_scope;
2926                    if let Ok(ref r) = run_result {
2927                        scope.set_last_result(r.clone());
2928                    }
2929                }
2930                {
2931                    let mut ec = self.exec_ctx.write().await;
2932                    ec.cwd = saved_cwd.0;
2933                    ec.prev_cwd = saved_cwd.1;
2934                }
2935
2936                // Now propagate the error
2937                let result = run_result?;
2938
2939                Ok(result.text_out().trim_end_matches('\n').to_string())
2940            }
2941            StringPart::LastExitCode => {
2942                let scope = self.scope.read().await;
2943                Ok(scope.last_result().code.to_string())
2944            }
2945            StringPart::CurrentPid => {
2946                let scope = self.scope.read().await;
2947                Ok(scope.pid().to_string())
2948            }
2949        }
2950        })
2951    }
2952
2953    /// Update the last result in scope.
2954    async fn update_last_result(&self, result: &ExecResult) {
2955        let mut scope = self.scope.write().await;
2956        scope.set_last_result(result.clone());
2957    }
2958
2959    /// Drain accumulated pipeline stderr into a result.
2960    ///
2961    /// Called after each sub-statement inside control structures (`if`, `for`,
2962    /// `while`, `case`, `&&`, `||`) so that stderr appears incrementally rather
2963    /// than batching until the entire structure finishes.
2964    async fn drain_stderr_into(&self, result: &mut ExecResult) {
2965        let drained = {
2966            let mut receiver = self.stderr_receiver.lock().await;
2967            receiver.drain_lossy()
2968        };
2969        if !drained.is_empty() {
2970            if !result.err.is_empty() && !result.err.ends_with('\n') {
2971                result.err.push('\n');
2972            }
2973            result.err.push_str(&drained);
2974        }
2975    }
2976
2977    /// Execute a user-defined function with local variable scoping.
2978    ///
2979    /// Functions push a new scope frame for local variables. Variables declared
2980    /// with `local` are scoped to the function; other assignments modify outer
2981    /// scopes (or create in root if new).
2982    async fn execute_user_tool(&self, def: ToolDef, args: &[Arg]) -> Result<ExecResult> {
2983        // 1. Build function args from AST args (async to support command substitution)
2984        let tool_args = self.build_args_async(args, None).await?;
2985
2986        // 2. Push a new scope frame for local variables
2987        {
2988            let mut scope = self.scope.write().await;
2989            scope.push_frame();
2990        }
2991
2992        // 3. Save current positional parameters and set new ones for this function
2993        let saved_positional = {
2994            let mut scope = self.scope.write().await;
2995            let saved = scope.save_positional();
2996
2997            // Set up new positional parameters ($0 = function name, $1, $2, ... = args)
2998            let positional_args: Vec<String> = tool_args.positional
2999                .iter()
3000                .map(value_to_string)
3001                .collect();
3002            scope.set_positional(&def.name, positional_args);
3003
3004            saved
3005        };
3006
3007        // 3. Execute body statements with control flow handling
3008        // Accumulate output across statements (like sh)
3009        let mut accumulated_out = String::new();
3010        let mut accumulated_err = String::new();
3011        let mut last_code = 0i64;
3012        let mut last_data: Option<Value> = None;
3013
3014        // Track execution error for propagation after cleanup
3015        let mut exec_error: Option<anyhow::Error> = None;
3016        let mut exit_code: Option<i64> = None;
3017
3018        for stmt in &def.body {
3019            match self.execute_stmt_flow(stmt).await {
3020                Ok(flow) => {
3021                    // Drain pipeline stderr after each sub-statement.
3022                    let drained = {
3023                        let mut receiver = self.stderr_receiver.lock().await;
3024                        receiver.drain_lossy()
3025                    };
3026                    if !drained.is_empty() {
3027                        accumulated_err.push_str(&drained);
3028                    }
3029
3030                    match flow {
3031                        ControlFlow::Normal(r) => {
3032                            accumulated_out.push_str(&r.text_out());
3033                            accumulated_err.push_str(&r.err);
3034                            last_code = r.code;
3035                            last_data = r.data;
3036                        }
3037                        ControlFlow::Return { value } => {
3038                            accumulated_out.push_str(&value.text_out());
3039                            accumulated_err.push_str(&value.err);
3040                            last_code = value.code;
3041                            last_data = value.data;
3042                            break;
3043                        }
3044                        ControlFlow::Exit { code } => {
3045                            exit_code = Some(code);
3046                            break;
3047                        }
3048                        ControlFlow::Break { result: r, .. } | ControlFlow::Continue { result: r, .. } => {
3049                            accumulated_out.push_str(&r.text_out());
3050                            accumulated_err.push_str(&r.err);
3051                            last_code = r.code;
3052                            last_data = r.data;
3053                        }
3054                    }
3055                }
3056                Err(e) => {
3057                    exec_error = Some(e);
3058                    break;
3059                }
3060            }
3061        }
3062
3063        // 4. Pop scope frame and restore original positional parameters (unconditionally)
3064        {
3065            let mut scope = self.scope.write().await;
3066            scope.pop_frame();
3067            scope.set_positional(saved_positional.0, saved_positional.1);
3068        }
3069
3070        // 5. Propagate error or exit after cleanup
3071        if let Some(e) = exec_error {
3072            return Err(e);
3073        }
3074        if let Some(code) = exit_code {
3075            return Ok(ExecResult::from_parts(code, accumulated_out, accumulated_err, last_data));
3076        }
3077
3078        Ok(ExecResult::from_parts(last_code, accumulated_out, accumulated_err, last_data))
3079    }
3080
3081    /// Execute the `source` / `.` command to include and run a script.
3082    ///
3083    /// Unlike regular tool execution, `source` executes in the CURRENT scope,
3084    /// allowing the sourced script to set variables and modify shell state.
3085    async fn execute_source(&self, args: &[Arg]) -> Result<ExecResult> {
3086        // Get the file path from the first positional argument
3087        let tool_args = self.build_args_async(args, None).await?;
3088        let path = match tool_args.positional.first() {
3089            Some(Value::String(s)) => s.clone(),
3090            Some(v) => value_to_string(v),
3091            None => {
3092                return Ok(ExecResult::failure(1, "source: missing filename"));
3093            }
3094        };
3095
3096        // Resolve path relative to cwd
3097        let full_path = {
3098            let ctx = self.exec_ctx.read().await;
3099            if path.starts_with('/') {
3100                std::path::PathBuf::from(&path)
3101            } else {
3102                ctx.cwd.join(&path)
3103            }
3104        };
3105
3106        // Read file content via backend
3107        let content = {
3108            let ctx = self.exec_ctx.read().await;
3109            match ctx.backend.read(&full_path, None).await {
3110                Ok(bytes) => {
3111                    String::from_utf8(bytes).map_err(|e| {
3112                        anyhow::anyhow!("source: {}: invalid UTF-8: {}", path, e)
3113                    })?
3114                }
3115                Err(e) => {
3116                    return Ok(ExecResult::failure(
3117                        1,
3118                        format!("source: {}: {}", path, e),
3119                    ));
3120                }
3121            }
3122        };
3123
3124        // Parse the content
3125        let program = match crate::parser::parse(&content) {
3126            Ok(p) => p,
3127            Err(errors) => {
3128                let msg = errors
3129                    .iter()
3130                    .map(|e| format!("{}:{}: {}", path, e.span.start, e.message))
3131                    .collect::<Vec<_>>()
3132                    .join("\n");
3133                return Ok(ExecResult::failure(1, format!("source: {}", msg)));
3134            }
3135        };
3136
3137        // Execute each statement in the CURRENT scope (not isolated)
3138        let mut result = ExecResult::success("");
3139        for stmt in program.statements {
3140            if matches!(stmt, crate::ast::Stmt::Empty) {
3141                continue;
3142            }
3143
3144            match self.execute_stmt_flow(&stmt).await {
3145                Ok(flow) => {
3146                    self.drain_stderr_into(&mut result).await;
3147                    match flow {
3148                        ControlFlow::Normal(r) => {
3149                            result = r.clone();
3150                            self.update_last_result(&r).await;
3151                        }
3152                        ControlFlow::Break { .. } | ControlFlow::Continue { .. } => {
3153                            return Err(anyhow::anyhow!(
3154                                "source: {}: unexpected break/continue outside loop",
3155                                path
3156                            ));
3157                        }
3158                        ControlFlow::Return { value } => {
3159                            return Ok(value);
3160                        }
3161                        ControlFlow::Exit { code } => {
3162                            result.code = code;
3163                            return Ok(result);
3164                        }
3165                    }
3166                }
3167                Err(e) => {
3168                    return Err(e.context(format!("source: {}", path)));
3169                }
3170            }
3171        }
3172
3173        Ok(result)
3174    }
3175
3176    /// Try to execute a script from PATH directories.
3177    ///
3178    /// Searches PATH for `{name}.kai` files and executes them in isolated scope
3179    /// (like user-defined tools). Returns None if no script is found.
3180    async fn try_execute_script(&self, name: &str, args: &[Arg]) -> Result<Option<ExecResult>> {
3181        // Get PATH from scope (default to "/bin")
3182        let path_value = {
3183            let scope = self.scope.read().await;
3184            scope
3185                .get("PATH")
3186                .map(value_to_string)
3187                .unwrap_or_else(|| "/bin".to_string())
3188        };
3189
3190        // Search PATH directories for script
3191        for dir in path_value.split(':') {
3192            if dir.is_empty() {
3193                continue;
3194            }
3195
3196            // Build script path: {dir}/{name}.kai
3197            let script_path = PathBuf::from(dir).join(format!("{}.kai", name));
3198
3199            // Check if script exists
3200            let exists = {
3201                let ctx = self.exec_ctx.read().await;
3202                ctx.backend.exists(&script_path).await
3203            };
3204
3205            if !exists {
3206                continue;
3207            }
3208
3209            // Read script content
3210            let content = {
3211                let ctx = self.exec_ctx.read().await;
3212                match ctx.backend.read(&script_path, None).await {
3213                    Ok(bytes) => match String::from_utf8(bytes) {
3214                        Ok(s) => s,
3215                        Err(e) => {
3216                            return Ok(Some(ExecResult::failure(
3217                                1,
3218                                format!("{}: invalid UTF-8: {}", script_path.display(), e),
3219                            )));
3220                        }
3221                    },
3222                    Err(e) => {
3223                        return Ok(Some(ExecResult::failure(
3224                            1,
3225                            format!("{}: {}", script_path.display(), e),
3226                        )));
3227                    }
3228                }
3229            };
3230
3231            // Parse the script
3232            let program = match crate::parser::parse(&content) {
3233                Ok(p) => p,
3234                Err(errors) => {
3235                    let msg = errors
3236                        .iter()
3237                        .map(|e| format!("{}:{}: {}", script_path.display(), e.span.start, e.message))
3238                        .collect::<Vec<_>>()
3239                        .join("\n");
3240                    return Ok(Some(ExecResult::failure(1, msg)));
3241                }
3242            };
3243
3244            // Build tool_args from args (async for command substitution support)
3245            let tool_args = self.build_args_async(args, None).await?;
3246
3247            // Create isolated scope (like user tools)
3248            let mut isolated_scope = Scope::new();
3249
3250            // Set up positional parameters ($0 = script name, $1, $2, ... = args)
3251            let positional_args: Vec<String> = tool_args.positional
3252                .iter()
3253                .map(value_to_string)
3254                .collect();
3255            isolated_scope.set_positional(name, positional_args);
3256
3257            // Save current scope and swap with isolated scope
3258            let original_scope = {
3259                let mut scope = self.scope.write().await;
3260                std::mem::replace(&mut *scope, isolated_scope)
3261            };
3262
3263            // Execute script statements — track outcome for cleanup
3264            let mut result = ExecResult::success("");
3265            let mut exec_error: Option<anyhow::Error> = None;
3266            let mut exit_code: Option<i64> = None;
3267
3268            for stmt in program.statements {
3269                if matches!(stmt, crate::ast::Stmt::Empty) {
3270                    continue;
3271                }
3272
3273                match self.execute_stmt_flow(&stmt).await {
3274                    Ok(flow) => {
3275                        match flow {
3276                            ControlFlow::Normal(r) => result = r,
3277                            ControlFlow::Return { value } => {
3278                                result = value;
3279                                break;
3280                            }
3281                            ControlFlow::Exit { code } => {
3282                                exit_code = Some(code);
3283                                break;
3284                            }
3285                            ControlFlow::Break { result: r, .. } | ControlFlow::Continue { result: r, .. } => {
3286                                result = r;
3287                            }
3288                        }
3289                    }
3290                    Err(e) => {
3291                        exec_error = Some(e);
3292                        break;
3293                    }
3294                }
3295            }
3296
3297            // Restore original scope unconditionally
3298            {
3299                let mut scope = self.scope.write().await;
3300                *scope = original_scope;
3301            }
3302
3303            // Propagate error or exit after cleanup
3304            if let Some(e) = exec_error {
3305                return Err(e.context(format!("script: {}", script_path.display())));
3306            }
3307            if let Some(code) = exit_code {
3308                result.code = code;
3309                return Ok(Some(result));
3310            }
3311
3312            return Ok(Some(result));
3313        }
3314
3315        // No script found
3316        Ok(None)
3317    }
3318
3319    /// Try to execute an external command from PATH.
3320    ///
3321    /// This is the fallback when no builtin or user-defined tool matches.
3322    /// External commands receive a clean argv (flags preserved in their original format).
3323    ///
3324    /// # Requirements
3325    /// - Command must be found in PATH
3326    /// - Current working directory must be on a real filesystem (not virtual like /v)
3327    ///
3328    /// # Returns
3329    /// - `Ok(Some(result))` if command was found and executed
3330    /// - `Ok(None)` if command was not found in PATH
3331    /// - `Err` on execution errors
3332    #[cfg(not(feature = "native"))]
3333    async fn try_execute_external(&self, _name: &str, _args: &[Arg]) -> Result<Option<ExecResult>> {
3334        Ok(None)
3335    }
3336
3337    /// Try to execute an external command from PATH.
3338    #[cfg(feature = "native")]
3339    #[tracing::instrument(level = "debug", skip(self, args), fields(command = %name))]
3340    async fn try_execute_external(&self, name: &str, args: &[Arg]) -> Result<Option<ExecResult>> {
3341        // Read the cancel token from `self.exec_ctx`, which `dispatch_command`
3342        // populates from the inbound ctx.cancel on every dispatch. This is
3343        // what makes the `timeout` builtin's swapped child token reach the
3344        // wait_or_kill discipline below — reading `self.cancel_token` would
3345        // give the kernel-wide token and miss the timeout's child cascade.
3346        let cancel = {
3347            let ec = self.exec_ctx.read().await;
3348            ec.cancel.clone()
3349        };
3350        let kill_grace = self.kill_grace;
3351        if !self.allow_external_commands {
3352            return Ok(None);
3353        }
3354
3355        // Get real working directory for relative path resolution and child cwd.
3356        // If the CWD is virtual (no real filesystem path), skip external command
3357        // execution entirely — return None so the dispatch can fall through to
3358        // backend-registered tools.
3359        let real_cwd = {
3360            let ctx = self.exec_ctx.read().await;
3361            match ctx.backend.resolve_real_path(&ctx.cwd) {
3362                Some(p) => p,
3363                None => return Ok(None),
3364            }
3365        };
3366
3367        let executable = if name.contains('/') {
3368            // Resolve relative paths (./script, ../bin/tool) against the shell's cwd
3369            let resolved = if std::path::Path::new(name).is_absolute() {
3370                std::path::PathBuf::from(name)
3371            } else {
3372                real_cwd.join(name)
3373            };
3374            if !resolved.exists() {
3375                return Ok(Some(ExecResult::failure(
3376                    127,
3377                    format!("{}: No such file or directory", name),
3378                )));
3379            }
3380            if !resolved.is_file() {
3381                return Ok(Some(ExecResult::failure(
3382                    126,
3383                    format!("{}: Is a directory", name),
3384                )));
3385            }
3386            #[cfg(unix)]
3387            {
3388                use std::os::unix::fs::PermissionsExt;
3389                let mode = std::fs::metadata(&resolved)
3390                    .map(|m| m.permissions().mode())
3391                    .unwrap_or(0);
3392                if mode & 0o111 == 0 {
3393                    return Ok(Some(ExecResult::failure(
3394                        126,
3395                        format!("{}: Permission denied", name),
3396                    )));
3397                }
3398            }
3399            resolved.to_string_lossy().into_owned()
3400        } else {
3401            // Get PATH from scope or environment
3402            let path_var = {
3403                let scope = self.scope.read().await;
3404                scope
3405                    .get("PATH")
3406                    .map(value_to_string)
3407                    .unwrap_or_else(|| std::env::var("PATH").unwrap_or_default())
3408            };
3409
3410            // Resolve command in PATH
3411            match resolve_in_path(name, &path_var) {
3412                Some(path) => path,
3413                None => return Ok(None), // Not found - let caller handle error
3414            }
3415        };
3416
3417        tracing::debug!(executable = %executable, "resolved external command");
3418
3419        // Build flat argv (preserves flag format)
3420        let argv = self.build_args_flat(args).await?;
3421
3422        // Get stdin if available
3423        let stdin_data = {
3424            let mut ctx = self.exec_ctx.write().await;
3425            ctx.take_stdin()
3426        };
3427
3428        // Build and spawn the command
3429        use tokio::process::Command;
3430
3431        let mut cmd = Command::new(&executable);
3432        cmd.args(&argv);
3433        cmd.current_dir(&real_cwd);
3434
3435        // Hermetic env: child sees only kaish's exported vars, not the kaish
3436        // process's OS env. Frontends that want OS-env passthrough (REPL, MCP)
3437        // populate it via KernelConfig::initial_vars at construction.
3438        cmd.env_clear();
3439        {
3440            let scope = self.scope.read().await;
3441            for (var_name, value) in scope.exported_vars() {
3442                cmd.env(var_name, value_to_string(&value));
3443            }
3444        }
3445
3446        // Handle stdin
3447        cmd.stdin(if stdin_data.is_some() {
3448            std::process::Stdio::piped()
3449        } else if self.interactive {
3450            std::process::Stdio::inherit()
3451        } else {
3452            std::process::Stdio::null()
3453        });
3454
3455        // In interactive mode, standalone or last-in-pipeline commands inherit
3456        // the terminal's stdout/stderr so output streams in real-time.
3457        // First/middle commands must capture stdout for the pipe — same as bash.
3458        let pipeline_position = {
3459            let ctx = self.exec_ctx.read().await;
3460            ctx.pipeline_position
3461        };
3462        let inherit_output = self.interactive
3463            && matches!(pipeline_position, PipelinePosition::Only | PipelinePosition::Last);
3464
3465        if inherit_output {
3466            cmd.stdout(std::process::Stdio::inherit());
3467            cmd.stderr(std::process::Stdio::inherit());
3468        } else {
3469            cmd.stdout(std::process::Stdio::piped());
3470            cmd.stderr(std::process::Stdio::piped());
3471        }
3472
3473        // On Unix, always put the child in its own process group so cancellation
3474        // can `killpg` the whole tree (the child plus any grandchildren).
3475        // Restoring default tty-related signal handlers stays gated on
3476        // job-control mode — those only matter when the child has a controlling
3477        // terminal.
3478        #[cfg(unix)]
3479        {
3480            let restore_jc_signals = self.terminal_state.is_some() && inherit_output;
3481            // SAFETY: setpgid and sigaction(SIG_DFL) are async-signal-safe per POSIX
3482            #[allow(unsafe_code)]
3483            unsafe {
3484                cmd.pre_exec(move || {
3485                    // Own process group — for kill scope.
3486                    nix::unistd::setpgid(nix::unistd::Pid::from_raw(0), nix::unistd::Pid::from_raw(0))
3487                        .map_err(|e| std::io::Error::from_raw_os_error(e as i32))?;
3488                    if restore_jc_signals {
3489                        use nix::libc::{sigaction, SIGTSTP, SIGTTOU, SIGTTIN, SIGINT, SIG_DFL};
3490                        let mut sa: nix::libc::sigaction = std::mem::zeroed();
3491                        sa.sa_sigaction = SIG_DFL;
3492                        if sigaction(SIGTSTP, &sa, std::ptr::null_mut()) != 0 {
3493                            return Err(std::io::Error::last_os_error());
3494                        }
3495                        if sigaction(SIGTTOU, &sa, std::ptr::null_mut()) != 0 {
3496                            return Err(std::io::Error::last_os_error());
3497                        }
3498                        if sigaction(SIGTTIN, &sa, std::ptr::null_mut()) != 0 {
3499                            return Err(std::io::Error::last_os_error());
3500                        }
3501                        if sigaction(SIGINT, &sa, std::ptr::null_mut()) != 0 {
3502                            return Err(std::io::Error::last_os_error());
3503                        }
3504                    }
3505                    Ok(())
3506                });
3507            }
3508        }
3509
3510        // Backstop for kill on drop in case our explicit kill path is bypassed
3511        // (panic, early return, etc) on the **capture** wait path. We do NOT
3512        // set this on the JC inherit path: that uses sync `waitpid` outside
3513        // tokio's view of the child, so on drop tokio would try to kill an
3514        // already-reaped (possibly-reused) PID. The JC path has its own
3515        // cancel handling via the side-task watcher.
3516        let in_jc_inherit_path = inherit_output && self.terminal_state.is_some();
3517        if !in_jc_inherit_path {
3518            cmd.kill_on_drop(true);
3519        }
3520
3521        // Spawn the process. Capture a `KillTarget` immediately so cancel/
3522        // timeout paths can deliver signals via pidfd (Linux ≥ 5.3) — bound
3523        // to this process's generation, immune to PID reuse if the OS reaps
3524        // the child before our kill syscalls fire.
3525        let mut child = match cmd.spawn() {
3526            Ok(child) => child,
3527            Err(e) => {
3528                return Ok(Some(ExecResult::failure(
3529                    127,
3530                    format!("{}: {}", name, e),
3531                )));
3532            }
3533        };
3534        let kill_target = crate::pidfd::KillTarget::from_child(&child);
3535
3536        // Write stdin if present
3537        if let Some(data) = stdin_data
3538            && let Some(mut stdin) = child.stdin.take()
3539        {
3540            use tokio::io::AsyncWriteExt;
3541            if let Err(e) = stdin.write_all(data.as_bytes()).await {
3542                return Ok(Some(ExecResult::failure(
3543                    1,
3544                    format!("{}: failed to write stdin: {}", name, e),
3545                )));
3546            }
3547            // Drop stdin to signal EOF
3548        }
3549
3550        if inherit_output {
3551            // Job control path: use waitpid with WUNTRACED for Ctrl-Z support
3552            #[cfg(unix)]
3553            if let Some(ref term) = self.terminal_state {
3554                let child_id = child.id().unwrap_or(0);
3555                let pid = nix::unistd::Pid::from_raw(child_id as i32);
3556                let pgid = pid; // child is its own pgid leader
3557
3558                // Give the terminal to the child's process group
3559                if let Err(e) = term.give_terminal_to(pgid) {
3560                    tracing::warn!("failed to give terminal to child: {}", e);
3561                }
3562
3563                let term_clone = term.clone();
3564                let cmd_name = name.to_string();
3565                let cmd_display = format!("{} {}", name, argv.join(" "));
3566                let jobs = self.jobs.clone();
3567
3568                // Side task that watches for cancellation while the blocking
3569                // waitpid runs. On cancel, it SIGTERMs the process group, waits
3570                // the grace period, then SIGKILLs. The blocking waitpid returns
3571                // when the child dies. AbortOnDrop guard cancels the watcher
3572                // on the success path so it doesn't keep running after wait
3573                // returns naturally.
3574                //
3575                // `wait_complete` shrinks the PID-reuse race: the watcher
3576                // checks it before each kill syscall and bails out if
3577                // wait_for_foreground has already reaped the child. This
3578                // doesn't fully eliminate the race (atomic load + kill is
3579                // not atomic with the OS reap+reuse), but narrows the window
3580                // to nanoseconds — enough to be ignorable in practice.
3581                let wait_complete = std::sync::Arc::new(
3582                    std::sync::atomic::AtomicBool::new(false)
3583                );
3584                let cancel_watcher = {
3585                    let cancel = cancel.clone();
3586                    let wc = wait_complete.clone();
3587                    // Ownership transfer: the JC path's sync wait inside
3588                    // block_in_place owns the child's reaping, so the
3589                    // cancel_watcher drives the kill side via KillTarget
3590                    // (pidfd-bound on Linux). When kill_target is None
3591                    // (older kernel + open failure, or non-Linux), falls
3592                    // through to the older PID-based path the closure
3593                    // captures from `pid`.
3594                    let target = kill_target.as_ref().map(|t| {
3595                        // Re-borrow the components we need into Owned-ish form
3596                        // so the spawned task is 'static. We can't move
3597                        // KillTarget directly because try_execute_external
3598                        // still uses it after the spawn — but on the JC path
3599                        // there is no further use after the watcher spawn,
3600                        // so a clone-of-pid + owned None pidfd is safe.
3601                        // Simpler: signal via the existing target by cloning
3602                        // a fresh pidfd; the original keeps its handle.
3603                        // Pidfd is just an OwnedFd — not Clone — so do it
3604                        // by re-opening from the pid. Fall back if reopen
3605                        // fails (race already reaped → best-effort kill).
3606                        crate::pidfd::KillTarget::from_pid(t.pid())
3607                    });
3608                    tokio::spawn(async move {
3609                        cancel.cancelled().await;
3610                        if wc.load(std::sync::atomic::Ordering::SeqCst) { return; }
3611                        use nix::sys::signal::Signal;
3612                        if let Some(t) = &target {
3613                            t.signal(Signal::SIGTERM);
3614                            t.signal_pg(Signal::SIGTERM);
3615                        } else {
3616                            let _ = nix::sys::signal::kill(pid, Signal::SIGTERM);
3617                            let _ = nix::sys::signal::killpg(pid, Signal::SIGTERM);
3618                        }
3619                        if kill_grace > Duration::ZERO {
3620                            tokio::time::sleep(kill_grace).await;
3621                            if wc.load(std::sync::atomic::Ordering::SeqCst) { return; }
3622                        }
3623                        if let Some(t) = &target {
3624                            t.signal(Signal::SIGKILL);
3625                            t.signal_pg(Signal::SIGKILL);
3626                        } else {
3627                            let _ = nix::sys::signal::kill(pid, Signal::SIGKILL);
3628                            let _ = nix::sys::signal::killpg(pid, Signal::SIGKILL);
3629                        }
3630                    })
3631                };
3632                struct AbortOnDrop(tokio::task::JoinHandle<()>);
3633                impl Drop for AbortOnDrop {
3634                    fn drop(&mut self) {
3635                        self.0.abort();
3636                    }
3637                }
3638                let _watcher_guard = AbortOnDrop(cancel_watcher);
3639
3640                let wait_complete_setter = wait_complete.clone();
3641                let code = tokio::task::block_in_place(move || {
3642                    let result = term_clone.wait_for_foreground(pid);
3643                    // Mark wait done before the watcher might fire.
3644                    wait_complete_setter.store(true, std::sync::atomic::Ordering::SeqCst);
3645
3646                    // Always reclaim the terminal
3647                    if let Err(e) = term_clone.reclaim_terminal() {
3648                        tracing::warn!("failed to reclaim terminal: {}", e);
3649                    }
3650
3651                    match result {
3652                        crate::terminal::WaitResult::Exited(code) => code as i64,
3653                        crate::terminal::WaitResult::Signaled(sig) => 128 + sig as i64,
3654                        crate::terminal::WaitResult::Stopped(_sig) => {
3655                            // Register as a stopped job
3656                            let rt = tokio::runtime::Handle::current();
3657                            let job_id = rt.block_on(jobs.register_stopped(
3658                                cmd_display,
3659                                child_id,
3660                                child_id, // pgid = pid for group leader
3661                            ));
3662                            eprintln!("\n[{}]+ Stopped\t{}", job_id, cmd_name);
3663                            148 // 128 + SIGTSTP(20) on most systems, but we use a fixed value
3664                        }
3665                    }
3666                });
3667
3668                return Ok(Some(ExecResult::from_output(code, String::new(), String::new())));
3669            }
3670
3671            // Non-job-control path with inherited stdio.
3672            let status = match wait_or_kill(&mut child, kill_target.as_ref(), &cancel, kill_grace).await {
3673                Ok(s) => s,
3674                Err(e) => {
3675                    return Ok(Some(ExecResult::failure(
3676                        1,
3677                        format!("{}: failed to wait: {}", name, e),
3678                    )));
3679                }
3680            };
3681
3682            let code = status.code().unwrap_or_else(|| {
3683                #[cfg(unix)]
3684                {
3685                    use std::os::unix::process::ExitStatusExt;
3686                    128 + status.signal().unwrap_or(0)
3687                }
3688                #[cfg(not(unix))]
3689                {
3690                    -1
3691                }
3692            }) as i64;
3693
3694            // stdout/stderr already went to the terminal
3695            Ok(Some(ExecResult::from_output(code, String::new(), String::new())))
3696        } else {
3697            // Capture output via bounded streams
3698            let stdout_stream = Arc::new(BoundedStream::new(DEFAULT_STREAM_MAX_SIZE));
3699            let stderr_stream = Arc::new(BoundedStream::new(DEFAULT_STREAM_MAX_SIZE));
3700
3701            let stdout_pipe = child.stdout.take();
3702            let stderr_pipe = child.stderr.take();
3703
3704            let stdout_clone = stdout_stream.clone();
3705            let stderr_clone = stderr_stream.clone();
3706
3707            let stdout_task = stdout_pipe.map(|pipe| {
3708                tokio::spawn(async move {
3709                    drain_to_stream(pipe, stdout_clone).await;
3710                })
3711            });
3712
3713            let stderr_task = stderr_pipe.map(|pipe| {
3714                tokio::spawn(async move {
3715                    drain_to_stream(pipe, stderr_clone).await;
3716                })
3717            });
3718
3719            let cancelled_before_wait = cancel.is_cancelled();
3720            let status = match wait_or_kill(&mut child, kill_target.as_ref(), &cancel, kill_grace).await {
3721                Ok(s) => s,
3722                Err(e) => {
3723                    if let Some(task) = stdout_task { task.abort(); let _ = task.await; }
3724                    if let Some(task) = stderr_task { task.abort(); let _ = task.await; }
3725                    return Ok(Some(ExecResult::failure(
3726                        1,
3727                        format!("{}: failed to wait: {}", name, e),
3728                    )));
3729                }
3730            };
3731
3732            // On cancel, abort the drain tasks (the child's pipes are gone;
3733            // late output is lost but predictable death beats partial capture).
3734            // On normal exit, await drains so we don't lose buffered output.
3735            if cancelled_before_wait || cancel.is_cancelled() {
3736                if let Some(task) = stdout_task { task.abort(); let _ = task.await; }
3737                if let Some(task) = stderr_task { task.abort(); let _ = task.await; }
3738            } else {
3739                if let Some(task) = stdout_task {
3740                    // Ignore join error — the drain task logs its own errors
3741                    let _ = task.await;
3742                }
3743                if let Some(task) = stderr_task {
3744                    let _ = task.await;
3745                }
3746            }
3747
3748            let code = status.code().unwrap_or_else(|| {
3749                #[cfg(unix)]
3750                {
3751                    use std::os::unix::process::ExitStatusExt;
3752                    128 + status.signal().unwrap_or(0)
3753                }
3754                #[cfg(not(unix))]
3755                {
3756                    -1
3757                }
3758            }) as i64;
3759
3760            let stdout = stdout_stream.read_string().await;
3761            let stderr = stderr_stream.read_string().await;
3762
3763            Ok(Some(ExecResult::from_output(code, stdout, stderr)))
3764        }
3765    }
3766
3767    // --- Variable Access ---
3768
3769    /// Get a variable value.
3770    pub async fn get_var(&self, name: &str) -> Option<Value> {
3771        let scope = self.scope.read().await;
3772        scope.get(name).cloned()
3773    }
3774
3775    /// Check if error-exit mode is enabled (for testing).
3776    #[cfg(test)]
3777    pub async fn error_exit_enabled(&self) -> bool {
3778        let scope = self.scope.read().await;
3779        scope.error_exit_enabled()
3780    }
3781
3782    /// Set a variable value.
3783    pub async fn set_var(&self, name: &str, value: Value) {
3784        let mut scope = self.scope.write().await;
3785        scope.set(name.to_string(), value);
3786    }
3787
3788    /// Set positional parameters ($0 script name and $1-$9 args).
3789    pub async fn set_positional(&self, script_name: impl Into<String>, args: Vec<String>) {
3790        let mut scope = self.scope.write().await;
3791        scope.set_positional(script_name, args);
3792    }
3793
3794    /// List all variables.
3795    pub async fn list_vars(&self) -> Vec<(String, Value)> {
3796        let scope = self.scope.read().await;
3797        scope.all()
3798    }
3799
3800    // --- CWD ---
3801
3802    /// Get current working directory.
3803    pub async fn cwd(&self) -> PathBuf {
3804        self.exec_ctx.read().await.cwd.clone()
3805    }
3806
3807    /// Set current working directory.
3808    pub async fn set_cwd(&self, path: PathBuf) {
3809        let mut ctx = self.exec_ctx.write().await;
3810        ctx.set_cwd(path);
3811    }
3812
3813    // --- Last Result ---
3814
3815    /// Get the last result ($?).
3816    pub async fn last_result(&self) -> ExecResult {
3817        let scope = self.scope.read().await;
3818        scope.last_result().clone()
3819    }
3820
3821    // --- Tools ---
3822
3823    /// Check if a user-defined function exists.
3824    pub async fn has_function(&self, name: &str) -> bool {
3825        self.user_tools.read().await.contains_key(name)
3826    }
3827
3828    /// Get available tool schemas.
3829    pub fn tool_schemas(&self) -> Vec<crate::tools::ToolSchema> {
3830        self.tools.schemas()
3831    }
3832
3833    // --- Jobs ---
3834
3835    /// Get job manager.
3836    pub fn jobs(&self) -> Arc<JobManager> {
3837        self.jobs.clone()
3838    }
3839
3840    // --- VFS ---
3841
3842    /// Get VFS router.
3843    pub fn vfs(&self) -> Arc<VfsRouter> {
3844        self.vfs.clone()
3845    }
3846
3847    // --- State ---
3848
3849    /// Reset kernel to initial state.
3850    ///
3851    /// Clears in-memory variables and resets cwd to root.
3852    /// History is not cleared (it persists across resets).
3853    pub async fn reset(&self) -> Result<()> {
3854        {
3855            let mut scope = self.scope.write().await;
3856            *scope = Scope::new();
3857        }
3858        {
3859            let mut ctx = self.exec_ctx.write().await;
3860            ctx.cwd = PathBuf::from("/");
3861        }
3862        Ok(())
3863    }
3864
3865    /// Shutdown the kernel.
3866    pub async fn shutdown(self) -> Result<()> {
3867        // Wait for all background jobs
3868        self.jobs.wait_all().await;
3869        Ok(())
3870    }
3871
3872    /// Dispatch a single command using the full resolution chain.
3873    ///
3874    /// This is the core of `CommandDispatcher` — it syncs state between the
3875    /// passed-in `ExecContext` and kernel-internal state (scope, exec_ctx),
3876    /// then delegates to `execute_command` for the actual dispatch.
3877    ///
3878    /// State flow:
3879    /// 1. ctx → self: sync scope, cwd, stdin so internal methods see current state
3880    /// 2. execute_command: full dispatch chain (user tools, builtins, scripts, external, backend)
3881    /// 3. self → ctx: sync scope, cwd changes back so the pipeline runner sees them
3882    async fn dispatch_command(&self, cmd: &Command, ctx: &mut ExecContext) -> Result<ExecResult> {
3883        // Ensure nested dispatch (e.g. the `timeout` builtin re-dispatching
3884        // its inner command via ctx.dispatcher) routes through THIS kernel,
3885        // not a stale parent. Critical for forks: the fork's builtins must
3886        // use the fork's dispatcher, not the parent's.
3887        if let Some(d) = self.dispatcher() {
3888            ctx.dispatcher = Some(d);
3889        }
3890
3891        // 1. Sync ctx → self internals
3892        {
3893            let mut scope = self.scope.write().await;
3894            *scope = ctx.scope.clone();
3895        }
3896        {
3897            let mut ec = self.exec_ctx.write().await;
3898            ec.cwd = ctx.cwd.clone();
3899            ec.prev_cwd = ctx.prev_cwd.clone();
3900            ec.stdin = ctx.stdin.take();
3901            ec.stdin_data = ctx.stdin_data.take();
3902            // Streaming pipe endpoints and kernel stderr must flow to the
3903            // tool via self.exec_ctx — execute_command reads that, not the
3904            // passed-in ctx. Without moving these, concurrent pipeline
3905            // stages dispatched via a fork get pipe_stdin = None and
3906            // silently read nothing.
3907            ec.pipe_stdin = ctx.pipe_stdin.take();
3908            ec.pipe_stdout = ctx.pipe_stdout.take();
3909            if let Some(stderr) = ctx.stderr.clone() {
3910                ec.stderr = Some(stderr);
3911            }
3912            ec.aliases = ctx.aliases.clone();
3913            ec.ignore_config = ctx.ignore_config.clone();
3914            ec.output_limit = ctx.output_limit.clone();
3915            ec.pipeline_position = ctx.pipeline_position;
3916            // Sync the cancel token from ctx → ec. Builtins like `timeout`
3917            // swap ctx.cancel to a derived child token before re-dispatching;
3918            // execute_command's snapshot reads ec.cancel (kept aligned by
3919            // this sync), so try_execute_external sees the right token.
3920            ec.cancel = ctx.cancel.clone();
3921        }
3922
3923        // 2. Execute via the full dispatch chain
3924        let result = self.execute_command(&cmd.name, &cmd.args).await?;
3925
3926        // 3. Sync self → ctx
3927        {
3928            let scope = self.scope.read().await;
3929            ctx.scope = scope.clone();
3930        }
3931        {
3932            let mut ec = self.exec_ctx.write().await;
3933            ctx.cwd = ec.cwd.clone();
3934            ctx.prev_cwd = ec.prev_cwd.clone();
3935            ctx.aliases = ec.aliases.clone();
3936            ctx.ignore_config = ec.ignore_config.clone();
3937            ctx.output_limit = ec.output_limit.clone();
3938            // Return any pipe endpoints that the tool didn't consume.
3939            // `take()` here keeps the fork's exec_ctx in a clean state for
3940            // the next dispatch — these are per-command and shouldn't leak
3941            // between calls.
3942            ctx.pipe_stdin = ec.pipe_stdin.take();
3943            ctx.pipe_stdout = ec.pipe_stdout.take();
3944        }
3945
3946        Ok(result)
3947    }
3948}
3949
3950#[async_trait]
3951impl CommandDispatcher for Kernel {
3952    /// Dispatch a command through the Kernel's full resolution chain.
3953    ///
3954    /// This is the single path for all command execution when called from
3955    /// the pipeline runner. It provides the full dispatch chain:
3956    /// user tools → builtins → .kai scripts → external commands → backend tools.
3957    async fn dispatch(&self, cmd: &Command, ctx: &mut ExecContext) -> Result<ExecResult> {
3958        self.dispatch_command(cmd, ctx).await
3959    }
3960
3961    /// Produce a forked dispatcher with independent mutable state (detached).
3962    ///
3963    /// Calls the inherent `Kernel::fork` method (note the UFCS to avoid
3964    /// recursing into the trait method we're defining) and coerces the
3965    /// returned `Arc<Kernel>` to `Arc<dyn CommandDispatcher>`.
3966    async fn fork(&self) -> Arc<dyn CommandDispatcher> {
3967        let fork: Arc<Kernel> = Kernel::fork(self).await;
3968        fork
3969    }
3970
3971    /// Produce a forked dispatcher with cancellation cascading from this kernel.
3972    async fn fork_attached(&self) -> Arc<dyn CommandDispatcher> {
3973        let fork: Arc<Kernel> = Kernel::fork_attached(self).await;
3974        fork
3975    }
3976}
3977
3978/// Accumulate output from one result into another.
3979///
3980/// This appends stdout and stderr (with newlines as separators) and updates
3981/// the exit code to match the new result. Used to preserve output from
3982/// multiple statements, loop iterations, and command chains.
3983fn accumulate_result(accumulated: &mut ExecResult, new: &ExecResult) {
3984    // Materialize lazy OutputData into .out before accumulating.
3985    // Without this, the first command's output stays in .output while
3986    // the second's text gets appended to .out, losing the first.
3987    accumulated.materialize();
3988    let new_text = new.text_out();
3989    if !accumulated.text_out().is_empty() && !new_text.is_empty() && !accumulated.text_out().ends_with('\n') {
3990        accumulated.push_out("\n");
3991    }
3992    accumulated.push_out(&new_text);
3993    if !accumulated.err.is_empty() && !new.err.is_empty() && !accumulated.err.ends_with('\n') {
3994        accumulated.err.push('\n');
3995    }
3996    accumulated.err.push_str(&new.err);
3997    accumulated.code = new.code;
3998    accumulated.data = new.data.clone();
3999    accumulated.did_spill = new.did_spill;
4000    accumulated.original_code = new.original_code;
4001    accumulated.content_type = new.content_type.clone();
4002    accumulated.baggage.clone_from(&new.baggage);
4003}
4004
4005/// Check if a value is truthy.
4006fn is_truthy(value: &Value) -> bool {
4007    match value {
4008        Value::Null => false,
4009        Value::Bool(b) => *b,
4010        Value::Int(i) => *i != 0,
4011        Value::Float(f) => *f != 0.0,
4012        Value::String(s) => !s.is_empty(),
4013        Value::Json(json) => match json {
4014            serde_json::Value::Null => false,
4015            serde_json::Value::Array(arr) => !arr.is_empty(),
4016            serde_json::Value::Object(obj) => !obj.is_empty(),
4017            serde_json::Value::Bool(b) => *b,
4018            serde_json::Value::Number(n) => n.as_f64().map(|f| f != 0.0).unwrap_or(false),
4019            serde_json::Value::String(s) => !s.is_empty(),
4020        },
4021        Value::Blob(_) => true, // Blob references are always truthy
4022    }
4023}
4024
4025/// Apply tilde expansion to a value.
4026///
4027/// Only string values starting with `~` are expanded.
4028fn apply_tilde_expansion(value: Value) -> Value {
4029    match value {
4030        Value::String(s) if s.starts_with('~') => Value::String(expand_tilde(&s)),
4031        _ => value,
4032    }
4033}
4034
4035/// Wait for a child to exit, killing it if `cancel` fires first.
4036///
4037/// `target` carries a Linux pidfd (when available) for race-free direct-child
4038/// kill; fall-through to PID-based kill otherwise. On non-unix targets the
4039/// parameter is ignored and we use tokio's cross-platform `start_kill`.
4040#[cfg(all(unix, feature = "native"))]
4041pub(crate) async fn wait_or_kill(
4042    child: &mut tokio::process::Child,
4043    target: Option<&crate::pidfd::KillTarget>,
4044    cancel: &tokio_util::sync::CancellationToken,
4045    grace: Duration,
4046) -> std::io::Result<std::process::ExitStatus> {
4047    tokio::select! {
4048        biased;
4049        status = child.wait() => status,
4050        _ = cancel.cancelled() => kill_with_grace(child, target, grace).await,
4051    }
4052}
4053
4054#[cfg(all(not(unix), feature = "native"))]
4055pub(crate) async fn wait_or_kill(
4056    child: &mut tokio::process::Child,
4057    _target: Option<&()>,
4058    cancel: &tokio_util::sync::CancellationToken,
4059    _grace: Duration,
4060) -> std::io::Result<std::process::ExitStatus> {
4061    tokio::select! {
4062        biased;
4063        status = child.wait() => status,
4064        _ = cancel.cancelled() => {
4065            let _ = child.start_kill();
4066            child.wait().await
4067        }
4068    }
4069}
4070
4071/// Send SIGTERM to the child and its process group; wait `grace`; then SIGKILL.
4072///
4073/// Direct-child kill goes through `target.signal()`, which on Linux uses a
4074/// pidfd (immune to PID reuse). Process-group kill uses `killpg` — there is
4075/// no PGID-equivalent of pidfd, so grandchildren retain a small reuse window.
4076#[cfg(all(unix, feature = "native"))]
4077pub(crate) async fn kill_with_grace(
4078    child: &mut tokio::process::Child,
4079    target: Option<&crate::pidfd::KillTarget>,
4080    grace: Duration,
4081) -> std::io::Result<std::process::ExitStatus> {
4082    use nix::sys::signal::Signal;
4083
4084    if let Some(t) = target {
4085        t.signal(Signal::SIGTERM);
4086        t.signal_pg(Signal::SIGTERM);
4087        if grace > Duration::ZERO
4088            && let Ok(status) = tokio::time::timeout(grace, child.wait()).await
4089        {
4090            return status;
4091        }
4092        t.signal(Signal::SIGKILL);
4093        t.signal_pg(Signal::SIGKILL);
4094    }
4095    child.wait().await
4096}
4097
4098#[cfg(all(test, feature = "native"))]
4099mod tests {
4100    use super::*;
4101
4102    #[tokio::test]
4103    async fn test_kernel_transient() {
4104        let kernel = Kernel::transient().expect("failed to create kernel");
4105        assert_eq!(kernel.name(), "transient");
4106    }
4107
4108    #[tokio::test]
4109    async fn test_kernel_execute_echo() {
4110        let kernel = Kernel::transient().expect("failed to create kernel");
4111        let result = kernel.execute("echo hello").await.expect("execution failed");
4112        assert!(result.ok());
4113        assert_eq!(result.text_out().trim(), "hello");
4114    }
4115
4116    #[tokio::test]
4117    async fn test_multiple_statements_accumulate_output() {
4118        let kernel = Kernel::transient().expect("failed to create kernel");
4119        let result = kernel
4120            .execute("echo one\necho two\necho three")
4121            .await
4122            .expect("execution failed");
4123        assert!(result.ok());
4124        // Should have all three outputs separated by newlines
4125        assert!(result.text_out().contains("one"), "missing 'one': {}", result.text_out());
4126        assert!(result.text_out().contains("two"), "missing 'two': {}", result.text_out());
4127        assert!(result.text_out().contains("three"), "missing 'three': {}", result.text_out());
4128    }
4129
4130    #[tokio::test]
4131    async fn test_and_chain_accumulates_output() {
4132        let kernel = Kernel::transient().expect("failed to create kernel");
4133        let result = kernel
4134            .execute("echo first && echo second")
4135            .await
4136            .expect("execution failed");
4137        assert!(result.ok());
4138        assert!(result.text_out().contains("first"), "missing 'first': {}", result.text_out());
4139        assert!(result.text_out().contains("second"), "missing 'second': {}", result.text_out());
4140    }
4141
4142    #[tokio::test]
4143    async fn test_for_loop_accumulates_output() {
4144        let kernel = Kernel::transient().expect("failed to create kernel");
4145        let result = kernel
4146            .execute(r#"for X in a b c; do echo "item: ${X}"; done"#)
4147            .await
4148            .expect("execution failed");
4149        assert!(result.ok());
4150        assert!(result.text_out().contains("item: a"), "missing 'item: a': {}", result.text_out());
4151        assert!(result.text_out().contains("item: b"), "missing 'item: b': {}", result.text_out());
4152        assert!(result.text_out().contains("item: c"), "missing 'item: c': {}", result.text_out());
4153    }
4154
4155    #[tokio::test]
4156    async fn test_while_loop_accumulates_output() {
4157        let kernel = Kernel::transient().expect("failed to create kernel");
4158        let result = kernel
4159            .execute(r#"
4160                N=3
4161                while [[ ${N} -gt 0 ]]; do
4162                    echo "N=${N}"
4163                    N=$((N - 1))
4164                done
4165            "#)
4166            .await
4167            .expect("execution failed");
4168        assert!(result.ok());
4169        assert!(result.text_out().contains("N=3"), "missing 'N=3': {}", result.text_out());
4170        assert!(result.text_out().contains("N=2"), "missing 'N=2': {}", result.text_out());
4171        assert!(result.text_out().contains("N=1"), "missing 'N=1': {}", result.text_out());
4172    }
4173
4174    #[tokio::test]
4175    async fn test_kernel_set_var() {
4176        let kernel = Kernel::transient().expect("failed to create kernel");
4177
4178        kernel.execute("X=42").await.expect("set failed");
4179
4180        let value = kernel.get_var("X").await;
4181        assert_eq!(value, Some(Value::Int(42)));
4182    }
4183
4184    #[tokio::test]
4185    async fn test_kernel_var_expansion() {
4186        let kernel = Kernel::transient().expect("failed to create kernel");
4187
4188        kernel.execute("NAME=\"world\"").await.expect("set failed");
4189        let result = kernel.execute("echo \"hello ${NAME}\"").await.expect("echo failed");
4190
4191        assert!(result.ok());
4192        assert_eq!(result.text_out().trim(), "hello world");
4193    }
4194
4195    #[tokio::test]
4196    async fn test_kernel_last_result() {
4197        let kernel = Kernel::transient().expect("failed to create kernel");
4198
4199        kernel.execute("echo test").await.expect("echo failed");
4200
4201        let last = kernel.last_result().await;
4202        assert!(last.ok());
4203        assert_eq!(last.text_out().trim(), "test");
4204    }
4205
4206    #[tokio::test]
4207    async fn test_kernel_tool_not_found() {
4208        let kernel = Kernel::transient().expect("failed to create kernel");
4209
4210        let result = kernel.execute("nonexistent_tool").await.expect("execution failed");
4211        assert!(!result.ok());
4212        assert_eq!(result.code, 127);
4213        assert!(result.err.contains("command not found"));
4214    }
4215
4216    #[tokio::test]
4217    async fn test_external_command_true() {
4218        // Use REPL config for passthrough filesystem access
4219        let kernel = Kernel::new(KernelConfig::repl()).expect("failed to create kernel");
4220
4221        // /bin/true should be available on any Unix system
4222        let result = kernel.execute("true").await.expect("execution failed");
4223        // This should use the builtin true, which returns 0
4224        assert!(result.ok(), "true should succeed: {:?}", result);
4225    }
4226
4227    #[tokio::test]
4228    async fn test_external_command_basic() {
4229        // Use REPL config for passthrough filesystem access
4230        let kernel = Kernel::new(KernelConfig::repl()).expect("failed to create kernel");
4231
4232        // Test with /bin/echo which is external
4233        // Note: kaish has a builtin echo, so this will use the builtin
4234        // Let's test with a command that's not a builtin
4235        // Actually, let's just test that PATH resolution works by checking the PATH var
4236        let path_var = std::env::var("PATH").unwrap_or_default();
4237        eprintln!("System PATH: {}", path_var);
4238
4239        // Set PATH in kernel to ensure it's available
4240        kernel.execute(&format!(r#"PATH="{}""#, path_var)).await.expect("set PATH failed");
4241
4242        // Now try an external command like /usr/bin/env
4243        // But env is also a builtin... let's try uname
4244        let result = kernel.execute("uname").await.expect("execution failed");
4245        eprintln!("uname result: {:?}", result);
4246        // uname should succeed if external commands work
4247        assert!(result.ok() || result.code == 127, "uname: {:?}", result);
4248    }
4249
4250    #[tokio::test]
4251    async fn test_kernel_reset() {
4252        let kernel = Kernel::transient().expect("failed to create kernel");
4253
4254        kernel.execute("X=1").await.expect("set failed");
4255        assert!(kernel.get_var("X").await.is_some());
4256
4257        kernel.reset().await.expect("reset failed");
4258        assert!(kernel.get_var("X").await.is_none());
4259    }
4260
4261    #[tokio::test]
4262    async fn test_kernel_cwd() {
4263        let kernel = Kernel::transient().expect("failed to create kernel");
4264
4265        // Transient kernel uses sandboxed mode with cwd=$HOME
4266        let cwd = kernel.cwd().await;
4267        let home = std::env::var("HOME")
4268            .map(PathBuf::from)
4269            .unwrap_or_else(|_| PathBuf::from("/"));
4270        assert_eq!(cwd, home);
4271
4272        kernel.set_cwd(PathBuf::from("/tmp")).await;
4273        assert_eq!(kernel.cwd().await, PathBuf::from("/tmp"));
4274    }
4275
4276    #[tokio::test]
4277    async fn test_kernel_list_vars() {
4278        let kernel = Kernel::transient().expect("failed to create kernel");
4279
4280        kernel.execute("A=1").await.ok();
4281        kernel.execute("B=2").await.ok();
4282
4283        let vars = kernel.list_vars().await;
4284        assert!(vars.iter().any(|(n, v)| n == "A" && *v == Value::Int(1)));
4285        assert!(vars.iter().any(|(n, v)| n == "B" && *v == Value::Int(2)));
4286    }
4287
4288    #[tokio::test]
4289    async fn test_is_truthy() {
4290        assert!(!is_truthy(&Value::Null));
4291        assert!(!is_truthy(&Value::Bool(false)));
4292        assert!(is_truthy(&Value::Bool(true)));
4293        assert!(!is_truthy(&Value::Int(0)));
4294        assert!(is_truthy(&Value::Int(1)));
4295        assert!(!is_truthy(&Value::String("".into())));
4296        assert!(is_truthy(&Value::String("x".into())));
4297    }
4298
4299    #[tokio::test]
4300    async fn test_jq_in_pipeline() {
4301        let kernel = Kernel::transient().expect("failed to create kernel");
4302        // kaish uses double quotes only; escape inner quotes
4303        let result = kernel
4304            .execute(r#"echo "{\"name\": \"Alice\"}" | jq ".name" -r"#)
4305            .await
4306            .expect("execution failed");
4307        assert!(result.ok(), "jq pipeline failed: {}", result.err);
4308        assert_eq!(result.text_out().trim(), "Alice");
4309    }
4310
4311    #[tokio::test]
4312    async fn test_user_defined_tool() {
4313        let kernel = Kernel::transient().expect("failed to create kernel");
4314
4315        // Define a function
4316        kernel
4317            .execute(r#"greet() { echo "Hello, $1!" }"#)
4318            .await
4319            .expect("function definition failed");
4320
4321        // Call the function
4322        let result = kernel
4323            .execute(r#"greet "World""#)
4324            .await
4325            .expect("function call failed");
4326
4327        assert!(result.ok(), "greet failed: {}", result.err);
4328        assert_eq!(result.text_out().trim(), "Hello, World!");
4329    }
4330
4331    #[tokio::test]
4332    async fn test_user_tool_positional_args() {
4333        let kernel = Kernel::transient().expect("failed to create kernel");
4334
4335        // Define a function with positional param
4336        kernel
4337            .execute(r#"greet() { echo "Hi $1" }"#)
4338            .await
4339            .expect("function definition failed");
4340
4341        // Call with positional argument
4342        let result = kernel
4343            .execute(r#"greet "Amy""#)
4344            .await
4345            .expect("function call failed");
4346
4347        assert!(result.ok(), "greet failed: {}", result.err);
4348        assert_eq!(result.text_out().trim(), "Hi Amy");
4349    }
4350
4351    #[tokio::test]
4352    async fn test_function_shared_scope() {
4353        let kernel = Kernel::transient().expect("failed to create kernel");
4354
4355        // Set a variable in parent scope
4356        kernel
4357            .execute(r#"SECRET="hidden""#)
4358            .await
4359            .expect("set failed");
4360
4361        // Define a function that accesses and modifies parent variable
4362        kernel
4363            .execute(r#"access_parent() {
4364                echo "${SECRET}"
4365                SECRET="modified"
4366            }"#)
4367            .await
4368            .expect("function definition failed");
4369
4370        // Call the function - it SHOULD see SECRET (shared scope like sh)
4371        let result = kernel.execute("access_parent").await.expect("function call failed");
4372
4373        // Function should have access to parent scope
4374        assert!(
4375            result.text_out().contains("hidden"),
4376            "Function should access parent scope, got: {}",
4377            result.text_out()
4378        );
4379
4380        // Function should have modified the parent variable
4381        let secret = kernel.get_var("SECRET").await;
4382        assert_eq!(
4383            secret,
4384            Some(Value::String("modified".into())),
4385            "Function should modify parent scope"
4386        );
4387    }
4388
4389    #[tokio::test]
4390    #[ignore = "exec replaces the test binary via CommandExt::exec, hangs libtest; cannot be run under cargo test"]
4391    async fn test_exec_builtin() {
4392        let kernel = Kernel::transient().expect("failed to create kernel");
4393        // argv is now a space-separated string or JSON array string
4394        let result = kernel
4395            .execute(r#"exec command="/bin/echo" argv="hello world""#)
4396            .await
4397            .expect("exec failed");
4398
4399        assert!(result.ok(), "exec failed: {}", result.err);
4400        assert_eq!(result.text_out().trim(), "hello world");
4401    }
4402
4403    #[tokio::test]
4404    async fn test_while_false_never_runs() {
4405        let kernel = Kernel::transient().expect("failed to create kernel");
4406
4407        // A while loop with false condition should never run
4408        let result = kernel
4409            .execute(r#"
4410                while false; do
4411                    echo "should not run"
4412                done
4413            "#)
4414            .await
4415            .expect("while false failed");
4416
4417        assert!(result.ok());
4418        assert!(result.text_out().is_empty(), "while false should not execute body: {}", result.text_out());
4419    }
4420
4421    #[tokio::test]
4422    async fn test_while_string_comparison() {
4423        let kernel = Kernel::transient().expect("failed to create kernel");
4424
4425        // Set a flag
4426        kernel.execute(r#"FLAG="go""#).await.expect("set failed");
4427
4428        // Use string comparison as condition (shell-compatible [[ ]] syntax)
4429        // Note: Put echo last so we can check the output
4430        let result = kernel
4431            .execute(r#"
4432                while [[ ${FLAG} == "go" ]]; do
4433                    FLAG="stop"
4434                    echo "running"
4435                done
4436            "#)
4437            .await
4438            .expect("while with string cmp failed");
4439
4440        assert!(result.ok());
4441        assert!(result.text_out().contains("running"), "should have run once: {}", result.text_out());
4442
4443        // Verify flag was changed
4444        let flag = kernel.get_var("FLAG").await;
4445        assert_eq!(flag, Some(Value::String("stop".into())));
4446    }
4447
4448    #[tokio::test]
4449    async fn test_while_numeric_comparison() {
4450        let kernel = Kernel::transient().expect("failed to create kernel");
4451
4452        // Test > comparison (shell-compatible [[ ]] with -gt)
4453        kernel.execute("N=5").await.expect("set failed");
4454
4455        // Note: Put echo last so we can check the output
4456        let result = kernel
4457            .execute(r#"
4458                while [[ ${N} -gt 3 ]]; do
4459                    N=3
4460                    echo "N was greater"
4461                done
4462            "#)
4463            .await
4464            .expect("while with > failed");
4465
4466        assert!(result.ok());
4467        assert!(result.text_out().contains("N was greater"), "should have run once: {}", result.text_out());
4468    }
4469
4470    #[tokio::test]
4471    async fn test_break_in_while_loop() {
4472        let kernel = Kernel::transient().expect("failed to create kernel");
4473
4474        let result = kernel
4475            .execute(r#"
4476                I=0
4477                while true; do
4478                    I=1
4479                    echo "before break"
4480                    break
4481                    echo "after break"
4482                done
4483            "#)
4484            .await
4485            .expect("while with break failed");
4486
4487        assert!(result.ok());
4488        assert!(result.text_out().contains("before break"), "should see before break: {}", result.text_out());
4489        assert!(!result.text_out().contains("after break"), "should not see after break: {}", result.text_out());
4490
4491        // Verify we exited the loop
4492        let i = kernel.get_var("I").await;
4493        assert_eq!(i, Some(Value::Int(1)));
4494    }
4495
4496    #[tokio::test]
4497    async fn test_continue_in_while_loop() {
4498        let kernel = Kernel::transient().expect("failed to create kernel");
4499
4500        // Test continue in a while loop where variables persist
4501        // We use string state transition: "start" -> "middle" -> "end"
4502        // continue on "middle" should skip to next iteration
4503        // Shell-compatible: use [[ ]] for comparisons
4504        let result = kernel
4505            .execute(r#"
4506                STATE="start"
4507                AFTER_CONTINUE="no"
4508                while [[ ${STATE} != "done" ]]; do
4509                    if [[ ${STATE} == "start" ]]; then
4510                        STATE="middle"
4511                        continue
4512                        AFTER_CONTINUE="yes"
4513                    fi
4514                    if [[ ${STATE} == "middle" ]]; then
4515                        STATE="done"
4516                    fi
4517                done
4518            "#)
4519            .await
4520            .expect("while with continue failed");
4521
4522        assert!(result.ok());
4523
4524        // STATE should be "done" (we completed the loop)
4525        let state = kernel.get_var("STATE").await;
4526        assert_eq!(state, Some(Value::String("done".into())));
4527
4528        // AFTER_CONTINUE should still be "no" (continue skipped the assignment)
4529        let after = kernel.get_var("AFTER_CONTINUE").await;
4530        assert_eq!(after, Some(Value::String("no".into())));
4531    }
4532
4533    #[tokio::test]
4534    async fn test_break_with_level() {
4535        let kernel = Kernel::transient().expect("failed to create kernel");
4536
4537        // Nested loop with break 2 to exit both loops
4538        // We verify by checking OUTER value:
4539        // - If break 2 works, OUTER stays at 1 (set before for loop)
4540        // - If break 2 fails, OUTER becomes 2 (set after for loop)
4541        let result = kernel
4542            .execute(r#"
4543                OUTER=0
4544                while true; do
4545                    OUTER=1
4546                    for X in "1 2"; do
4547                        break 2
4548                    done
4549                    OUTER=2
4550                done
4551            "#)
4552            .await
4553            .expect("nested break failed");
4554
4555        assert!(result.ok());
4556
4557        // OUTER should be 1 (set before for loop), not 2 (would be set after for loop)
4558        let outer = kernel.get_var("OUTER").await;
4559        assert_eq!(outer, Some(Value::Int(1)), "break 2 should have skipped OUTER=2");
4560    }
4561
4562    #[tokio::test]
4563    async fn test_return_from_tool() {
4564        let kernel = Kernel::transient().expect("failed to create kernel");
4565
4566        // Define a function that returns early
4567        kernel
4568            .execute(r#"early_return() {
4569                if [[ $1 == 1 ]]; then
4570                    return 42
4571                fi
4572                echo "not returned"
4573            }"#)
4574            .await
4575            .expect("function definition failed");
4576
4577        // Call with arg=1 should return with exit code 42
4578        // (POSIX shell behavior: return N sets exit code, doesn't output N)
4579        let result = kernel
4580            .execute("early_return 1")
4581            .await
4582            .expect("function call failed");
4583
4584        // Exit code should be 42 (non-zero, so not ok())
4585        assert_eq!(result.code, 42);
4586        // Output should be empty (we returned before echo)
4587        assert!(result.text_out().is_empty());
4588    }
4589
4590    #[tokio::test]
4591    async fn test_return_without_value() {
4592        let kernel = Kernel::transient().expect("failed to create kernel");
4593
4594        // Define a function that returns without a value
4595        kernel
4596            .execute(r#"early_exit() {
4597                if [[ $1 == "stop" ]]; then
4598                    return
4599                fi
4600                echo "continued"
4601            }"#)
4602            .await
4603            .expect("function definition failed");
4604
4605        // Call with arg="stop" should return early
4606        let result = kernel
4607            .execute(r#"early_exit "stop""#)
4608            .await
4609            .expect("function call failed");
4610
4611        assert!(result.ok());
4612        assert!(result.text_out().is_empty() || result.text_out().trim().is_empty());
4613    }
4614
4615    #[tokio::test]
4616    async fn test_exit_stops_execution() {
4617        let kernel = Kernel::transient().expect("failed to create kernel");
4618
4619        // exit should stop further execution
4620        kernel
4621            .execute(r#"
4622                BEFORE="yes"
4623                exit 0
4624                AFTER="yes"
4625            "#)
4626            .await
4627            .expect("execution failed");
4628
4629        // BEFORE should be set, AFTER should not
4630        let before = kernel.get_var("BEFORE").await;
4631        assert_eq!(before, Some(Value::String("yes".into())));
4632
4633        let after = kernel.get_var("AFTER").await;
4634        assert!(after.is_none(), "AFTER should not be set after exit");
4635    }
4636
4637    #[tokio::test]
4638    async fn test_exit_with_code() {
4639        let kernel = Kernel::transient().expect("failed to create kernel");
4640
4641        // exit with code should propagate the exit code
4642        let result = kernel
4643            .execute("exit 42")
4644            .await
4645            .expect("exit failed");
4646
4647        assert_eq!(result.code, 42);
4648        assert!(result.text_out().is_empty(), "exit should not produce stdout");
4649    }
4650
4651    #[tokio::test]
4652    async fn test_set_e_stops_on_failure() {
4653        let kernel = Kernel::transient().expect("failed to create kernel");
4654
4655        // Enable error-exit mode
4656        kernel.execute("set -e").await.expect("set -e failed");
4657
4658        // Run a sequence where the middle command fails
4659        kernel
4660            .execute(r#"
4661                STEP1="done"
4662                false
4663                STEP2="done"
4664            "#)
4665            .await
4666            .expect("execution failed");
4667
4668        // STEP1 should be set, but STEP2 should NOT be set (exit on false)
4669        let step1 = kernel.get_var("STEP1").await;
4670        assert_eq!(step1, Some(Value::String("done".into())));
4671
4672        let step2 = kernel.get_var("STEP2").await;
4673        assert!(step2.is_none(), "STEP2 should not be set after false with set -e");
4674    }
4675
4676    #[tokio::test]
4677    async fn test_set_plus_e_disables_error_exit() {
4678        let kernel = Kernel::transient().expect("failed to create kernel");
4679
4680        // Enable then disable error-exit mode
4681        kernel.execute("set -e").await.expect("set -e failed");
4682        kernel.execute("set +e").await.expect("set +e failed");
4683
4684        // Now failure should NOT stop execution
4685        kernel
4686            .execute(r#"
4687                STEP1="done"
4688                false
4689                STEP2="done"
4690            "#)
4691            .await
4692            .expect("execution failed");
4693
4694        // Both should be set since +e disables error exit
4695        let step1 = kernel.get_var("STEP1").await;
4696        assert_eq!(step1, Some(Value::String("done".into())));
4697
4698        let step2 = kernel.get_var("STEP2").await;
4699        assert_eq!(step2, Some(Value::String("done".into())));
4700    }
4701
4702    #[tokio::test]
4703    async fn test_set_ignores_unknown_options() {
4704        let kernel = Kernel::transient().expect("failed to create kernel");
4705
4706        // Bash idiom: set -euo pipefail (we support -e, ignore the rest)
4707        let result = kernel
4708            .execute("set -e -u -o pipefail")
4709            .await
4710            .expect("set with unknown options failed");
4711
4712        assert!(result.ok(), "set should succeed with unknown options");
4713
4714        // -e should still be enabled
4715        kernel
4716            .execute(r#"
4717                BEFORE="yes"
4718                false
4719                AFTER="yes"
4720            "#)
4721            .await
4722            .ok();
4723
4724        let after = kernel.get_var("AFTER").await;
4725        assert!(after.is_none(), "-e should be enabled despite unknown options");
4726    }
4727
4728    #[tokio::test]
4729    async fn test_set_no_args_shows_settings() {
4730        let kernel = Kernel::transient().expect("failed to create kernel");
4731
4732        // Enable -e
4733        kernel.execute("set -e").await.expect("set -e failed");
4734
4735        // Call set with no args to see settings
4736        let result = kernel.execute("set").await.expect("set failed");
4737
4738        assert!(result.ok());
4739        assert!(result.text_out().contains("set -e"), "should show -e is enabled: {}", result.text_out());
4740    }
4741
4742    #[tokio::test]
4743    async fn test_set_e_in_pipeline() {
4744        let kernel = Kernel::transient().expect("failed to create kernel");
4745
4746        kernel.execute("set -e").await.expect("set -e failed");
4747
4748        // Pipeline failure should trigger exit
4749        kernel
4750            .execute(r#"
4751                BEFORE="yes"
4752                false | cat
4753                AFTER="yes"
4754            "#)
4755            .await
4756            .ok();
4757
4758        let before = kernel.get_var("BEFORE").await;
4759        assert_eq!(before, Some(Value::String("yes".into())));
4760
4761        // AFTER should not be set if pipeline failure triggers exit
4762        // Note: The exit code of a pipeline is the exit code of the last command
4763        // So `false | cat` returns 0 (cat succeeds). This is bash-compatible behavior.
4764        // To test pipeline failure, we need the last command to fail.
4765    }
4766
4767    #[tokio::test]
4768    async fn test_set_e_with_and_chain() {
4769        let kernel = Kernel::transient().expect("failed to create kernel");
4770
4771        kernel.execute("set -e").await.expect("set -e failed");
4772
4773        // Commands in && chain should not trigger -e on the first failure
4774        // because && explicitly handles the error
4775        kernel
4776            .execute(r#"
4777                RESULT="initial"
4778                false && RESULT="chained"
4779                RESULT="continued"
4780            "#)
4781            .await
4782            .ok();
4783
4784        // In bash, commands in && don't trigger -e. The chain handles the failure.
4785        // Our implementation may differ - let's verify current behavior.
4786        let result = kernel.get_var("RESULT").await;
4787        // If we follow bash semantics, RESULT should be "continued"
4788        // If we trigger -e on the false, RESULT stays "initial"
4789        assert!(result.is_some(), "RESULT should be set");
4790    }
4791
4792    #[tokio::test]
4793    async fn test_set_e_exits_in_for_loop() {
4794        let kernel = Kernel::transient().expect("failed to create kernel");
4795
4796        kernel.execute("set -e").await.expect("set -e failed");
4797
4798        kernel
4799            .execute(r#"
4800                REACHED="no"
4801                for x in 1 2 3; do
4802                    false
4803                    REACHED="yes"
4804                done
4805            "#)
4806            .await
4807            .ok();
4808
4809        // With set -e, false should trigger exit; REACHED should remain "no"
4810        let reached = kernel.get_var("REACHED").await;
4811        assert_eq!(reached, Some(Value::String("no".into())),
4812            "set -e should exit on failure in for loop body");
4813    }
4814
4815    #[tokio::test]
4816    async fn test_for_loop_continues_without_set_e() {
4817        let kernel = Kernel::transient().expect("failed to create kernel");
4818
4819        // Without set -e, for loop should continue normally
4820        kernel
4821            .execute(r#"
4822                COUNT=0
4823                for x in 1 2 3; do
4824                    false
4825                    COUNT=$((COUNT + 1))
4826                done
4827            "#)
4828            .await
4829            .ok();
4830
4831        let count = kernel.get_var("COUNT").await;
4832        // Arithmetic produces Int values; accept either Int or String representation
4833        let count_val = match &count {
4834            Some(Value::Int(n)) => *n,
4835            Some(Value::String(s)) => s.parse().unwrap_or(-1),
4836            _ => -1,
4837        };
4838        assert_eq!(count_val, 3,
4839            "without set -e, loop should complete all iterations (got {:?})", count);
4840    }
4841
4842    // ═══════════════════════════════════════════════════════════════════════════
4843    // Source Tests
4844    // ═══════════════════════════════════════════════════════════════════════════
4845
4846    #[tokio::test]
4847    async fn test_source_sets_variables() {
4848        let kernel = Kernel::transient().expect("failed to create kernel");
4849
4850        // Write a script to the VFS
4851        kernel
4852            .execute(r#"write "/test.kai" 'FOO="bar"'"#)
4853            .await
4854            .expect("write failed");
4855
4856        // Source the script
4857        let result = kernel
4858            .execute(r#"source "/test.kai""#)
4859            .await
4860            .expect("source failed");
4861
4862        assert!(result.ok(), "source should succeed");
4863
4864        // Variable should be set in current scope
4865        let foo = kernel.get_var("FOO").await;
4866        assert_eq!(foo, Some(Value::String("bar".into())));
4867    }
4868
4869    #[tokio::test]
4870    async fn test_source_with_dot_alias() {
4871        let kernel = Kernel::transient().expect("failed to create kernel");
4872
4873        // Write a script to the VFS
4874        kernel
4875            .execute(r#"write "/vars.kai" 'X=42'"#)
4876            .await
4877            .expect("write failed");
4878
4879        // Source using . alias
4880        let result = kernel
4881            .execute(r#". "/vars.kai""#)
4882            .await
4883            .expect(". failed");
4884
4885        assert!(result.ok(), ". should succeed");
4886
4887        // Variable should be set in current scope
4888        let x = kernel.get_var("X").await;
4889        assert_eq!(x, Some(Value::Int(42)));
4890    }
4891
4892    #[tokio::test]
4893    async fn test_source_not_found() {
4894        let kernel = Kernel::transient().expect("failed to create kernel");
4895
4896        // Try to source a non-existent file
4897        let result = kernel
4898            .execute(r#"source "/nonexistent.kai""#)
4899            .await
4900            .expect("source should not fail with error");
4901
4902        assert!(!result.ok(), "source of non-existent file should fail");
4903        assert!(result.err.contains("nonexistent.kai"), "error should mention filename");
4904    }
4905
4906    #[tokio::test]
4907    async fn test_source_missing_filename() {
4908        let kernel = Kernel::transient().expect("failed to create kernel");
4909
4910        // Call source with no arguments
4911        let result = kernel
4912            .execute("source")
4913            .await
4914            .expect("source should not fail with error");
4915
4916        assert!(!result.ok(), "source without filename should fail");
4917        assert!(result.err.contains("missing filename"), "error should mention missing filename");
4918    }
4919
4920    #[tokio::test]
4921    async fn test_source_executes_multiple_statements() {
4922        let kernel = Kernel::transient().expect("failed to create kernel");
4923
4924        // Write a script with multiple statements
4925        kernel
4926            .execute(r#"write "/multi.kai" 'A=1
4927B=2
4928C=3'"#)
4929            .await
4930            .expect("write failed");
4931
4932        // Source it
4933        kernel
4934            .execute(r#"source "/multi.kai""#)
4935            .await
4936            .expect("source failed");
4937
4938        // All variables should be set
4939        assert_eq!(kernel.get_var("A").await, Some(Value::Int(1)));
4940        assert_eq!(kernel.get_var("B").await, Some(Value::Int(2)));
4941        assert_eq!(kernel.get_var("C").await, Some(Value::Int(3)));
4942    }
4943
4944    #[tokio::test]
4945    async fn test_source_can_define_functions() {
4946        let kernel = Kernel::transient().expect("failed to create kernel");
4947
4948        // Write a script that defines a function
4949        kernel
4950            .execute(r#"write "/functions.kai" 'greet() {
4951    echo "Hello, $1!"
4952}'"#)
4953            .await
4954            .expect("write failed");
4955
4956        // Source it
4957        kernel
4958            .execute(r#"source "/functions.kai""#)
4959            .await
4960            .expect("source failed");
4961
4962        // Use the defined function
4963        let result = kernel
4964            .execute(r#"greet "World""#)
4965            .await
4966            .expect("greet failed");
4967
4968        assert!(result.ok());
4969        assert!(result.text_out().contains("Hello, World!"));
4970    }
4971
4972    #[tokio::test]
4973    async fn test_source_inherits_error_exit() {
4974        let kernel = Kernel::transient().expect("failed to create kernel");
4975
4976        // Enable error exit
4977        kernel.execute("set -e").await.expect("set -e failed");
4978
4979        // Write a script that has a failure
4980        kernel
4981            .execute(r#"write "/fail.kai" 'BEFORE="yes"
4982false
4983AFTER="yes"'"#)
4984            .await
4985            .expect("write failed");
4986
4987        // Source it (should exit on false due to set -e)
4988        kernel
4989            .execute(r#"source "/fail.kai""#)
4990            .await
4991            .ok();
4992
4993        // BEFORE should be set, AFTER should NOT be set due to error exit
4994        let before = kernel.get_var("BEFORE").await;
4995        assert_eq!(before, Some(Value::String("yes".into())));
4996
4997        // Note: This test depends on whether error exit is checked within source
4998        // Currently our implementation checks per-statement in the main kernel
4999    }
5000
5001    // ═══════════════════════════════════════════════════════════════════════════
5002    // set -e with && / || chains
5003    // ═══════════════════════════════════════════════════════════════════════════
5004
5005    #[tokio::test]
5006    async fn test_set_e_and_chain_left_fails() {
5007        // set -e; false && echo hi; REACHED=1 → REACHED should be set
5008        let kernel = Kernel::transient().expect("failed to create kernel");
5009        kernel.execute("set -e").await.expect("set -e failed");
5010
5011        kernel
5012            .execute("false && echo hi; REACHED=1")
5013            .await
5014            .expect("execution failed");
5015
5016        let reached = kernel.get_var("REACHED").await;
5017        assert_eq!(
5018            reached,
5019            Some(Value::Int(1)),
5020            "set -e should not trigger on left side of &&"
5021        );
5022    }
5023
5024    #[tokio::test]
5025    async fn test_set_e_and_chain_right_fails() {
5026        // set -e; true && false; REACHED=1 → REACHED should NOT be set
5027        let kernel = Kernel::transient().expect("failed to create kernel");
5028        kernel.execute("set -e").await.expect("set -e failed");
5029
5030        kernel
5031            .execute("true && false; REACHED=1")
5032            .await
5033            .expect("execution failed");
5034
5035        let reached = kernel.get_var("REACHED").await;
5036        assert!(
5037            reached.is_none(),
5038            "set -e should trigger when right side of && fails"
5039        );
5040    }
5041
5042    #[tokio::test]
5043    async fn test_set_e_or_chain_recovers() {
5044        // set -e; false || echo recovered; REACHED=1 → REACHED should be set
5045        let kernel = Kernel::transient().expect("failed to create kernel");
5046        kernel.execute("set -e").await.expect("set -e failed");
5047
5048        kernel
5049            .execute("false || echo recovered; REACHED=1")
5050            .await
5051            .expect("execution failed");
5052
5053        let reached = kernel.get_var("REACHED").await;
5054        assert_eq!(
5055            reached,
5056            Some(Value::Int(1)),
5057            "set -e should not trigger when || recovers the failure"
5058        );
5059    }
5060
5061    #[tokio::test]
5062    async fn test_set_e_or_chain_both_fail() {
5063        // set -e; false || false; REACHED=1 → REACHED should NOT be set
5064        let kernel = Kernel::transient().expect("failed to create kernel");
5065        kernel.execute("set -e").await.expect("set -e failed");
5066
5067        kernel
5068            .execute("false || false; REACHED=1")
5069            .await
5070            .expect("execution failed");
5071
5072        let reached = kernel.get_var("REACHED").await;
5073        assert!(
5074            reached.is_none(),
5075            "set -e should trigger when || chain ultimately fails"
5076        );
5077    }
5078
5079    // ═══════════════════════════════════════════════════════════════════════════
5080    // Cancellation Tests
5081    // ═══════════════════════════════════════════════════════════════════════════
5082
5083    /// Helper: schedule a cancel after a delay from a background thread.
5084    /// Uses std::thread because cancel() is sync and Kernel is not Send.
5085    fn schedule_cancel(kernel: &Arc<Kernel>, delay: std::time::Duration) {
5086        let k = Arc::clone(kernel);
5087        std::thread::spawn(move || {
5088            std::thread::sleep(delay);
5089            k.cancel();
5090        });
5091    }
5092
5093    #[tokio::test]
5094    async fn test_cancel_interrupts_for_loop() {
5095        let kernel = Arc::new(Kernel::transient().expect("failed to create kernel"));
5096
5097        // Schedule cancel after a short delay from a background OS thread
5098        schedule_cancel(&kernel, std::time::Duration::from_millis(10));
5099
5100        let result = kernel
5101            .execute("for i in $(seq 1 100000); do X=$i; done")
5102            .await
5103            .expect("execute failed");
5104
5105        assert_eq!(result.code, 130, "cancelled execution should exit with code 130");
5106
5107        // The loop variable should be set to something < 100000
5108        let x = kernel.get_var("X").await;
5109        if let Some(Value::Int(n)) = x {
5110            assert!(n < 100000, "loop should have been interrupted before finishing, got X={n}");
5111        }
5112    }
5113
5114    #[tokio::test]
5115    async fn test_cancel_interrupts_while_loop() {
5116        let kernel = Arc::new(Kernel::transient().expect("failed to create kernel"));
5117        kernel.execute("COUNT=0").await.expect("init failed");
5118
5119        schedule_cancel(&kernel, std::time::Duration::from_millis(10));
5120
5121        let result = kernel
5122            .execute("while true; do COUNT=$((COUNT + 1)); done")
5123            .await
5124            .expect("execute failed");
5125
5126        assert_eq!(result.code, 130);
5127
5128        let count = kernel.get_var("COUNT").await;
5129        if let Some(Value::Int(n)) = count {
5130            assert!(n > 0, "loop should have run at least once");
5131        }
5132    }
5133
5134    #[tokio::test]
5135    async fn test_reset_after_cancel() {
5136        // After cancellation, the next execute() should work normally
5137        let kernel = Kernel::transient().expect("failed to create kernel");
5138        kernel.cancel(); // cancel with nothing running
5139
5140        let result = kernel.execute("echo hello").await.expect("execute failed");
5141        assert!(result.ok(), "execute after cancel should succeed");
5142        assert_eq!(result.text_out().trim(), "hello");
5143    }
5144
5145    #[tokio::test]
5146    async fn test_cancel_interrupts_statement_sequence() {
5147        let kernel = Arc::new(Kernel::transient().expect("failed to create kernel"));
5148
5149        // Schedule cancel after the first statement runs but before sleep finishes
5150        schedule_cancel(&kernel, std::time::Duration::from_millis(50));
5151
5152        let result = kernel
5153            .execute("STEP=1; sleep 5; STEP=2; sleep 5; STEP=3")
5154            .await
5155            .expect("execute failed");
5156
5157        assert_eq!(result.code, 130);
5158
5159        // STEP should be 1 (set before sleep), not 2 or 3
5160        let step = kernel.get_var("STEP").await;
5161        assert_eq!(step, Some(Value::Int(1)), "cancel should stop before STEP=2");
5162    }
5163
5164    // ═══════════════════════════════════════════════════════════════════════════
5165    // Case Statement Tests
5166    // ═══════════════════════════════════════════════════════════════════════════
5167
5168    #[tokio::test]
5169    async fn test_case_simple_match() {
5170        let kernel = Kernel::transient().expect("failed to create kernel");
5171
5172        let result = kernel
5173            .execute(r#"
5174                case "hello" in
5175                    hello) echo "matched hello" ;;
5176                    world) echo "matched world" ;;
5177                esac
5178            "#)
5179            .await
5180            .expect("case failed");
5181
5182        assert!(result.ok());
5183        assert_eq!(result.text_out().trim(), "matched hello");
5184    }
5185
5186    #[tokio::test]
5187    async fn test_case_wildcard_match() {
5188        let kernel = Kernel::transient().expect("failed to create kernel");
5189
5190        let result = kernel
5191            .execute(r#"
5192                case "main.rs" in
5193                    *.py) echo "Python" ;;
5194                    *.rs) echo "Rust" ;;
5195                    *) echo "Unknown" ;;
5196                esac
5197            "#)
5198            .await
5199            .expect("case failed");
5200
5201        assert!(result.ok());
5202        assert_eq!(result.text_out().trim(), "Rust");
5203    }
5204
5205    #[tokio::test]
5206    async fn test_case_default_match() {
5207        let kernel = Kernel::transient().expect("failed to create kernel");
5208
5209        let result = kernel
5210            .execute(r#"
5211                case "unknown.xyz" in
5212                    *.py) echo "Python" ;;
5213                    *.rs) echo "Rust" ;;
5214                    *) echo "Default" ;;
5215                esac
5216            "#)
5217            .await
5218            .expect("case failed");
5219
5220        assert!(result.ok());
5221        assert_eq!(result.text_out().trim(), "Default");
5222    }
5223
5224    #[tokio::test]
5225    async fn test_case_no_match() {
5226        let kernel = Kernel::transient().expect("failed to create kernel");
5227
5228        // Case with no default branch and no match
5229        let result = kernel
5230            .execute(r#"
5231                case "nope" in
5232                    "yes") echo "yes" ;;
5233                    "no") echo "no" ;;
5234                esac
5235            "#)
5236            .await
5237            .expect("case failed");
5238
5239        assert!(result.ok());
5240        assert!(result.text_out().is_empty(), "no match should produce empty output");
5241    }
5242
5243    #[tokio::test]
5244    async fn test_case_with_variable() {
5245        let kernel = Kernel::transient().expect("failed to create kernel");
5246
5247        kernel.execute(r#"LANG="rust""#).await.expect("set failed");
5248
5249        let result = kernel
5250            .execute(r#"
5251                case ${LANG} in
5252                    python) echo "snake" ;;
5253                    rust) echo "crab" ;;
5254                    go) echo "gopher" ;;
5255                esac
5256            "#)
5257            .await
5258            .expect("case failed");
5259
5260        assert!(result.ok());
5261        assert_eq!(result.text_out().trim(), "crab");
5262    }
5263
5264    #[tokio::test]
5265    async fn test_case_multiple_patterns() {
5266        let kernel = Kernel::transient().expect("failed to create kernel");
5267
5268        let result = kernel
5269            .execute(r#"
5270                case "yes" in
5271                    "y"|"yes"|"Y"|"YES") echo "affirmative" ;;
5272                    "n"|"no"|"N"|"NO") echo "negative" ;;
5273                esac
5274            "#)
5275            .await
5276            .expect("case failed");
5277
5278        assert!(result.ok());
5279        assert_eq!(result.text_out().trim(), "affirmative");
5280    }
5281
5282    #[tokio::test]
5283    async fn test_case_glob_question_mark() {
5284        let kernel = Kernel::transient().expect("failed to create kernel");
5285
5286        let result = kernel
5287            .execute(r#"
5288                case "test1" in
5289                    test?) echo "matched test?" ;;
5290                    *) echo "default" ;;
5291                esac
5292            "#)
5293            .await
5294            .expect("case failed");
5295
5296        assert!(result.ok());
5297        assert_eq!(result.text_out().trim(), "matched test?");
5298    }
5299
5300    #[tokio::test]
5301    async fn test_case_char_class() {
5302        let kernel = Kernel::transient().expect("failed to create kernel");
5303
5304        let result = kernel
5305            .execute(r#"
5306                case "Yes" in
5307                    [Yy]*) echo "yes-like" ;;
5308                    [Nn]*) echo "no-like" ;;
5309                esac
5310            "#)
5311            .await
5312            .expect("case failed");
5313
5314        assert!(result.ok());
5315        assert_eq!(result.text_out().trim(), "yes-like");
5316    }
5317
5318    // ═══════════════════════════════════════════════════════════════════════════
5319    // Cat Stdin Tests
5320    // ═══════════════════════════════════════════════════════════════════════════
5321
5322    #[tokio::test]
5323    async fn test_cat_from_pipeline() {
5324        let kernel = Kernel::transient().expect("failed to create kernel");
5325
5326        let result = kernel
5327            .execute(r#"echo "piped text" | cat"#)
5328            .await
5329            .expect("cat pipeline failed");
5330
5331        assert!(result.ok(), "cat failed: {}", result.err);
5332        assert_eq!(result.text_out().trim(), "piped text");
5333    }
5334
5335    #[tokio::test]
5336    async fn test_cat_from_pipeline_multiline() {
5337        let kernel = Kernel::transient().expect("failed to create kernel");
5338
5339        let result = kernel
5340            .execute(r#"echo "line1\nline2" | cat -n"#)
5341            .await
5342            .expect("cat pipeline failed");
5343
5344        assert!(result.ok(), "cat failed: {}", result.err);
5345        assert!(result.text_out().contains("1\t"), "output: {}", result.text_out());
5346    }
5347
5348    // ═══════════════════════════════════════════════════════════════════════════
5349    // Heredoc Tests
5350    // ═══════════════════════════════════════════════════════════════════════════
5351
5352    #[tokio::test]
5353    async fn test_heredoc_basic() {
5354        let kernel = Kernel::transient().expect("failed to create kernel");
5355
5356        let result = kernel
5357            .execute("cat <<EOF\nhello\nEOF")
5358            .await
5359            .expect("heredoc failed");
5360
5361        assert!(result.ok(), "cat with heredoc failed: {}", result.err);
5362        assert_eq!(result.text_out().trim(), "hello");
5363    }
5364
5365    #[tokio::test]
5366    async fn test_arithmetic_in_string() {
5367        let kernel = Kernel::transient().expect("failed to create kernel");
5368
5369        let result = kernel
5370            .execute(r#"echo "result: $((1 + 2))""#)
5371            .await
5372            .expect("arithmetic in string failed");
5373
5374        assert!(result.ok(), "echo failed: {}", result.err);
5375        assert_eq!(result.text_out().trim(), "result: 3");
5376    }
5377
5378    #[tokio::test]
5379    async fn test_heredoc_multiline() {
5380        let kernel = Kernel::transient().expect("failed to create kernel");
5381
5382        let result = kernel
5383            .execute("cat <<EOF\nline1\nline2\nline3\nEOF")
5384            .await
5385            .expect("heredoc failed");
5386
5387        assert!(result.ok(), "cat with heredoc failed: {}", result.err);
5388        assert!(result.text_out().contains("line1"), "output: {}", result.text_out());
5389        assert!(result.text_out().contains("line2"), "output: {}", result.text_out());
5390        assert!(result.text_out().contains("line3"), "output: {}", result.text_out());
5391    }
5392
5393    #[tokio::test]
5394    async fn test_heredoc_variable_expansion() {
5395        // Bug N: unquoted heredoc should expand variables
5396        let kernel = Kernel::transient().expect("failed to create kernel");
5397
5398        kernel.execute("GREETING=hello").await.expect("set var");
5399
5400        let result = kernel
5401            .execute("cat <<EOF\n$GREETING world\nEOF")
5402            .await
5403            .expect("heredoc expansion failed");
5404
5405        assert!(result.ok(), "heredoc expansion failed: {}", result.err);
5406        assert_eq!(result.text_out().trim(), "hello world");
5407    }
5408
5409    #[tokio::test]
5410    async fn test_heredoc_quoted_no_expansion() {
5411        // Bug N: quoted heredoc (<<'EOF') should NOT expand variables
5412        let kernel = Kernel::transient().expect("failed to create kernel");
5413
5414        kernel.execute("GREETING=hello").await.expect("set var");
5415
5416        let result = kernel
5417            .execute("cat <<'EOF'\n$GREETING world\nEOF")
5418            .await
5419            .expect("quoted heredoc failed");
5420
5421        assert!(result.ok(), "quoted heredoc failed: {}", result.err);
5422        assert_eq!(result.text_out().trim(), "$GREETING world");
5423    }
5424
5425    #[tokio::test]
5426    async fn test_heredoc_default_value_expansion() {
5427        // Bug N: ${VAR:-default} should expand in unquoted heredocs
5428        let kernel = Kernel::transient().expect("failed to create kernel");
5429
5430        let result = kernel
5431            .execute("cat <<EOF\n${UNSET:-fallback}\nEOF")
5432            .await
5433            .expect("heredoc default expansion failed");
5434
5435        assert!(result.ok(), "heredoc default expansion failed: {}", result.err);
5436        assert_eq!(result.text_out().trim(), "fallback");
5437    }
5438
5439    // ═══════════════════════════════════════════════════════════════════════════
5440    // Read Builtin Tests
5441    // ═══════════════════════════════════════════════════════════════════════════
5442
5443    #[tokio::test]
5444    async fn test_read_from_pipeline() {
5445        let kernel = Kernel::transient().expect("failed to create kernel");
5446
5447        // Pipe input to read
5448        let result = kernel
5449            .execute(r#"echo "Alice" | read NAME; echo "Hello, ${NAME}""#)
5450            .await
5451            .expect("read pipeline failed");
5452
5453        assert!(result.ok(), "read failed: {}", result.err);
5454        assert!(result.text_out().contains("Hello, Alice"), "output: {}", result.text_out());
5455    }
5456
5457    #[tokio::test]
5458    async fn test_read_multiple_vars_from_pipeline() {
5459        let kernel = Kernel::transient().expect("failed to create kernel");
5460
5461        let result = kernel
5462            .execute(r#"echo "John Doe 42" | read FIRST LAST AGE; echo "${FIRST} is ${AGE}""#)
5463            .await
5464            .expect("read pipeline failed");
5465
5466        assert!(result.ok(), "read failed: {}", result.err);
5467        assert!(result.text_out().contains("John is 42"), "output: {}", result.text_out());
5468    }
5469
5470    // ═══════════════════════════════════════════════════════════════════════════
5471    // Shell-Style Function Tests
5472    // ═══════════════════════════════════════════════════════════════════════════
5473
5474    #[tokio::test]
5475    async fn test_posix_function_with_positional_params() {
5476        let kernel = Kernel::transient().expect("failed to create kernel");
5477
5478        // Define POSIX-style function
5479        kernel
5480            .execute(r#"greet() { echo "Hello, $1!" }"#)
5481            .await
5482            .expect("function definition failed");
5483
5484        // Call the function
5485        let result = kernel
5486            .execute(r#"greet "Amy""#)
5487            .await
5488            .expect("function call failed");
5489
5490        assert!(result.ok(), "greet failed: {}", result.err);
5491        assert_eq!(result.text_out().trim(), "Hello, Amy!");
5492    }
5493
5494    #[tokio::test]
5495    async fn test_posix_function_multiple_args() {
5496        let kernel = Kernel::transient().expect("failed to create kernel");
5497
5498        // Define function using $1 and $2
5499        kernel
5500            .execute(r#"add_greeting() { echo "$1 $2!" }"#)
5501            .await
5502            .expect("function definition failed");
5503
5504        // Call the function
5505        let result = kernel
5506            .execute(r#"add_greeting "Hello" "World""#)
5507            .await
5508            .expect("function call failed");
5509
5510        assert!(result.ok(), "function failed: {}", result.err);
5511        assert_eq!(result.text_out().trim(), "Hello World!");
5512    }
5513
5514    #[tokio::test]
5515    async fn test_bash_function_with_positional_params() {
5516        let kernel = Kernel::transient().expect("failed to create kernel");
5517
5518        // Define bash-style function (function keyword, no parens)
5519        kernel
5520            .execute(r#"function greet { echo "Hi $1" }"#)
5521            .await
5522            .expect("function definition failed");
5523
5524        // Call the function
5525        let result = kernel
5526            .execute(r#"greet "Bob""#)
5527            .await
5528            .expect("function call failed");
5529
5530        assert!(result.ok(), "greet failed: {}", result.err);
5531        assert_eq!(result.text_out().trim(), "Hi Bob");
5532    }
5533
5534    #[tokio::test]
5535    async fn test_shell_function_with_all_args() {
5536        let kernel = Kernel::transient().expect("failed to create kernel");
5537
5538        // Define function using $@ (all args)
5539        kernel
5540            .execute(r#"echo_all() { echo "args: $@" }"#)
5541            .await
5542            .expect("function definition failed");
5543
5544        // Call with multiple args
5545        let result = kernel
5546            .execute(r#"echo_all "a" "b" "c""#)
5547            .await
5548            .expect("function call failed");
5549
5550        assert!(result.ok(), "function failed: {}", result.err);
5551        assert_eq!(result.text_out().trim(), "args: a b c");
5552    }
5553
5554    #[tokio::test]
5555    async fn test_shell_function_with_arg_count() {
5556        let kernel = Kernel::transient().expect("failed to create kernel");
5557
5558        // Define function using $# (arg count)
5559        kernel
5560            .execute(r#"count_args() { echo "count: $#" }"#)
5561            .await
5562            .expect("function definition failed");
5563
5564        // Call with three args
5565        let result = kernel
5566            .execute(r#"count_args "x" "y" "z""#)
5567            .await
5568            .expect("function call failed");
5569
5570        assert!(result.ok(), "function failed: {}", result.err);
5571        assert_eq!(result.text_out().trim(), "count: 3");
5572    }
5573
5574    #[tokio::test]
5575    async fn test_shell_function_shared_scope() {
5576        let kernel = Kernel::transient().expect("failed to create kernel");
5577
5578        // Set a variable in parent scope
5579        kernel
5580            .execute(r#"PARENT_VAR="visible""#)
5581            .await
5582            .expect("set failed");
5583
5584        // Define shell function that reads and writes parent variable
5585        kernel
5586            .execute(r#"modify_parent() {
5587                echo "saw: ${PARENT_VAR}"
5588                PARENT_VAR="changed by function"
5589            }"#)
5590            .await
5591            .expect("function definition failed");
5592
5593        // Call the function - it SHOULD see PARENT_VAR (bash-compatible shared scope)
5594        let result = kernel.execute("modify_parent").await.expect("function failed");
5595
5596        assert!(
5597            result.text_out().contains("visible"),
5598            "Shell function should access parent scope, got: {}",
5599            result.text_out()
5600        );
5601
5602        // Parent variable should be modified
5603        let var = kernel.get_var("PARENT_VAR").await;
5604        assert_eq!(
5605            var,
5606            Some(Value::String("changed by function".into())),
5607            "Shell function should modify parent scope"
5608        );
5609    }
5610
5611    // ═══════════════════════════════════════════════════════════════════════════
5612    // Script Execution via PATH Tests
5613    // ═══════════════════════════════════════════════════════════════════════════
5614
5615    #[tokio::test]
5616    async fn test_script_execution_from_path() {
5617        let kernel = Kernel::transient().expect("failed to create kernel");
5618
5619        // Create /bin directory and script
5620        kernel.execute(r#"mkdir "/bin""#).await.ok();
5621        kernel
5622            .execute(r#"write "/bin/hello.kai" 'echo "Hello from script!"'"#)
5623            .await
5624            .expect("write script failed");
5625
5626        // Set PATH to /bin
5627        kernel.execute(r#"PATH="/bin""#).await.expect("set PATH failed");
5628
5629        // Call script by name (without .kai extension)
5630        let result = kernel
5631            .execute("hello")
5632            .await
5633            .expect("script execution failed");
5634
5635        assert!(result.ok(), "script failed: {}", result.err);
5636        assert_eq!(result.text_out().trim(), "Hello from script!");
5637    }
5638
5639    #[tokio::test]
5640    async fn test_script_with_args() {
5641        let kernel = Kernel::transient().expect("failed to create kernel");
5642
5643        // Create script that uses positional params
5644        kernel.execute(r#"mkdir "/bin""#).await.ok();
5645        kernel
5646            .execute(r#"write "/bin/greet.kai" 'echo "Hello, $1!"'"#)
5647            .await
5648            .expect("write script failed");
5649
5650        // Set PATH
5651        kernel.execute(r#"PATH="/bin""#).await.expect("set PATH failed");
5652
5653        // Call script with arg
5654        let result = kernel
5655            .execute(r#"greet "World""#)
5656            .await
5657            .expect("script execution failed");
5658
5659        assert!(result.ok(), "script failed: {}", result.err);
5660        assert_eq!(result.text_out().trim(), "Hello, World!");
5661    }
5662
5663    #[tokio::test]
5664    async fn test_script_not_found() {
5665        let kernel = Kernel::transient().expect("failed to create kernel");
5666
5667        // Set empty PATH
5668        kernel.execute(r#"PATH="/nonexistent""#).await.expect("set PATH failed");
5669
5670        // Call non-existent script
5671        let result = kernel
5672            .execute("noscript")
5673            .await
5674            .expect("execution failed");
5675
5676        assert!(!result.ok(), "should fail with command not found");
5677        assert_eq!(result.code, 127);
5678        assert!(result.err.contains("command not found"));
5679    }
5680
5681    #[tokio::test]
5682    async fn test_script_path_search_order() {
5683        let kernel = Kernel::transient().expect("failed to create kernel");
5684
5685        // Create two directories with same-named script
5686        // Note: using "myscript" not "test" to avoid conflict with test builtin
5687        kernel.execute(r#"mkdir "/first""#).await.ok();
5688        kernel.execute(r#"mkdir "/second""#).await.ok();
5689        kernel
5690            .execute(r#"write "/first/myscript.kai" 'echo "from first"'"#)
5691            .await
5692            .expect("write failed");
5693        kernel
5694            .execute(r#"write "/second/myscript.kai" 'echo "from second"'"#)
5695            .await
5696            .expect("write failed");
5697
5698        // Set PATH with first before second
5699        kernel.execute(r#"PATH="/first:/second""#).await.expect("set PATH failed");
5700
5701        // Should find first one
5702        let result = kernel
5703            .execute("myscript")
5704            .await
5705            .expect("script execution failed");
5706
5707        assert!(result.ok(), "script failed: {}", result.err);
5708        assert_eq!(result.text_out().trim(), "from first");
5709    }
5710
5711    // ═══════════════════════════════════════════════════════════════════════════
5712    // Special Variable Tests ($?, $$, unset vars)
5713    // ═══════════════════════════════════════════════════════════════════════════
5714
5715    #[tokio::test]
5716    async fn test_last_exit_code_success() {
5717        let kernel = Kernel::transient().expect("failed to create kernel");
5718
5719        // true exits with 0
5720        let result = kernel.execute("true; echo $?").await.expect("execution failed");
5721        assert!(result.text_out().contains("0"), "expected 0, got: {}", result.text_out());
5722    }
5723
5724    #[tokio::test]
5725    async fn test_last_exit_code_failure() {
5726        let kernel = Kernel::transient().expect("failed to create kernel");
5727
5728        // false exits with 1
5729        let result = kernel.execute("false; echo $?").await.expect("execution failed");
5730        assert!(result.text_out().contains("1"), "expected 1, got: {}", result.text_out());
5731    }
5732
5733    #[tokio::test]
5734    async fn test_current_pid() {
5735        let kernel = Kernel::transient().expect("failed to create kernel");
5736
5737        let result = kernel.execute("echo $$").await.expect("execution failed");
5738        // PID should be a positive number
5739        let pid: u32 = result.text_out().trim().parse().expect("PID should be a number");
5740        assert!(pid > 0, "PID should be positive");
5741    }
5742
5743    #[tokio::test]
5744    async fn test_unset_variable_expands_to_empty() {
5745        let kernel = Kernel::transient().expect("failed to create kernel");
5746
5747        // Unset variable in interpolation should be empty
5748        let result = kernel.execute(r#"echo "prefix:${UNSET_VAR}:suffix""#).await.expect("execution failed");
5749        assert_eq!(result.text_out().trim(), "prefix::suffix");
5750    }
5751
5752    #[tokio::test]
5753    async fn test_eq_ne_operators() {
5754        let kernel = Kernel::transient().expect("failed to create kernel");
5755
5756        // Test -eq operator
5757        let result = kernel.execute(r#"if [[ 5 -eq 5 ]]; then echo "eq works"; fi"#).await.expect("execution failed");
5758        assert_eq!(result.text_out().trim(), "eq works");
5759
5760        // Test -ne operator
5761        let result = kernel.execute(r#"if [[ 5 -ne 3 ]]; then echo "ne works"; fi"#).await.expect("execution failed");
5762        assert_eq!(result.text_out().trim(), "ne works");
5763
5764        // Test -eq with different values
5765        let result = kernel.execute(r#"if [[ 5 -eq 3 ]]; then echo "wrong"; else echo "correct"; fi"#).await.expect("execution failed");
5766        assert_eq!(result.text_out().trim(), "correct");
5767    }
5768
5769    #[tokio::test]
5770    async fn test_escaped_dollar_in_string() {
5771        let kernel = Kernel::transient().expect("failed to create kernel");
5772
5773        // \$ should produce literal $
5774        let result = kernel.execute(r#"echo "\$100""#).await.expect("execution failed");
5775        assert_eq!(result.text_out().trim(), "$100");
5776    }
5777
5778    #[tokio::test]
5779    async fn test_special_vars_in_interpolation() {
5780        let kernel = Kernel::transient().expect("failed to create kernel");
5781
5782        // Test $? in string interpolation
5783        let result = kernel.execute(r#"true; echo "exit: $?""#).await.expect("execution failed");
5784        assert_eq!(result.text_out().trim(), "exit: 0");
5785
5786        // Test $$ in string interpolation
5787        let result = kernel.execute(r#"echo "pid: $$""#).await.expect("execution failed");
5788        assert!(result.text_out().starts_with("pid: "), "unexpected output: {}", result.text_out());
5789        let text = result.text_out();
5790        let pid_part = text.trim().strip_prefix("pid: ").unwrap();
5791        let _pid: u32 = pid_part.parse().expect("PID in string should be a number");
5792    }
5793
5794    // ═══════════════════════════════════════════════════════════════════════════
5795    // Command Substitution Tests
5796    // ═══════════════════════════════════════════════════════════════════════════
5797
5798    #[tokio::test]
5799    async fn test_command_subst_assignment() {
5800        let kernel = Kernel::transient().expect("failed to create kernel");
5801
5802        // Command substitution in assignment
5803        let result = kernel.execute(r#"X=$(echo hello); echo "$X""#).await.expect("execution failed");
5804        assert_eq!(result.text_out().trim(), "hello");
5805    }
5806
5807    #[tokio::test]
5808    async fn test_command_subst_with_args() {
5809        let kernel = Kernel::transient().expect("failed to create kernel");
5810
5811        // Command substitution with string argument
5812        let result = kernel.execute(r#"X=$(echo "a b c"); echo "$X""#).await.expect("execution failed");
5813        assert_eq!(result.text_out().trim(), "a b c");
5814    }
5815
5816    #[tokio::test]
5817    async fn test_command_subst_nested_vars() {
5818        let kernel = Kernel::transient().expect("failed to create kernel");
5819
5820        // Variables inside command substitution
5821        let result = kernel.execute(r#"Y=world; X=$(echo "hello $Y"); echo "$X""#).await.expect("execution failed");
5822        assert_eq!(result.text_out().trim(), "hello world");
5823    }
5824
5825    #[tokio::test]
5826    async fn test_background_job_basic() {
5827        use std::time::Duration;
5828
5829        let kernel = Kernel::new(KernelConfig::isolated()).expect("failed to create kernel");
5830
5831        // Run a simple background command
5832        let result = kernel.execute("echo hello &").await.expect("execution failed");
5833        assert!(result.ok(), "background command should succeed: {}", result.err);
5834        assert!(result.text_out().contains("[1]"), "should return job ID: {}", result.text_out());
5835
5836        // Give the job time to complete
5837        tokio::time::sleep(Duration::from_millis(100)).await;
5838
5839        // Check job status
5840        let status = kernel.execute("cat /v/jobs/1/status").await.expect("status check failed");
5841        assert!(status.ok(), "status should succeed: {}", status.err);
5842        assert!(
5843            status.text_out().contains("done:") || status.text_out().contains("running"),
5844            "should have valid status: {}",
5845            status.text_out()
5846        );
5847
5848        // Check stdout
5849        let stdout = kernel.execute("cat /v/jobs/1/stdout").await.expect("stdout check failed");
5850        assert!(stdout.ok());
5851        assert!(stdout.text_out().contains("hello"));
5852    }
5853
5854    #[tokio::test]
5855    async fn test_heredoc_piped_to_command() {
5856        // Bug 4: heredoc content should pipe through to next command
5857        let kernel = Kernel::transient().expect("kernel");
5858        let result = kernel.execute("cat <<EOF | cat\nhello world\nEOF").await.expect("exec");
5859        assert!(result.ok(), "heredoc | cat failed: {}", result.err);
5860        assert_eq!(result.text_out().trim(), "hello world");
5861    }
5862
5863    #[tokio::test]
5864    async fn test_for_loop_glob_iterates() {
5865        // Bug 1: for F in $(glob ...) should iterate per file, not once
5866        let kernel = Kernel::transient().expect("kernel");
5867        let dir = format!("/tmp/kaish_test_glob_{}", std::process::id());
5868        kernel.execute(&format!("mkdir -p {dir}")).await.unwrap();
5869        kernel.execute(&format!("echo a > {dir}/a.txt")).await.unwrap();
5870        kernel.execute(&format!("echo b > {dir}/b.txt")).await.unwrap();
5871        let result = kernel.execute(&format!(r#"
5872            N=0
5873            for F in $(glob "{dir}/*.txt"); do
5874                N=$((N + 1))
5875            done
5876            echo $N
5877        "#)).await.unwrap();
5878        assert!(result.ok(), "for glob failed: {}", result.err);
5879        assert_eq!(result.text_out().trim(), "2", "Should iterate 2 files, got: {}", result.text_out());
5880        kernel.execute(&format!("rm {dir}/a.txt")).await.unwrap();
5881        kernel.execute(&format!("rm {dir}/b.txt")).await.unwrap();
5882    }
5883
5884    #[tokio::test]
5885    async fn test_bare_glob_expansion_echo() {
5886        let kernel = Kernel::transient().expect("kernel");
5887        let dir = format!("/tmp/kaish_test_bareglob_{}", std::process::id());
5888        kernel.execute(&format!("mkdir -p {dir}")).await.unwrap();
5889        kernel.execute(&format!("echo a > {dir}/a.txt")).await.unwrap();
5890        kernel.execute(&format!("echo b > {dir}/b.txt")).await.unwrap();
5891        kernel.execute(&format!("echo c > {dir}/c.rs")).await.unwrap();
5892        kernel.execute(&format!("cd {dir}")).await.unwrap();
5893        let result = kernel.execute("echo *.txt").await.unwrap();
5894        assert!(result.ok(), "echo *.txt failed: {}", result.err);
5895        let out = result.text_out();
5896        let out = out.trim();
5897        // Should contain both .txt files (order may vary)
5898        assert!(out.contains("a.txt"), "missing a.txt in: {}", out);
5899        assert!(out.contains("b.txt"), "missing b.txt in: {}", out);
5900        assert!(!out.contains("c.rs"), "should not contain c.rs in: {}", out);
5901        // cleanup
5902        kernel.execute(&format!("rm {dir}/a.txt")).await.unwrap();
5903        kernel.execute(&format!("rm {dir}/b.txt")).await.unwrap();
5904        kernel.execute(&format!("rm {dir}/c.rs")).await.unwrap();
5905    }
5906
5907    #[tokio::test]
5908    async fn test_bare_glob_no_matches_errors() {
5909        let kernel = Kernel::transient().expect("kernel");
5910        let dir = format!("/tmp/kaish_test_bareglob_nomatch_{}", std::process::id());
5911        kernel.execute(&format!("mkdir -p {dir}")).await.unwrap();
5912        kernel.execute(&format!("cd {dir}")).await.unwrap();
5913        let result = kernel.execute("echo *.nonexistent").await;
5914        match &result {
5915            Ok(exec) => {
5916                // No-match glob should produce a non-zero exit code
5917                assert!(!exec.ok(), "expected failure, got success: out={}, err={}", exec.text_out(), exec.err);
5918                assert!(exec.err.contains("no matches"), "error should say no matches: {}", exec.err);
5919            }
5920            Err(e) => {
5921                assert!(e.to_string().contains("no matches"), "error should say no matches: {}", e);
5922            }
5923        }
5924    }
5925
5926    #[tokio::test]
5927    async fn test_bare_glob_disabled_with_set() {
5928        let kernel = Kernel::transient().expect("kernel");
5929        let dir = format!("/tmp/kaish_test_bareglob_noglob_{}", std::process::id());
5930        kernel.execute(&format!("mkdir -p {dir}")).await.unwrap();
5931        kernel.execute(&format!("echo a > {dir}/a.txt")).await.unwrap();
5932        kernel.execute(&format!("cd {dir}")).await.unwrap();
5933        // Disable glob expansion
5934        kernel.execute("set +o glob").await.unwrap();
5935        let result = kernel.execute("echo *.txt").await.unwrap();
5936        // With glob disabled, *.txt should be passed as literal string
5937        assert!(result.ok(), "echo should succeed: {}", result.err);
5938        assert_eq!(result.text_out().trim(), "*.txt", "should be literal: {}", result.text_out());
5939        // cleanup
5940        kernel.execute("set -o glob").await.unwrap();
5941        kernel.execute(&format!("rm {dir}/a.txt")).await.unwrap();
5942    }
5943
5944    #[tokio::test]
5945    async fn test_bare_glob_quoted_not_expanded() {
5946        let kernel = Kernel::transient().expect("kernel");
5947        let dir = format!("/tmp/kaish_test_bareglob_quoted_{}", std::process::id());
5948        kernel.execute(&format!("mkdir -p {dir}")).await.unwrap();
5949        kernel.execute(&format!("echo a > {dir}/a.txt")).await.unwrap();
5950        kernel.execute(&format!("cd {dir}")).await.unwrap();
5951        // Quoted globs should NOT expand
5952        let result = kernel.execute("echo \"*.txt\"").await.unwrap();
5953        assert!(result.ok(), "echo should succeed: {}", result.err);
5954        assert_eq!(result.text_out().trim(), "*.txt", "quoted should be literal: {}", result.text_out());
5955        // cleanup
5956        kernel.execute(&format!("rm {dir}/a.txt")).await.unwrap();
5957    }
5958
5959    #[tokio::test]
5960    async fn test_bare_glob_for_loop() {
5961        let kernel = Kernel::transient().expect("kernel");
5962        let dir = format!("/tmp/kaish_test_bareglob_forloop_{}", std::process::id());
5963        kernel.execute(&format!("mkdir -p {dir}")).await.unwrap();
5964        kernel.execute(&format!("echo a > {dir}/a.txt")).await.unwrap();
5965        kernel.execute(&format!("echo b > {dir}/b.txt")).await.unwrap();
5966        kernel.execute(&format!("cd {dir}")).await.unwrap();
5967        let result = kernel.execute(r#"
5968            N=0
5969            for f in *.txt; do
5970                N=$((N + 1))
5971            done
5972            echo $N
5973        "#).await.unwrap();
5974        assert!(result.ok(), "for loop failed: {}", result.err);
5975        assert_eq!(result.text_out().trim(), "2", "should iterate 2 files: {}", result.text_out());
5976        // cleanup
5977        kernel.execute(&format!("rm {dir}/a.txt")).await.unwrap();
5978        kernel.execute(&format!("rm {dir}/b.txt")).await.unwrap();
5979    }
5980
5981    #[tokio::test]
5982    async fn test_glob_in_assignment_is_literal() {
5983        let kernel = Kernel::transient().expect("kernel");
5984        let result = kernel.execute("X=*.txt; echo $X").await.unwrap();
5985        assert!(result.ok());
5986        assert_eq!(result.text_out().trim(), "*.txt", "glob in assignment should be literal");
5987    }
5988
5989    #[tokio::test]
5990    async fn test_glob_in_test_expr_is_literal() {
5991        let kernel = Kernel::transient().expect("kernel");
5992        let result = kernel.execute(r#"
5993            if [[ *.txt == "*.txt" ]]; then
5994                echo "match"
5995            else
5996                echo "no"
5997            fi
5998        "#).await.unwrap();
5999        assert!(result.ok());
6000        assert_eq!(result.text_out().trim(), "match", "glob in test expr should be literal");
6001    }
6002
6003    #[tokio::test]
6004    async fn test_command_subst_echo_not_iterable() {
6005        // Regression guard: $(echo "a b c") must remain a single string
6006        let kernel = Kernel::transient().expect("kernel");
6007        let result = kernel.execute(r#"
6008            N=0
6009            for X in $(echo "a b c"); do N=$((N + 1)); done
6010            echo $N
6011        "#).await.unwrap();
6012        assert!(result.ok());
6013        assert_eq!(result.text_out().trim(), "1", "echo should be one item: {}", result.text_out());
6014    }
6015
6016    // -- accumulate_result / newline tests --
6017
6018    #[test]
6019    fn test_accumulate_no_double_newlines() {
6020        // When output already ends with \n, accumulate should not add another
6021        let mut acc = ExecResult::success("line1\n");
6022        let new = ExecResult::success("line2\n");
6023        accumulate_result(&mut acc, &new);
6024        assert_eq!(&*acc.text_out(), "line1\nline2\n");
6025        assert!(!acc.text_out().contains("\n\n"), "should not have double newlines: {:?}", acc.text_out());
6026    }
6027
6028    #[test]
6029    fn test_accumulate_adds_separator_when_needed() {
6030        // When output does NOT end with \n, accumulate adds one
6031        let mut acc = ExecResult::success("line1");
6032        let new = ExecResult::success("line2");
6033        accumulate_result(&mut acc, &new);
6034        assert_eq!(&*acc.text_out(), "line1\nline2");
6035    }
6036
6037    #[test]
6038    fn test_accumulate_empty_into_nonempty() {
6039        let mut acc = ExecResult::success("");
6040        let new = ExecResult::success("hello\n");
6041        accumulate_result(&mut acc, &new);
6042        assert_eq!(&*acc.text_out(), "hello\n");
6043    }
6044
6045    #[test]
6046    fn test_accumulate_nonempty_into_empty() {
6047        let mut acc = ExecResult::success("hello\n");
6048        let new = ExecResult::success("");
6049        accumulate_result(&mut acc, &new);
6050        assert_eq!(&*acc.text_out(), "hello\n");
6051    }
6052
6053    #[test]
6054    fn test_accumulate_stderr_no_double_newlines() {
6055        let mut acc = ExecResult::failure(1, "err1\n");
6056        let new = ExecResult::failure(1, "err2\n");
6057        accumulate_result(&mut acc, &new);
6058        assert!(!acc.err.contains("\n\n"), "stderr should not have double newlines: {:?}", acc.err);
6059    }
6060
6061    #[tokio::test]
6062    async fn test_multiple_echo_no_blank_lines() {
6063        let kernel = Kernel::transient().expect("kernel");
6064        let result = kernel
6065            .execute("echo one\necho two\necho three")
6066            .await
6067            .expect("execution failed");
6068        assert!(result.ok());
6069        assert_eq!(&*result.text_out(), "one\ntwo\nthree\n");
6070    }
6071
6072    #[tokio::test]
6073    async fn test_for_loop_no_blank_lines() {
6074        let kernel = Kernel::transient().expect("kernel");
6075        let result = kernel
6076            .execute(r#"for X in a b c; do echo "item: ${X}"; done"#)
6077            .await
6078            .expect("execution failed");
6079        assert!(result.ok());
6080        assert_eq!(&*result.text_out(), "item: a\nitem: b\nitem: c\n");
6081    }
6082
6083    #[tokio::test]
6084    async fn test_for_command_subst_no_blank_lines() {
6085        let kernel = Kernel::transient().expect("kernel");
6086        let result = kernel
6087            .execute(r#"for N in $(seq 1 3); do echo "n=${N}"; done"#)
6088            .await
6089            .expect("execution failed");
6090        assert!(result.ok());
6091        assert_eq!(&*result.text_out(), "n=1\nn=2\nn=3\n");
6092    }
6093
6094    // ------------------------------------------------------------------
6095    // build_args_async: multi-consume flags (jq --arg NAME VALUE pattern)
6096    // ------------------------------------------------------------------
6097
6098    /// Helper: a throwaway schema with one `--pair` param declared as
6099    /// consuming two positionals per occurrence. Modelled after what
6100    /// jq_native will declare for `--arg` / `--argjson`.
6101    fn multi_consume_schema() -> crate::tools::ToolSchema {
6102        use crate::tools::{ParamSchema, ToolSchema};
6103        ToolSchema::new("test", "multi-consume smoke")
6104            .param(
6105                ParamSchema::optional("pair", "array", Value::Null, "name+value pair")
6106                    .consumes(2),
6107            )
6108    }
6109
6110    fn pos(s: &str) -> Arg {
6111        Arg::Positional(Expr::Literal(Value::String(s.to_string())))
6112    }
6113
6114    #[tokio::test]
6115    async fn build_args_multi_consume_single_occurrence() {
6116        let kernel = Kernel::transient().expect("kernel");
6117        let schema = multi_consume_schema();
6118        // Simulates:  test --pair NAME VALUE filter
6119        let args = vec![
6120            Arg::LongFlag("pair".into()),
6121            pos("NAME"),
6122            pos("VALUE"),
6123            pos("filter"),
6124        ];
6125        let built = kernel
6126            .build_args_async(&args, Some(&schema))
6127            .await
6128            .expect("build_args should succeed");
6129
6130        // `--pair` + its two positionals are consumed into named["pair"],
6131        // which becomes an outer array of one inner 2-element array.
6132        let pair = built.named.get("pair").expect("named[pair] missing");
6133        match pair {
6134            Value::Json(serde_json::Value::Array(occurrences)) => {
6135                assert_eq!(occurrences.len(), 1, "expected one occurrence");
6136                match &occurrences[0] {
6137                    serde_json::Value::Array(values) => {
6138                        assert_eq!(values.len(), 2, "pair must have 2 values");
6139                        assert_eq!(values[0], serde_json::Value::String("NAME".into()));
6140                        assert_eq!(values[1], serde_json::Value::String("VALUE".into()));
6141                    }
6142                    other => panic!("expected inner array, got {other:?}"),
6143                }
6144            }
6145            other => panic!("expected Json(Array(...)) for named[pair], got {other:?}"),
6146        }
6147
6148        // The un-consumed positional ("filter") remains in `positional`.
6149        assert_eq!(built.positional.len(), 1);
6150        assert_eq!(built.positional[0], Value::String("filter".into()));
6151    }
6152
6153    #[tokio::test]
6154    async fn build_args_multi_consume_two_occurrences_accumulate() {
6155        let kernel = Kernel::transient().expect("kernel");
6156        let schema = multi_consume_schema();
6157        // Simulates:  test --pair A 1 --pair B 2 filter
6158        let args = vec![
6159            Arg::LongFlag("pair".into()),
6160            pos("A"),
6161            pos("1"),
6162            Arg::LongFlag("pair".into()),
6163            pos("B"),
6164            pos("2"),
6165            pos("filter"),
6166        ];
6167        let built = kernel
6168            .build_args_async(&args, Some(&schema))
6169            .await
6170            .expect("build_args should succeed");
6171
6172        let pair = built.named.get("pair").expect("named[pair] missing");
6173        match pair {
6174            Value::Json(serde_json::Value::Array(occurrences)) => {
6175                assert_eq!(occurrences.len(), 2, "expected two occurrences");
6176                // Preserved in invocation order.
6177                match &occurrences[0] {
6178                    serde_json::Value::Array(values) => {
6179                        assert_eq!(values[0], serde_json::Value::String("A".into()));
6180                        assert_eq!(values[1], serde_json::Value::String("1".into()));
6181                    }
6182                    other => panic!("expected inner array, got {other:?}"),
6183                }
6184                match &occurrences[1] {
6185                    serde_json::Value::Array(values) => {
6186                        assert_eq!(values[0], serde_json::Value::String("B".into()));
6187                        assert_eq!(values[1], serde_json::Value::String("2".into()));
6188                    }
6189                    other => panic!("expected inner array, got {other:?}"),
6190                }
6191            }
6192            other => panic!("expected Json(Array(...)), got {other:?}"),
6193        }
6194    }
6195
6196    // ── initial_vars + execute_with_vars + hermetic env ───────────────────
6197
6198    #[tokio::test]
6199    async fn test_initial_vars_set_and_exported() {
6200        let config = KernelConfig::transient()
6201            .with_var("INIT_FOO", Value::String("bar".into()));
6202        let kernel = Kernel::new(config).expect("failed to create kernel");
6203
6204        assert_eq!(
6205            kernel.get_var("INIT_FOO").await,
6206            Some(Value::String("bar".into()))
6207        );
6208        assert!(
6209            kernel.scope.read().await.is_exported("INIT_FOO"),
6210            "initial_vars entries must be marked exported"
6211        );
6212    }
6213
6214    #[tokio::test]
6215    async fn test_execute_with_vars_overlay_visible() {
6216        let kernel = Kernel::transient().expect("failed to create kernel");
6217        let mut overlay = HashMap::new();
6218        overlay.insert("OVERLAY_X".to_string(), Value::String("yes".into()));
6219
6220        let result = kernel
6221            .execute_with_options(r#"echo "${OVERLAY_X}""#, ExecuteOptions::new().with_vars(overlay))
6222            .await
6223            .expect("execute failed");
6224
6225        assert!(result.ok());
6226        assert_eq!(result.text_out().trim(), "yes");
6227    }
6228
6229    #[tokio::test]
6230    async fn test_execute_with_vars_overlay_cleanup() {
6231        let kernel = Kernel::transient().expect("failed to create kernel");
6232        let mut overlay = HashMap::new();
6233        overlay.insert("EPHEMERAL".to_string(), Value::String("transient".into()));
6234
6235        kernel
6236            .execute_with_options("echo ignored", ExecuteOptions::new().with_vars(overlay))
6237            .await
6238            .expect("execute failed");
6239
6240        assert_eq!(kernel.get_var("EPHEMERAL").await, None);
6241        assert!(
6242            !kernel.scope.read().await.is_exported("EPHEMERAL"),
6243            "overlay-only export must be cleared on return"
6244        );
6245    }
6246
6247    #[tokio::test]
6248    async fn test_execute_with_vars_does_not_clobber_existing_export() {
6249        let kernel = Kernel::transient().expect("failed to create kernel");
6250        kernel
6251            .execute("export OUTER=outer")
6252            .await
6253            .expect("export failed");
6254
6255        let mut overlay = HashMap::new();
6256        overlay.insert("OUTER".to_string(), Value::String("inner".into()));
6257        let result = kernel
6258            .execute_with_options(r#"echo "${OUTER}""#, ExecuteOptions::new().with_vars(overlay))
6259            .await
6260            .expect("execute failed");
6261        assert_eq!(result.text_out().trim(), "inner");
6262
6263        assert_eq!(
6264            kernel.get_var("OUTER").await,
6265            Some(Value::String("outer".into())),
6266            "outer value must reappear after pop"
6267        );
6268        assert!(
6269            kernel.scope.read().await.is_exported("OUTER"),
6270            "outer export must survive overlay"
6271        );
6272    }
6273
6274    #[tokio::test]
6275    async fn test_execute_with_vars_inner_assignment_is_local() {
6276        let kernel = Kernel::transient().expect("failed to create kernel");
6277        let mut overlay = HashMap::new();
6278        overlay.insert("LOCAL_FOO".to_string(), Value::String("from-overlay".into()));
6279
6280        // Variable assignment inside a single statement uses set() (innermost
6281        // frame), not set_global() — this matches bash function-local semantics.
6282        // We explicitly use `local FOO=...` style by relying on the pushed
6283        // frame; the assignment in the script body modifies the same frame.
6284        let result = kernel
6285            .execute_with_options(
6286                r#"LOCAL_FOO="reassigned"; echo "${LOCAL_FOO}""#,
6287                ExecuteOptions::new().with_vars(overlay),
6288            )
6289            .await
6290            .expect("execute failed");
6291        assert!(result.ok());
6292
6293        // After the call the frame is popped, so LOCAL_FOO is gone regardless
6294        // of how the script reassigned it.
6295        assert_eq!(kernel.get_var("LOCAL_FOO").await, None);
6296    }
6297
6298    #[tokio::test]
6299    async fn test_external_command_sees_exported_var() {
6300        let kernel = Kernel::transient().expect("failed to create kernel");
6301        let result = kernel
6302            .execute("export EXT_FOO=bar; printenv EXT_FOO")
6303            .await
6304            .expect("execute failed");
6305
6306        assert!(result.ok(), "printenv should succeed: stderr={}", result.err);
6307        assert_eq!(result.text_out().trim(), "bar");
6308    }
6309
6310    #[tokio::test]
6311    async fn test_external_command_does_not_see_unexported_var() {
6312        let kernel = Kernel::transient().expect("failed to create kernel");
6313
6314        // Set without exporting; printenv must not see it (exit code != 0,
6315        // empty stdout per printenv semantics).
6316        let result = kernel
6317            .execute("EXT_BAR=hidden; printenv EXT_BAR")
6318            .await
6319            .expect("execute failed");
6320
6321        assert!(!result.ok(), "printenv should fail when var is unexported");
6322        assert!(
6323            result.text_out().trim().is_empty(),
6324            "no stdout when var is missing, got: {}",
6325            result.text_out()
6326        );
6327    }
6328
6329    #[tokio::test]
6330    async fn test_external_command_does_not_see_os_env() {
6331        // The kernel is hermetic: it never reads std::env::vars() and only
6332        // exports what it has been told to export. Cargo always sets PATH for
6333        // tests, so PATH is reliably present in the OS env — but a transient
6334        // kernel doesn't seed it into initial_vars, so `printenv PATH` from
6335        // inside the kernel must fail.
6336        assert!(
6337            std::env::var_os("PATH").is_some(),
6338            "test precondition: cargo should set PATH"
6339        );
6340
6341        let kernel = Kernel::transient().expect("failed to create kernel");
6342        let result = kernel
6343            .execute("printenv PATH")
6344            .await
6345            .expect("execute failed");
6346
6347        assert!(
6348            !result.ok(),
6349            "printenv PATH must fail in hermetic kernel, got stdout={:?}",
6350            result.text_out()
6351        );
6352        assert!(
6353            result.text_out().trim().is_empty(),
6354            "no PATH in subprocess env, got stdout={:?}",
6355            result.text_out()
6356        );
6357    }
6358
6359    #[tokio::test]
6360    async fn test_execute_with_vars_overlay_reaches_subprocess() {
6361        let kernel = Kernel::transient().expect("failed to create kernel");
6362        let mut overlay = HashMap::new();
6363        overlay.insert("SUB_FOO".to_string(), Value::String("subproc".into()));
6364
6365        let result = kernel
6366            .execute_with_options("printenv SUB_FOO", ExecuteOptions::new().with_vars(overlay))
6367            .await
6368            .expect("execute failed");
6369
6370        assert!(
6371            result.ok(),
6372            "printenv should succeed: code={} stdout={:?} stderr={:?}",
6373            result.code,
6374            result.text_out(),
6375            result.err
6376        );
6377        assert_eq!(result.text_out().trim(), "subproc");
6378    }
6379}