Skip to main content

kaish_kernel/
output_limit.rs

1//! Configurable output size limits for agent safety.
2//!
3//! When output exceeds the threshold, the full output is written to a
4//! spill file on the real filesystem and `ExecResult.out` is truncated
5//! with a pointer message. The agent can then selectively read the file.
6//!
7//! Per-mode defaults: MCP kernels get a 64KB limit, REPL/test kernels
8//! are unlimited. Runtime-switchable via the `kaish-output-limit` builtin.
9
10use std::path::PathBuf;
11
12use crate::interpreter::ExecResult;
13use crate::paths;
14
15/// Default output limit for MCP mode (64KB).
16const DEFAULT_MCP_LIMIT: usize = 64 * 1024;
17
18/// Default head preview size (bytes of output start to keep).
19const DEFAULT_HEAD_BYTES: usize = 1024;
20
21/// Default tail preview size (bytes of output end to keep).
22const DEFAULT_TAIL_BYTES: usize = 512;
23
24/// Configurable output size limit.
25///
26/// Threaded through `KernelConfig` → `ExecContext` → kernel pipeline execution.
27/// Runtime-mutable via the `kaish-output-limit` builtin.
28#[derive(Debug, Clone)]
29pub struct OutputLimitConfig {
30    max_bytes: Option<usize>,
31    head_bytes: usize,
32    tail_bytes: usize,
33}
34
35impl OutputLimitConfig {
36    /// No limiting — REPL/embedded/test default.
37    pub fn none() -> Self {
38        Self {
39            max_bytes: None,
40            head_bytes: DEFAULT_HEAD_BYTES,
41            tail_bytes: DEFAULT_TAIL_BYTES,
42        }
43    }
44
45    /// MCP-safe defaults: 64KB limit, 1KB head, 512B tail.
46    pub fn mcp() -> Self {
47        Self {
48            max_bytes: Some(DEFAULT_MCP_LIMIT),
49            head_bytes: DEFAULT_HEAD_BYTES,
50            tail_bytes: DEFAULT_TAIL_BYTES,
51        }
52    }
53
54    /// Whether output limiting is enabled.
55    pub fn is_enabled(&self) -> bool {
56        self.max_bytes.is_some()
57    }
58
59    /// The maximum output size in bytes, if set.
60    pub fn max_bytes(&self) -> Option<usize> {
61        self.max_bytes
62    }
63
64    /// Bytes of output head to preserve in truncated result.
65    pub fn head_bytes(&self) -> usize {
66        self.head_bytes
67    }
68
69    /// Bytes of output tail to preserve in truncated result.
70    pub fn tail_bytes(&self) -> usize {
71        self.tail_bytes
72    }
73
74    /// Set the output limit. `None` disables limiting.
75    pub fn set_limit(&mut self, max: Option<usize>) {
76        self.max_bytes = max;
77    }
78
79    /// Set the head preview size.
80    pub fn set_head_bytes(&mut self, bytes: usize) {
81        self.head_bytes = bytes;
82    }
83
84    /// Set the tail preview size.
85    pub fn set_tail_bytes(&mut self, bytes: usize) {
86        self.tail_bytes = bytes;
87    }
88}
89
90/// Result of a spill operation.
91pub struct SpillResult {
92    pub path: PathBuf,
93    pub total_bytes: usize,
94}
95
96/// Check if the result output exceeds the limit and spill to disk if so.
97///
98/// Mutates `result.out` in place: replaces with head+tail+pointer message.
99/// Returns `Some(SpillResult)` if a spill file was written, `None` otherwise.
100///
101/// If the filesystem write fails, the result is replaced with an error.
102/// Fail fast: truncating output silently could corrupt structured data
103/// that an agent acts on. An explicit error is safer.
104pub async fn spill_if_needed(
105    result: &mut ExecResult,
106    config: &OutputLimitConfig,
107) -> Option<SpillResult> {
108    let max = config.max_bytes?;
109    let total = result.out.len();
110    if total <= max {
111        return None;
112    }
113
114    match write_spill_file(result.out.as_bytes()).await {
115        Ok((path, written)) => {
116            result.out = build_truncated_output(&result.out, config, &path, total);
117            Some(SpillResult {
118                path,
119                total_bytes: written,
120            })
121        }
122        Err(e) => {
123            // Spill failed — replace with error rather than sending truncated/corrupt data
124            tracing::error!("output spill failed: {}", e);
125            *result = ExecResult::failure(1, format!(
126                "output exceeded {} byte limit ({} bytes) and spill to disk failed: {}",
127                max, total, e
128            ));
129            None
130        }
131    }
132}
133
134/// Collect stdout from a child process with spill-aware size limiting.
135///
136/// Two-phase approach:
137/// 1. Detection window (up to 1s): accumulate in memory
138/// 2. If still running after 1s and over limit: stream to spill file
139///
140/// Returns `(stdout_string, stderr_string)`.
141pub async fn spill_aware_collect(
142    mut stdout: tokio::process::ChildStdout,
143    mut stderr_reader: tokio::process::ChildStderr,
144    stderr_stream: Option<crate::scheduler::StderrStream>,
145    config: &OutputLimitConfig,
146) -> (String, String) {
147    let max = config.max_bytes.unwrap_or(usize::MAX);
148
149    // Spawn stderr collection
150    let stderr_task = tokio::spawn(async move {
151        collect_stderr(&mut stderr_reader, stderr_stream.as_ref()).await
152    });
153
154    let stdout_result = collect_stdout_with_spill(&mut stdout, max, config).await;
155
156    let stderr = stderr_task.await.unwrap_or_default();
157    (stdout_result, stderr)
158}
159
160/// Collect stderr (same pattern as existing dispatch code).
161async fn collect_stderr(
162    reader: &mut tokio::process::ChildStderr,
163    stream: Option<&crate::scheduler::StderrStream>,
164) -> String {
165    use tokio::io::AsyncReadExt;
166
167    let mut buf = Vec::new();
168    let mut chunk = [0u8; 8192];
169    loop {
170        match reader.read(&mut chunk).await {
171            Ok(0) => break,
172            Ok(n) => {
173                if let Some(s) = stream {
174                    s.write(&chunk[..n]);
175                } else {
176                    buf.extend_from_slice(&chunk[..n]);
177                }
178            }
179            Err(_) => break,
180        }
181    }
182    if stream.is_some() {
183        String::new()
184    } else {
185        String::from_utf8_lossy(&buf).into_owned()
186    }
187}
188
189/// Collect stdout with two-phase spill detection.
190async fn collect_stdout_with_spill(
191    stdout: &mut tokio::process::ChildStdout,
192    max_bytes: usize,
193    config: &OutputLimitConfig,
194) -> String {
195    use tokio::io::AsyncReadExt;
196    use tokio::time::{sleep, Duration};
197
198    let mut buffer = Vec::new();
199    let mut chunk = [0u8; 8192];
200    let deadline = sleep(Duration::from_secs(1));
201    tokio::pin!(deadline);
202
203    // Phase 1: Detection window (up to 1s)
204    loop {
205        tokio::select! {
206            biased;
207            result = stdout.read(&mut chunk) => {
208                match result {
209                    Ok(0) => {
210                        // EOF — command finished within detection window.
211                        // Post-hoc spill check happens in the caller.
212                        return String::from_utf8_lossy(&buffer).into_owned();
213                    }
214                    Ok(n) => {
215                        buffer.extend_from_slice(&chunk[..n]);
216                        // Break early if already over limit — don't OOM during detection window
217                        if buffer.len() > max_bytes {
218                            break;
219                        }
220                    }
221                    Err(_) => {
222                        return String::from_utf8_lossy(&buffer).into_owned();
223                    }
224                }
225            }
226            () = &mut deadline => {
227                // 1s elapsed, command still running
228                break;
229            }
230        }
231    }
232
233    // Phase 2: Check if we should switch to spill mode
234    if buffer.len() > max_bytes {
235        // Already over limit — spill what we have and stream the rest to disk
236        match stream_to_spill(&buffer, stdout, config).await {
237            Ok(result) => return result,
238            Err(e) => {
239                // Spill failed — return error. Don't continue accumulating.
240                // Dropping stdout closes the pipe, which sends SIGPIPE to the child.
241                tracing::error!("streaming spill failed: {}", e);
242                let size = buffer.len();
243                drop(buffer);
244                return format!(
245                    "ERROR: output exceeded {} byte limit ({} bytes buffered) and spill to disk failed: {}",
246                    max_bytes, size, e
247                );
248            }
249        }
250    }
251
252    // Continue collecting (under limit so far)
253    // Check size after each chunk
254    loop {
255        match stdout.read(&mut chunk).await {
256            Ok(0) => break,
257            Ok(n) => {
258                buffer.extend_from_slice(&chunk[..n]);
259                // Check if we've exceeded limit mid-stream
260                if buffer.len() > max_bytes {
261                    match stream_to_spill(&buffer, stdout, config).await {
262                        Ok(result) => return result,
263                        Err(e) => {
264                            tracing::error!("streaming spill failed: {}", e);
265                            let size = buffer.len();
266                            drop(buffer);
267                            return format!(
268                                "ERROR: output exceeded {} byte limit ({} bytes buffered) and spill to disk failed: {}",
269                                max_bytes, size, e
270                            );
271                        }
272                    }
273                }
274            }
275            Err(_) => break,
276        }
277    }
278
279    String::from_utf8_lossy(&buffer).into_owned()
280}
281
282/// Write buffered data + remaining stdout to a spill file, return truncated result.
283async fn stream_to_spill(
284    buffer: &[u8],
285    stdout: &mut tokio::process::ChildStdout,
286    config: &OutputLimitConfig,
287) -> Result<String, std::io::Error> {
288    use tokio::io::AsyncReadExt;
289
290    let spill_dir = paths::spill_dir();
291    tokio::fs::create_dir_all(&spill_dir).await?;
292
293    let filename = generate_spill_filename();
294    let path = spill_dir.join(&filename);
295    let mut file = tokio::fs::File::create(&path).await?;
296
297    // Write buffered data
298    use tokio::io::AsyncWriteExt;
299    file.write_all(buffer).await?;
300    let mut total = buffer.len();
301
302    // Stream remaining chunks directly to file
303    let mut chunk = [0u8; 8192];
304    loop {
305        match stdout.read(&mut chunk).await {
306            Ok(0) => break,
307            Ok(n) => {
308                file.write_all(&chunk[..n]).await?;
309                total += n;
310            }
311            Err(_) => break,
312        }
313    }
314    file.flush().await?;
315
316    // Read head + tail for the truncated message
317    let full = String::from_utf8_lossy(buffer);
318    let head = truncate_to_char_boundary(&full, config.head_bytes);
319
320    // For tail, read from the spill file if buffer doesn't cover the end
321    let tail: String = if total <= buffer.len() {
322        let full_str = String::from_utf8_lossy(buffer);
323        tail_from_str(&full_str, config.tail_bytes).to_string()
324    } else {
325        read_tail_from_file(&path, config.tail_bytes).await.unwrap_or_default()
326    };
327
328    let path_str = path.to_string_lossy();
329    Ok(format!(
330        "{}\n...\n{}\n[output truncated: {} bytes total — full output at {}]",
331        head, tail, total, path_str
332    ))
333}
334
335/// Write output bytes to a new spill file. Returns (path, bytes_written).
336async fn write_spill_file(data: &[u8]) -> Result<(PathBuf, usize), std::io::Error> {
337    let dir = paths::spill_dir();
338    tokio::fs::create_dir_all(&dir).await?;
339
340    let filename = generate_spill_filename();
341    let path = dir.join(filename);
342    tokio::fs::write(&path, data).await?;
343    Ok((path, data.len()))
344}
345
346/// Build the truncated output string with head, tail, and pointer.
347fn build_truncated_output(
348    full: &str,
349    config: &OutputLimitConfig,
350    spill_path: &std::path::Path,
351    total_bytes: usize,
352) -> String {
353    let head = truncate_to_char_boundary(full, config.head_bytes);
354    let tail = tail_from_str(full, config.tail_bytes);
355    let path_str = spill_path.to_string_lossy();
356    format!(
357        "{}\n...\n{}\n[output truncated: {} bytes total — full output at {}]",
358        head, tail, total_bytes, path_str
359    )
360}
361
362/// Truncate a string to at most `max_bytes`, respecting UTF-8 char boundaries.
363fn truncate_to_char_boundary(s: &str, max_bytes: usize) -> &str {
364    if s.len() <= max_bytes {
365        return s;
366    }
367    // Find the last char boundary at or before max_bytes
368    let mut end = max_bytes;
369    while end > 0 && !s.is_char_boundary(end) {
370        end -= 1;
371    }
372    &s[..end]
373}
374
375/// Get the last `max_bytes` of a string, respecting UTF-8 char boundaries.
376fn tail_from_str(s: &str, max_bytes: usize) -> &str {
377    if s.len() <= max_bytes {
378        return s;
379    }
380    let start = s.len() - max_bytes;
381    let mut adjusted = start;
382    while adjusted < s.len() && !s.is_char_boundary(adjusted) {
383        adjusted += 1;
384    }
385    &s[adjusted..]
386}
387
388/// Read the last N bytes from a file for tail preview.
389async fn read_tail_from_file(path: &std::path::Path, max_bytes: usize) -> Result<String, std::io::Error> {
390    use tokio::io::{AsyncReadExt, AsyncSeekExt};
391
392    let mut file = tokio::fs::File::open(path).await?;
393    let metadata = file.metadata().await?;
394    let len = metadata.len() as usize;
395
396    if len <= max_bytes {
397        let mut buf = Vec::new();
398        file.read_to_end(&mut buf).await?;
399        return Ok(String::from_utf8_lossy(&buf).into_owned());
400    }
401
402    let offset = len - max_bytes;
403    file.seek(std::io::SeekFrom::Start(offset as u64)).await?;
404    let mut buf = vec![0u8; max_bytes];
405    let n = file.read(&mut buf).await?;
406    buf.truncate(n);
407
408    // Adjust to char boundary
409    let s = String::from_utf8_lossy(&buf);
410    Ok(s.into_owned())
411}
412
413/// Generate a unique spill filename using timestamp, PID, and monotonic counter.
414fn generate_spill_filename() -> String {
415    use std::sync::atomic::{AtomicUsize, Ordering};
416    use std::time::SystemTime;
417
418    static COUNTER: AtomicUsize = AtomicUsize::new(0);
419    let seq = COUNTER.fetch_add(1, Ordering::Relaxed);
420    let ts = SystemTime::now()
421        .duration_since(SystemTime::UNIX_EPOCH)
422        .unwrap_or_default();
423    let pid = std::process::id();
424    format!("spill-{}.{}-{}-{}.txt", ts.as_secs(), ts.subsec_nanos(), pid, seq)
425}
426
427/// Parse a size string with optional K/M suffix into bytes.
428///
429/// Accepts: "64K", "64k", "1M", "1m", "65536" (raw bytes).
430pub fn parse_size(s: &str) -> Result<usize, String> {
431    let s = s.trim();
432    if s.is_empty() {
433        return Err("empty size string".to_string());
434    }
435
436    let (num_str, multiplier) = if let Some(n) = s.strip_suffix('K').or_else(|| s.strip_suffix('k')) {
437        (n, 1024)
438    } else if let Some(n) = s.strip_suffix('M').or_else(|| s.strip_suffix('m')) {
439        (n, 1024 * 1024)
440    } else {
441        (s, 1)
442    };
443
444    let num: usize = num_str
445        .parse()
446        .map_err(|_| format!("invalid size: {}", s))?;
447
448    Ok(num * multiplier)
449}
450
451#[cfg(test)]
452mod tests {
453    use super::*;
454
455    #[test]
456    fn test_none_is_disabled() {
457        let config = OutputLimitConfig::none();
458        assert!(!config.is_enabled());
459        assert_eq!(config.max_bytes(), None);
460    }
461
462    #[test]
463    fn test_mcp_is_enabled() {
464        let config = OutputLimitConfig::mcp();
465        assert!(config.is_enabled());
466        assert_eq!(config.max_bytes(), Some(64 * 1024));
467        assert_eq!(config.head_bytes(), 1024);
468        assert_eq!(config.tail_bytes(), 512);
469    }
470
471    #[test]
472    fn test_set_limit() {
473        let mut config = OutputLimitConfig::none();
474        assert!(!config.is_enabled());
475
476        config.set_limit(Some(1024));
477        assert!(config.is_enabled());
478        assert_eq!(config.max_bytes(), Some(1024));
479
480        config.set_limit(None);
481        assert!(!config.is_enabled());
482    }
483
484    #[test]
485    fn test_set_head_tail() {
486        let mut config = OutputLimitConfig::mcp();
487        config.set_head_bytes(2048);
488        config.set_tail_bytes(1024);
489        assert_eq!(config.head_bytes(), 2048);
490        assert_eq!(config.tail_bytes(), 1024);
491    }
492
493    #[test]
494    fn test_parse_size() {
495        assert_eq!(parse_size("64K").unwrap(), 64 * 1024);
496        assert_eq!(parse_size("64k").unwrap(), 64 * 1024);
497        assert_eq!(parse_size("1M").unwrap(), 1024 * 1024);
498        assert_eq!(parse_size("1m").unwrap(), 1024 * 1024);
499        assert_eq!(parse_size("65536").unwrap(), 65536);
500        assert!(parse_size("").is_err());
501        assert!(parse_size("abc").is_err());
502    }
503
504    #[test]
505    fn test_truncate_to_char_boundary() {
506        assert_eq!(truncate_to_char_boundary("hello", 10), "hello");
507        assert_eq!(truncate_to_char_boundary("hello", 3), "hel");
508        // Multi-byte: "日" is 3 bytes
509        assert_eq!(truncate_to_char_boundary("日本語", 3), "日");
510        assert_eq!(truncate_to_char_boundary("日本語", 4), "日");
511        assert_eq!(truncate_to_char_boundary("日本語", 6), "日本");
512    }
513
514    #[test]
515    fn test_tail_from_str() {
516        assert_eq!(tail_from_str("hello", 10), "hello");
517        assert_eq!(tail_from_str("hello", 3), "llo");
518        // Multi-byte
519        assert_eq!(tail_from_str("日本語", 3), "語");
520        assert_eq!(tail_from_str("日本語", 6), "本語");
521    }
522
523    #[test]
524    fn test_generate_spill_filename() {
525        let name = generate_spill_filename();
526        assert!(name.starts_with("spill-"));
527        assert!(name.ends_with(".txt"));
528    }
529
530    #[tokio::test]
531    async fn test_spill_if_needed_under_limit() {
532        let config = OutputLimitConfig::mcp();
533        let mut result = ExecResult::success("short output");
534        let spill = spill_if_needed(&mut result, &config).await;
535        assert!(spill.is_none());
536        assert_eq!(result.out, "short output");
537    }
538
539    #[tokio::test]
540    async fn test_spill_if_needed_over_limit() {
541        let config = OutputLimitConfig {
542            max_bytes: Some(100),
543            head_bytes: 20,
544            tail_bytes: 10,
545        };
546        let big_output = "x".repeat(200);
547        let mut result = ExecResult::success(big_output);
548        let spill = spill_if_needed(&mut result, &config).await;
549        assert!(spill.is_some());
550
551        let spill = spill.unwrap();
552        assert_eq!(spill.total_bytes, 200);
553        assert!(spill.path.exists());
554
555        // Verify truncated output
556        assert!(result.out.contains("..."));
557        assert!(result.out.contains("[output truncated: 200 bytes total"));
558        assert!(result.out.contains(&spill.path.to_string_lossy().to_string()));
559
560        // Verify head (first 20 bytes)
561        assert!(result.out.starts_with(&"x".repeat(20)));
562
563        // Verify spill file has full content
564        let spill_content = tokio::fs::read_to_string(&spill.path).await.unwrap();
565        assert_eq!(spill_content.len(), 200);
566
567        // Clean up
568        let _ = tokio::fs::remove_file(&spill.path).await;
569    }
570
571    #[tokio::test]
572    async fn test_spill_if_needed_disabled() {
573        let config = OutputLimitConfig::none();
574        let big_output = "x".repeat(200);
575        let mut result = ExecResult::success(big_output.clone());
576        let spill = spill_if_needed(&mut result, &config).await;
577        assert!(spill.is_none());
578        assert_eq!(result.out, big_output);
579    }
580
581    #[test]
582    fn test_build_truncated_output() {
583        let config = OutputLimitConfig {
584            max_bytes: Some(100),
585            head_bytes: 5,
586            tail_bytes: 3,
587        };
588        let full = "abcdefghijklmnop";
589        let path = PathBuf::from("/tmp/test-spill.txt");
590        let result = build_truncated_output(full, &config, &path, 16);
591        assert!(result.starts_with("abcde"));
592        assert!(result.contains("..."));
593        assert!(result.contains("nop"));
594        assert!(result.contains("[output truncated: 16 bytes total — full output at /tmp/test-spill.txt]"));
595    }
596
597    #[tokio::test]
598    async fn test_kernel_mcp_truncates_large_output() {
599        use crate::kernel::{Kernel, KernelConfig};
600
601        // MCP config has 64K limit by default — use a smaller limit for testing
602        let config = KernelConfig::mcp()
603            .with_output_limit(OutputLimitConfig {
604                max_bytes: Some(200),
605                head_bytes: 50,
606                tail_bytes: 30,
607            });
608        let kernel = Kernel::new(config).expect("kernel creation");
609
610        // seq 1 10000 produces lots of output
611        let result = kernel.execute("seq 1 10000").await.expect("execute");
612        assert!(result.out.contains("[output truncated:"));
613        assert!(result.out.contains("full output at"));
614        // Head should contain the first numbers
615        assert!(result.out.starts_with("1\n"));
616    }
617
618    #[tokio::test]
619    async fn test_kernel_repl_no_truncation() {
620        use crate::kernel::{Kernel, KernelConfig};
621
622        // REPL has no limit
623        let config = KernelConfig::repl();
624        let kernel = Kernel::new(config).expect("kernel creation");
625
626        let result = kernel.execute("seq 1 100").await.expect("execute");
627        assert!(!result.out.contains("[output truncated:"));
628        assert!(result.out.contains("100"));
629    }
630
631    #[tokio::test]
632    async fn test_kernel_builtin_truncation() {
633        use crate::kernel::{Kernel, KernelConfig};
634
635        // Builtins go through post-hoc spill check
636        let config = KernelConfig::mcp()
637            .with_output_limit(OutputLimitConfig {
638                max_bytes: Some(100),
639                head_bytes: 30,
640                tail_bytes: 20,
641            });
642        let kernel = Kernel::new(config).expect("kernel creation");
643
644        // echo with a large string
645        let big = "x".repeat(200);
646        let result = kernel.execute(&format!("echo '{}'", big)).await.expect("execute");
647        assert!(result.out.contains("[output truncated:"));
648    }
649}