kaish-kernel 0.7.0

Core kernel for kaish: lexer, parser, interpreter, and runtime
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
//! Command dispatch — the single execution path for all commands.
//!
//! The `CommandDispatcher` trait defines how a single command is resolved and
//! executed. The Kernel implements this trait with the full dispatch chain:
//! user tools → builtins → .kai scripts → external commands → backend tools.
//!
//! `PipelineRunner` calls `dispatcher.dispatch()` for each command in a
//! pipeline, handling I/O routing (stdin piping, redirects) around each call.
//!
//! ```text
//! Stmt::Command ──┐
//!                  ├──▶ execute_pipeline() ──▶ PipelineRunner::run(dispatcher, commands, ctx)
//! Stmt::Pipeline ──┘                                  │
//!                                               for each command:
//!                                                 dispatcher.dispatch(cmd, ctx)
//!//!                                               ┌─────┼──────────────┐
//!                                               │     │              │
//!                                          user_tools builtins  .kai scripts
//!                                                                external cmds
//!                                                                backend tools
//! ```

use std::sync::Arc;

use anyhow::Result;
use async_trait::async_trait;

use crate::ast::Command;
use crate::interpreter::ExecResult;
use crate::tools::ExecContext;

// The following imports are only used by the test-only `BackendDispatcher`.
#[cfg(test)]
use crate::ast::{Arg, Value};
#[cfg(all(test, feature = "native"))]
use crate::ast::Expr;
#[cfg(test)]
use crate::backend::BackendError;
#[cfg(test)]
use crate::interpreter::apply_output_format;
#[cfg(test)]
use crate::scheduler::build_tool_args;
#[cfg(test)]
use crate::tools::{extract_output_format, ToolRegistry};
#[cfg(all(test, feature = "native"))]
use crate::tools::resolve_in_path;

/// Position of a command within a pipeline.
///
/// Used by external command execution to decide stdio inheritance:
/// - `Only` or `Last` in interactive mode → inherit terminal
/// - `First` or `Middle` → always capture
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum PipelinePosition {
    /// Single command, no pipe.
    #[default]
    Only,
    /// First command in a pipeline (no stdin from pipe).
    First,
    /// Middle of a pipeline (piped stdin, piped stdout).
    Middle,
    /// Last command in a pipeline (piped stdin, final output).
    Last,
}

/// Trait for dispatching a single command through the full resolution chain.
///
/// Implementations handle argument parsing, tool lookup, and execution.
/// The pipeline runner handles I/O routing (stdin, redirects, piping).
#[async_trait]
pub trait CommandDispatcher: Send + Sync {
    /// Dispatch a single command for execution.
    ///
    /// The `ctx` provides stdin (from pipe or redirect), scope, and backend.
    /// Implementations should handle schema-aware argument parsing and
    /// output format extraction internally.
    async fn dispatch(&self, cmd: &Command, ctx: &mut ExecContext) -> Result<ExecResult>;

    /// Fork the dispatcher for concurrent execution (detached).
    ///
    /// Returns a subsidiary dispatcher with independent mutable state, safe
    /// to run concurrently with the parent and other forks without data
    /// races on shared scope/cwd/aliases. Used by background `&` jobs,
    /// where the fork must survive parent cancellation.
    ///
    /// For stateful dispatchers (e.g. Kernel) this snapshots per-session
    /// state into a fresh instance. Stateless dispatchers may clone.
    async fn fork(&self) -> Arc<dyn CommandDispatcher>;

    /// Fork the dispatcher for concurrent execution (attached to parent cancel).
    ///
    /// Like [`Self::fork`] but the fork's cancellation token is a *child* of
    /// the parent's. Cancelling the parent (timeout, Ctrl-C, embedder
    /// `Kernel::cancel`) cascades into the fork, which then kills its own
    /// external children via the usual SIGTERM/SIGKILL discipline.
    ///
    /// Used for foreground concurrency: scatter workers, concurrent pipeline
    /// stages, command substitution. Default implementation delegates to
    /// [`Self::fork`] for stateless dispatchers that don't track cancellation.
    async fn fork_attached(&self) -> Arc<dyn CommandDispatcher> {
        self.fork().await
    }
}

/// Minimal stateless dispatcher used by pipeline/runner unit tests.
///
/// Production code uses `Kernel` (via `Kernel::fork` for concurrent contexts).
/// This test-only dispatcher routes directly through `backend.call_tool()` so
/// the pipeline runner can be exercised without spinning up a full Kernel.
///
/// Limitations (intentional — these are test-only constraints):
/// - No user-defined tools
/// - No .kai script resolution
/// - No async argument evaluation (command substitution in args won't work)
#[cfg(test)]
pub(crate) struct BackendDispatcher {
    tools: Arc<ToolRegistry>,
}

#[cfg(test)]
impl BackendDispatcher {
    /// Create a new backend dispatcher with the given tool registry.
    pub(crate) fn new(tools: Arc<ToolRegistry>) -> Self {
        Self { tools }
    }

    /// Try to execute an external command (PATH lookup + process spawn).
    ///
    /// Used as fallback when no builtin/backend tool matches. Returns None if
    /// the command is not found in PATH. Always captures stdout/stderr (never
    /// inherits terminal — pipeline stages don't need interactive I/O).
    #[cfg(not(feature = "native"))]
    async fn try_external(
        &self,
        _name: &str,
        _args: &[Arg],
        _ctx: &mut ExecContext,
    ) -> Option<ExecResult> {
        None
    }

    /// Try to execute an external command (PATH lookup + process spawn).
    #[cfg(feature = "native")]
    async fn try_external(
        &self,
        name: &str,
        args: &[Arg],
        ctx: &mut ExecContext,
    ) -> Option<ExecResult> {
        if !ctx.allow_external_commands {
            return None;
        }

        // Get real working directory (needed for relative path resolution and child cwd).
        // If the CWD is virtual (no real path), skip external execution entirely.
        let real_cwd = match ctx.backend.resolve_real_path(&ctx.cwd) {
            Some(p) => p,
            None => return None,
        };

        // Resolve command: absolute/relative path or PATH lookup
        let executable = if name.contains('/') {
            // Resolve relative paths (./script, ../bin/tool) against the shell's cwd
            let resolved = if std::path::Path::new(name).is_absolute() {
                std::path::PathBuf::from(name)
            } else {
                real_cwd.join(name)
            };
            if resolved.exists() {
                resolved.to_string_lossy().into_owned()
            } else {
                return Some(ExecResult::failure(127, format!("{}: No such file or directory", name)));
            }
        } else {
            let path_var = ctx.scope.get("PATH")
                .map(crate::interpreter::value_to_string)
                .unwrap_or_else(|| std::env::var("PATH").unwrap_or_default());
            resolve_in_path(name, &path_var)?
        };

        // Build flat argv from args
        let argv: Vec<String> = args.iter().filter_map(|arg| {
            match arg {
                Arg::Positional(expr) => match expr {
                    Expr::Literal(Value::String(s)) => Some(s.clone()),
                    Expr::Literal(Value::Int(i)) => Some(i.to_string()),
                    Expr::Literal(Value::Float(f)) => Some(f.to_string()),
                    Expr::VarRef(path) => ctx.scope.resolve_path(path).map(|v| crate::interpreter::value_to_string(&v)),
                    _ => None,
                },
                Arg::ShortFlag(f) => Some(format!("-{f}")),
                Arg::LongFlag(f) => Some(format!("--{f}")),
                Arg::Named { key, value } => match value {
                    Expr::Literal(Value::String(s)) => Some(format!("{key}={s}")),
                    _ => Some(format!("{key}=")),
                },
                Arg::DoubleDash => Some("--".to_string()),
            }
        }).collect();

        // Check for streaming pipes
        let has_pipe_stdin = ctx.pipe_stdin.is_some();
        // pipe_stdout checked later when deciding buffered vs streaming output
        let has_buffered_stdin = ctx.stdin.is_some();

        // Spawn process
        use tokio::process::Command;
        use tokio::io::{AsyncReadExt, AsyncWriteExt};

        let mut cmd = Command::new(&executable);
        cmd.args(&argv);
        cmd.current_dir(&real_cwd);
        cmd.kill_on_drop(true);

        // Hermetic env: child sees only kaish's exported vars, not the kaish
        // process's OS env. Frontends that want OS-env passthrough (REPL, MCP)
        // populate it via KernelConfig::initial_vars at construction.
        cmd.env_clear();
        for (var_name, value) in ctx.scope.exported_vars() {
            cmd.env(var_name, crate::interpreter::value_to_string(&value));
        }

        // Stdin: pipe_stdin or buffered string or inherit (interactive) or null
        cmd.stdin(if has_pipe_stdin || has_buffered_stdin {
            std::process::Stdio::piped()
        } else if ctx.interactive && matches!(ctx.pipeline_position, PipelinePosition::First | PipelinePosition::Only) {
            std::process::Stdio::inherit()
        } else {
            std::process::Stdio::null()
        });
        cmd.stdout(std::process::Stdio::piped());
        cmd.stderr(std::process::Stdio::piped());

        let mut child = match cmd.spawn() {
            Ok(c) => c,
            Err(e) => return Some(ExecResult::failure(127, format!("{}: {}", name, e))),
        };
        // Open a pidfd (Linux) for race-free direct-child kill via wait_or_kill.
        let kill_target = crate::pidfd::KillTarget::from_child(&child);

        // Stream stdin: copy pipe_stdin → child stdin in chunks (bounded memory)
        let stdin_task: Option<tokio::task::JoinHandle<()>> = if let Some(mut pipe_in) = ctx.pipe_stdin.take() {
            child.stdin.take().map(|mut child_stdin| {
                tokio::spawn(async move {
                    let mut buf = [0u8; 8192];
                    loop {
                        match pipe_in.read(&mut buf).await {
                            Ok(0) => break, // EOF
                            Ok(n) => {
                                if child_stdin.write_all(&buf[..n]).await.is_err() {
                                    break; // child closed stdin
                                }
                            }
                            Err(_) => break,
                        }
                    }
                    // Drop child_stdin signals EOF to child
                })
            })
        } else if let Some(data) = ctx.stdin.take() {
            // Buffered string stdin
            if let Some(mut child_stdin) = child.stdin.take() {
                let _ = child_stdin.write_all(data.as_bytes()).await;
                // Drop child_stdin signals EOF
            }
            None
        } else {
            None
        };

        // Stream stdout: copy child stdout → pipe_stdout in chunks (bounded memory)
        if let Some(mut pipe_out) = ctx.pipe_stdout.take() {
            // Safety: stdout/stderr were set to piped() above, so take() always returns Some
            let Some(mut child_stdout) = child.stdout.take() else {
                return Some(ExecResult::failure(1, "internal: stdout not available"));
            };
            let Some(mut child_stderr_reader) = child.stderr.take() else {
                return Some(ExecResult::failure(1, "internal: stderr not available"));
            };
            // Stream stderr to the kernel's stderr stream (if available) for
            // real-time delivery. Otherwise buffer with a cap.
            let stderr_stream_handle = ctx.stderr.clone();
            let stderr_task = tokio::spawn(async move {
                let mut buf = Vec::new();
                let mut chunk = [0u8; 8192];
                loop {
                    match child_stderr_reader.read(&mut chunk).await {
                        Ok(0) => break,
                        Ok(n) => {
                            if let Some(ref stream) = stderr_stream_handle {
                                // Stream raw bytes — no decode here, lossy decode at drain site
                                stream.write(&chunk[..n]);
                            } else {
                                buf.extend_from_slice(&chunk[..n]);
                            }
                        }
                        Err(_) => break,
                    }
                }
                if stderr_stream_handle.is_some() {
                    // Already streamed — return empty
                    String::new()
                } else {
                    String::from_utf8_lossy(&buf).into_owned()
                }
            });

            // Copy child stdout → pipe_stdout in chunks
            let mut buf = [0u8; 8192];
            loop {
                match child_stdout.read(&mut buf).await {
                    Ok(0) => break,
                    Ok(n) => {
                        if pipe_out.write_all(&buf[..n]).await.is_err() {
                            break; // next stage dropped its reader (broken pipe)
                        }
                    }
                    Err(_) => break,
                }
            }
            let _ = pipe_out.shutdown().await;
            drop(pipe_out);
            let cancel = ctx.cancel.clone();
            let status = crate::kernel::wait_or_kill(
                &mut child,
                kill_target.as_ref(),
                &cancel,
                std::time::Duration::from_secs(2),
            ).await;
            // Child has exited (naturally or via kill). Abort stdin/stderr drain tasks.
            if let Some(task) = stdin_task { task.abort(); }
            stderr_task.abort();
            let stderr = stderr_task.await.unwrap_or_default();
            let code = status.map(|s| s.code().unwrap_or(1) as i64).unwrap_or(1);
            // Output was streamed to pipe, so result.out is empty
            Some(ExecResult::from_output(code, String::new(), stderr))
        } else {
            // No pipe_stdout — last stage or non-pipeline.
            // Use spill-aware collection if output limits are configured.
            let Some(child_stdout) = child.stdout.take() else {
                return Some(ExecResult::failure(1, "internal: stdout not available"));
            };
            let Some(child_stderr) = child.stderr.take() else {
                return Some(ExecResult::failure(1, "internal: stderr not available"));
            };

            // Always use spill_aware_collect — it handles both limited and
            // unlimited modes, and correctly streams stderr to ctx.stderr.
            // (wait_with_output would bypass stderr streaming.)
            let (stdout, stderr, did_spill) = crate::output_limit::spill_aware_collect(
                child_stdout,
                child_stderr,
                ctx.stderr.clone(),
                &ctx.output_limit,
            ).await;

            let cancel = ctx.cancel.clone();
            let status = crate::kernel::wait_or_kill(
                &mut child,
                kill_target.as_ref(),
                &cancel,
                std::time::Duration::from_secs(2),
            ).await;
            if let Some(task) = stdin_task { task.abort(); }
            let code = status.map(|s| s.code().unwrap_or(1) as i64).unwrap_or(1);
            let mut result = ExecResult::from_output(code, stdout, stderr);
            result.did_spill = did_spill;
            Some(result)
        }
    }
}

#[cfg(test)]
#[async_trait]
impl CommandDispatcher for BackendDispatcher {
    async fn dispatch(&self, cmd: &Command, ctx: &mut ExecContext) -> Result<ExecResult> {
        // Handle built-in true/false
        match cmd.name.as_str() {
            "true" => return Ok(ExecResult::success("")),
            "false" => return Ok(ExecResult::failure(1, "")),
            _ => {}
        }

        // Build tool args with schema-aware parsing (sync — no command substitution)
        let schema = self.tools.get(&cmd.name).map(|t| t.schema());
        let mut tool_args = build_tool_args(&cmd.args, ctx, schema.as_ref());
        let output_format = extract_output_format(&mut tool_args, schema.as_ref());

        // Execute via backend
        let backend = ctx.backend.clone();
        let result = match backend.call_tool(&cmd.name, tool_args, ctx).await {
            Ok(tool_result) => {
                let mut exec = ExecResult::from_output(
                    tool_result.code as i64,
                    tool_result.stdout,
                    tool_result.stderr,
                );
                exec.set_output(tool_result.output);
                exec.content_type = tool_result.content_type;
                exec.baggage = tool_result.baggage;
                // Restore structured data from ToolResult (preserved through backend roundtrip)
                if let Some(json_data) = tool_result.data {
                    exec.data = Some(Value::Json(json_data));
                }
                exec
            }
            Err(BackendError::ToolNotFound(_)) => {
                // Fall back to external command execution
                match self.try_external(&cmd.name, &cmd.args, ctx).await {
                    Some(result) => result,
                    None => ExecResult::failure(127, format!("command not found: {}", cmd.name)),
                }
            }
            Err(e) => ExecResult::failure(127, e.to_string()),
        };

        // Apply output format transform
        let result = match output_format {
            Some(format) => apply_output_format(result, format),
            None => result,
        };

        Ok(result)
    }

    /// BackendDispatcher is stateless, so a fork is just a clone.
    async fn fork(&self) -> Arc<dyn CommandDispatcher> {
        Arc::new(Self { tools: Arc::clone(&self.tools) })
    }
}