Skip to main content

kaish_kernel/
dispatch.rs

1//! Command dispatch — the single execution path for all commands.
2//!
3//! The `CommandDispatcher` trait defines how a single command is resolved and
4//! executed. The Kernel implements this trait with the full dispatch chain:
5//! user tools → builtins → .kai scripts → external commands → backend tools.
6//!
7//! `PipelineRunner` calls `dispatcher.dispatch()` for each command in a
8//! pipeline, handling I/O routing (stdin piping, redirects) around each call.
9//!
10//! ```text
11//! Stmt::Command ──┐
12//!                  ├──▶ execute_pipeline() ──▶ PipelineRunner::run(dispatcher, commands, ctx)
13//! Stmt::Pipeline ──┘                                  │
14//!                                               for each command:
15//!                                                 dispatcher.dispatch(cmd, ctx)
16//!                                                     │
17//!                                               ┌─────┼──────────────┐
18//!                                               │     │              │
19//!                                          user_tools builtins  .kai scripts
20//!                                                                external cmds
21//!                                                                backend tools
22//! ```
23
24use std::sync::Arc;
25
26use anyhow::Result;
27use async_trait::async_trait;
28
29use crate::ast::{Arg, Command, Expr, Value};
30use crate::backend::BackendError;
31use crate::interpreter::{apply_output_format, ExecResult};
32use crate::scheduler::build_tool_args;
33use crate::tools::{extract_output_format, resolve_in_path, ExecContext, ToolRegistry};
34
35/// Position of a command within a pipeline.
36///
37/// Used by external command execution to decide stdio inheritance:
38/// - `Only` or `Last` in interactive mode → inherit terminal
39/// - `First` or `Middle` → always capture
40#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
41pub enum PipelinePosition {
42    /// Single command, no pipe.
43    #[default]
44    Only,
45    /// First command in a pipeline (no stdin from pipe).
46    First,
47    /// Middle of a pipeline (piped stdin, piped stdout).
48    Middle,
49    /// Last command in a pipeline (piped stdin, final output).
50    Last,
51}
52
53/// Trait for dispatching a single command through the full resolution chain.
54///
55/// Implementations handle argument parsing, tool lookup, and execution.
56/// The pipeline runner handles I/O routing (stdin, redirects, piping).
57#[async_trait]
58pub trait CommandDispatcher: Send + Sync {
59    /// Dispatch a single command for execution.
60    ///
61    /// The `ctx` provides stdin (from pipe or redirect), scope, and backend.
62    /// Implementations should handle schema-aware argument parsing and
63    /// output format extraction internally.
64    async fn dispatch(&self, cmd: &Command, ctx: &mut ExecContext) -> Result<ExecResult>;
65}
66
67/// Fallback dispatcher that routes through `backend.call_tool()`.
68///
69/// This provides the same behavior as the old `PipelineRunner` — it dispatches
70/// to builtins via the backend's tool registry. Used for background jobs and
71/// scatter/gather workers until full `Arc<Kernel>` dispatch is wired up.
72///
73/// Limitations compared to the Kernel dispatcher:
74/// - No user-defined tools
75/// - No .kai script resolution
76/// - No async argument evaluation (command substitution in args won't work)
77pub struct BackendDispatcher {
78    tools: Arc<ToolRegistry>,
79}
80
81impl BackendDispatcher {
82    /// Create a new backend dispatcher with the given tool registry.
83    pub fn new(tools: Arc<ToolRegistry>) -> Self {
84        Self { tools }
85    }
86
87    /// Try to execute an external command (PATH lookup + process spawn).
88    ///
89    /// Used as fallback when no builtin/backend tool matches. Returns None if
90    /// the command is not found in PATH. Always captures stdout/stderr (never
91    /// inherits terminal — pipeline stages don't need interactive I/O).
92    async fn try_external(
93        &self,
94        name: &str,
95        args: &[Arg],
96        ctx: &mut ExecContext,
97    ) -> Option<ExecResult> {
98        if !ctx.allow_external_commands {
99            return None;
100        }
101
102        // Get real working directory (needed for relative path resolution and child cwd).
103        // If the CWD is virtual (no real path), skip external execution entirely.
104        let real_cwd = match ctx.backend.resolve_real_path(&ctx.cwd) {
105            Some(p) => p,
106            None => return None,
107        };
108
109        // Resolve command: absolute/relative path or PATH lookup
110        let executable = if name.contains('/') {
111            // Resolve relative paths (./script, ../bin/tool) against the shell's cwd
112            let resolved = if std::path::Path::new(name).is_absolute() {
113                std::path::PathBuf::from(name)
114            } else {
115                real_cwd.join(name)
116            };
117            if resolved.exists() {
118                resolved.to_string_lossy().into_owned()
119            } else {
120                return Some(ExecResult::failure(127, format!("{}: No such file or directory", name)));
121            }
122        } else {
123            let path_var = ctx.scope.get("PATH")
124                .map(crate::interpreter::value_to_string)
125                .unwrap_or_else(|| std::env::var("PATH").unwrap_or_default());
126            resolve_in_path(name, &path_var)?
127        };
128
129        // Build flat argv from args
130        let argv: Vec<String> = args.iter().filter_map(|arg| {
131            match arg {
132                Arg::Positional(expr) => match expr {
133                    Expr::Literal(Value::String(s)) => Some(s.clone()),
134                    Expr::Literal(Value::Int(i)) => Some(i.to_string()),
135                    Expr::Literal(Value::Float(f)) => Some(f.to_string()),
136                    Expr::VarRef(path) => ctx.scope.resolve_path(path).map(|v| crate::interpreter::value_to_string(&v)),
137                    _ => None,
138                },
139                Arg::ShortFlag(f) => Some(format!("-{f}")),
140                Arg::LongFlag(f) => Some(format!("--{f}")),
141                Arg::Named { key, value } => match value {
142                    Expr::Literal(Value::String(s)) => Some(format!("{key}={s}")),
143                    _ => Some(format!("{key}=")),
144                },
145                Arg::DoubleDash => Some("--".to_string()),
146            }
147        }).collect();
148
149        // Check for streaming pipes
150        let has_pipe_stdin = ctx.pipe_stdin.is_some();
151        // pipe_stdout checked later when deciding buffered vs streaming output
152        let has_buffered_stdin = ctx.stdin.is_some();
153
154        // Spawn process
155        use tokio::process::Command;
156        use tokio::io::{AsyncReadExt, AsyncWriteExt};
157
158        let mut cmd = Command::new(&executable);
159        cmd.args(&argv);
160        cmd.current_dir(&real_cwd);
161
162        // Stdin: pipe_stdin or buffered string or inherit (interactive) or null
163        cmd.stdin(if has_pipe_stdin || has_buffered_stdin {
164            std::process::Stdio::piped()
165        } else if ctx.interactive && matches!(ctx.pipeline_position, PipelinePosition::First | PipelinePosition::Only) {
166            std::process::Stdio::inherit()
167        } else {
168            std::process::Stdio::null()
169        });
170        cmd.stdout(std::process::Stdio::piped());
171        cmd.stderr(std::process::Stdio::piped());
172
173        let mut child = match cmd.spawn() {
174            Ok(c) => c,
175            Err(e) => return Some(ExecResult::failure(127, format!("{}: {}", name, e))),
176        };
177
178        // Stream stdin: copy pipe_stdin → child stdin in chunks (bounded memory)
179        let stdin_task: Option<tokio::task::JoinHandle<()>> = if let Some(mut pipe_in) = ctx.pipe_stdin.take() {
180            child.stdin.take().map(|mut child_stdin| {
181                tokio::spawn(async move {
182                    let mut buf = [0u8; 8192];
183                    loop {
184                        match pipe_in.read(&mut buf).await {
185                            Ok(0) => break, // EOF
186                            Ok(n) => {
187                                if child_stdin.write_all(&buf[..n]).await.is_err() {
188                                    break; // child closed stdin
189                                }
190                            }
191                            Err(_) => break,
192                        }
193                    }
194                    // Drop child_stdin signals EOF to child
195                })
196            })
197        } else if let Some(data) = ctx.stdin.take() {
198            // Buffered string stdin
199            if let Some(mut child_stdin) = child.stdin.take() {
200                let _ = child_stdin.write_all(data.as_bytes()).await;
201                // Drop child_stdin signals EOF
202            }
203            None
204        } else {
205            None
206        };
207
208        // Stream stdout: copy child stdout → pipe_stdout in chunks (bounded memory)
209        if let Some(mut pipe_out) = ctx.pipe_stdout.take() {
210            // Safety: stdout/stderr were set to piped() above, so take() always returns Some
211            let Some(mut child_stdout) = child.stdout.take() else {
212                return Some(ExecResult::failure(1, "internal: stdout not available"));
213            };
214            let Some(mut child_stderr_reader) = child.stderr.take() else {
215                return Some(ExecResult::failure(1, "internal: stderr not available"));
216            };
217            // Stream stderr to the kernel's stderr stream (if available) for
218            // real-time delivery. Otherwise buffer with a cap.
219            let stderr_stream_handle = ctx.stderr.clone();
220            let stderr_task = tokio::spawn(async move {
221                let mut buf = Vec::new();
222                let mut chunk = [0u8; 8192];
223                loop {
224                    match child_stderr_reader.read(&mut chunk).await {
225                        Ok(0) => break,
226                        Ok(n) => {
227                            if let Some(ref stream) = stderr_stream_handle {
228                                // Stream raw bytes — no decode here, lossy decode at drain site
229                                stream.write(&chunk[..n]);
230                            } else {
231                                buf.extend_from_slice(&chunk[..n]);
232                            }
233                        }
234                        Err(_) => break,
235                    }
236                }
237                if stderr_stream_handle.is_some() {
238                    // Already streamed — return empty
239                    String::new()
240                } else {
241                    String::from_utf8_lossy(&buf).into_owned()
242                }
243            });
244
245            // Copy child stdout → pipe_stdout in chunks
246            let mut buf = [0u8; 8192];
247            loop {
248                match child_stdout.read(&mut buf).await {
249                    Ok(0) => break,
250                    Ok(n) => {
251                        if pipe_out.write_all(&buf[..n]).await.is_err() {
252                            break; // next stage dropped its reader (broken pipe)
253                        }
254                    }
255                    Err(_) => break,
256                }
257            }
258            let _ = pipe_out.shutdown().await;
259            drop(pipe_out);
260            let status = child.wait().await;
261            // Abort stdin copier if child exited (it may be blocked on pipe_in.read)
262            if let Some(task) = stdin_task { task.abort(); }
263            let stderr = stderr_task.await.unwrap_or_default();
264            let code = status.map(|s| s.code().unwrap_or(1) as i64).unwrap_or(1);
265            // Output was streamed to pipe, so result.out is empty
266            Some(ExecResult::from_output(code, String::new(), stderr))
267        } else {
268            // No pipe_stdout — last stage or non-pipeline.
269            // Use spill-aware collection if output limits are configured.
270            let Some(child_stdout) = child.stdout.take() else {
271                return Some(ExecResult::failure(1, "internal: stdout not available"));
272            };
273            let Some(child_stderr) = child.stderr.take() else {
274                return Some(ExecResult::failure(1, "internal: stderr not available"));
275            };
276
277            // Always use spill_aware_collect — it handles both limited and
278            // unlimited modes, and correctly streams stderr to ctx.stderr.
279            // (wait_with_output would bypass stderr streaming.)
280            let (stdout, stderr) = crate::output_limit::spill_aware_collect(
281                child_stdout,
282                child_stderr,
283                ctx.stderr.clone(),
284                &ctx.output_limit,
285            ).await;
286
287            let status = child.wait().await;
288            if let Some(task) = stdin_task { task.abort(); }
289            let code = status.map(|s| s.code().unwrap_or(1) as i64).unwrap_or(1);
290            Some(ExecResult::from_output(code, stdout, stderr))
291        }
292    }
293}
294
295#[async_trait]
296impl CommandDispatcher for BackendDispatcher {
297    async fn dispatch(&self, cmd: &Command, ctx: &mut ExecContext) -> Result<ExecResult> {
298        // Handle built-in true/false
299        match cmd.name.as_str() {
300            "true" => return Ok(ExecResult::success("")),
301            "false" => return Ok(ExecResult::failure(1, "")),
302            _ => {}
303        }
304
305        // Build tool args with schema-aware parsing (sync — no command substitution)
306        let schema = self.tools.get(&cmd.name).map(|t| t.schema());
307        let mut tool_args = build_tool_args(&cmd.args, ctx, schema.as_ref());
308        let output_format = extract_output_format(&mut tool_args, schema.as_ref());
309
310        // Execute via backend
311        let backend = ctx.backend.clone();
312        let result = match backend.call_tool(&cmd.name, tool_args, ctx).await {
313            Ok(tool_result) => {
314                let mut exec = ExecResult::from_output(
315                    tool_result.code as i64,
316                    tool_result.stdout,
317                    tool_result.stderr,
318                );
319                exec.output = tool_result.output;
320                // Restore structured data from ToolResult (preserved through backend roundtrip)
321                if let Some(json_data) = tool_result.data {
322                    exec.data = Some(Value::Json(json_data));
323                }
324                exec
325            }
326            Err(BackendError::ToolNotFound(_)) => {
327                // Fall back to external command execution
328                match self.try_external(&cmd.name, &cmd.args, ctx).await {
329                    Some(result) => result,
330                    None => ExecResult::failure(127, format!("command not found: {}", cmd.name)),
331                }
332            }
333            Err(e) => ExecResult::failure(127, e.to_string()),
334        };
335
336        // Apply output format transform
337        let result = match output_format {
338            Some(format) => apply_output_format(result, format),
339            None => result,
340        };
341
342        Ok(result)
343    }
344}