Skip to main content

kaish_kernel/
dispatch.rs

1//! Command dispatch — the single execution path for all commands.
2//!
3//! The `CommandDispatcher` trait defines how a single command is resolved and
4//! executed. The Kernel implements this trait with the full dispatch chain:
5//! user tools → builtins → .kai scripts → external commands → backend tools.
6//!
7//! `PipelineRunner` calls `dispatcher.dispatch()` for each command in a
8//! pipeline, handling I/O routing (stdin piping, redirects) around each call.
9//!
10//! ```text
11//! Stmt::Command ──┐
12//!                  ├──▶ execute_pipeline() ──▶ PipelineRunner::run(dispatcher, commands, ctx)
13//! Stmt::Pipeline ──┘                                  │
14//!                                               for each command:
15//!                                                 dispatcher.dispatch(cmd, ctx)
16//!                                                     │
17//!                                               ┌─────┼──────────────┐
18//!                                               │     │              │
19//!                                          user_tools builtins  .kai scripts
20//!                                                                external cmds
21//!                                                                backend tools
22//! ```
23
24use std::sync::Arc;
25
26use anyhow::Result;
27use async_trait::async_trait;
28
29use crate::ast::Command;
30use crate::interpreter::ExecResult;
31use crate::tools::ExecContext;
32
33// The following imports are only used by the test-only `BackendDispatcher`.
34#[cfg(test)]
35use crate::ast::{Arg, Value};
36#[cfg(all(test, feature = "native"))]
37use crate::ast::Expr;
38#[cfg(test)]
39use crate::backend::BackendError;
40#[cfg(test)]
41use crate::interpreter::apply_output_format;
42#[cfg(test)]
43use crate::scheduler::build_tool_args;
44#[cfg(test)]
45use crate::tools::{extract_output_format, ToolRegistry};
46#[cfg(all(test, feature = "native"))]
47use crate::tools::resolve_in_path;
48
49/// Position of a command within a pipeline.
50///
51/// Used by external command execution to decide stdio inheritance:
52/// - `Only` or `Last` in interactive mode → inherit terminal
53/// - `First` or `Middle` → always capture
54#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
55pub enum PipelinePosition {
56    /// Single command, no pipe.
57    #[default]
58    Only,
59    /// First command in a pipeline (no stdin from pipe).
60    First,
61    /// Middle of a pipeline (piped stdin, piped stdout).
62    Middle,
63    /// Last command in a pipeline (piped stdin, final output).
64    Last,
65}
66
67/// Trait for dispatching a single command through the full resolution chain.
68///
69/// Implementations handle argument parsing, tool lookup, and execution.
70/// The pipeline runner handles I/O routing (stdin, redirects, piping).
71#[async_trait]
72pub trait CommandDispatcher: Send + Sync {
73    /// Dispatch a single command for execution.
74    ///
75    /// The `ctx` provides stdin (from pipe or redirect), scope, and backend.
76    /// Implementations should handle schema-aware argument parsing and
77    /// output format extraction internally.
78    async fn dispatch(&self, cmd: &Command, ctx: &mut ExecContext) -> Result<ExecResult>;
79
80    /// Fork the dispatcher for concurrent execution (detached).
81    ///
82    /// Returns a subsidiary dispatcher with independent mutable state, safe
83    /// to run concurrently with the parent and other forks without data
84    /// races on shared scope/cwd/aliases. Used by background `&` jobs,
85    /// where the fork must survive parent cancellation.
86    ///
87    /// For stateful dispatchers (e.g. Kernel) this snapshots per-session
88    /// state into a fresh instance. Stateless dispatchers may clone.
89    async fn fork(&self) -> Arc<dyn CommandDispatcher>;
90
91    /// Fork the dispatcher for concurrent execution (attached to parent cancel).
92    ///
93    /// Like [`Self::fork`] but the fork's cancellation token is a *child* of
94    /// the parent's. Cancelling the parent (timeout, Ctrl-C, embedder
95    /// `Kernel::cancel`) cascades into the fork, which then kills its own
96    /// external children via the usual SIGTERM/SIGKILL discipline.
97    ///
98    /// Used for foreground concurrency: scatter workers, concurrent pipeline
99    /// stages, command substitution. Default implementation delegates to
100    /// [`Self::fork`] for stateless dispatchers that don't track cancellation.
101    async fn fork_attached(&self) -> Arc<dyn CommandDispatcher> {
102        self.fork().await
103    }
104}
105
106/// Minimal stateless dispatcher used by pipeline/runner unit tests.
107///
108/// Production code uses `Kernel` (via `Kernel::fork` for concurrent contexts).
109/// This test-only dispatcher routes directly through `backend.call_tool()` so
110/// the pipeline runner can be exercised without spinning up a full Kernel.
111///
112/// Limitations (intentional — these are test-only constraints):
113/// - No user-defined tools
114/// - No .kai script resolution
115/// - No async argument evaluation (command substitution in args won't work)
116#[cfg(test)]
117pub(crate) struct BackendDispatcher {
118    tools: Arc<ToolRegistry>,
119}
120
121#[cfg(test)]
122impl BackendDispatcher {
123    /// Create a new backend dispatcher with the given tool registry.
124    pub(crate) fn new(tools: Arc<ToolRegistry>) -> Self {
125        Self { tools }
126    }
127
128    /// Try to execute an external command (PATH lookup + process spawn).
129    ///
130    /// Used as fallback when no builtin/backend tool matches. Returns None if
131    /// the command is not found in PATH. Always captures stdout/stderr (never
132    /// inherits terminal — pipeline stages don't need interactive I/O).
133    #[cfg(not(feature = "native"))]
134    async fn try_external(
135        &self,
136        _name: &str,
137        _args: &[Arg],
138        _ctx: &mut ExecContext,
139    ) -> Option<ExecResult> {
140        None
141    }
142
143    /// Try to execute an external command (PATH lookup + process spawn).
144    #[cfg(feature = "native")]
145    async fn try_external(
146        &self,
147        name: &str,
148        args: &[Arg],
149        ctx: &mut ExecContext,
150    ) -> Option<ExecResult> {
151        if !ctx.allow_external_commands {
152            return None;
153        }
154
155        // Get real working directory (needed for relative path resolution and child cwd).
156        // If the CWD is virtual (no real path), skip external execution entirely.
157        let real_cwd = match ctx.backend.resolve_real_path(&ctx.cwd) {
158            Some(p) => p,
159            None => return None,
160        };
161
162        // Resolve command: absolute/relative path or PATH lookup
163        let executable = if name.contains('/') {
164            // Resolve relative paths (./script, ../bin/tool) against the shell's cwd
165            let resolved = if std::path::Path::new(name).is_absolute() {
166                std::path::PathBuf::from(name)
167            } else {
168                real_cwd.join(name)
169            };
170            if resolved.exists() {
171                resolved.to_string_lossy().into_owned()
172            } else {
173                return Some(ExecResult::failure(127, format!("{}: No such file or directory", name)));
174            }
175        } else {
176            let path_var = ctx.scope.get("PATH")
177                .map(crate::interpreter::value_to_string)
178                .unwrap_or_else(|| std::env::var("PATH").unwrap_or_default());
179            resolve_in_path(name, &path_var)?
180        };
181
182        // Build flat argv from args
183        let argv: Vec<String> = args.iter().filter_map(|arg| {
184            match arg {
185                Arg::Positional(expr) => match expr {
186                    Expr::Literal(Value::String(s)) => Some(s.clone()),
187                    Expr::Literal(Value::Int(i)) => Some(i.to_string()),
188                    Expr::Literal(Value::Float(f)) => Some(f.to_string()),
189                    Expr::VarRef(path) => ctx.scope.resolve_path(path).map(|v| crate::interpreter::value_to_string(&v)),
190                    _ => None,
191                },
192                Arg::ShortFlag(f) => Some(format!("-{f}")),
193                Arg::LongFlag(f) => Some(format!("--{f}")),
194                Arg::Named { key, value } => match value {
195                    Expr::Literal(Value::String(s)) => Some(format!("{key}={s}")),
196                    _ => Some(format!("{key}=")),
197                },
198                Arg::DoubleDash => Some("--".to_string()),
199            }
200        }).collect();
201
202        // Check for streaming pipes
203        let has_pipe_stdin = ctx.pipe_stdin.is_some();
204        // pipe_stdout checked later when deciding buffered vs streaming output
205        let has_buffered_stdin = ctx.stdin.is_some();
206
207        // Spawn process
208        use tokio::process::Command;
209        use tokio::io::{AsyncReadExt, AsyncWriteExt};
210
211        let mut cmd = Command::new(&executable);
212        cmd.args(&argv);
213        cmd.current_dir(&real_cwd);
214        cmd.kill_on_drop(true);
215
216        // Hermetic env: child sees only kaish's exported vars, not the kaish
217        // process's OS env. Frontends that want OS-env passthrough (REPL, MCP)
218        // populate it via KernelConfig::initial_vars at construction.
219        cmd.env_clear();
220        for (var_name, value) in ctx.scope.exported_vars() {
221            cmd.env(var_name, crate::interpreter::value_to_string(&value));
222        }
223
224        // Stdin: pipe_stdin or buffered string or inherit (interactive) or null
225        cmd.stdin(if has_pipe_stdin || has_buffered_stdin {
226            std::process::Stdio::piped()
227        } else if ctx.interactive && matches!(ctx.pipeline_position, PipelinePosition::First | PipelinePosition::Only) {
228            std::process::Stdio::inherit()
229        } else {
230            std::process::Stdio::null()
231        });
232        cmd.stdout(std::process::Stdio::piped());
233        cmd.stderr(std::process::Stdio::piped());
234
235        let mut child = match cmd.spawn() {
236            Ok(c) => c,
237            Err(e) => return Some(ExecResult::failure(127, format!("{}: {}", name, e))),
238        };
239        // Open a pidfd (Linux) for race-free direct-child kill via wait_or_kill.
240        let kill_target = crate::pidfd::KillTarget::from_child(&child);
241
242        // Stream stdin: copy pipe_stdin → child stdin in chunks (bounded memory)
243        let stdin_task: Option<tokio::task::JoinHandle<()>> = if let Some(mut pipe_in) = ctx.pipe_stdin.take() {
244            child.stdin.take().map(|mut child_stdin| {
245                tokio::spawn(async move {
246                    let mut buf = [0u8; 8192];
247                    loop {
248                        match pipe_in.read(&mut buf).await {
249                            Ok(0) => break, // EOF
250                            Ok(n) => {
251                                if child_stdin.write_all(&buf[..n]).await.is_err() {
252                                    break; // child closed stdin
253                                }
254                            }
255                            Err(_) => break,
256                        }
257                    }
258                    // Drop child_stdin signals EOF to child
259                })
260            })
261        } else if let Some(data) = ctx.stdin.take() {
262            // Buffered string stdin
263            if let Some(mut child_stdin) = child.stdin.take() {
264                let _ = child_stdin.write_all(data.as_bytes()).await;
265                // Drop child_stdin signals EOF
266            }
267            None
268        } else {
269            None
270        };
271
272        // Stream stdout: copy child stdout → pipe_stdout in chunks (bounded memory)
273        if let Some(mut pipe_out) = ctx.pipe_stdout.take() {
274            // Safety: stdout/stderr were set to piped() above, so take() always returns Some
275            let Some(mut child_stdout) = child.stdout.take() else {
276                return Some(ExecResult::failure(1, "internal: stdout not available"));
277            };
278            let Some(mut child_stderr_reader) = child.stderr.take() else {
279                return Some(ExecResult::failure(1, "internal: stderr not available"));
280            };
281            // Stream stderr to the kernel's stderr stream (if available) for
282            // real-time delivery. Otherwise buffer with a cap.
283            let stderr_stream_handle = ctx.stderr.clone();
284            let stderr_task = tokio::spawn(async move {
285                let mut buf = Vec::new();
286                let mut chunk = [0u8; 8192];
287                loop {
288                    match child_stderr_reader.read(&mut chunk).await {
289                        Ok(0) => break,
290                        Ok(n) => {
291                            if let Some(ref stream) = stderr_stream_handle {
292                                // Stream raw bytes — no decode here, lossy decode at drain site
293                                stream.write(&chunk[..n]);
294                            } else {
295                                buf.extend_from_slice(&chunk[..n]);
296                            }
297                        }
298                        Err(_) => break,
299                    }
300                }
301                if stderr_stream_handle.is_some() {
302                    // Already streamed — return empty
303                    String::new()
304                } else {
305                    String::from_utf8_lossy(&buf).into_owned()
306                }
307            });
308
309            // Copy child stdout → pipe_stdout in chunks
310            let mut buf = [0u8; 8192];
311            loop {
312                match child_stdout.read(&mut buf).await {
313                    Ok(0) => break,
314                    Ok(n) => {
315                        if pipe_out.write_all(&buf[..n]).await.is_err() {
316                            break; // next stage dropped its reader (broken pipe)
317                        }
318                    }
319                    Err(_) => break,
320                }
321            }
322            let _ = pipe_out.shutdown().await;
323            drop(pipe_out);
324            let cancel = ctx.cancel.clone();
325            let status = crate::kernel::wait_or_kill(
326                &mut child,
327                kill_target.as_ref(),
328                &cancel,
329                std::time::Duration::from_secs(2),
330            ).await;
331            // Child has exited (naturally or via kill). Abort stdin/stderr drain tasks.
332            if let Some(task) = stdin_task { task.abort(); }
333            stderr_task.abort();
334            let stderr = stderr_task.await.unwrap_or_default();
335            let code = status.map(|s| s.code().unwrap_or(1) as i64).unwrap_or(1);
336            // Output was streamed to pipe, so result.out is empty
337            Some(ExecResult::from_output(code, String::new(), stderr))
338        } else {
339            // No pipe_stdout — last stage or non-pipeline.
340            // Use spill-aware collection if output limits are configured.
341            let Some(child_stdout) = child.stdout.take() else {
342                return Some(ExecResult::failure(1, "internal: stdout not available"));
343            };
344            let Some(child_stderr) = child.stderr.take() else {
345                return Some(ExecResult::failure(1, "internal: stderr not available"));
346            };
347
348            // Always use spill_aware_collect — it handles both limited and
349            // unlimited modes, and correctly streams stderr to ctx.stderr.
350            // (wait_with_output would bypass stderr streaming.)
351            let (stdout, stderr, did_spill) = crate::output_limit::spill_aware_collect(
352                child_stdout,
353                child_stderr,
354                ctx.stderr.clone(),
355                &ctx.output_limit,
356            ).await;
357
358            let cancel = ctx.cancel.clone();
359            let status = crate::kernel::wait_or_kill(
360                &mut child,
361                kill_target.as_ref(),
362                &cancel,
363                std::time::Duration::from_secs(2),
364            ).await;
365            if let Some(task) = stdin_task { task.abort(); }
366            let code = status.map(|s| s.code().unwrap_or(1) as i64).unwrap_or(1);
367            let mut result = ExecResult::from_output(code, stdout, stderr);
368            result.did_spill = did_spill;
369            Some(result)
370        }
371    }
372}
373
374#[cfg(test)]
375#[async_trait]
376impl CommandDispatcher for BackendDispatcher {
377    async fn dispatch(&self, cmd: &Command, ctx: &mut ExecContext) -> Result<ExecResult> {
378        // Handle built-in true/false
379        match cmd.name.as_str() {
380            "true" => return Ok(ExecResult::success("")),
381            "false" => return Ok(ExecResult::failure(1, "")),
382            _ => {}
383        }
384
385        // Build tool args with schema-aware parsing (sync — no command substitution)
386        let schema = self.tools.get(&cmd.name).map(|t| t.schema());
387        let mut tool_args = build_tool_args(&cmd.args, ctx, schema.as_ref());
388        let output_format = extract_output_format(&mut tool_args, schema.as_ref());
389
390        // Execute via backend
391        let backend = ctx.backend.clone();
392        let result = match backend.call_tool(&cmd.name, tool_args, ctx).await {
393            Ok(tool_result) => {
394                let mut exec = ExecResult::from_output(
395                    tool_result.code as i64,
396                    tool_result.stdout,
397                    tool_result.stderr,
398                );
399                exec.set_output(tool_result.output);
400                exec.content_type = tool_result.content_type;
401                exec.baggage = tool_result.baggage;
402                // Restore structured data from ToolResult (preserved through backend roundtrip)
403                if let Some(json_data) = tool_result.data {
404                    exec.data = Some(Value::Json(json_data));
405                }
406                exec
407            }
408            Err(BackendError::ToolNotFound(_)) => {
409                // Fall back to external command execution
410                match self.try_external(&cmd.name, &cmd.args, ctx).await {
411                    Some(result) => result,
412                    None => ExecResult::failure(127, format!("command not found: {}", cmd.name)),
413                }
414            }
415            Err(e) => ExecResult::failure(127, e.to_string()),
416        };
417
418        // Apply output format transform
419        let result = match output_format {
420            Some(format) => apply_output_format(result, format),
421            None => result,
422        };
423
424        Ok(result)
425    }
426
427    /// BackendDispatcher is stateless, so a fork is just a clone.
428    async fn fork(&self) -> Arc<dyn CommandDispatcher> {
429        Arc::new(Self { tools: Arc::clone(&self.tools) })
430    }
431}