Skip to main content

kaish_kernel/
dispatch.rs

1//! Command dispatch — the single execution path for all commands.
2//!
3//! The `CommandDispatcher` trait defines how a single command is resolved and
4//! executed. The Kernel implements this trait with the full dispatch chain:
5//! user tools → builtins → .kai scripts → external commands → backend tools.
6//!
7//! `PipelineRunner` calls `dispatcher.dispatch()` for each command in a
8//! pipeline, handling I/O routing (stdin piping, redirects) around each call.
9//!
10//! ```text
11//! Stmt::Command ──┐
12//!                  ├──▶ execute_pipeline() ──▶ PipelineRunner::run(dispatcher, commands, ctx)
13//! Stmt::Pipeline ──┘                                  │
14//!                                               for each command:
15//!                                                 dispatcher.dispatch(cmd, ctx)
16//!                                                     │
17//!                                               ┌─────┼──────────────┐
18//!                                               │     │              │
19//!                                          user_tools builtins  .kai scripts
20//!                                                                external cmds
21//!                                                                backend tools
22//! ```
23
24use std::sync::Arc;
25
26use anyhow::Result;
27use async_trait::async_trait;
28
29use crate::ast::{Command, Expr, Value};
30use crate::interpreter::ExecResult;
31use crate::tools::ExecContext;
32
33// The following imports are only used by the test-only `BackendDispatcher`.
34#[cfg(test)]
35use crate::ast::Arg;
36#[cfg(test)]
37use crate::backend::BackendError;
38#[cfg(test)]
39use crate::interpreter::apply_output_format;
40#[cfg(test)]
41use crate::scheduler::build_tool_args;
42#[cfg(test)]
43use crate::tools::{GlobalFlags, ToolRegistry};
44#[cfg(all(test, feature = "subprocess"))]
45use crate::tools::resolve_in_path;
46
47/// Position of a command within a pipeline.
48///
49/// Used by external command execution to decide stdio inheritance:
50/// - `Only` or `Last` in interactive mode → inherit terminal
51/// - `First` or `Middle` → always capture
52#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
53pub enum PipelinePosition {
54    /// Single command, no pipe.
55    #[default]
56    Only,
57    /// First command in a pipeline (no stdin from pipe).
58    First,
59    /// Middle of a pipeline (piped stdin, piped stdout).
60    Middle,
61    /// Last command in a pipeline (piped stdin, final output).
62    Last,
63}
64
65/// Trait for dispatching a single command through the full resolution chain.
66///
67/// Implementations handle argument parsing, tool lookup, and execution.
68/// The pipeline runner handles I/O routing (stdin, redirects, piping).
69#[async_trait]
70pub trait CommandDispatcher: Send + Sync {
71    /// Dispatch a single command for execution.
72    ///
73    /// The `ctx` provides stdin (from pipe or redirect), scope, and backend.
74    /// Implementations should handle schema-aware argument parsing and
75    /// output format extraction internally.
76    async fn dispatch(&self, cmd: &Command, ctx: &mut ExecContext) -> Result<ExecResult>;
77
78    /// Evaluate an expression through the full async chain.
79    ///
80    /// Unlike the runner's sync `eval_simple_expr`, this can run command
81    /// substitution (`$(...)`) because it has access to pipeline execution.
82    /// Used for redirect targets and heredoc bodies so `cat < $(cmd)`,
83    /// `echo x > $(cmd)`, and `$(...)` inside heredoc bodies work. The `ctx`
84    /// carries scope/cwd/backend for dispatchers that evaluate against it;
85    /// stateful dispatchers (Kernel) snapshot their own session state and
86    /// only let command output escape (side effects like `cd` do not).
87    async fn eval_expr(&self, expr: &Expr, ctx: &ExecContext) -> Result<Value>;
88
89    /// Fork the dispatcher for concurrent execution (detached).
90    ///
91    /// Returns a subsidiary dispatcher with independent mutable state, safe
92    /// to run concurrently with the parent and other forks without data
93    /// races on shared scope/cwd/aliases. Used by background `&` jobs,
94    /// where the fork must survive parent cancellation.
95    ///
96    /// For stateful dispatchers (e.g. Kernel) this snapshots per-session
97    /// state into a fresh instance. Stateless dispatchers may clone.
98    async fn fork(&self) -> Arc<dyn CommandDispatcher>;
99
100    /// Fork the dispatcher for concurrent execution (attached to parent cancel).
101    ///
102    /// Like [`Self::fork`] but the fork's cancellation token is a *child* of
103    /// the parent's. Cancelling the parent (timeout, Ctrl-C, embedder
104    /// `Kernel::cancel`) cascades into the fork, which then kills its own
105    /// external children via the usual SIGTERM/SIGKILL discipline.
106    ///
107    /// Used for foreground concurrency: scatter workers, concurrent pipeline
108    /// stages, command substitution. Default implementation delegates to
109    /// [`Self::fork`] for stateless dispatchers that don't track cancellation.
110    async fn fork_attached(&self) -> Arc<dyn CommandDispatcher> {
111        self.fork().await
112    }
113}
114
115/// Minimal stateless dispatcher used by pipeline/runner unit tests.
116///
117/// Production code uses `Kernel` (via `Kernel::fork` for concurrent contexts).
118/// This test-only dispatcher routes directly through `backend.call_tool()` so
119/// the pipeline runner can be exercised without spinning up a full Kernel.
120///
121/// Limitations (intentional — these are test-only constraints):
122/// - No user-defined tools
123/// - No .kai script resolution
124/// - No async argument evaluation (command substitution in args won't work)
125#[cfg(test)]
126pub(crate) struct BackendDispatcher {
127    tools: Arc<ToolRegistry>,
128}
129
130#[cfg(test)]
131impl BackendDispatcher {
132    /// Create a new backend dispatcher with the given tool registry.
133    pub(crate) fn new(tools: Arc<ToolRegistry>) -> Self {
134        Self { tools }
135    }
136
137    /// Try to execute an external command (PATH lookup + process spawn).
138    ///
139    /// Used as fallback when no builtin/backend tool matches. Returns None if
140    /// the command is not found in PATH. Always captures stdout/stderr (never
141    /// inherits terminal — pipeline stages don't need interactive I/O).
142    #[cfg(not(feature = "subprocess"))]
143    async fn try_external(
144        &self,
145        _name: &str,
146        _args: &[Arg],
147        _ctx: &mut ExecContext,
148    ) -> Option<ExecResult> {
149        None
150    }
151
152    /// Try to execute an external command (PATH lookup + process spawn).
153    #[cfg(feature = "subprocess")]
154    async fn try_external(
155        &self,
156        name: &str,
157        args: &[Arg],
158        ctx: &mut ExecContext,
159    ) -> Option<ExecResult> {
160        if !ctx.allow_external_commands {
161            return None;
162        }
163
164        // Get real working directory (needed for relative path resolution and child cwd).
165        // If the CWD is virtual (no real path), skip external execution entirely.
166        let real_cwd = match ctx.backend.resolve_real_path(&ctx.cwd) {
167            Some(p) => p,
168            None => return None,
169        };
170
171        // Resolve command: absolute/relative path or PATH lookup
172        let executable = if name.contains('/') {
173            // Resolve relative paths (./script, ../bin/tool) against the shell's cwd
174            let resolved = if std::path::Path::new(name).is_absolute() {
175                std::path::PathBuf::from(name)
176            } else {
177                real_cwd.join(name)
178            };
179            if resolved.exists() {
180                resolved.to_string_lossy().into_owned()
181            } else {
182                return Some(ExecResult::failure(127, format!("{}: No such file or directory", name)));
183            }
184        } else {
185            let path_var = ctx.scope.get("PATH")
186                .map(crate::interpreter::value_to_string)
187                .unwrap_or_else(|| std::env::var("PATH").unwrap_or_default());
188            resolve_in_path(name, &path_var)?
189        };
190
191        // Build flat argv from args
192        let argv: Vec<String> = args.iter().filter_map(|arg| {
193            match arg {
194                Arg::Positional(expr) => match expr {
195                    Expr::Literal(Value::String(s)) => Some(s.clone()),
196                    Expr::Literal(Value::Int(i)) => Some(i.to_string()),
197                    Expr::Literal(Value::Float(f)) => Some(f.to_string()),
198                    Expr::VarRef(path) => ctx.scope.resolve_path(path).map(|v| crate::interpreter::value_to_string(&v)),
199                    _ => None,
200                },
201                Arg::ShortFlag(f) => Some(format!("-{f}")),
202                Arg::LongFlag(f) => Some(format!("--{f}")),
203                Arg::Named { key, value } => match value {
204                    Expr::Literal(Value::String(s)) => Some(format!("--{key}={s}")),
205                    _ => Some(format!("--{key}=")),
206                },
207                Arg::WordAssign { key, value } => match value {
208                    Expr::Literal(Value::String(s)) => Some(format!("{key}={s}")),
209                    _ => Some(format!("{key}=")),
210                },
211                Arg::DoubleDash => Some("--".to_string()),
212            }
213        }).collect();
214
215        // Check for streaming pipes
216        let has_pipe_stdin = ctx.pipe_stdin.is_some();
217        // pipe_stdout checked later when deciding buffered vs streaming output
218        let has_buffered_stdin = ctx.stdin.is_some();
219
220        // Spawn process
221        use tokio::process::Command;
222        use tokio::io::{AsyncReadExt, AsyncWriteExt};
223
224        let mut cmd = Command::new(&executable);
225        cmd.args(&argv);
226        cmd.current_dir(&real_cwd);
227        cmd.kill_on_drop(true);
228
229        // Hermetic env: child sees only kaish's exported vars, not the kaish
230        // process's OS env. Frontends that want OS-env passthrough (REPL, MCP)
231        // populate it via KernelConfig::initial_vars at construction.
232        cmd.env_clear();
233        for (var_name, value) in ctx.scope.exported_vars() {
234            cmd.env(var_name, crate::interpreter::value_to_string(&value));
235        }
236
237        // Stdin: pipe_stdin or buffered string or inherit (interactive) or null
238        cmd.stdin(if has_pipe_stdin || has_buffered_stdin {
239            std::process::Stdio::piped()
240        } else if ctx.interactive && matches!(ctx.pipeline_position, PipelinePosition::First | PipelinePosition::Only) {
241            std::process::Stdio::inherit()
242        } else {
243            std::process::Stdio::null()
244        });
245        cmd.stdout(std::process::Stdio::piped());
246        cmd.stderr(std::process::Stdio::piped());
247
248        let mut child = match cmd.spawn() {
249            Ok(c) => c,
250            Err(e) => return Some(ExecResult::failure(127, format!("{}: {}", name, e))),
251        };
252        // Open a pidfd (Linux) for race-free direct-child kill via wait_or_kill.
253        let kill_target = crate::pidfd::KillTarget::from_child(&child);
254
255        // Stream stdin: copy pipe_stdin → child stdin in chunks (bounded memory)
256        let stdin_task: Option<tokio::task::JoinHandle<()>> = if let Some(mut pipe_in) = ctx.pipe_stdin.take() {
257            child.stdin.take().map(|mut child_stdin| {
258                tokio::spawn(async move {
259                    let mut buf = [0u8; 8192];
260                    loop {
261                        match pipe_in.read(&mut buf).await {
262                            Ok(0) => break, // EOF
263                            Ok(n) => {
264                                if child_stdin.write_all(&buf[..n]).await.is_err() {
265                                    break; // child closed stdin
266                                }
267                            }
268                            Err(_) => break,
269                        }
270                    }
271                    // Drop child_stdin signals EOF to child
272                })
273            })
274        } else if let Some(data) = ctx.stdin.take() {
275            // Buffered string stdin
276            if let Some(mut child_stdin) = child.stdin.take() {
277                let _ = child_stdin.write_all(data.as_bytes()).await;
278                // Drop child_stdin signals EOF
279            }
280            None
281        } else {
282            None
283        };
284
285        // Stream stdout: copy child stdout → pipe_stdout in chunks (bounded memory)
286        if let Some(mut pipe_out) = ctx.pipe_stdout.take() {
287            // Safety: stdout/stderr were set to piped() above, so take() always returns Some
288            let Some(mut child_stdout) = child.stdout.take() else {
289                return Some(ExecResult::failure(1, "internal: stdout not available"));
290            };
291            let Some(mut child_stderr_reader) = child.stderr.take() else {
292                return Some(ExecResult::failure(1, "internal: stderr not available"));
293            };
294            // Stream stderr to the kernel's stderr stream (if available) for
295            // real-time delivery. Otherwise buffer with a cap.
296            let stderr_stream_handle = ctx.stderr.clone();
297            let stderr_task = tokio::spawn(async move {
298                let mut buf = Vec::new();
299                let mut chunk = [0u8; 8192];
300                loop {
301                    match child_stderr_reader.read(&mut chunk).await {
302                        Ok(0) => break,
303                        Ok(n) => {
304                            if let Some(ref stream) = stderr_stream_handle {
305                                // Stream raw bytes — no decode here, lossy decode at drain site
306                                stream.write(&chunk[..n]);
307                            } else {
308                                buf.extend_from_slice(&chunk[..n]);
309                            }
310                        }
311                        Err(_) => break,
312                    }
313                }
314                if stderr_stream_handle.is_some() {
315                    // Already streamed — return empty
316                    String::new()
317                } else {
318                    String::from_utf8_lossy(&buf).into_owned()
319                }
320            });
321
322            // Copy child stdout → pipe_stdout in chunks
323            let mut buf = [0u8; 8192];
324            loop {
325                match child_stdout.read(&mut buf).await {
326                    Ok(0) => break,
327                    Ok(n) => {
328                        if pipe_out.write_all(&buf[..n]).await.is_err() {
329                            break; // next stage dropped its reader (broken pipe)
330                        }
331                    }
332                    Err(_) => break,
333                }
334            }
335            let _ = pipe_out.shutdown().await;
336            drop(pipe_out);
337            let cancel = ctx.cancel.clone();
338            let status = crate::kernel::wait_or_kill(
339                &mut child,
340                kill_target.as_ref(),
341                &cancel,
342                std::time::Duration::from_secs(2),
343            ).await;
344            // Child has exited (naturally or via kill). Abort stdin/stderr drain tasks.
345            if let Some(task) = stdin_task { task.abort(); }
346            stderr_task.abort();
347            let stderr = stderr_task.await.unwrap_or_default();
348            let code = status.map(|s| s.code().unwrap_or(1) as i64).unwrap_or(1);
349            // Output was streamed to pipe, so result.out is empty
350            Some(ExecResult::from_output(code, String::new(), stderr))
351        } else {
352            // No pipe_stdout — last stage or non-pipeline.
353            // Use spill-aware collection if output limits are configured.
354            let Some(child_stdout) = child.stdout.take() else {
355                return Some(ExecResult::failure(1, "internal: stdout not available"));
356            };
357            let Some(child_stderr) = child.stderr.take() else {
358                return Some(ExecResult::failure(1, "internal: stderr not available"));
359            };
360
361            // Always use spill_aware_collect — it handles both limited and
362            // unlimited modes, and correctly streams stderr to ctx.stderr.
363            // (wait_with_output would bypass stderr streaming.)
364            let (stdout, stderr, did_spill) = crate::output_limit::spill_aware_collect(
365                child_stdout,
366                child_stderr,
367                ctx.stderr.clone(),
368                &ctx.output_limit,
369            ).await;
370
371            let cancel = ctx.cancel.clone();
372            let status = crate::kernel::wait_or_kill(
373                &mut child,
374                kill_target.as_ref(),
375                &cancel,
376                std::time::Duration::from_secs(2),
377            ).await;
378            if let Some(task) = stdin_task { task.abort(); }
379            let code = status.map(|s| s.code().unwrap_or(1) as i64).unwrap_or(1);
380            let mut result = ExecResult::from_output(code, stdout, stderr);
381            result.did_spill = did_spill;
382            Some(result)
383        }
384    }
385}
386
387#[cfg(test)]
388#[async_trait]
389impl CommandDispatcher for BackendDispatcher {
390    async fn dispatch(&self, cmd: &Command, ctx: &mut ExecContext) -> Result<ExecResult> {
391        // Handle built-in true/false
392        match cmd.name.as_str() {
393            "true" => return Ok(ExecResult::success("")),
394            "false" => return Ok(ExecResult::failure(1, "")),
395            _ => {}
396        }
397
398        // Build tool args with schema-aware parsing (sync — no command substitution)
399        let schema = self.tools.get(&cmd.name).map(|t| t.schema());
400        let tool_args = build_tool_args(&cmd.args, ctx, schema.as_ref());
401
402        // Honor --json before the tool runs so a parse failure inside the
403        // builtin doesn't drop the format on the floor. See kernel.rs for the
404        // matching call in the production path.
405        GlobalFlags::apply_from_args(&tool_args, ctx);
406
407        // Execute via backend
408        let backend = ctx.backend.clone();
409        let result = match backend.call_tool(&cmd.name, tool_args, ctx).await {
410            Ok(tool_result) => {
411                let mut exec = ExecResult::from_output(
412                    tool_result.code as i64,
413                    tool_result.stdout,
414                    tool_result.stderr,
415                );
416                exec.set_output(tool_result.output);
417                exec.content_type = tool_result.content_type;
418                exec.baggage = tool_result.baggage;
419                // Restore structured data from ToolResult (preserved through backend roundtrip)
420                if let Some(json_data) = tool_result.data {
421                    exec.data = Some(Value::Json(json_data));
422                }
423                exec
424            }
425            Err(BackendError::ToolNotFound(_)) => {
426                // Fall back to external command execution
427                match self.try_external(&cmd.name, &cmd.args, ctx).await {
428                    Some(result) => result,
429                    None => ExecResult::failure(127, format!("command not found: {}", cmd.name)),
430                }
431            }
432            Err(e) => ExecResult::failure(127, e.to_string()),
433        };
434
435        // Migrated builtins parse --json via the GlobalFlags flatten and
436        // write ctx.output_format. The kernel just applies it.
437        let result = match ctx.output_format {
438            Some(format) => apply_output_format(result, format),
439            None => result,
440        };
441
442        Ok(result)
443    }
444
445    /// Sync-only evaluation (no command substitution) — matches this
446    /// test dispatcher's documented "no async argument evaluation" limit.
447    async fn eval_expr(&self, expr: &Expr, ctx: &ExecContext) -> Result<Value> {
448        crate::scheduler::pipeline::eval_simple_expr(expr, ctx)
449            .ok_or_else(|| anyhow::anyhow!("cannot evaluate expression in test dispatcher"))
450    }
451
452    /// BackendDispatcher is stateless, so a fork is just a clone.
453    async fn fork(&self) -> Arc<dyn CommandDispatcher> {
454        Arc::new(Self { tools: Arc::clone(&self.tools) })
455    }
456}