Skip to main content

kaish_kernel/
output_limit.rs

1//! Configurable output size limits for agent safety.
2//!
3//! When output exceeds the threshold, the full output is written to a
4//! spill file on the real filesystem and `ExecResult.out` is truncated
5//! with a pointer message. The agent can then selectively read the file.
6//!
7//! Per-mode defaults: MCP kernels get an 8KB limit, REPL/test kernels
8//! are unlimited. Runtime-switchable via the `kaish-output-limit` builtin.
9
10use std::path::PathBuf;
11
12use crate::interpreter::ExecResult;
13use crate::paths;
14
15/// Default output limit for MCP mode (8KB).
16const DEFAULT_MCP_LIMIT: usize = 8 * 1024;
17
18/// Default head preview size (bytes of output start to keep).
19const DEFAULT_HEAD_BYTES: usize = 1024;
20
21/// Default tail preview size (bytes of output end to keep).
22const DEFAULT_TAIL_BYTES: usize = 512;
23
24/// Configurable output size limit.
25///
26/// Threaded through `KernelConfig` → `ExecContext` → kernel pipeline execution.
27/// Runtime-mutable via the `kaish-output-limit` builtin.
28#[derive(Debug, Clone)]
29pub struct OutputLimitConfig {
30    max_bytes: Option<usize>,
31    head_bytes: usize,
32    tail_bytes: usize,
33}
34
35impl OutputLimitConfig {
36    /// No limiting — REPL/embedded/test default.
37    pub fn none() -> Self {
38        Self {
39            max_bytes: None,
40            head_bytes: DEFAULT_HEAD_BYTES,
41            tail_bytes: DEFAULT_TAIL_BYTES,
42        }
43    }
44
45    /// Default limit used by `on` subcommand and `set -o output-limit`.
46    pub fn default_limit() -> usize {
47        DEFAULT_MCP_LIMIT
48    }
49
50    /// MCP-safe defaults: 8KB limit, 1KB head, 512B tail.
51    pub fn mcp() -> Self {
52        Self {
53            max_bytes: Some(DEFAULT_MCP_LIMIT),
54            head_bytes: DEFAULT_HEAD_BYTES,
55            tail_bytes: DEFAULT_TAIL_BYTES,
56        }
57    }
58
59    /// Whether output limiting is enabled.
60    pub fn is_enabled(&self) -> bool {
61        self.max_bytes.is_some()
62    }
63
64    /// The maximum output size in bytes, if set.
65    pub fn max_bytes(&self) -> Option<usize> {
66        self.max_bytes
67    }
68
69    /// Bytes of output head to preserve in truncated result.
70    pub fn head_bytes(&self) -> usize {
71        self.head_bytes
72    }
73
74    /// Bytes of output tail to preserve in truncated result.
75    pub fn tail_bytes(&self) -> usize {
76        self.tail_bytes
77    }
78
79    /// Set the output limit. `None` disables limiting.
80    pub fn set_limit(&mut self, max: Option<usize>) {
81        self.max_bytes = max;
82    }
83
84    /// Set the head preview size.
85    pub fn set_head_bytes(&mut self, bytes: usize) {
86        self.head_bytes = bytes;
87    }
88
89    /// Set the tail preview size.
90    pub fn set_tail_bytes(&mut self, bytes: usize) {
91        self.tail_bytes = bytes;
92    }
93}
94
95/// Result of a spill operation.
96pub struct SpillResult {
97    pub path: PathBuf,
98    pub total_bytes: usize,
99}
100
101/// Check if the result output exceeds the limit and spill to disk if so.
102///
103/// Mutates `result.out` in place: replaces with head+tail+pointer message.
104/// Returns `Some(SpillResult)` if a spill file was written, `None` otherwise.
105///
106/// If the filesystem write fails, the result is replaced with an error.
107/// Fail fast: truncating output silently could corrupt structured data
108/// that an agent acts on. An explicit error is safer.
109pub async fn spill_if_needed(
110    result: &mut ExecResult,
111    config: &OutputLimitConfig,
112) -> Option<SpillResult> {
113    let max = config.max_bytes?;
114
115    // If result.out is already populated (external commands), check it directly
116    if !result.out.is_empty() {
117        let total = result.out.len();
118        if total <= max {
119            return None;
120        }
121        return spill_string(result, config, max).await;
122    }
123
124    // If we have structured OutputData, estimate size before materializing
125    if let Some(ref output) = result.output {
126        let estimate = output.estimated_byte_size();
127        if estimate <= max {
128            // Small enough — materialize normally
129            result.out = output.to_canonical_string();
130            // Re-check actual size (estimate is a lower bound)
131            if result.out.len() <= max {
132                return None;
133            }
134            return spill_string(result, config, max).await;
135        }
136
137        // Large — stream directly to spill file, never holding full String
138        return spill_output_data(result, config, max).await;
139    }
140
141    None
142}
143
144/// Spill an already-materialized string in result.out.
145async fn spill_string(
146    result: &mut ExecResult,
147    config: &OutputLimitConfig,
148    max: usize,
149) -> Option<SpillResult> {
150    let total = result.out.len();
151    match write_spill_file(result.out.as_bytes()).await {
152        Ok((path, written)) => {
153            result.out = build_truncated_output(&result.out, config, &path, total);
154            result.did_spill = true;
155            Some(SpillResult {
156                path,
157                total_bytes: written,
158            })
159        }
160        Err(e) => {
161            tracing::error!("output spill failed: {}", e);
162            *result = ExecResult::failure(1, format!(
163                "output exceeded {} byte limit ({} bytes) and spill to disk failed: {}",
164                max, total, e
165            ));
166            None
167        }
168    }
169}
170
171/// Stream OutputData directly to a spill file without materializing the full String.
172async fn spill_output_data(
173    result: &mut ExecResult,
174    config: &OutputLimitConfig,
175    max: usize,
176) -> Option<SpillResult> {
177    let output = result.output.as_ref()?;
178
179    let dir = paths::spill_dir();
180    if let Err(e) = tokio::fs::create_dir_all(&dir).await {
181        tracing::error!("output spill dir creation failed: {}", e);
182        *result = ExecResult::failure(1, format!(
183            "output exceeded {} byte limit and spill dir creation failed: {}", max, e
184        ));
185        return None;
186    }
187
188    let filename = generate_spill_filename();
189    let path = dir.join(&filename);
190
191    // Write OutputData directly to file via write_canonical
192    let total = match std::fs::File::create(&path) {
193        Ok(mut file) => {
194            match output.write_canonical(&mut file, None) {
195                Ok(n) => n,
196                Err(e) => {
197                    tracing::error!("output spill write failed: {}", e);
198                    *result = ExecResult::failure(1, format!(
199                        "output exceeded {} byte limit and spill to disk failed: {}", max, e
200                    ));
201                    return None;
202                }
203            }
204        }
205        Err(e) => {
206            tracing::error!("output spill file creation failed: {}", e);
207            *result = ExecResult::failure(1, format!(
208                "output exceeded {} byte limit and spill to disk failed: {}", max, e
209            ));
210            return None;
211        }
212    };
213
214    // Read head and tail from the spill file for the truncated preview
215    let head = read_head_from_file(&path, config.head_bytes).await.unwrap_or_default();
216    let tail = read_tail_from_file(&path, config.tail_bytes).await.unwrap_or_default();
217    let path_str = path.to_string_lossy();
218
219    result.out = format!(
220        "{}\n...\n{}\n[output truncated: {} bytes total — full output at {}]",
221        head, tail, total, path_str
222    );
223    result.did_spill = true;
224
225    Some(SpillResult {
226        path,
227        total_bytes: total,
228    })
229}
230
231/// Collect stdout from a child process with spill-aware size limiting.
232///
233/// Two-phase approach:
234/// 1. Detection window (up to 1s): accumulate in memory
235/// 2. If still running after 1s and over limit: stream to spill file
236///
237/// Returns `(stdout_string, stderr_string, did_spill)`.
238pub async fn spill_aware_collect(
239    mut stdout: tokio::process::ChildStdout,
240    mut stderr_reader: tokio::process::ChildStderr,
241    stderr_stream: Option<crate::scheduler::StderrStream>,
242    config: &OutputLimitConfig,
243) -> (String, String, bool) {
244    let max = config.max_bytes.unwrap_or(usize::MAX);
245
246    // Spawn stderr collection
247    let stderr_task = tokio::spawn(async move {
248        collect_stderr(&mut stderr_reader, stderr_stream.as_ref()).await
249    });
250
251    let (stdout_result, did_spill) = collect_stdout_with_spill(&mut stdout, max, config).await;
252
253    let stderr = stderr_task.await.unwrap_or_default();
254    (stdout_result, stderr, did_spill)
255}
256
257/// Collect stderr (same pattern as existing dispatch code).
258async fn collect_stderr(
259    reader: &mut tokio::process::ChildStderr,
260    stream: Option<&crate::scheduler::StderrStream>,
261) -> String {
262    use tokio::io::AsyncReadExt;
263
264    let mut buf = Vec::new();
265    let mut chunk = [0u8; 8192];
266    loop {
267        match reader.read(&mut chunk).await {
268            Ok(0) => break,
269            Ok(n) => {
270                if let Some(s) = stream {
271                    s.write(&chunk[..n]);
272                } else {
273                    buf.extend_from_slice(&chunk[..n]);
274                }
275            }
276            Err(_) => break,
277        }
278    }
279    if stream.is_some() {
280        String::new()
281    } else {
282        String::from_utf8_lossy(&buf).into_owned()
283    }
284}
285
286/// Collect stdout with two-phase spill detection.
287///
288/// Generic over `AsyncRead + Unpin` — works with `ChildStdout` in production
289/// and `DuplexStream` in tests.
290///
291/// Returns `(stdout_string, did_spill)`.
292async fn collect_stdout_with_spill<R: tokio::io::AsyncRead + Unpin>(
293    stdout: &mut R,
294    max_bytes: usize,
295    config: &OutputLimitConfig,
296) -> (String, bool) {
297    use tokio::io::AsyncReadExt;
298    use tokio::time::{sleep, Duration};
299
300    let mut buffer = Vec::new();
301    let mut chunk = [0u8; 8192];
302    let deadline = sleep(Duration::from_secs(1));
303    tokio::pin!(deadline);
304
305    // Phase 1: Detection window (up to 1s)
306    loop {
307        tokio::select! {
308            biased;
309            result = stdout.read(&mut chunk) => {
310                match result {
311                    Ok(0) => {
312                        // EOF — command finished within detection window.
313                        // Post-hoc spill check happens in the caller.
314                        return (String::from_utf8_lossy(&buffer).into_owned(), false);
315                    }
316                    Ok(n) => {
317                        buffer.extend_from_slice(&chunk[..n]);
318                        // Break early if already over limit — don't OOM during detection window
319                        if buffer.len() > max_bytes {
320                            break;
321                        }
322                    }
323                    Err(_) => {
324                        return (String::from_utf8_lossy(&buffer).into_owned(), false);
325                    }
326                }
327            }
328            () = &mut deadline => {
329                // 1s elapsed, command still running
330                break;
331            }
332        }
333    }
334
335    // Phase 2: Check if we should switch to spill mode
336    if buffer.len() > max_bytes {
337        // Already over limit — spill what we have and stream the rest to disk
338        match stream_to_spill(&buffer, stdout, config).await {
339            Ok(result) => return (result, true),
340            Err(e) => {
341                // Spill failed — return error. Don't continue accumulating.
342                // Dropping stdout closes the pipe, which sends SIGPIPE to the child.
343                tracing::error!("streaming spill failed: {}", e);
344                let size = buffer.len();
345                drop(buffer);
346                return (format!(
347                    "ERROR: output exceeded {} byte limit ({} bytes buffered) and spill to disk failed: {}",
348                    max_bytes, size, e
349                ), false);
350            }
351        }
352    }
353
354    // Continue collecting (under limit so far)
355    // Check size after each chunk
356    loop {
357        match stdout.read(&mut chunk).await {
358            Ok(0) => break,
359            Ok(n) => {
360                buffer.extend_from_slice(&chunk[..n]);
361                // Check if we've exceeded limit mid-stream
362                if buffer.len() > max_bytes {
363                    match stream_to_spill(&buffer, stdout, config).await {
364                        Ok(result) => return (result, true),
365                        Err(e) => {
366                            tracing::error!("streaming spill failed: {}", e);
367                            let size = buffer.len();
368                            drop(buffer);
369                            return (format!(
370                                "ERROR: output exceeded {} byte limit ({} bytes buffered) and spill to disk failed: {}",
371                                max_bytes, size, e
372                            ), false);
373                        }
374                    }
375                }
376            }
377            Err(_) => break,
378        }
379    }
380
381    (String::from_utf8_lossy(&buffer).into_owned(), false)
382}
383
384/// Write buffered data + remaining stdout to a spill file, return truncated result.
385///
386/// Generic over `AsyncRead + Unpin` for testability.
387async fn stream_to_spill<R: tokio::io::AsyncRead + Unpin>(
388    buffer: &[u8],
389    stdout: &mut R,
390    config: &OutputLimitConfig,
391) -> Result<String, std::io::Error> {
392    use tokio::io::AsyncReadExt;
393
394    let spill_dir = paths::spill_dir();
395    tokio::fs::create_dir_all(&spill_dir).await?;
396
397    let filename = generate_spill_filename();
398    let path = spill_dir.join(&filename);
399    let mut file = tokio::fs::File::create(&path).await?;
400
401    // Write buffered data
402    use tokio::io::AsyncWriteExt;
403    file.write_all(buffer).await?;
404    let mut total = buffer.len();
405
406    // Stream remaining chunks directly to file
407    let mut chunk = [0u8; 8192];
408    loop {
409        match stdout.read(&mut chunk).await {
410            Ok(0) => break,
411            Ok(n) => {
412                file.write_all(&chunk[..n]).await?;
413                total += n;
414            }
415            Err(_) => break,
416        }
417    }
418    file.flush().await?;
419
420    // Read head + tail for the truncated message
421    let full = String::from_utf8_lossy(buffer);
422    let head = truncate_to_char_boundary(&full, config.head_bytes);
423
424    // For tail, read from the spill file if buffer doesn't cover the end
425    let tail: String = if total <= buffer.len() {
426        let full_str = String::from_utf8_lossy(buffer);
427        tail_from_str(&full_str, config.tail_bytes).to_string()
428    } else {
429        read_tail_from_file(&path, config.tail_bytes).await.unwrap_or_default()
430    };
431
432    let path_str = path.to_string_lossy();
433    Ok(format!(
434        "{}\n...\n{}\n[output truncated: {} bytes total — full output at {}]",
435        head, tail, total, path_str
436    ))
437}
438
439/// Write output bytes to a new spill file. Returns (path, bytes_written).
440async fn write_spill_file(data: &[u8]) -> Result<(PathBuf, usize), std::io::Error> {
441    let dir = paths::spill_dir();
442    tokio::fs::create_dir_all(&dir).await?;
443
444    let filename = generate_spill_filename();
445    let path = dir.join(filename);
446    tokio::fs::write(&path, data).await?;
447    Ok((path, data.len()))
448}
449
450/// Build the truncated output string with head, tail, and pointer.
451fn build_truncated_output(
452    full: &str,
453    config: &OutputLimitConfig,
454    spill_path: &std::path::Path,
455    total_bytes: usize,
456) -> String {
457    let head = truncate_to_char_boundary(full, config.head_bytes);
458    let tail = tail_from_str(full, config.tail_bytes);
459    let path_str = spill_path.to_string_lossy();
460    format!(
461        "{}\n...\n{}\n[output truncated: {} bytes total — full output at {}]",
462        head, tail, total_bytes, path_str
463    )
464}
465
466/// Truncate a string to at most `max_bytes`, respecting UTF-8 char boundaries.
467fn truncate_to_char_boundary(s: &str, max_bytes: usize) -> &str {
468    if s.len() <= max_bytes {
469        return s;
470    }
471    // Find the last char boundary at or before max_bytes
472    let mut end = max_bytes;
473    while end > 0 && !s.is_char_boundary(end) {
474        end -= 1;
475    }
476    &s[..end]
477}
478
479/// Get the last `max_bytes` of a string, respecting UTF-8 char boundaries.
480fn tail_from_str(s: &str, max_bytes: usize) -> &str {
481    if s.len() <= max_bytes {
482        return s;
483    }
484    let start = s.len() - max_bytes;
485    let mut adjusted = start;
486    while adjusted < s.len() && !s.is_char_boundary(adjusted) {
487        adjusted += 1;
488    }
489    &s[adjusted..]
490}
491
492/// Read the first N bytes from a file for head preview.
493async fn read_head_from_file(path: &std::path::Path, max_bytes: usize) -> Result<String, std::io::Error> {
494    use tokio::io::AsyncReadExt;
495
496    let mut file = tokio::fs::File::open(path).await?;
497    let mut buf = vec![0u8; max_bytes];
498    let n = file.read(&mut buf).await?;
499    buf.truncate(n);
500
501    let s = String::from_utf8_lossy(&buf);
502    // Truncate to char boundary
503    let result = truncate_to_char_boundary(&s, max_bytes);
504    Ok(result.to_string())
505}
506
507/// Read the last N bytes from a file for tail preview.
508async fn read_tail_from_file(path: &std::path::Path, max_bytes: usize) -> Result<String, std::io::Error> {
509    use tokio::io::{AsyncReadExt, AsyncSeekExt};
510
511    let mut file = tokio::fs::File::open(path).await?;
512    let metadata = file.metadata().await?;
513    let len = metadata.len() as usize;
514
515    if len <= max_bytes {
516        let mut buf = Vec::new();
517        file.read_to_end(&mut buf).await?;
518        return Ok(String::from_utf8_lossy(&buf).into_owned());
519    }
520
521    let offset = len - max_bytes;
522    file.seek(std::io::SeekFrom::Start(offset as u64)).await?;
523    let mut buf = vec![0u8; max_bytes];
524    let n = file.read(&mut buf).await?;
525    buf.truncate(n);
526
527    // Adjust to char boundary
528    let s = String::from_utf8_lossy(&buf);
529    Ok(s.into_owned())
530}
531
532/// Generate a unique spill filename using timestamp, PID, and monotonic counter.
533fn generate_spill_filename() -> String {
534    use std::sync::atomic::{AtomicUsize, Ordering};
535    use std::time::SystemTime;
536
537    static COUNTER: AtomicUsize = AtomicUsize::new(0);
538    let seq = COUNTER.fetch_add(1, Ordering::Relaxed);
539    let ts = SystemTime::now()
540        .duration_since(SystemTime::UNIX_EPOCH)
541        .unwrap_or_default();
542    let pid = std::process::id();
543    format!("spill-{}.{}-{}-{}.txt", ts.as_secs(), ts.subsec_nanos(), pid, seq)
544}
545
546/// Parse a size string with optional K/M suffix into bytes.
547///
548/// Accepts: "64K", "64k", "1M", "1m", "65536" (raw bytes).
549pub fn parse_size(s: &str) -> Result<usize, String> {
550    let s = s.trim();
551    if s.is_empty() {
552        return Err("empty size string".to_string());
553    }
554
555    let (num_str, multiplier) = if let Some(n) = s.strip_suffix('K').or_else(|| s.strip_suffix('k')) {
556        (n, 1024)
557    } else if let Some(n) = s.strip_suffix('M').or_else(|| s.strip_suffix('m')) {
558        (n, 1024 * 1024)
559    } else {
560        (s, 1)
561    };
562
563    let num: usize = num_str
564        .parse()
565        .map_err(|_| format!("invalid size: {}", s))?;
566
567    Ok(num * multiplier)
568}
569
570#[cfg(test)]
571mod tests {
572    use super::*;
573
574    #[test]
575    fn test_none_is_disabled() {
576        let config = OutputLimitConfig::none();
577        assert!(!config.is_enabled());
578        assert_eq!(config.max_bytes(), None);
579    }
580
581    #[test]
582    fn test_mcp_is_enabled() {
583        let config = OutputLimitConfig::mcp();
584        assert!(config.is_enabled());
585        assert_eq!(config.max_bytes(), Some(8 * 1024));
586        assert_eq!(config.head_bytes(), 1024);
587        assert_eq!(config.tail_bytes(), 512);
588    }
589
590    #[test]
591    fn test_set_limit() {
592        let mut config = OutputLimitConfig::none();
593        assert!(!config.is_enabled());
594
595        config.set_limit(Some(1024));
596        assert!(config.is_enabled());
597        assert_eq!(config.max_bytes(), Some(1024));
598
599        config.set_limit(None);
600        assert!(!config.is_enabled());
601    }
602
603    #[test]
604    fn test_set_head_tail() {
605        let mut config = OutputLimitConfig::mcp();
606        config.set_head_bytes(2048);
607        config.set_tail_bytes(1024);
608        assert_eq!(config.head_bytes(), 2048);
609        assert_eq!(config.tail_bytes(), 1024);
610    }
611
612    #[test]
613    fn test_parse_size() {
614        assert_eq!(parse_size("64K").unwrap(), 64 * 1024);
615        assert_eq!(parse_size("64k").unwrap(), 64 * 1024);
616        assert_eq!(parse_size("1M").unwrap(), 1024 * 1024);
617        assert_eq!(parse_size("1m").unwrap(), 1024 * 1024);
618        assert_eq!(parse_size("65536").unwrap(), 65536);
619        assert!(parse_size("").is_err());
620        assert!(parse_size("abc").is_err());
621    }
622
623    #[test]
624    fn test_truncate_to_char_boundary() {
625        assert_eq!(truncate_to_char_boundary("hello", 10), "hello");
626        assert_eq!(truncate_to_char_boundary("hello", 3), "hel");
627        // Multi-byte: "日" is 3 bytes
628        assert_eq!(truncate_to_char_boundary("日本語", 3), "日");
629        assert_eq!(truncate_to_char_boundary("日本語", 4), "日");
630        assert_eq!(truncate_to_char_boundary("日本語", 6), "日本");
631    }
632
633    #[test]
634    fn test_tail_from_str() {
635        assert_eq!(tail_from_str("hello", 10), "hello");
636        assert_eq!(tail_from_str("hello", 3), "llo");
637        // Multi-byte
638        assert_eq!(tail_from_str("日本語", 3), "語");
639        assert_eq!(tail_from_str("日本語", 6), "本語");
640    }
641
642    #[test]
643    fn test_generate_spill_filename() {
644        let name = generate_spill_filename();
645        assert!(name.starts_with("spill-"));
646        assert!(name.ends_with(".txt"));
647    }
648
649    #[tokio::test]
650    async fn test_spill_if_needed_under_limit() {
651        let config = OutputLimitConfig::mcp();
652        let mut result = ExecResult::success("short output");
653        let spill = spill_if_needed(&mut result, &config).await;
654        assert!(spill.is_none());
655        assert_eq!(result.out, "short output");
656        assert!(!result.did_spill);
657    }
658
659    #[tokio::test]
660    async fn test_spill_if_needed_over_limit() {
661        let config = OutputLimitConfig {
662            max_bytes: Some(100),
663            head_bytes: 20,
664            tail_bytes: 10,
665        };
666        let big_output = "x".repeat(200);
667        let mut result = ExecResult::success(big_output);
668        let spill = spill_if_needed(&mut result, &config).await;
669        assert!(spill.is_some());
670        assert!(result.did_spill);
671
672        let spill = spill.unwrap();
673        assert_eq!(spill.total_bytes, 200);
674        assert!(spill.path.exists());
675
676        // Verify truncated output
677        assert!(result.out.contains("..."));
678        assert!(result.out.contains("[output truncated: 200 bytes total"));
679        assert!(result.out.contains(&spill.path.to_string_lossy().to_string()));
680
681        // Verify head (first 20 bytes)
682        assert!(result.out.starts_with(&"x".repeat(20)));
683
684        // Verify spill file has full content
685        let spill_content = tokio::fs::read_to_string(&spill.path).await.unwrap();
686        assert_eq!(spill_content.len(), 200);
687
688        // Clean up
689        let _ = tokio::fs::remove_file(&spill.path).await;
690    }
691
692    #[tokio::test]
693    async fn test_spill_if_needed_disabled() {
694        let config = OutputLimitConfig::none();
695        let big_output = "x".repeat(200);
696        let mut result = ExecResult::success(big_output.clone());
697        let spill = spill_if_needed(&mut result, &config).await;
698        assert!(spill.is_none());
699        assert_eq!(result.out, big_output);
700        assert!(!result.did_spill);
701    }
702
703    #[test]
704    fn test_build_truncated_output() {
705        let config = OutputLimitConfig {
706            max_bytes: Some(100),
707            head_bytes: 5,
708            tail_bytes: 3,
709        };
710        let full = "abcdefghijklmnop";
711        let path = PathBuf::from("/tmp/test-spill.txt");
712        let result = build_truncated_output(full, &config, &path, 16);
713        assert!(result.starts_with("abcde"));
714        assert!(result.contains("..."));
715        assert!(result.contains("nop"));
716        assert!(result.contains("[output truncated: 16 bytes total — full output at /tmp/test-spill.txt]"));
717    }
718
719    #[tokio::test]
720    async fn test_kernel_mcp_truncates_large_output() {
721        use crate::kernel::{Kernel, KernelConfig};
722
723        // MCP config has 8K limit by default — use a smaller limit for testing
724        let config = KernelConfig::mcp()
725            .with_output_limit(OutputLimitConfig {
726                max_bytes: Some(200),
727                head_bytes: 50,
728                tail_bytes: 30,
729            });
730        let kernel = Kernel::new(config).expect("kernel creation");
731
732        // seq 1 10000 produces lots of output
733        let result = kernel.execute("seq 1 10000").await.expect("execute");
734        assert!(result.out.contains("[output truncated:"));
735        assert!(result.out.contains("full output at"));
736        // Head should contain the first numbers
737        assert!(result.out.starts_with("1\n"));
738    }
739
740    #[tokio::test]
741    async fn test_spill_exits_3() {
742        use crate::kernel::{Kernel, KernelConfig};
743
744        let config = KernelConfig::mcp()
745            .with_output_limit(OutputLimitConfig {
746                max_bytes: Some(100),
747                head_bytes: 30,
748                tail_bytes: 20,
749            });
750        let kernel = Kernel::new(config).expect("kernel creation");
751
752        let big = "x".repeat(200);
753        let result = kernel.execute(&format!("echo '{}'", big)).await.expect("execute");
754        assert_eq!(result.code, 3, "spill should always exit 3");
755        assert_eq!(result.original_code, Some(0), "original command exit code preserved");
756        assert!(result.out.contains("[output truncated:"));
757    }
758
759    #[tokio::test]
760    async fn test_kernel_repl_no_truncation() {
761        use crate::kernel::{Kernel, KernelConfig};
762
763        // REPL has no limit
764        let config = KernelConfig::repl();
765        let kernel = Kernel::new(config).expect("kernel creation");
766
767        let result = kernel.execute("seq 1 100").await.expect("execute");
768        assert!(!result.out.contains("[output truncated:"));
769        assert!(result.out.contains("100"));
770    }
771
772    #[tokio::test]
773    async fn test_kernel_builtin_truncation() {
774        use crate::kernel::{Kernel, KernelConfig};
775
776        // Builtins go through post-hoc spill check
777        let config = KernelConfig::mcp()
778            .with_output_limit(OutputLimitConfig {
779                max_bytes: Some(100),
780                head_bytes: 30,
781                tail_bytes: 20,
782            });
783        let kernel = Kernel::new(config).expect("kernel creation");
784
785        // echo with a large string
786        let big = "x".repeat(200);
787        let result = kernel.execute(&format!("echo '{}'", big)).await.expect("execute");
788        assert!(result.out.contains("[output truncated:"));
789    }
790
791    // ── OutputData estimation and streaming tests ──
792
793    #[test]
794    fn test_estimated_byte_size_text() {
795        use crate::interpreter::OutputData;
796        let data = OutputData::text("hello world");
797        assert_eq!(data.estimated_byte_size(), 11);
798    }
799
800    #[test]
801    fn test_estimated_byte_size_table() {
802        use crate::interpreter::{OutputData, OutputNode};
803        let data = OutputData::table(
804            vec!["NAME".into(), "SIZE".into()],
805            vec![
806                OutputNode::new("foo").with_cells(vec!["123".into()]),
807                OutputNode::new("bar").with_cells(vec!["456".into()]),
808            ],
809        );
810        // "foo\t123\nbar\t456" = 3+1+3 + 1 + 3+1+3 = 15
811        assert_eq!(data.estimated_byte_size(), 15);
812    }
813
814    #[test]
815    fn test_estimated_byte_size_tree() {
816        use crate::interpreter::{OutputData, OutputNode};
817        let data = OutputData::nodes(vec![
818            OutputNode::new("src").with_children(vec![
819                OutputNode::new("main.rs"),
820                OutputNode::new("lib.rs"),
821            ]),
822        ]);
823        // "src/{main.rs,lib.rs}" = 3 + 2 + 7 + 1 + 6 + 1 = 20
824        assert_eq!(data.estimated_byte_size(), 20);
825    }
826
827    #[test]
828    fn test_write_canonical_matches_to_canonical_string() {
829        use crate::interpreter::{OutputData, OutputNode};
830
831        let cases: Vec<OutputData> = vec![
832            OutputData::text("hello world"),
833            OutputData::nodes(vec![
834                OutputNode::new("file1"),
835                OutputNode::new("file2"),
836            ]),
837            OutputData::table(
838                vec!["NAME".into(), "SIZE".into()],
839                vec![
840                    OutputNode::new("foo").with_cells(vec!["123".into()]),
841                    OutputNode::new("bar").with_cells(vec!["456".into()]),
842                ],
843            ),
844            OutputData::nodes(vec![
845                OutputNode::new("src").with_children(vec![
846                    OutputNode::new("main.rs"),
847                    OutputNode::new("lib.rs"),
848                ]),
849            ]),
850        ];
851
852        for data in cases {
853            let expected = data.to_canonical_string();
854            let mut buf = Vec::new();
855            let written = data.write_canonical(&mut buf, None).unwrap();
856            let got = String::from_utf8(buf).unwrap();
857            assert_eq!(got, expected, "write_canonical mismatch for {:?}", data);
858            assert_eq!(written, expected.len(), "byte count mismatch");
859        }
860    }
861
862    #[test]
863    fn test_write_canonical_budget_stops_early() {
864        use crate::interpreter::{OutputData, OutputNode};
865
866        let data = OutputData::nodes(
867            (0..1000).map(|i| OutputNode::new(format!("file_{:04}", i))).collect()
868        );
869        let mut buf = Vec::new();
870        let written = data.write_canonical(&mut buf, Some(100)).unwrap();
871        // Should have stopped shortly after 100 bytes
872        assert!(written > 100, "should exceed budget slightly");
873        assert!(written < 500, "should stop soon after budget: got {}", written);
874    }
875
876    #[tokio::test]
877    async fn test_spill_if_needed_large_output_data_no_oom() {
878        use crate::interpreter::{OutputData, OutputNode};
879
880        let config = OutputLimitConfig {
881            max_bytes: Some(1024),
882            head_bytes: 100,
883            tail_bytes: 50,
884        };
885
886        // 100K nodes — large enough to detect OOM if materialized carelessly,
887        // but small enough to not slow down the test
888        let nodes: Vec<OutputNode> = (0..100_000)
889            .map(|i| OutputNode::new(format!("node_{:06}", i)))
890            .collect();
891        let data = OutputData::nodes(nodes);
892        let mut result = ExecResult::with_output(data);
893
894        let spill = spill_if_needed(&mut result, &config).await;
895        assert!(spill.is_some(), "should have spilled");
896        assert!(result.did_spill);
897        assert!(result.out.contains("[output truncated:"));
898
899        // Clean up
900        if let Some(s) = spill {
901            let _ = tokio::fs::remove_file(&s.path).await;
902        }
903    }
904
905    // ── Streaming collector tests (using tokio::io::duplex) ──
906
907    #[tokio::test]
908    async fn test_collect_small_output_no_spill() {
909        let (mut writer, reader) = tokio::io::duplex(1024);
910        let config = OutputLimitConfig {
911            max_bytes: Some(1024),
912            head_bytes: 100,
913            tail_bytes: 50,
914        };
915
916        // Write small data and close
917        use tokio::io::AsyncWriteExt;
918        writer.write_all(b"hello world").await.unwrap();
919        drop(writer); // EOF
920
921        let mut reader = reader;
922        let (result, did_spill) = collect_stdout_with_spill(&mut reader, 1024, &config).await;
923        assert_eq!(result, "hello world");
924        assert!(!did_spill);
925    }
926
927    #[tokio::test]
928    async fn test_collect_large_output_spills() {
929        let (mut writer, reader) = tokio::io::duplex(64 * 1024);
930        let config = OutputLimitConfig {
931            max_bytes: Some(100),
932            head_bytes: 20,
933            tail_bytes: 10,
934        };
935
936        // Write data exceeding limit and close
937        use tokio::io::AsyncWriteExt;
938        let data = "x".repeat(500);
939        writer.write_all(data.as_bytes()).await.unwrap();
940        drop(writer); // EOF
941
942        let mut reader = reader;
943        let (result, did_spill) = collect_stdout_with_spill(&mut reader, 100, &config).await;
944        assert!(did_spill, "should have spilled");
945        assert!(result.contains("[output truncated:"));
946        assert!(result.contains("full output at"));
947    }
948
949    #[tokio::test]
950    async fn test_collect_exact_boundary_no_spill() {
951        let (mut writer, reader) = tokio::io::duplex(1024);
952        let config = OutputLimitConfig {
953            max_bytes: Some(100),
954            head_bytes: 20,
955            tail_bytes: 10,
956        };
957
958        // Write exactly max_bytes
959        use tokio::io::AsyncWriteExt;
960        let data = "x".repeat(100);
961        writer.write_all(data.as_bytes()).await.unwrap();
962        drop(writer); // EOF
963
964        let mut reader = reader;
965        let (result, did_spill) = collect_stdout_with_spill(&mut reader, 100, &config).await;
966        // Exactly at limit — should not spill (<=)
967        assert!(!did_spill, "exact boundary should not spill");
968        assert_eq!(result.len(), 100);
969    }
970
971    #[tokio::test]
972    async fn test_collect_broken_pipe() {
973        let (writer, reader) = tokio::io::duplex(1024);
974        let config = OutputLimitConfig {
975            max_bytes: Some(1024),
976            head_bytes: 100,
977            tail_bytes: 50,
978        };
979
980        // Write some data, then drop writer mid-stream
981        use tokio::io::AsyncWriteExt;
982        let mut writer = writer;
983        writer.write_all(b"partial data").await.unwrap();
984        drop(writer); // Simulate broken pipe
985
986        let mut reader = reader;
987        let (result, did_spill) = collect_stdout_with_spill(&mut reader, 1024, &config).await;
988        assert_eq!(result, "partial data");
989        assert!(!did_spill);
990    }
991}