kaish_kernel/dispatch.rs
1//! Command dispatch — the single execution path for all commands.
2//!
3//! The `CommandDispatcher` trait defines how a single command is resolved and
4//! executed. The Kernel implements this trait with the full dispatch chain:
5//! user tools → builtins → .kai scripts → external commands → backend tools.
6//!
7//! `PipelineRunner` calls `dispatcher.dispatch()` for each command in a
8//! pipeline, handling I/O routing (stdin piping, redirects) around each call.
9//!
10//! ```text
11//! Stmt::Command ──┐
12//! ├──▶ execute_pipeline() ──▶ PipelineRunner::run(dispatcher, commands, ctx)
13//! Stmt::Pipeline ──┘ │
14//! for each command:
15//! dispatcher.dispatch(cmd, ctx)
16//! │
17//! ┌─────┼──────────────┐
18//! │ │ │
19//! user_tools builtins .kai scripts
20//! external cmds
21//! backend tools
22//! ```
23
24use std::sync::Arc;
25
26use anyhow::Result;
27use async_trait::async_trait;
28
29use crate::ast::{Arg, Command, Expr, Value};
30use crate::backend::BackendError;
31use crate::interpreter::{apply_output_format, ExecResult};
32use crate::scheduler::build_tool_args;
33use crate::tools::{extract_output_format, resolve_in_path, ExecContext, ToolRegistry};
34
35/// Position of a command within a pipeline.
36///
37/// Used by external command execution to decide stdio inheritance:
38/// - `Only` or `Last` in interactive mode → inherit terminal
39/// - `First` or `Middle` → always capture
40#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
41pub enum PipelinePosition {
42 /// Single command, no pipe.
43 #[default]
44 Only,
45 /// First command in a pipeline (no stdin from pipe).
46 First,
47 /// Middle of a pipeline (piped stdin, piped stdout).
48 Middle,
49 /// Last command in a pipeline (piped stdin, final output).
50 Last,
51}
52
53/// Trait for dispatching a single command through the full resolution chain.
54///
55/// Implementations handle argument parsing, tool lookup, and execution.
56/// The pipeline runner handles I/O routing (stdin, redirects, piping).
57#[async_trait]
58pub trait CommandDispatcher: Send + Sync {
59 /// Dispatch a single command for execution.
60 ///
61 /// The `ctx` provides stdin (from pipe or redirect), scope, and backend.
62 /// Implementations should handle schema-aware argument parsing and
63 /// output format extraction internally.
64 async fn dispatch(&self, cmd: &Command, ctx: &mut ExecContext) -> Result<ExecResult>;
65}
66
67/// Fallback dispatcher that routes through `backend.call_tool()`.
68///
69/// This provides the same behavior as the old `PipelineRunner` — it dispatches
70/// to builtins via the backend's tool registry. Used for background jobs and
71/// scatter/gather workers until full `Arc<Kernel>` dispatch is wired up.
72///
73/// Limitations compared to the Kernel dispatcher:
74/// - No user-defined tools
75/// - No .kai script resolution
76/// - No external command execution
77/// - No async argument evaluation (command substitution in args won't work)
78pub struct BackendDispatcher {
79 tools: Arc<ToolRegistry>,
80}
81
82impl BackendDispatcher {
83 /// Create a new backend dispatcher with the given tool registry.
84 pub fn new(tools: Arc<ToolRegistry>) -> Self {
85 Self { tools }
86 }
87
88 /// Try to execute an external command (PATH lookup + process spawn).
89 ///
90 /// Used as fallback when no builtin/backend tool matches. Returns None if
91 /// the command is not found in PATH. Always captures stdout/stderr (never
92 /// inherits terminal — pipeline stages don't need interactive I/O).
93 async fn try_external(
94 &self,
95 name: &str,
96 args: &[Arg],
97 ctx: &mut ExecContext,
98 ) -> Option<ExecResult> {
99 // Get real working directory (needed for relative path resolution and child cwd)
100 let real_cwd = ctx.backend.resolve_real_path(&ctx.cwd)
101 .unwrap_or_else(|| std::path::PathBuf::from("/"));
102
103 // Resolve command: absolute/relative path or PATH lookup
104 let executable = if name.contains('/') {
105 // Resolve relative paths (./script, ../bin/tool) against the shell's cwd
106 let resolved = if std::path::Path::new(name).is_absolute() {
107 std::path::PathBuf::from(name)
108 } else {
109 real_cwd.join(name)
110 };
111 if resolved.exists() {
112 resolved.to_string_lossy().into_owned()
113 } else {
114 return Some(ExecResult::failure(127, format!("{}: No such file or directory", name)));
115 }
116 } else {
117 let path_var = ctx.scope.get("PATH")
118 .map(crate::interpreter::value_to_string)
119 .unwrap_or_else(|| std::env::var("PATH").unwrap_or_default());
120 resolve_in_path(name, &path_var)?
121 };
122
123 // Build flat argv from args
124 let argv: Vec<String> = args.iter().filter_map(|arg| {
125 match arg {
126 Arg::Positional(expr) => match expr {
127 Expr::Literal(Value::String(s)) => Some(s.clone()),
128 Expr::Literal(Value::Int(i)) => Some(i.to_string()),
129 Expr::Literal(Value::Float(f)) => Some(f.to_string()),
130 Expr::VarRef(path) => ctx.scope.resolve_path(path).map(|v| crate::interpreter::value_to_string(&v)),
131 _ => None,
132 },
133 Arg::ShortFlag(f) => Some(format!("-{f}")),
134 Arg::LongFlag(f) => Some(format!("--{f}")),
135 Arg::Named { key, value } => match value {
136 Expr::Literal(Value::String(s)) => Some(format!("{key}={s}")),
137 _ => Some(format!("{key}=")),
138 },
139 Arg::DoubleDash => Some("--".to_string()),
140 }
141 }).collect();
142
143 // Check for streaming pipes
144 let has_pipe_stdin = ctx.pipe_stdin.is_some();
145 // pipe_stdout checked later when deciding buffered vs streaming output
146 let has_buffered_stdin = ctx.stdin.is_some();
147
148 // Spawn process
149 use tokio::process::Command;
150 use tokio::io::{AsyncReadExt, AsyncWriteExt};
151
152 let mut cmd = Command::new(&executable);
153 cmd.args(&argv);
154 cmd.current_dir(&real_cwd);
155
156 // Stdin: pipe_stdin or buffered string or inherit (interactive) or null
157 cmd.stdin(if has_pipe_stdin || has_buffered_stdin {
158 std::process::Stdio::piped()
159 } else if ctx.interactive && matches!(ctx.pipeline_position, PipelinePosition::First | PipelinePosition::Only) {
160 std::process::Stdio::inherit()
161 } else {
162 std::process::Stdio::null()
163 });
164 cmd.stdout(std::process::Stdio::piped());
165 cmd.stderr(std::process::Stdio::piped());
166
167 let mut child = match cmd.spawn() {
168 Ok(c) => c,
169 Err(e) => return Some(ExecResult::failure(127, format!("{}: {}", name, e))),
170 };
171
172 // Stream stdin: copy pipe_stdin → child stdin in chunks (bounded memory)
173 let stdin_task: Option<tokio::task::JoinHandle<()>> = if let Some(mut pipe_in) = ctx.pipe_stdin.take() {
174 child.stdin.take().map(|mut child_stdin| {
175 tokio::spawn(async move {
176 let mut buf = [0u8; 8192];
177 loop {
178 match pipe_in.read(&mut buf).await {
179 Ok(0) => break, // EOF
180 Ok(n) => {
181 if child_stdin.write_all(&buf[..n]).await.is_err() {
182 break; // child closed stdin
183 }
184 }
185 Err(_) => break,
186 }
187 }
188 // Drop child_stdin signals EOF to child
189 })
190 })
191 } else if let Some(data) = ctx.stdin.take() {
192 // Buffered string stdin
193 if let Some(mut child_stdin) = child.stdin.take() {
194 let _ = child_stdin.write_all(data.as_bytes()).await;
195 // Drop child_stdin signals EOF
196 }
197 None
198 } else {
199 None
200 };
201
202 // Stream stdout: copy child stdout → pipe_stdout in chunks (bounded memory)
203 if let Some(mut pipe_out) = ctx.pipe_stdout.take() {
204 // Safety: stdout/stderr were set to piped() above, so take() always returns Some
205 let Some(mut child_stdout) = child.stdout.take() else {
206 return Some(ExecResult::failure(1, "internal: stdout not available"));
207 };
208 let Some(mut child_stderr_reader) = child.stderr.take() else {
209 return Some(ExecResult::failure(1, "internal: stderr not available"));
210 };
211 // Stream stderr to the kernel's stderr stream (if available) for
212 // real-time delivery. Otherwise buffer with a cap.
213 let stderr_stream_handle = ctx.stderr.clone();
214 let stderr_task = tokio::spawn(async move {
215 let mut buf = Vec::new();
216 let mut chunk = [0u8; 8192];
217 loop {
218 match child_stderr_reader.read(&mut chunk).await {
219 Ok(0) => break,
220 Ok(n) => {
221 if let Some(ref stream) = stderr_stream_handle {
222 // Stream raw bytes — no decode here, lossy decode at drain site
223 stream.write(&chunk[..n]);
224 } else {
225 buf.extend_from_slice(&chunk[..n]);
226 }
227 }
228 Err(_) => break,
229 }
230 }
231 if stderr_stream_handle.is_some() {
232 // Already streamed — return empty
233 String::new()
234 } else {
235 String::from_utf8_lossy(&buf).into_owned()
236 }
237 });
238
239 // Copy child stdout → pipe_stdout in chunks
240 let mut buf = [0u8; 8192];
241 loop {
242 match child_stdout.read(&mut buf).await {
243 Ok(0) => break,
244 Ok(n) => {
245 if pipe_out.write_all(&buf[..n]).await.is_err() {
246 break; // next stage dropped its reader (broken pipe)
247 }
248 }
249 Err(_) => break,
250 }
251 }
252 let _ = pipe_out.shutdown().await;
253 drop(pipe_out);
254 let status = child.wait().await;
255 // Abort stdin copier if child exited (it may be blocked on pipe_in.read)
256 if let Some(task) = stdin_task { task.abort(); }
257 let stderr = stderr_task.await.unwrap_or_default();
258 let code = status.map(|s| s.code().unwrap_or(1) as i64).unwrap_or(1);
259 // Output was streamed to pipe, so result.out is empty
260 Some(ExecResult::from_output(code, String::new(), stderr))
261 } else {
262 // No pipe_stdout — buffer output as before (last stage or non-pipeline)
263 let result = match child.wait_with_output().await {
264 Ok(output) => {
265 let code = output.status.code().unwrap_or(1) as i64;
266 let stdout = String::from_utf8_lossy(&output.stdout).into_owned();
267 let stderr = String::from_utf8_lossy(&output.stderr).into_owned();
268 Some(ExecResult::from_output(code, stdout, stderr))
269 }
270 Err(e) => Some(ExecResult::failure(1, format!("{}: {}", name, e))),
271 };
272 if let Some(task) = stdin_task { task.abort(); }
273 result
274 }
275 }
276}
277
278#[async_trait]
279impl CommandDispatcher for BackendDispatcher {
280 async fn dispatch(&self, cmd: &Command, ctx: &mut ExecContext) -> Result<ExecResult> {
281 // Handle built-in true/false
282 match cmd.name.as_str() {
283 "true" => return Ok(ExecResult::success("")),
284 "false" => return Ok(ExecResult::failure(1, "")),
285 _ => {}
286 }
287
288 // Build tool args with schema-aware parsing (sync — no command substitution)
289 let schema = self.tools.get(&cmd.name).map(|t| t.schema());
290 let mut tool_args = build_tool_args(&cmd.args, ctx, schema.as_ref());
291 let output_format = extract_output_format(&mut tool_args, schema.as_ref());
292
293 // Execute via backend
294 let backend = ctx.backend.clone();
295 let result = match backend.call_tool(&cmd.name, tool_args, ctx).await {
296 Ok(tool_result) => {
297 let mut exec = ExecResult::from_output(
298 tool_result.code as i64,
299 tool_result.stdout,
300 tool_result.stderr,
301 );
302 exec.output = tool_result.output;
303 // Restore structured data from ToolResult (preserved through backend roundtrip)
304 if let Some(json_data) = tool_result.data {
305 exec.data = Some(Value::Json(json_data));
306 }
307 exec
308 }
309 Err(BackendError::ToolNotFound(_)) => {
310 // Fall back to external command execution
311 match self.try_external(&cmd.name, &cmd.args, ctx).await {
312 Some(result) => result,
313 None => ExecResult::failure(127, format!("command not found: {}", cmd.name)),
314 }
315 }
316 Err(e) => ExecResult::failure(127, e.to_string()),
317 };
318
319 // Apply output format transform
320 let result = match output_format {
321 Some(format) => apply_output_format(result, format),
322 None => result,
323 };
324
325 Ok(result)
326 }
327}