kaish_kernel/dispatch.rs
1//! Command dispatch — the single execution path for all commands.
2//!
3//! The `CommandDispatcher` trait defines how a single command is resolved and
4//! executed. The Kernel implements this trait with the full dispatch chain:
5//! user tools → builtins → .kai scripts → external commands → backend tools.
6//!
7//! `PipelineRunner` calls `dispatcher.dispatch()` for each command in a
8//! pipeline, handling I/O routing (stdin piping, redirects) around each call.
9//!
10//! ```text
11//! Stmt::Command ──┐
12//! ├──▶ execute_pipeline() ──▶ PipelineRunner::run(dispatcher, commands, ctx)
13//! Stmt::Pipeline ──┘ │
14//! for each command:
15//! dispatcher.dispatch(cmd, ctx)
16//! │
17//! ┌─────┼──────────────┐
18//! │ │ │
19//! user_tools builtins .kai scripts
20//! external cmds
21//! backend tools
22//! ```
23
24use std::sync::Arc;
25
26use anyhow::Result;
27use async_trait::async_trait;
28
29use crate::ast::{Command, Expr, Value};
30use crate::interpreter::ExecResult;
31use crate::tools::ExecContext;
32
33// The following imports are only used by the test-only `BackendDispatcher`.
34#[cfg(test)]
35use crate::ast::Arg;
36#[cfg(test)]
37use crate::backend::BackendError;
38#[cfg(test)]
39use crate::interpreter::apply_output_format;
40#[cfg(test)]
41use crate::scheduler::build_tool_args;
42#[cfg(test)]
43use crate::tools::{GlobalFlags, ToolRegistry};
44#[cfg(all(test, feature = "subprocess"))]
45use crate::tools::resolve_in_path;
46
47/// Position of a command within a pipeline.
48///
49/// Used by external command execution to decide stdio inheritance:
50/// - `Only` or `Last` in interactive mode → inherit terminal
51/// - `First` or `Middle` → always capture
52#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
53pub enum PipelinePosition {
54 /// Single command, no pipe.
55 #[default]
56 Only,
57 /// First command in a pipeline (no stdin from pipe).
58 First,
59 /// Middle of a pipeline (piped stdin, piped stdout).
60 Middle,
61 /// Last command in a pipeline (piped stdin, final output).
62 Last,
63}
64
65/// Trait for dispatching a single command through the full resolution chain.
66///
67/// Implementations handle argument parsing, tool lookup, and execution.
68/// The pipeline runner handles I/O routing (stdin, redirects, piping).
69#[async_trait]
70pub trait CommandDispatcher: Send + Sync {
71 /// Dispatch a single command for execution.
72 ///
73 /// The `ctx` provides stdin (from pipe or redirect), scope, and backend.
74 /// Implementations should handle schema-aware argument parsing and
75 /// output format extraction internally.
76 async fn dispatch(&self, cmd: &Command, ctx: &mut ExecContext) -> Result<ExecResult>;
77
78 /// Evaluate an expression through the full async chain.
79 ///
80 /// Unlike the runner's sync `eval_simple_expr`, this can run command
81 /// substitution (`$(...)`) because it has access to pipeline execution.
82 /// Used for redirect targets and heredoc bodies so `cat < $(cmd)`,
83 /// `echo x > $(cmd)`, and `$(...)` inside heredoc bodies work. The `ctx`
84 /// carries scope/cwd/backend for dispatchers that evaluate against it;
85 /// stateful dispatchers (Kernel) snapshot their own session state and
86 /// only let command output escape (side effects like `cd` do not).
87 async fn eval_expr(&self, expr: &Expr, ctx: &ExecContext) -> Result<Value>;
88
89 /// Fork the dispatcher for concurrent execution (detached).
90 ///
91 /// Returns a subsidiary dispatcher with independent mutable state, safe
92 /// to run concurrently with the parent and other forks without data
93 /// races on shared scope/cwd/aliases. Used by background `&` jobs,
94 /// where the fork must survive parent cancellation.
95 ///
96 /// For stateful dispatchers (e.g. Kernel) this snapshots per-session
97 /// state into a fresh instance. Stateless dispatchers may clone.
98 async fn fork(&self) -> Arc<dyn CommandDispatcher>;
99
100 /// Fork the dispatcher for concurrent execution (attached to parent cancel).
101 ///
102 /// Like [`Self::fork`] but the fork's cancellation token is a *child* of
103 /// the parent's. Cancelling the parent (timeout, Ctrl-C, embedder
104 /// `Kernel::cancel`) cascades into the fork, which then kills its own
105 /// external children via the usual SIGTERM/SIGKILL discipline.
106 ///
107 /// Used for foreground concurrency: scatter workers, concurrent pipeline
108 /// stages, command substitution. Default implementation delegates to
109 /// [`Self::fork`] for stateless dispatchers that don't track cancellation.
110 async fn fork_attached(&self) -> Arc<dyn CommandDispatcher> {
111 self.fork().await
112 }
113}
114
115/// Minimal stateless dispatcher used by pipeline/runner unit tests.
116///
117/// Production code uses `Kernel` (via `Kernel::fork` for concurrent contexts).
118/// This test-only dispatcher routes directly through `backend.call_tool()` so
119/// the pipeline runner can be exercised without spinning up a full Kernel.
120///
121/// Limitations (intentional — these are test-only constraints):
122/// - No user-defined tools
123/// - No .kai script resolution
124/// - No async argument evaluation (command substitution in args won't work)
125#[cfg(test)]
126pub(crate) struct BackendDispatcher {
127 tools: Arc<ToolRegistry>,
128}
129
130#[cfg(test)]
131impl BackendDispatcher {
132 /// Create a new backend dispatcher with the given tool registry.
133 pub(crate) fn new(tools: Arc<ToolRegistry>) -> Self {
134 Self { tools }
135 }
136
137 /// Try to execute an external command (PATH lookup + process spawn).
138 ///
139 /// Used as fallback when no builtin/backend tool matches. Returns None if
140 /// the command is not found in PATH. Always captures stdout/stderr (never
141 /// inherits terminal — pipeline stages don't need interactive I/O).
142 #[cfg(not(feature = "subprocess"))]
143 async fn try_external(
144 &self,
145 _name: &str,
146 _args: &[Arg],
147 _ctx: &mut ExecContext,
148 ) -> Option<ExecResult> {
149 None
150 }
151
152 /// Try to execute an external command (PATH lookup + process spawn).
153 #[cfg(feature = "subprocess")]
154 async fn try_external(
155 &self,
156 name: &str,
157 args: &[Arg],
158 ctx: &mut ExecContext,
159 ) -> Option<ExecResult> {
160 if !ctx.allow_external_commands {
161 return None;
162 }
163
164 // Get real working directory (needed for relative path resolution and child cwd).
165 // If the CWD is virtual (no real path), skip external execution entirely.
166 let real_cwd = match ctx.backend.resolve_real_path(&ctx.cwd) {
167 Some(p) => p,
168 None => return None,
169 };
170
171 // Resolve command: absolute/relative path or PATH lookup
172 let executable = if name.contains('/') {
173 // Resolve relative paths (./script, ../bin/tool) against the shell's cwd
174 let resolved = if std::path::Path::new(name).is_absolute() {
175 std::path::PathBuf::from(name)
176 } else {
177 real_cwd.join(name)
178 };
179 if resolved.exists() {
180 resolved.to_string_lossy().into_owned()
181 } else {
182 return Some(ExecResult::failure(127, format!("{}: No such file or directory", name)));
183 }
184 } else {
185 let path_var = ctx.scope.get("PATH")
186 .map(crate::interpreter::value_to_string)
187 .unwrap_or_else(|| std::env::var("PATH").unwrap_or_default());
188 resolve_in_path(name, &path_var)?
189 };
190
191 // Build flat argv from args
192 let argv: Vec<String> = args.iter().filter_map(|arg| {
193 match arg {
194 Arg::Positional(expr) => match expr {
195 Expr::Literal(Value::String(s)) => Some(s.clone()),
196 Expr::Literal(Value::Int(i)) => Some(i.to_string()),
197 Expr::Literal(Value::Float(f)) => Some(f.to_string()),
198 Expr::VarRef(path) => ctx.scope.resolve_path(path).map(|v| crate::interpreter::value_to_string(&v)),
199 _ => None,
200 },
201 Arg::ShortFlag(f) => Some(format!("-{f}")),
202 Arg::LongFlag(f) => Some(format!("--{f}")),
203 Arg::Named { key, value } => match value {
204 Expr::Literal(Value::String(s)) => Some(format!("--{key}={s}")),
205 _ => Some(format!("--{key}=")),
206 },
207 Arg::WordAssign { key, value } => match value {
208 Expr::Literal(Value::String(s)) => Some(format!("{key}={s}")),
209 _ => Some(format!("{key}=")),
210 },
211 Arg::DoubleDash => Some("--".to_string()),
212 }
213 }).collect();
214
215 // Check for streaming pipes
216 let has_pipe_stdin = ctx.pipe_stdin.is_some();
217 // pipe_stdout checked later when deciding buffered vs streaming output
218 let has_buffered_stdin = ctx.stdin.is_some();
219
220 // Spawn process
221 use tokio::process::Command;
222 use tokio::io::{AsyncReadExt, AsyncWriteExt};
223
224 let mut cmd = Command::new(&executable);
225 cmd.args(&argv);
226 cmd.current_dir(&real_cwd);
227 cmd.kill_on_drop(true);
228
229 // Hermetic env: child sees only kaish's exported vars, not the kaish
230 // process's OS env. Frontends that want OS-env passthrough (REPL, MCP)
231 // populate it via KernelConfig::initial_vars at construction.
232 cmd.env_clear();
233 for (var_name, value) in ctx.scope.exported_vars() {
234 cmd.env(var_name, crate::interpreter::value_to_string(&value));
235 }
236
237 // Stdin: pipe_stdin or buffered string or inherit (interactive) or null
238 cmd.stdin(if has_pipe_stdin || has_buffered_stdin {
239 std::process::Stdio::piped()
240 } else if ctx.interactive && matches!(ctx.pipeline_position, PipelinePosition::First | PipelinePosition::Only) {
241 std::process::Stdio::inherit()
242 } else {
243 std::process::Stdio::null()
244 });
245 cmd.stdout(std::process::Stdio::piped());
246 cmd.stderr(std::process::Stdio::piped());
247
248 let mut child = match cmd.spawn() {
249 Ok(c) => c,
250 Err(e) => return Some(ExecResult::failure(127, format!("{}: {}", name, e))),
251 };
252 // Open a pidfd (Linux) for race-free direct-child kill via wait_or_kill.
253 let kill_target = crate::pidfd::KillTarget::from_child(&child);
254
255 // Stream stdin: copy pipe_stdin → child stdin in chunks (bounded memory)
256 let stdin_task: Option<tokio::task::JoinHandle<()>> = if let Some(mut pipe_in) = ctx.pipe_stdin.take() {
257 child.stdin.take().map(|mut child_stdin| {
258 tokio::spawn(async move {
259 let mut buf = [0u8; 8192];
260 loop {
261 match pipe_in.read(&mut buf).await {
262 Ok(0) => break, // EOF
263 Ok(n) => {
264 if child_stdin.write_all(&buf[..n]).await.is_err() {
265 break; // child closed stdin
266 }
267 }
268 Err(_) => break,
269 }
270 }
271 // Drop child_stdin signals EOF to child
272 })
273 })
274 } else if let Some(data) = ctx.stdin.take() {
275 // Buffered string stdin
276 if let Some(mut child_stdin) = child.stdin.take() {
277 let _ = child_stdin.write_all(data.as_bytes()).await;
278 // Drop child_stdin signals EOF
279 }
280 None
281 } else {
282 None
283 };
284
285 // Stream stdout: copy child stdout → pipe_stdout in chunks (bounded memory)
286 if let Some(mut pipe_out) = ctx.pipe_stdout.take() {
287 // Safety: stdout/stderr were set to piped() above, so take() always returns Some
288 let Some(mut child_stdout) = child.stdout.take() else {
289 return Some(ExecResult::failure(1, "internal: stdout not available"));
290 };
291 let Some(mut child_stderr_reader) = child.stderr.take() else {
292 return Some(ExecResult::failure(1, "internal: stderr not available"));
293 };
294 // Stream stderr to the kernel's stderr stream (if available) for
295 // real-time delivery. Otherwise buffer with a cap.
296 let stderr_stream_handle = ctx.stderr.clone();
297 let stderr_task = tokio::spawn(async move {
298 let mut buf = Vec::new();
299 let mut chunk = [0u8; 8192];
300 loop {
301 match child_stderr_reader.read(&mut chunk).await {
302 Ok(0) => break,
303 Ok(n) => {
304 if let Some(ref stream) = stderr_stream_handle {
305 // Stream raw bytes — no decode here, lossy decode at drain site
306 stream.write(&chunk[..n]);
307 } else {
308 buf.extend_from_slice(&chunk[..n]);
309 }
310 }
311 Err(_) => break,
312 }
313 }
314 if stderr_stream_handle.is_some() {
315 // Already streamed — return empty
316 String::new()
317 } else {
318 String::from_utf8_lossy(&buf).into_owned()
319 }
320 });
321
322 // Copy child stdout → pipe_stdout in chunks
323 let mut buf = [0u8; 8192];
324 loop {
325 match child_stdout.read(&mut buf).await {
326 Ok(0) => break,
327 Ok(n) => {
328 if pipe_out.write_all(&buf[..n]).await.is_err() {
329 break; // next stage dropped its reader (broken pipe)
330 }
331 }
332 Err(_) => break,
333 }
334 }
335 let _ = pipe_out.shutdown().await;
336 drop(pipe_out);
337 let cancel = ctx.cancel.clone();
338 let status = crate::kernel::wait_or_kill(
339 &mut child,
340 kill_target.as_ref(),
341 &cancel,
342 std::time::Duration::from_secs(2),
343 ).await;
344 // Child has exited (naturally or via kill). Abort stdin/stderr drain tasks.
345 if let Some(task) = stdin_task { task.abort(); }
346 stderr_task.abort();
347 let stderr = stderr_task.await.unwrap_or_default();
348 let code = status.map(|s| s.code().unwrap_or(1) as i64).unwrap_or(1);
349 // Output was streamed to pipe, so result.out is empty
350 Some(ExecResult::from_output(code, String::new(), stderr))
351 } else {
352 // No pipe_stdout — last stage or non-pipeline.
353 // Use spill-aware collection if output limits are configured.
354 let Some(child_stdout) = child.stdout.take() else {
355 return Some(ExecResult::failure(1, "internal: stdout not available"));
356 };
357 let Some(child_stderr) = child.stderr.take() else {
358 return Some(ExecResult::failure(1, "internal: stderr not available"));
359 };
360
361 // Always use spill_aware_collect — it handles both limited and
362 // unlimited modes, and correctly streams stderr to ctx.stderr.
363 // (wait_with_output would bypass stderr streaming.)
364 let (stdout, stderr, did_spill) = crate::output_limit::spill_aware_collect(
365 child_stdout,
366 child_stderr,
367 ctx.stderr.clone(),
368 &ctx.output_limit,
369 ).await;
370
371 let cancel = ctx.cancel.clone();
372 let status = crate::kernel::wait_or_kill(
373 &mut child,
374 kill_target.as_ref(),
375 &cancel,
376 std::time::Duration::from_secs(2),
377 ).await;
378 if let Some(task) = stdin_task { task.abort(); }
379 let code = status.map(|s| s.code().unwrap_or(1) as i64).unwrap_or(1);
380 let mut result = ExecResult::from_output(code, stdout, stderr);
381 result.did_spill = did_spill;
382 Some(result)
383 }
384 }
385}
386
387#[cfg(test)]
388#[async_trait]
389impl CommandDispatcher for BackendDispatcher {
390 async fn dispatch(&self, cmd: &Command, ctx: &mut ExecContext) -> Result<ExecResult> {
391 // Handle built-in true/false
392 match cmd.name.as_str() {
393 "true" => return Ok(ExecResult::success("")),
394 "false" => return Ok(ExecResult::failure(1, "")),
395 _ => {}
396 }
397
398 // Build tool args with schema-aware parsing (sync — no command substitution)
399 let schema = self.tools.get(&cmd.name).map(|t| t.schema());
400 let tool_args = build_tool_args(&cmd.args, ctx, schema.as_ref());
401
402 // Honor --json before the tool runs so a parse failure inside the
403 // builtin doesn't drop the format on the floor. See kernel.rs for the
404 // matching call in the production path.
405 GlobalFlags::apply_from_args(&tool_args, ctx);
406
407 // Execute via backend
408 let backend = ctx.backend.clone();
409 let result = match backend.call_tool(&cmd.name, tool_args, ctx).await {
410 Ok(tool_result) => {
411 let mut exec = ExecResult::from_output(
412 tool_result.code as i64,
413 tool_result.stdout,
414 tool_result.stderr,
415 );
416 exec.set_output(tool_result.output);
417 exec.content_type = tool_result.content_type;
418 exec.baggage = tool_result.baggage;
419 // Restore structured data from ToolResult (preserved through backend roundtrip)
420 if let Some(json_data) = tool_result.data {
421 exec.data = Some(Value::Json(json_data));
422 }
423 exec
424 }
425 Err(BackendError::ToolNotFound(_)) => {
426 // Fall back to external command execution
427 match self.try_external(&cmd.name, &cmd.args, ctx).await {
428 Some(result) => result,
429 None => ExecResult::failure(127, format!("command not found: {}", cmd.name)),
430 }
431 }
432 Err(e) => ExecResult::failure(127, e.to_string()),
433 };
434
435 // Migrated builtins parse --json via the GlobalFlags flatten and
436 // write ctx.output_format. The kernel just applies it.
437 let result = match ctx.output_format {
438 Some(format) => apply_output_format(result, format),
439 None => result,
440 };
441
442 Ok(result)
443 }
444
445 /// Sync-only evaluation (no command substitution) — matches this
446 /// test dispatcher's documented "no async argument evaluation" limit.
447 async fn eval_expr(&self, expr: &Expr, ctx: &ExecContext) -> Result<Value> {
448 crate::scheduler::pipeline::eval_simple_expr(expr, ctx)
449 .ok_or_else(|| anyhow::anyhow!("cannot evaluate expression in test dispatcher"))
450 }
451
452 /// BackendDispatcher is stateless, so a fork is just a clone.
453 async fn fork(&self) -> Arc<dyn CommandDispatcher> {
454 Arc::new(Self { tools: Arc::clone(&self.tools) })
455 }
456}