Skip to main content

kaish_kernel/
dispatch.rs

1//! Command dispatch — the single execution path for all commands.
2//!
3//! The `CommandDispatcher` trait defines how a single command is resolved and
4//! executed. The Kernel implements this trait with the full dispatch chain:
5//! user tools → builtins → .kai scripts → external commands → backend tools.
6//!
7//! `PipelineRunner` calls `dispatcher.dispatch()` for each command in a
8//! pipeline, handling I/O routing (stdin piping, redirects) around each call.
9//!
10//! ```text
11//! Stmt::Command ──┐
12//!                  ├──▶ execute_pipeline() ──▶ PipelineRunner::run(dispatcher, commands, ctx)
13//! Stmt::Pipeline ──┘                                  │
14//!                                               for each command:
15//!                                                 dispatcher.dispatch(cmd, ctx)
16//!                                                     │
17//!                                               ┌─────┼──────────────┐
18//!                                               │     │              │
19//!                                          user_tools builtins  .kai scripts
20//!                                                                external cmds
21//!                                                                backend tools
22//! ```
23
24use std::sync::Arc;
25
26use anyhow::Result;
27use async_trait::async_trait;
28
29use crate::ast::{Arg, Command, Expr, Value};
30use crate::backend::BackendError;
31use crate::interpreter::{apply_output_format, ExecResult};
32use crate::scheduler::build_tool_args;
33use crate::tools::{extract_output_format, resolve_in_path, ExecContext, ToolRegistry};
34
35/// Position of a command within a pipeline.
36///
37/// Used by external command execution to decide stdio inheritance:
38/// - `Only` or `Last` in interactive mode → inherit terminal
39/// - `First` or `Middle` → always capture
40#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
41pub enum PipelinePosition {
42    /// Single command, no pipe.
43    #[default]
44    Only,
45    /// First command in a pipeline (no stdin from pipe).
46    First,
47    /// Middle of a pipeline (piped stdin, piped stdout).
48    Middle,
49    /// Last command in a pipeline (piped stdin, final output).
50    Last,
51}
52
53/// Trait for dispatching a single command through the full resolution chain.
54///
55/// Implementations handle argument parsing, tool lookup, and execution.
56/// The pipeline runner handles I/O routing (stdin, redirects, piping).
57#[async_trait]
58pub trait CommandDispatcher: Send + Sync {
59    /// Dispatch a single command for execution.
60    ///
61    /// The `ctx` provides stdin (from pipe or redirect), scope, and backend.
62    /// Implementations should handle schema-aware argument parsing and
63    /// output format extraction internally.
64    async fn dispatch(&self, cmd: &Command, ctx: &mut ExecContext) -> Result<ExecResult>;
65}
66
67/// Fallback dispatcher that routes through `backend.call_tool()`.
68///
69/// This provides the same behavior as the old `PipelineRunner` — it dispatches
70/// to builtins via the backend's tool registry. Used for background jobs and
71/// scatter/gather workers until full `Arc<Kernel>` dispatch is wired up.
72///
73/// Limitations compared to the Kernel dispatcher:
74/// - No user-defined tools
75/// - No .kai script resolution
76/// - No external command execution
77/// - No async argument evaluation (command substitution in args won't work)
78pub struct BackendDispatcher {
79    tools: Arc<ToolRegistry>,
80}
81
82impl BackendDispatcher {
83    /// Create a new backend dispatcher with the given tool registry.
84    pub fn new(tools: Arc<ToolRegistry>) -> Self {
85        Self { tools }
86    }
87
88    /// Try to execute an external command (PATH lookup + process spawn).
89    ///
90    /// Used as fallback when no builtin/backend tool matches. Returns None if
91    /// the command is not found in PATH. Always captures stdout/stderr (never
92    /// inherits terminal — pipeline stages don't need interactive I/O).
93    async fn try_external(
94        &self,
95        name: &str,
96        args: &[Arg],
97        ctx: &mut ExecContext,
98    ) -> Option<ExecResult> {
99        // Get real working directory (needed for relative path resolution and child cwd)
100        let real_cwd = ctx.backend.resolve_real_path(&ctx.cwd)
101            .unwrap_or_else(|| std::path::PathBuf::from("/"));
102
103        // Resolve command: absolute/relative path or PATH lookup
104        let executable = if name.contains('/') {
105            // Resolve relative paths (./script, ../bin/tool) against the shell's cwd
106            let resolved = if std::path::Path::new(name).is_absolute() {
107                std::path::PathBuf::from(name)
108            } else {
109                real_cwd.join(name)
110            };
111            if resolved.exists() {
112                resolved.to_string_lossy().into_owned()
113            } else {
114                return Some(ExecResult::failure(127, format!("{}: No such file or directory", name)));
115            }
116        } else {
117            let path_var = ctx.scope.get("PATH")
118                .map(crate::interpreter::value_to_string)
119                .unwrap_or_else(|| std::env::var("PATH").unwrap_or_default());
120            resolve_in_path(name, &path_var)?
121        };
122
123        // Build flat argv from args
124        let argv: Vec<String> = args.iter().filter_map(|arg| {
125            match arg {
126                Arg::Positional(expr) => match expr {
127                    Expr::Literal(Value::String(s)) => Some(s.clone()),
128                    Expr::Literal(Value::Int(i)) => Some(i.to_string()),
129                    Expr::Literal(Value::Float(f)) => Some(f.to_string()),
130                    Expr::VarRef(path) => ctx.scope.resolve_path(path).map(|v| crate::interpreter::value_to_string(&v)),
131                    _ => None,
132                },
133                Arg::ShortFlag(f) => Some(format!("-{f}")),
134                Arg::LongFlag(f) => Some(format!("--{f}")),
135                Arg::Named { key, value } => match value {
136                    Expr::Literal(Value::String(s)) => Some(format!("{key}={s}")),
137                    _ => Some(format!("{key}=")),
138                },
139                Arg::DoubleDash => Some("--".to_string()),
140            }
141        }).collect();
142
143        // Check for streaming pipes
144        let has_pipe_stdin = ctx.pipe_stdin.is_some();
145        // pipe_stdout checked later when deciding buffered vs streaming output
146        let has_buffered_stdin = ctx.stdin.is_some();
147
148        // Spawn process
149        use tokio::process::Command;
150        use tokio::io::{AsyncReadExt, AsyncWriteExt};
151
152        let mut cmd = Command::new(&executable);
153        cmd.args(&argv);
154        cmd.current_dir(&real_cwd);
155
156        // Stdin: pipe_stdin or buffered string or inherit (interactive) or null
157        cmd.stdin(if has_pipe_stdin || has_buffered_stdin {
158            std::process::Stdio::piped()
159        } else if ctx.interactive && matches!(ctx.pipeline_position, PipelinePosition::First | PipelinePosition::Only) {
160            std::process::Stdio::inherit()
161        } else {
162            std::process::Stdio::null()
163        });
164        cmd.stdout(std::process::Stdio::piped());
165        cmd.stderr(std::process::Stdio::piped());
166
167        let mut child = match cmd.spawn() {
168            Ok(c) => c,
169            Err(e) => return Some(ExecResult::failure(127, format!("{}: {}", name, e))),
170        };
171
172        // Stream stdin: copy pipe_stdin → child stdin in chunks (bounded memory)
173        let stdin_task: Option<tokio::task::JoinHandle<()>> = if let Some(mut pipe_in) = ctx.pipe_stdin.take() {
174            child.stdin.take().map(|mut child_stdin| {
175                tokio::spawn(async move {
176                    let mut buf = [0u8; 8192];
177                    loop {
178                        match pipe_in.read(&mut buf).await {
179                            Ok(0) => break, // EOF
180                            Ok(n) => {
181                                if child_stdin.write_all(&buf[..n]).await.is_err() {
182                                    break; // child closed stdin
183                                }
184                            }
185                            Err(_) => break,
186                        }
187                    }
188                    // Drop child_stdin signals EOF to child
189                })
190            })
191        } else if let Some(data) = ctx.stdin.take() {
192            // Buffered string stdin
193            if let Some(mut child_stdin) = child.stdin.take() {
194                let _ = child_stdin.write_all(data.as_bytes()).await;
195                // Drop child_stdin signals EOF
196            }
197            None
198        } else {
199            None
200        };
201
202        // Stream stdout: copy child stdout → pipe_stdout in chunks (bounded memory)
203        if let Some(mut pipe_out) = ctx.pipe_stdout.take() {
204            // Safety: stdout/stderr were set to piped() above, so take() always returns Some
205            let Some(mut child_stdout) = child.stdout.take() else {
206                return Some(ExecResult::failure(1, "internal: stdout not available"));
207            };
208            let Some(mut child_stderr_reader) = child.stderr.take() else {
209                return Some(ExecResult::failure(1, "internal: stderr not available"));
210            };
211            // Stream stderr to the kernel's stderr stream (if available) for
212            // real-time delivery. Otherwise buffer with a cap.
213            let stderr_stream_handle = ctx.stderr.clone();
214            let stderr_task = tokio::spawn(async move {
215                let mut buf = Vec::new();
216                let mut chunk = [0u8; 8192];
217                loop {
218                    match child_stderr_reader.read(&mut chunk).await {
219                        Ok(0) => break,
220                        Ok(n) => {
221                            if let Some(ref stream) = stderr_stream_handle {
222                                // Stream raw bytes — no decode here, lossy decode at drain site
223                                stream.write(&chunk[..n]);
224                            } else {
225                                buf.extend_from_slice(&chunk[..n]);
226                            }
227                        }
228                        Err(_) => break,
229                    }
230                }
231                if stderr_stream_handle.is_some() {
232                    // Already streamed — return empty
233                    String::new()
234                } else {
235                    String::from_utf8_lossy(&buf).into_owned()
236                }
237            });
238
239            // Copy child stdout → pipe_stdout in chunks
240            let mut buf = [0u8; 8192];
241            loop {
242                match child_stdout.read(&mut buf).await {
243                    Ok(0) => break,
244                    Ok(n) => {
245                        if pipe_out.write_all(&buf[..n]).await.is_err() {
246                            break; // next stage dropped its reader (broken pipe)
247                        }
248                    }
249                    Err(_) => break,
250                }
251            }
252            let _ = pipe_out.shutdown().await;
253            drop(pipe_out);
254            let status = child.wait().await;
255            // Abort stdin copier if child exited (it may be blocked on pipe_in.read)
256            if let Some(task) = stdin_task { task.abort(); }
257            let stderr = stderr_task.await.unwrap_or_default();
258            let code = status.map(|s| s.code().unwrap_or(1) as i64).unwrap_or(1);
259            // Output was streamed to pipe, so result.out is empty
260            Some(ExecResult::from_output(code, String::new(), stderr))
261        } else {
262            // No pipe_stdout — buffer output as before (last stage or non-pipeline)
263            let result = match child.wait_with_output().await {
264                Ok(output) => {
265                    let code = output.status.code().unwrap_or(1) as i64;
266                    let stdout = String::from_utf8_lossy(&output.stdout).into_owned();
267                    let stderr = String::from_utf8_lossy(&output.stderr).into_owned();
268                    Some(ExecResult::from_output(code, stdout, stderr))
269                }
270                Err(e) => Some(ExecResult::failure(1, format!("{}: {}", name, e))),
271            };
272            if let Some(task) = stdin_task { task.abort(); }
273            result
274        }
275    }
276}
277
278#[async_trait]
279impl CommandDispatcher for BackendDispatcher {
280    async fn dispatch(&self, cmd: &Command, ctx: &mut ExecContext) -> Result<ExecResult> {
281        // Handle built-in true/false
282        match cmd.name.as_str() {
283            "true" => return Ok(ExecResult::success("")),
284            "false" => return Ok(ExecResult::failure(1, "")),
285            _ => {}
286        }
287
288        // Build tool args with schema-aware parsing (sync — no command substitution)
289        let schema = self.tools.get(&cmd.name).map(|t| t.schema());
290        let mut tool_args = build_tool_args(&cmd.args, ctx, schema.as_ref());
291        let output_format = extract_output_format(&mut tool_args, schema.as_ref());
292
293        // Execute via backend
294        let backend = ctx.backend.clone();
295        let result = match backend.call_tool(&cmd.name, tool_args, ctx).await {
296            Ok(tool_result) => {
297                let mut exec = ExecResult::from_output(
298                    tool_result.code as i64,
299                    tool_result.stdout,
300                    tool_result.stderr,
301                );
302                exec.output = tool_result.output;
303                // Restore structured data from ToolResult (preserved through backend roundtrip)
304                if let Some(json_data) = tool_result.data {
305                    exec.data = Some(Value::Json(json_data));
306                }
307                exec
308            }
309            Err(BackendError::ToolNotFound(_)) => {
310                // Fall back to external command execution
311                match self.try_external(&cmd.name, &cmd.args, ctx).await {
312                    Some(result) => result,
313                    None => ExecResult::failure(127, format!("command not found: {}", cmd.name)),
314                }
315            }
316            Err(e) => ExecResult::failure(127, e.to_string()),
317        };
318
319        // Apply output format transform
320        let result = match output_format {
321            Some(format) => apply_output_format(result, format),
322            None => result,
323        };
324
325        Ok(result)
326    }
327}