kaish_kernel/dispatch.rs
1//! Command dispatch — the single execution path for all commands.
2//!
3//! The `CommandDispatcher` trait defines how a single command is resolved and
4//! executed. The Kernel implements this trait with the full dispatch chain:
5//! user tools → builtins → .kai scripts → external commands → backend tools.
6//!
7//! `PipelineRunner` calls `dispatcher.dispatch()` for each command in a
8//! pipeline, handling I/O routing (stdin piping, redirects) around each call.
9//!
10//! ```text
11//! Stmt::Command ──┐
12//! ├──▶ execute_pipeline() ──▶ PipelineRunner::run(dispatcher, commands, ctx)
13//! Stmt::Pipeline ──┘ │
14//! for each command:
15//! dispatcher.dispatch(cmd, ctx)
16//! │
17//! ┌─────┼──────────────┐
18//! │ │ │
19//! user_tools builtins .kai scripts
20//! external cmds
21//! backend tools
22//! ```
23
24use std::sync::Arc;
25
26use anyhow::Result;
27use async_trait::async_trait;
28
29use crate::ast::{Arg, Command, Expr, Value};
30use crate::backend::BackendError;
31use crate::interpreter::{apply_output_format, ExecResult};
32use crate::scheduler::build_tool_args;
33use crate::tools::{extract_output_format, resolve_in_path, ExecContext, ToolRegistry};
34
35/// Position of a command within a pipeline.
36///
37/// Used by external command execution to decide stdio inheritance:
38/// - `Only` or `Last` in interactive mode → inherit terminal
39/// - `First` or `Middle` → always capture
40#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
41pub enum PipelinePosition {
42 /// Single command, no pipe.
43 #[default]
44 Only,
45 /// First command in a pipeline (no stdin from pipe).
46 First,
47 /// Middle of a pipeline (piped stdin, piped stdout).
48 Middle,
49 /// Last command in a pipeline (piped stdin, final output).
50 Last,
51}
52
53/// Trait for dispatching a single command through the full resolution chain.
54///
55/// Implementations handle argument parsing, tool lookup, and execution.
56/// The pipeline runner handles I/O routing (stdin, redirects, piping).
57#[async_trait]
58pub trait CommandDispatcher: Send + Sync {
59 /// Dispatch a single command for execution.
60 ///
61 /// The `ctx` provides stdin (from pipe or redirect), scope, and backend.
62 /// Implementations should handle schema-aware argument parsing and
63 /// output format extraction internally.
64 async fn dispatch(&self, cmd: &Command, ctx: &mut ExecContext) -> Result<ExecResult>;
65}
66
67/// Fallback dispatcher that routes through `backend.call_tool()`.
68///
69/// This provides the same behavior as the old `PipelineRunner` — it dispatches
70/// to builtins via the backend's tool registry. Used for background jobs and
71/// scatter/gather workers until full `Arc<Kernel>` dispatch is wired up.
72///
73/// Limitations compared to the Kernel dispatcher:
74/// - No user-defined tools
75/// - No .kai script resolution
76/// - No async argument evaluation (command substitution in args won't work)
77pub struct BackendDispatcher {
78 tools: Arc<ToolRegistry>,
79}
80
81impl BackendDispatcher {
82 /// Create a new backend dispatcher with the given tool registry.
83 pub fn new(tools: Arc<ToolRegistry>) -> Self {
84 Self { tools }
85 }
86
87 /// Try to execute an external command (PATH lookup + process spawn).
88 ///
89 /// Used as fallback when no builtin/backend tool matches. Returns None if
90 /// the command is not found in PATH. Always captures stdout/stderr (never
91 /// inherits terminal — pipeline stages don't need interactive I/O).
92 async fn try_external(
93 &self,
94 name: &str,
95 args: &[Arg],
96 ctx: &mut ExecContext,
97 ) -> Option<ExecResult> {
98 if !ctx.allow_external_commands {
99 return None;
100 }
101
102 // Get real working directory (needed for relative path resolution and child cwd).
103 // If the CWD is virtual (no real path), skip external execution entirely.
104 let real_cwd = match ctx.backend.resolve_real_path(&ctx.cwd) {
105 Some(p) => p,
106 None => return None,
107 };
108
109 // Resolve command: absolute/relative path or PATH lookup
110 let executable = if name.contains('/') {
111 // Resolve relative paths (./script, ../bin/tool) against the shell's cwd
112 let resolved = if std::path::Path::new(name).is_absolute() {
113 std::path::PathBuf::from(name)
114 } else {
115 real_cwd.join(name)
116 };
117 if resolved.exists() {
118 resolved.to_string_lossy().into_owned()
119 } else {
120 return Some(ExecResult::failure(127, format!("{}: No such file or directory", name)));
121 }
122 } else {
123 let path_var = ctx.scope.get("PATH")
124 .map(crate::interpreter::value_to_string)
125 .unwrap_or_else(|| std::env::var("PATH").unwrap_or_default());
126 resolve_in_path(name, &path_var)?
127 };
128
129 // Build flat argv from args
130 let argv: Vec<String> = args.iter().filter_map(|arg| {
131 match arg {
132 Arg::Positional(expr) => match expr {
133 Expr::Literal(Value::String(s)) => Some(s.clone()),
134 Expr::Literal(Value::Int(i)) => Some(i.to_string()),
135 Expr::Literal(Value::Float(f)) => Some(f.to_string()),
136 Expr::VarRef(path) => ctx.scope.resolve_path(path).map(|v| crate::interpreter::value_to_string(&v)),
137 _ => None,
138 },
139 Arg::ShortFlag(f) => Some(format!("-{f}")),
140 Arg::LongFlag(f) => Some(format!("--{f}")),
141 Arg::Named { key, value } => match value {
142 Expr::Literal(Value::String(s)) => Some(format!("{key}={s}")),
143 _ => Some(format!("{key}=")),
144 },
145 Arg::DoubleDash => Some("--".to_string()),
146 }
147 }).collect();
148
149 // Check for streaming pipes
150 let has_pipe_stdin = ctx.pipe_stdin.is_some();
151 // pipe_stdout checked later when deciding buffered vs streaming output
152 let has_buffered_stdin = ctx.stdin.is_some();
153
154 // Spawn process
155 use tokio::process::Command;
156 use tokio::io::{AsyncReadExt, AsyncWriteExt};
157
158 let mut cmd = Command::new(&executable);
159 cmd.args(&argv);
160 cmd.current_dir(&real_cwd);
161
162 // Stdin: pipe_stdin or buffered string or inherit (interactive) or null
163 cmd.stdin(if has_pipe_stdin || has_buffered_stdin {
164 std::process::Stdio::piped()
165 } else if ctx.interactive && matches!(ctx.pipeline_position, PipelinePosition::First | PipelinePosition::Only) {
166 std::process::Stdio::inherit()
167 } else {
168 std::process::Stdio::null()
169 });
170 cmd.stdout(std::process::Stdio::piped());
171 cmd.stderr(std::process::Stdio::piped());
172
173 let mut child = match cmd.spawn() {
174 Ok(c) => c,
175 Err(e) => return Some(ExecResult::failure(127, format!("{}: {}", name, e))),
176 };
177
178 // Stream stdin: copy pipe_stdin → child stdin in chunks (bounded memory)
179 let stdin_task: Option<tokio::task::JoinHandle<()>> = if let Some(mut pipe_in) = ctx.pipe_stdin.take() {
180 child.stdin.take().map(|mut child_stdin| {
181 tokio::spawn(async move {
182 let mut buf = [0u8; 8192];
183 loop {
184 match pipe_in.read(&mut buf).await {
185 Ok(0) => break, // EOF
186 Ok(n) => {
187 if child_stdin.write_all(&buf[..n]).await.is_err() {
188 break; // child closed stdin
189 }
190 }
191 Err(_) => break,
192 }
193 }
194 // Drop child_stdin signals EOF to child
195 })
196 })
197 } else if let Some(data) = ctx.stdin.take() {
198 // Buffered string stdin
199 if let Some(mut child_stdin) = child.stdin.take() {
200 let _ = child_stdin.write_all(data.as_bytes()).await;
201 // Drop child_stdin signals EOF
202 }
203 None
204 } else {
205 None
206 };
207
208 // Stream stdout: copy child stdout → pipe_stdout in chunks (bounded memory)
209 if let Some(mut pipe_out) = ctx.pipe_stdout.take() {
210 // Safety: stdout/stderr were set to piped() above, so take() always returns Some
211 let Some(mut child_stdout) = child.stdout.take() else {
212 return Some(ExecResult::failure(1, "internal: stdout not available"));
213 };
214 let Some(mut child_stderr_reader) = child.stderr.take() else {
215 return Some(ExecResult::failure(1, "internal: stderr not available"));
216 };
217 // Stream stderr to the kernel's stderr stream (if available) for
218 // real-time delivery. Otherwise buffer with a cap.
219 let stderr_stream_handle = ctx.stderr.clone();
220 let stderr_task = tokio::spawn(async move {
221 let mut buf = Vec::new();
222 let mut chunk = [0u8; 8192];
223 loop {
224 match child_stderr_reader.read(&mut chunk).await {
225 Ok(0) => break,
226 Ok(n) => {
227 if let Some(ref stream) = stderr_stream_handle {
228 // Stream raw bytes — no decode here, lossy decode at drain site
229 stream.write(&chunk[..n]);
230 } else {
231 buf.extend_from_slice(&chunk[..n]);
232 }
233 }
234 Err(_) => break,
235 }
236 }
237 if stderr_stream_handle.is_some() {
238 // Already streamed — return empty
239 String::new()
240 } else {
241 String::from_utf8_lossy(&buf).into_owned()
242 }
243 });
244
245 // Copy child stdout → pipe_stdout in chunks
246 let mut buf = [0u8; 8192];
247 loop {
248 match child_stdout.read(&mut buf).await {
249 Ok(0) => break,
250 Ok(n) => {
251 if pipe_out.write_all(&buf[..n]).await.is_err() {
252 break; // next stage dropped its reader (broken pipe)
253 }
254 }
255 Err(_) => break,
256 }
257 }
258 let _ = pipe_out.shutdown().await;
259 drop(pipe_out);
260 let status = child.wait().await;
261 // Abort stdin copier if child exited (it may be blocked on pipe_in.read)
262 if let Some(task) = stdin_task { task.abort(); }
263 let stderr = stderr_task.await.unwrap_or_default();
264 let code = status.map(|s| s.code().unwrap_or(1) as i64).unwrap_or(1);
265 // Output was streamed to pipe, so result.out is empty
266 Some(ExecResult::from_output(code, String::new(), stderr))
267 } else {
268 // No pipe_stdout — last stage or non-pipeline.
269 // Use spill-aware collection if output limits are configured.
270 let Some(child_stdout) = child.stdout.take() else {
271 return Some(ExecResult::failure(1, "internal: stdout not available"));
272 };
273 let Some(child_stderr) = child.stderr.take() else {
274 return Some(ExecResult::failure(1, "internal: stderr not available"));
275 };
276
277 // Always use spill_aware_collect — it handles both limited and
278 // unlimited modes, and correctly streams stderr to ctx.stderr.
279 // (wait_with_output would bypass stderr streaming.)
280 let (stdout, stderr) = crate::output_limit::spill_aware_collect(
281 child_stdout,
282 child_stderr,
283 ctx.stderr.clone(),
284 &ctx.output_limit,
285 ).await;
286
287 let status = child.wait().await;
288 if let Some(task) = stdin_task { task.abort(); }
289 let code = status.map(|s| s.code().unwrap_or(1) as i64).unwrap_or(1);
290 Some(ExecResult::from_output(code, stdout, stderr))
291 }
292 }
293}
294
295#[async_trait]
296impl CommandDispatcher for BackendDispatcher {
297 async fn dispatch(&self, cmd: &Command, ctx: &mut ExecContext) -> Result<ExecResult> {
298 // Handle built-in true/false
299 match cmd.name.as_str() {
300 "true" => return Ok(ExecResult::success("")),
301 "false" => return Ok(ExecResult::failure(1, "")),
302 _ => {}
303 }
304
305 // Build tool args with schema-aware parsing (sync — no command substitution)
306 let schema = self.tools.get(&cmd.name).map(|t| t.schema());
307 let mut tool_args = build_tool_args(&cmd.args, ctx, schema.as_ref());
308 let output_format = extract_output_format(&mut tool_args, schema.as_ref());
309
310 // Execute via backend
311 let backend = ctx.backend.clone();
312 let result = match backend.call_tool(&cmd.name, tool_args, ctx).await {
313 Ok(tool_result) => {
314 let mut exec = ExecResult::from_output(
315 tool_result.code as i64,
316 tool_result.stdout,
317 tool_result.stderr,
318 );
319 exec.output = tool_result.output;
320 // Restore structured data from ToolResult (preserved through backend roundtrip)
321 if let Some(json_data) = tool_result.data {
322 exec.data = Some(Value::Json(json_data));
323 }
324 exec
325 }
326 Err(BackendError::ToolNotFound(_)) => {
327 // Fall back to external command execution
328 match self.try_external(&cmd.name, &cmd.args, ctx).await {
329 Some(result) => result,
330 None => ExecResult::failure(127, format!("command not found: {}", cmd.name)),
331 }
332 }
333 Err(e) => ExecResult::failure(127, e.to_string()),
334 };
335
336 // Apply output format transform
337 let result = match output_format {
338 Some(format) => apply_output_format(result, format),
339 None => result,
340 };
341
342 Ok(result)
343 }
344}