kaish_kernel/dispatch.rs
1//! Command dispatch — the single execution path for all commands.
2//!
3//! The `CommandDispatcher` trait defines how a single command is resolved and
4//! executed. The Kernel implements this trait with the full dispatch chain:
5//! user tools → builtins → .kai scripts → external commands → backend tools.
6//!
7//! `PipelineRunner` calls `dispatcher.dispatch()` for each command in a
8//! pipeline, handling I/O routing (stdin piping, redirects) around each call.
9//!
10//! ```text
11//! Stmt::Command ──┐
12//! ├──▶ execute_pipeline() ──▶ PipelineRunner::run(dispatcher, commands, ctx)
13//! Stmt::Pipeline ──┘ │
14//! for each command:
15//! dispatcher.dispatch(cmd, ctx)
16//! │
17//! ┌─────┼──────────────┐
18//! │ │ │
19//! user_tools builtins .kai scripts
20//! external cmds
21//! backend tools
22//! ```
23
24use std::sync::Arc;
25
26use anyhow::Result;
27use async_trait::async_trait;
28
29use crate::ast::Command;
30use crate::interpreter::ExecResult;
31use crate::tools::ExecContext;
32
33// The following imports are only used by the test-only `BackendDispatcher`.
34#[cfg(test)]
35use crate::ast::{Arg, Value};
36#[cfg(all(test, feature = "native"))]
37use crate::ast::Expr;
38#[cfg(test)]
39use crate::backend::BackendError;
40#[cfg(test)]
41use crate::interpreter::apply_output_format;
42#[cfg(test)]
43use crate::scheduler::build_tool_args;
44#[cfg(test)]
45use crate::tools::{extract_output_format, ToolRegistry};
46#[cfg(all(test, feature = "native"))]
47use crate::tools::resolve_in_path;
48
49/// Position of a command within a pipeline.
50///
51/// Used by external command execution to decide stdio inheritance:
52/// - `Only` or `Last` in interactive mode → inherit terminal
53/// - `First` or `Middle` → always capture
54#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
55pub enum PipelinePosition {
56 /// Single command, no pipe.
57 #[default]
58 Only,
59 /// First command in a pipeline (no stdin from pipe).
60 First,
61 /// Middle of a pipeline (piped stdin, piped stdout).
62 Middle,
63 /// Last command in a pipeline (piped stdin, final output).
64 Last,
65}
66
67/// Trait for dispatching a single command through the full resolution chain.
68///
69/// Implementations handle argument parsing, tool lookup, and execution.
70/// The pipeline runner handles I/O routing (stdin, redirects, piping).
71#[async_trait]
72pub trait CommandDispatcher: Send + Sync {
73 /// Dispatch a single command for execution.
74 ///
75 /// The `ctx` provides stdin (from pipe or redirect), scope, and backend.
76 /// Implementations should handle schema-aware argument parsing and
77 /// output format extraction internally.
78 async fn dispatch(&self, cmd: &Command, ctx: &mut ExecContext) -> Result<ExecResult>;
79
80 /// Fork the dispatcher for concurrent execution (detached).
81 ///
82 /// Returns a subsidiary dispatcher with independent mutable state, safe
83 /// to run concurrently with the parent and other forks without data
84 /// races on shared scope/cwd/aliases. Used by background `&` jobs,
85 /// where the fork must survive parent cancellation.
86 ///
87 /// For stateful dispatchers (e.g. Kernel) this snapshots per-session
88 /// state into a fresh instance. Stateless dispatchers may clone.
89 async fn fork(&self) -> Arc<dyn CommandDispatcher>;
90
91 /// Fork the dispatcher for concurrent execution (attached to parent cancel).
92 ///
93 /// Like [`Self::fork`] but the fork's cancellation token is a *child* of
94 /// the parent's. Cancelling the parent (timeout, Ctrl-C, embedder
95 /// `Kernel::cancel`) cascades into the fork, which then kills its own
96 /// external children via the usual SIGTERM/SIGKILL discipline.
97 ///
98 /// Used for foreground concurrency: scatter workers, concurrent pipeline
99 /// stages, command substitution. Default implementation delegates to
100 /// [`Self::fork`] for stateless dispatchers that don't track cancellation.
101 async fn fork_attached(&self) -> Arc<dyn CommandDispatcher> {
102 self.fork().await
103 }
104}
105
106/// Minimal stateless dispatcher used by pipeline/runner unit tests.
107///
108/// Production code uses `Kernel` (via `Kernel::fork` for concurrent contexts).
109/// This test-only dispatcher routes directly through `backend.call_tool()` so
110/// the pipeline runner can be exercised without spinning up a full Kernel.
111///
112/// Limitations (intentional — these are test-only constraints):
113/// - No user-defined tools
114/// - No .kai script resolution
115/// - No async argument evaluation (command substitution in args won't work)
116#[cfg(test)]
117pub(crate) struct BackendDispatcher {
118 tools: Arc<ToolRegistry>,
119}
120
121#[cfg(test)]
122impl BackendDispatcher {
123 /// Create a new backend dispatcher with the given tool registry.
124 pub(crate) fn new(tools: Arc<ToolRegistry>) -> Self {
125 Self { tools }
126 }
127
128 /// Try to execute an external command (PATH lookup + process spawn).
129 ///
130 /// Used as fallback when no builtin/backend tool matches. Returns None if
131 /// the command is not found in PATH. Always captures stdout/stderr (never
132 /// inherits terminal — pipeline stages don't need interactive I/O).
133 #[cfg(not(feature = "native"))]
134 async fn try_external(
135 &self,
136 _name: &str,
137 _args: &[Arg],
138 _ctx: &mut ExecContext,
139 ) -> Option<ExecResult> {
140 None
141 }
142
143 /// Try to execute an external command (PATH lookup + process spawn).
144 #[cfg(feature = "native")]
145 async fn try_external(
146 &self,
147 name: &str,
148 args: &[Arg],
149 ctx: &mut ExecContext,
150 ) -> Option<ExecResult> {
151 if !ctx.allow_external_commands {
152 return None;
153 }
154
155 // Get real working directory (needed for relative path resolution and child cwd).
156 // If the CWD is virtual (no real path), skip external execution entirely.
157 let real_cwd = match ctx.backend.resolve_real_path(&ctx.cwd) {
158 Some(p) => p,
159 None => return None,
160 };
161
162 // Resolve command: absolute/relative path or PATH lookup
163 let executable = if name.contains('/') {
164 // Resolve relative paths (./script, ../bin/tool) against the shell's cwd
165 let resolved = if std::path::Path::new(name).is_absolute() {
166 std::path::PathBuf::from(name)
167 } else {
168 real_cwd.join(name)
169 };
170 if resolved.exists() {
171 resolved.to_string_lossy().into_owned()
172 } else {
173 return Some(ExecResult::failure(127, format!("{}: No such file or directory", name)));
174 }
175 } else {
176 let path_var = ctx.scope.get("PATH")
177 .map(crate::interpreter::value_to_string)
178 .unwrap_or_else(|| std::env::var("PATH").unwrap_or_default());
179 resolve_in_path(name, &path_var)?
180 };
181
182 // Build flat argv from args
183 let argv: Vec<String> = args.iter().filter_map(|arg| {
184 match arg {
185 Arg::Positional(expr) => match expr {
186 Expr::Literal(Value::String(s)) => Some(s.clone()),
187 Expr::Literal(Value::Int(i)) => Some(i.to_string()),
188 Expr::Literal(Value::Float(f)) => Some(f.to_string()),
189 Expr::VarRef(path) => ctx.scope.resolve_path(path).map(|v| crate::interpreter::value_to_string(&v)),
190 _ => None,
191 },
192 Arg::ShortFlag(f) => Some(format!("-{f}")),
193 Arg::LongFlag(f) => Some(format!("--{f}")),
194 Arg::Named { key, value } => match value {
195 Expr::Literal(Value::String(s)) => Some(format!("{key}={s}")),
196 _ => Some(format!("{key}=")),
197 },
198 Arg::DoubleDash => Some("--".to_string()),
199 }
200 }).collect();
201
202 // Check for streaming pipes
203 let has_pipe_stdin = ctx.pipe_stdin.is_some();
204 // pipe_stdout checked later when deciding buffered vs streaming output
205 let has_buffered_stdin = ctx.stdin.is_some();
206
207 // Spawn process
208 use tokio::process::Command;
209 use tokio::io::{AsyncReadExt, AsyncWriteExt};
210
211 let mut cmd = Command::new(&executable);
212 cmd.args(&argv);
213 cmd.current_dir(&real_cwd);
214 cmd.kill_on_drop(true);
215
216 // Hermetic env: child sees only kaish's exported vars, not the kaish
217 // process's OS env. Frontends that want OS-env passthrough (REPL, MCP)
218 // populate it via KernelConfig::initial_vars at construction.
219 cmd.env_clear();
220 for (var_name, value) in ctx.scope.exported_vars() {
221 cmd.env(var_name, crate::interpreter::value_to_string(&value));
222 }
223
224 // Stdin: pipe_stdin or buffered string or inherit (interactive) or null
225 cmd.stdin(if has_pipe_stdin || has_buffered_stdin {
226 std::process::Stdio::piped()
227 } else if ctx.interactive && matches!(ctx.pipeline_position, PipelinePosition::First | PipelinePosition::Only) {
228 std::process::Stdio::inherit()
229 } else {
230 std::process::Stdio::null()
231 });
232 cmd.stdout(std::process::Stdio::piped());
233 cmd.stderr(std::process::Stdio::piped());
234
235 let mut child = match cmd.spawn() {
236 Ok(c) => c,
237 Err(e) => return Some(ExecResult::failure(127, format!("{}: {}", name, e))),
238 };
239 // Open a pidfd (Linux) for race-free direct-child kill via wait_or_kill.
240 let kill_target = crate::pidfd::KillTarget::from_child(&child);
241
242 // Stream stdin: copy pipe_stdin → child stdin in chunks (bounded memory)
243 let stdin_task: Option<tokio::task::JoinHandle<()>> = if let Some(mut pipe_in) = ctx.pipe_stdin.take() {
244 child.stdin.take().map(|mut child_stdin| {
245 tokio::spawn(async move {
246 let mut buf = [0u8; 8192];
247 loop {
248 match pipe_in.read(&mut buf).await {
249 Ok(0) => break, // EOF
250 Ok(n) => {
251 if child_stdin.write_all(&buf[..n]).await.is_err() {
252 break; // child closed stdin
253 }
254 }
255 Err(_) => break,
256 }
257 }
258 // Drop child_stdin signals EOF to child
259 })
260 })
261 } else if let Some(data) = ctx.stdin.take() {
262 // Buffered string stdin
263 if let Some(mut child_stdin) = child.stdin.take() {
264 let _ = child_stdin.write_all(data.as_bytes()).await;
265 // Drop child_stdin signals EOF
266 }
267 None
268 } else {
269 None
270 };
271
272 // Stream stdout: copy child stdout → pipe_stdout in chunks (bounded memory)
273 if let Some(mut pipe_out) = ctx.pipe_stdout.take() {
274 // Safety: stdout/stderr were set to piped() above, so take() always returns Some
275 let Some(mut child_stdout) = child.stdout.take() else {
276 return Some(ExecResult::failure(1, "internal: stdout not available"));
277 };
278 let Some(mut child_stderr_reader) = child.stderr.take() else {
279 return Some(ExecResult::failure(1, "internal: stderr not available"));
280 };
281 // Stream stderr to the kernel's stderr stream (if available) for
282 // real-time delivery. Otherwise buffer with a cap.
283 let stderr_stream_handle = ctx.stderr.clone();
284 let stderr_task = tokio::spawn(async move {
285 let mut buf = Vec::new();
286 let mut chunk = [0u8; 8192];
287 loop {
288 match child_stderr_reader.read(&mut chunk).await {
289 Ok(0) => break,
290 Ok(n) => {
291 if let Some(ref stream) = stderr_stream_handle {
292 // Stream raw bytes — no decode here, lossy decode at drain site
293 stream.write(&chunk[..n]);
294 } else {
295 buf.extend_from_slice(&chunk[..n]);
296 }
297 }
298 Err(_) => break,
299 }
300 }
301 if stderr_stream_handle.is_some() {
302 // Already streamed — return empty
303 String::new()
304 } else {
305 String::from_utf8_lossy(&buf).into_owned()
306 }
307 });
308
309 // Copy child stdout → pipe_stdout in chunks
310 let mut buf = [0u8; 8192];
311 loop {
312 match child_stdout.read(&mut buf).await {
313 Ok(0) => break,
314 Ok(n) => {
315 if pipe_out.write_all(&buf[..n]).await.is_err() {
316 break; // next stage dropped its reader (broken pipe)
317 }
318 }
319 Err(_) => break,
320 }
321 }
322 let _ = pipe_out.shutdown().await;
323 drop(pipe_out);
324 let cancel = ctx.cancel.clone();
325 let status = crate::kernel::wait_or_kill(
326 &mut child,
327 kill_target.as_ref(),
328 &cancel,
329 std::time::Duration::from_secs(2),
330 ).await;
331 // Child has exited (naturally or via kill). Abort stdin/stderr drain tasks.
332 if let Some(task) = stdin_task { task.abort(); }
333 stderr_task.abort();
334 let stderr = stderr_task.await.unwrap_or_default();
335 let code = status.map(|s| s.code().unwrap_or(1) as i64).unwrap_or(1);
336 // Output was streamed to pipe, so result.out is empty
337 Some(ExecResult::from_output(code, String::new(), stderr))
338 } else {
339 // No pipe_stdout — last stage or non-pipeline.
340 // Use spill-aware collection if output limits are configured.
341 let Some(child_stdout) = child.stdout.take() else {
342 return Some(ExecResult::failure(1, "internal: stdout not available"));
343 };
344 let Some(child_stderr) = child.stderr.take() else {
345 return Some(ExecResult::failure(1, "internal: stderr not available"));
346 };
347
348 // Always use spill_aware_collect — it handles both limited and
349 // unlimited modes, and correctly streams stderr to ctx.stderr.
350 // (wait_with_output would bypass stderr streaming.)
351 let (stdout, stderr, did_spill) = crate::output_limit::spill_aware_collect(
352 child_stdout,
353 child_stderr,
354 ctx.stderr.clone(),
355 &ctx.output_limit,
356 ).await;
357
358 let cancel = ctx.cancel.clone();
359 let status = crate::kernel::wait_or_kill(
360 &mut child,
361 kill_target.as_ref(),
362 &cancel,
363 std::time::Duration::from_secs(2),
364 ).await;
365 if let Some(task) = stdin_task { task.abort(); }
366 let code = status.map(|s| s.code().unwrap_or(1) as i64).unwrap_or(1);
367 let mut result = ExecResult::from_output(code, stdout, stderr);
368 result.did_spill = did_spill;
369 Some(result)
370 }
371 }
372}
373
374#[cfg(test)]
375#[async_trait]
376impl CommandDispatcher for BackendDispatcher {
377 async fn dispatch(&self, cmd: &Command, ctx: &mut ExecContext) -> Result<ExecResult> {
378 // Handle built-in true/false
379 match cmd.name.as_str() {
380 "true" => return Ok(ExecResult::success("")),
381 "false" => return Ok(ExecResult::failure(1, "")),
382 _ => {}
383 }
384
385 // Build tool args with schema-aware parsing (sync — no command substitution)
386 let schema = self.tools.get(&cmd.name).map(|t| t.schema());
387 let mut tool_args = build_tool_args(&cmd.args, ctx, schema.as_ref());
388 let output_format = extract_output_format(&mut tool_args, schema.as_ref());
389
390 // Execute via backend
391 let backend = ctx.backend.clone();
392 let result = match backend.call_tool(&cmd.name, tool_args, ctx).await {
393 Ok(tool_result) => {
394 let mut exec = ExecResult::from_output(
395 tool_result.code as i64,
396 tool_result.stdout,
397 tool_result.stderr,
398 );
399 exec.set_output(tool_result.output);
400 exec.content_type = tool_result.content_type;
401 exec.baggage = tool_result.baggage;
402 // Restore structured data from ToolResult (preserved through backend roundtrip)
403 if let Some(json_data) = tool_result.data {
404 exec.data = Some(Value::Json(json_data));
405 }
406 exec
407 }
408 Err(BackendError::ToolNotFound(_)) => {
409 // Fall back to external command execution
410 match self.try_external(&cmd.name, &cmd.args, ctx).await {
411 Some(result) => result,
412 None => ExecResult::failure(127, format!("command not found: {}", cmd.name)),
413 }
414 }
415 Err(e) => ExecResult::failure(127, e.to_string()),
416 };
417
418 // Apply output format transform
419 let result = match output_format {
420 Some(format) => apply_output_format(result, format),
421 None => result,
422 };
423
424 Ok(result)
425 }
426
427 /// BackendDispatcher is stateless, so a fork is just a clone.
428 async fn fork(&self) -> Arc<dyn CommandDispatcher> {
429 Arc::new(Self { tools: Arc::clone(&self.tools) })
430 }
431}