Skip to main content

kaish_kernel/
output_limit.rs

1//! Configurable output size limits for agent safety.
2//!
3//! When output exceeds the threshold, the full output is written to a
4//! spill file on the real filesystem and `ExecResult.out` is truncated
5//! with a pointer message. The agent can then selectively read the file.
6//!
7//! Per-mode defaults: MCP kernels get an 8KB limit, REPL/test kernels
8//! are unlimited. Runtime-switchable via the `kaish-output-limit` builtin.
9
10use std::path::PathBuf;
11
12use crate::interpreter::ExecResult;
13#[cfg(feature = "native")]
14use crate::paths;
15
16/// Default output limit for MCP mode (8KB).
17const DEFAULT_MCP_LIMIT: usize = 8 * 1024;
18
19/// Default head preview size (bytes of output start to keep).
20const DEFAULT_HEAD_BYTES: usize = 1024;
21
22/// Default tail preview size (bytes of output end to keep).
23const DEFAULT_TAIL_BYTES: usize = 512;
24
25/// Configurable output size limit.
26///
27/// Threaded through `KernelConfig` → `ExecContext` → kernel pipeline execution.
28/// Runtime-mutable via the `kaish-output-limit` builtin.
29#[derive(Debug, Clone)]
30pub struct OutputLimitConfig {
31    max_bytes: Option<usize>,
32    head_bytes: usize,
33    tail_bytes: usize,
34}
35
36impl OutputLimitConfig {
37    /// No limiting — REPL/embedded/test default.
38    pub fn none() -> Self {
39        Self {
40            max_bytes: None,
41            head_bytes: DEFAULT_HEAD_BYTES,
42            tail_bytes: DEFAULT_TAIL_BYTES,
43        }
44    }
45
46    /// Default limit used by `on` subcommand and `set -o output-limit`.
47    pub fn default_limit() -> usize {
48        DEFAULT_MCP_LIMIT
49    }
50
51    /// MCP-safe defaults: 8KB limit, 1KB head, 512B tail.
52    pub fn mcp() -> Self {
53        Self {
54            max_bytes: Some(DEFAULT_MCP_LIMIT),
55            head_bytes: DEFAULT_HEAD_BYTES,
56            tail_bytes: DEFAULT_TAIL_BYTES,
57        }
58    }
59
60    /// Whether output limiting is enabled.
61    pub fn is_enabled(&self) -> bool {
62        self.max_bytes.is_some()
63    }
64
65    /// The maximum output size in bytes, if set.
66    pub fn max_bytes(&self) -> Option<usize> {
67        self.max_bytes
68    }
69
70    /// Bytes of output head to preserve in truncated result.
71    pub fn head_bytes(&self) -> usize {
72        self.head_bytes
73    }
74
75    /// Bytes of output tail to preserve in truncated result.
76    pub fn tail_bytes(&self) -> usize {
77        self.tail_bytes
78    }
79
80    /// Set the output limit. `None` disables limiting.
81    pub fn set_limit(&mut self, max: Option<usize>) {
82        self.max_bytes = max;
83    }
84
85    /// Set the head preview size.
86    pub fn set_head_bytes(&mut self, bytes: usize) {
87        self.head_bytes = bytes;
88    }
89
90    /// Set the tail preview size.
91    pub fn set_tail_bytes(&mut self, bytes: usize) {
92        self.tail_bytes = bytes;
93    }
94}
95
96/// Result of a spill operation.
97pub struct SpillResult {
98    pub path: PathBuf,
99    pub total_bytes: usize,
100}
101
102/// Check if the result output exceeds the limit and spill to disk if so.
103///
104/// Mutates `result.out` in place: replaces with head+tail+pointer message.
105/// Returns `Some(SpillResult)` if a spill file was written, `None` otherwise.
106///
107/// If the filesystem write fails, the result is replaced with an error.
108/// Fail fast: truncating output silently could corrupt structured data
109/// that an agent acts on. An explicit error is safer.
110///
111/// Without the `native` feature, performs in-memory head+tail truncation
112/// (no disk I/O).
113pub async fn spill_if_needed(
114    result: &mut ExecResult,
115    config: &OutputLimitConfig,
116) -> Option<SpillResult> {
117    let max = config.max_bytes?;
118
119    #[cfg(feature = "native")]
120    {
121        // If result.out is already populated (external commands), check it directly
122        if !result.text_out().is_empty() && !result.has_output() {
123            let total = result.text_out().len();
124            if total <= max {
125                return None;
126            }
127            return spill_string(result, config, max).await;
128        }
129
130        // If we have structured OutputData, estimate size before materializing
131        if let Some(output) = result.output() {
132            let estimate = output.estimated_byte_size();
133            if estimate <= max {
134                // Small enough — materialize normally
135                result.materialize();
136                // Re-check actual size (estimate is a lower bound)
137                if result.text_out().len() <= max {
138                    return None;
139                }
140                return spill_string(result, config, max).await;
141            }
142
143            // Large — stream directly to spill file, never holding full String
144            return spill_output_data(result, config, max).await;
145        }
146
147        None
148    }
149
150    // Without native: in-memory head+tail truncation (no disk I/O)
151    #[cfg(not(feature = "native"))]
152    {
153        // Materialize structured OutputData if present
154        if result.has_output() {
155            result.materialize();
156        }
157
158        let total = result.text_out().len();
159        if total <= max {
160            return None;
161        }
162
163        let text = result.text_out().into_owned();
164        let head = truncate_to_char_boundary(&text, config.head_bytes);
165        let tail = tail_from_str(&text, config.tail_bytes);
166        let truncated = format!(
167            "{}\n...\n{}\n[output truncated: {} bytes total — spill not available in sandbox mode]",
168            head, tail, total
169        );
170        result.set_out(truncated);
171        None
172    }
173}
174
175/// Spill an already-materialized string in result.out.
176#[cfg(feature = "native")]
177async fn spill_string(
178    result: &mut ExecResult,
179    config: &OutputLimitConfig,
180    max: usize,
181) -> Option<SpillResult> {
182    let total = result.text_out().len();
183    match write_spill_file(result.text_out().as_bytes()).await {
184        Ok((path, written)) => {
185            let truncated = build_truncated_output(&result.text_out(), config, &path, total);
186            result.set_out(truncated);
187            result.did_spill = true;
188            Some(SpillResult {
189                path,
190                total_bytes: written,
191            })
192        }
193        Err(e) => {
194            tracing::error!("output spill failed: {}", e);
195            *result = ExecResult::failure(1, format!(
196                "output exceeded {} byte limit ({} bytes) and spill to disk failed: {}",
197                max, total, e
198            ));
199            None
200        }
201    }
202}
203
204/// Stream OutputData directly to a spill file without materializing the full String.
205#[cfg(feature = "native")]
206async fn spill_output_data(
207    result: &mut ExecResult,
208    config: &OutputLimitConfig,
209    max: usize,
210) -> Option<SpillResult> {
211    let output = result.output()?;
212
213    let dir = paths::spill_dir();
214    if let Err(e) = tokio::fs::create_dir_all(&dir).await {
215        tracing::error!("output spill dir creation failed: {}", e);
216        *result = ExecResult::failure(1, format!(
217            "output exceeded {} byte limit and spill dir creation failed: {}", max, e
218        ));
219        return None;
220    }
221
222    let filename = generate_spill_filename();
223    let path = dir.join(&filename);
224
225    // Write OutputData directly to file via write_canonical
226    let total = match std::fs::File::create(&path) {
227        Ok(mut file) => {
228            match output.write_canonical(&mut file, None) {
229                Ok(n) => n,
230                Err(e) => {
231                    tracing::error!("output spill write failed: {}", e);
232                    *result = ExecResult::failure(1, format!(
233                        "output exceeded {} byte limit and spill to disk failed: {}", max, e
234                    ));
235                    return None;
236                }
237            }
238        }
239        Err(e) => {
240            tracing::error!("output spill file creation failed: {}", e);
241            *result = ExecResult::failure(1, format!(
242                "output exceeded {} byte limit and spill to disk failed: {}", max, e
243            ));
244            return None;
245        }
246    };
247
248    // Read head and tail from the spill file for the truncated preview
249    let head = read_head_from_file(&path, config.head_bytes).await.unwrap_or_default();
250    let tail = read_tail_from_file(&path, config.tail_bytes).await.unwrap_or_default();
251    let path_str = path.to_string_lossy();
252
253    result.set_out(format!(
254        "{}\n...\n{}\n[output truncated: {} bytes total — full output at {}]",
255        head, tail, total, path_str
256    ));
257    result.did_spill = true;
258
259    Some(SpillResult {
260        path,
261        total_bytes: total,
262    })
263}
264
265/// Collect stdout from a child process with spill-aware size limiting.
266///
267/// Two-phase approach:
268/// 1. Detection window (up to 1s): accumulate in memory
269/// 2. If still running after 1s and over limit: stream to spill file
270///
271/// Returns `(stdout_string, stderr_string, did_spill)`.
272#[cfg(feature = "native")]
273pub async fn spill_aware_collect(
274    mut stdout: tokio::process::ChildStdout,
275    mut stderr_reader: tokio::process::ChildStderr,
276    stderr_stream: Option<crate::scheduler::StderrStream>,
277    config: &OutputLimitConfig,
278) -> (String, String, bool) {
279    let max = config.max_bytes.unwrap_or(usize::MAX);
280
281    // Spawn stderr collection
282    let stderr_task = tokio::spawn(async move {
283        collect_stderr(&mut stderr_reader, stderr_stream.as_ref()).await
284    });
285
286    let (stdout_result, did_spill) = collect_stdout_with_spill(&mut stdout, max, config).await;
287
288    let stderr = stderr_task.await.unwrap_or_default();
289    (stdout_result, stderr, did_spill)
290}
291
292/// Collect stderr (same pattern as existing dispatch code).
293#[cfg(feature = "native")]
294async fn collect_stderr(
295    reader: &mut tokio::process::ChildStderr,
296    stream: Option<&crate::scheduler::StderrStream>,
297) -> String {
298    use tokio::io::AsyncReadExt;
299
300    let mut buf = Vec::new();
301    let mut chunk = [0u8; 8192];
302    loop {
303        match reader.read(&mut chunk).await {
304            Ok(0) => break,
305            Ok(n) => {
306                if let Some(s) = stream {
307                    s.write(&chunk[..n]);
308                } else {
309                    buf.extend_from_slice(&chunk[..n]);
310                }
311            }
312            Err(_) => break,
313        }
314    }
315    if stream.is_some() {
316        String::new()
317    } else {
318        String::from_utf8_lossy(&buf).into_owned()
319    }
320}
321
322/// Collect stdout with two-phase spill detection.
323///
324/// Generic over `AsyncRead + Unpin` — works with `ChildStdout` in production
325/// and `DuplexStream` in tests.
326///
327/// Returns `(stdout_string, did_spill)`.
328#[cfg(feature = "native")]
329async fn collect_stdout_with_spill<R: tokio::io::AsyncRead + Unpin>(
330    stdout: &mut R,
331    max_bytes: usize,
332    config: &OutputLimitConfig,
333) -> (String, bool) {
334    use tokio::io::AsyncReadExt;
335    use tokio::time::{sleep, Duration};
336
337    let mut buffer = Vec::new();
338    let mut chunk = [0u8; 8192];
339    let deadline = sleep(Duration::from_secs(1));
340    tokio::pin!(deadline);
341
342    // Phase 1: Detection window (up to 1s)
343    loop {
344        tokio::select! {
345            biased;
346            result = stdout.read(&mut chunk) => {
347                match result {
348                    Ok(0) => {
349                        // EOF — command finished within detection window.
350                        // Post-hoc spill check happens in the caller.
351                        return (String::from_utf8_lossy(&buffer).into_owned(), false);
352                    }
353                    Ok(n) => {
354                        buffer.extend_from_slice(&chunk[..n]);
355                        // Break early if already over limit — don't OOM during detection window
356                        if buffer.len() > max_bytes {
357                            break;
358                        }
359                    }
360                    Err(_) => {
361                        return (String::from_utf8_lossy(&buffer).into_owned(), false);
362                    }
363                }
364            }
365            () = &mut deadline => {
366                // 1s elapsed, command still running
367                break;
368            }
369        }
370    }
371
372    // Phase 2: Check if we should switch to spill mode
373    if buffer.len() > max_bytes {
374        // Already over limit — spill what we have and stream the rest to disk
375        match stream_to_spill(&buffer, stdout, config).await {
376            Ok(result) => return (result, true),
377            Err(e) => {
378                // Spill failed — return error. Don't continue accumulating.
379                // Dropping stdout closes the pipe, which sends SIGPIPE to the child.
380                tracing::error!("streaming spill failed: {}", e);
381                let size = buffer.len();
382                drop(buffer);
383                return (format!(
384                    "ERROR: output exceeded {} byte limit ({} bytes buffered) and spill to disk failed: {}",
385                    max_bytes, size, e
386                ), false);
387            }
388        }
389    }
390
391    // Continue collecting (under limit so far)
392    // Check size after each chunk
393    loop {
394        match stdout.read(&mut chunk).await {
395            Ok(0) => break,
396            Ok(n) => {
397                buffer.extend_from_slice(&chunk[..n]);
398                // Check if we've exceeded limit mid-stream
399                if buffer.len() > max_bytes {
400                    match stream_to_spill(&buffer, stdout, config).await {
401                        Ok(result) => return (result, true),
402                        Err(e) => {
403                            tracing::error!("streaming spill failed: {}", e);
404                            let size = buffer.len();
405                            drop(buffer);
406                            return (format!(
407                                "ERROR: output exceeded {} byte limit ({} bytes buffered) and spill to disk failed: {}",
408                                max_bytes, size, e
409                            ), false);
410                        }
411                    }
412                }
413            }
414            Err(_) => break,
415        }
416    }
417
418    (String::from_utf8_lossy(&buffer).into_owned(), false)
419}
420
421/// Write buffered data + remaining stdout to a spill file, return truncated result.
422///
423/// Generic over `AsyncRead + Unpin` for testability.
424#[cfg(feature = "native")]
425async fn stream_to_spill<R: tokio::io::AsyncRead + Unpin>(
426    buffer: &[u8],
427    stdout: &mut R,
428    config: &OutputLimitConfig,
429) -> Result<String, std::io::Error> {
430    use tokio::io::AsyncReadExt;
431
432    let spill_dir = paths::spill_dir();
433    tokio::fs::create_dir_all(&spill_dir).await?;
434
435    let filename = generate_spill_filename();
436    let path = spill_dir.join(&filename);
437    let mut file = tokio::fs::File::create(&path).await?;
438
439    // Write buffered data
440    use tokio::io::AsyncWriteExt;
441    file.write_all(buffer).await?;
442    let mut total = buffer.len();
443
444    // Stream remaining chunks directly to file
445    let mut chunk = [0u8; 8192];
446    loop {
447        match stdout.read(&mut chunk).await {
448            Ok(0) => break,
449            Ok(n) => {
450                file.write_all(&chunk[..n]).await?;
451                total += n;
452            }
453            Err(_) => break,
454        }
455    }
456    file.flush().await?;
457
458    // Read head + tail for the truncated message
459    let full = String::from_utf8_lossy(buffer);
460    let head = truncate_to_char_boundary(&full, config.head_bytes);
461
462    // For tail, read from the spill file if buffer doesn't cover the end
463    let tail: String = if total <= buffer.len() {
464        let full_str = String::from_utf8_lossy(buffer);
465        tail_from_str(&full_str, config.tail_bytes).to_string()
466    } else {
467        read_tail_from_file(&path, config.tail_bytes).await.unwrap_or_default()
468    };
469
470    let path_str = path.to_string_lossy();
471    Ok(format!(
472        "{}\n...\n{}\n[output truncated: {} bytes total — full output at {}]",
473        head, tail, total, path_str
474    ))
475}
476
477/// Write output bytes to a new spill file. Returns (path, bytes_written).
478#[cfg(feature = "native")]
479async fn write_spill_file(data: &[u8]) -> Result<(PathBuf, usize), std::io::Error> {
480    let dir = paths::spill_dir();
481    tokio::fs::create_dir_all(&dir).await?;
482
483    let filename = generate_spill_filename();
484    let path = dir.join(filename);
485    tokio::fs::write(&path, data).await?;
486    Ok((path, data.len()))
487}
488
489/// Build the truncated output string with head, tail, and pointer.
490#[cfg(feature = "native")]
491fn build_truncated_output(
492    full: &str,
493    config: &OutputLimitConfig,
494    spill_path: &std::path::Path,
495    total_bytes: usize,
496) -> String {
497    let head = truncate_to_char_boundary(full, config.head_bytes);
498    let tail = tail_from_str(full, config.tail_bytes);
499    let path_str = spill_path.to_string_lossy();
500    format!(
501        "{}\n...\n{}\n[output truncated: {} bytes total — full output at {}]",
502        head, tail, total_bytes, path_str
503    )
504}
505
506/// Truncate a string to at most `max_bytes`, respecting UTF-8 char boundaries.
507fn truncate_to_char_boundary(s: &str, max_bytes: usize) -> &str {
508    if s.len() <= max_bytes {
509        return s;
510    }
511    // Find the last char boundary at or before max_bytes
512    let mut end = max_bytes;
513    while end > 0 && !s.is_char_boundary(end) {
514        end -= 1;
515    }
516    &s[..end]
517}
518
519/// Get the last `max_bytes` of a string, respecting UTF-8 char boundaries.
520fn tail_from_str(s: &str, max_bytes: usize) -> &str {
521    if s.len() <= max_bytes {
522        return s;
523    }
524    let start = s.len() - max_bytes;
525    let mut adjusted = start;
526    while adjusted < s.len() && !s.is_char_boundary(adjusted) {
527        adjusted += 1;
528    }
529    &s[adjusted..]
530}
531
532/// Read the first N bytes from a file for head preview.
533#[cfg(feature = "native")]
534async fn read_head_from_file(path: &std::path::Path, max_bytes: usize) -> Result<String, std::io::Error> {
535    use tokio::io::AsyncReadExt;
536
537    let mut file = tokio::fs::File::open(path).await?;
538    let mut buf = vec![0u8; max_bytes];
539    let n = file.read(&mut buf).await?;
540    buf.truncate(n);
541
542    let s = String::from_utf8_lossy(&buf);
543    // Truncate to char boundary
544    let result = truncate_to_char_boundary(&s, max_bytes);
545    Ok(result.to_string())
546}
547
548/// Read the last N bytes from a file for tail preview.
549#[cfg(feature = "native")]
550async fn read_tail_from_file(path: &std::path::Path, max_bytes: usize) -> Result<String, std::io::Error> {
551    use tokio::io::{AsyncReadExt, AsyncSeekExt};
552
553    let mut file = tokio::fs::File::open(path).await?;
554    let metadata = file.metadata().await?;
555    let len = metadata.len() as usize;
556
557    if len <= max_bytes {
558        let mut buf = Vec::new();
559        file.read_to_end(&mut buf).await?;
560        return Ok(String::from_utf8_lossy(&buf).into_owned());
561    }
562
563    let offset = len - max_bytes;
564    file.seek(std::io::SeekFrom::Start(offset as u64)).await?;
565    let mut buf = vec![0u8; max_bytes];
566    let n = file.read(&mut buf).await?;
567    buf.truncate(n);
568
569    // Adjust to char boundary
570    let s = String::from_utf8_lossy(&buf);
571    Ok(s.into_owned())
572}
573
574/// Generate a unique spill filename using timestamp, PID, and monotonic counter.
575#[cfg(feature = "native")]
576fn generate_spill_filename() -> String {
577    use std::sync::atomic::{AtomicUsize, Ordering};
578    use std::time::SystemTime;
579
580    static COUNTER: AtomicUsize = AtomicUsize::new(0);
581    let seq = COUNTER.fetch_add(1, Ordering::Relaxed);
582    let ts = SystemTime::now()
583        .duration_since(SystemTime::UNIX_EPOCH)
584        .unwrap_or_default();
585    let pid = std::process::id();
586    format!("spill-{}.{}-{}-{}.txt", ts.as_secs(), ts.subsec_nanos(), pid, seq)
587}
588
589/// Parse a size string with optional K/M suffix into bytes.
590///
591/// Accepts: "64K", "64k", "1M", "1m", "65536" (raw bytes).
592pub fn parse_size(s: &str) -> Result<usize, String> {
593    let s = s.trim();
594    if s.is_empty() {
595        return Err("empty size string".to_string());
596    }
597
598    let (num_str, multiplier) = if let Some(n) = s.strip_suffix('K').or_else(|| s.strip_suffix('k')) {
599        (n, 1024)
600    } else if let Some(n) = s.strip_suffix('M').or_else(|| s.strip_suffix('m')) {
601        (n, 1024 * 1024)
602    } else {
603        (s, 1)
604    };
605
606    let num: usize = num_str
607        .parse()
608        .map_err(|_| format!("invalid size: {}", s))?;
609
610    Ok(num * multiplier)
611}
612
613#[cfg(all(test, feature = "native"))]
614mod tests {
615    use super::*;
616
617    #[test]
618    fn test_none_is_disabled() {
619        let config = OutputLimitConfig::none();
620        assert!(!config.is_enabled());
621        assert_eq!(config.max_bytes(), None);
622    }
623
624    #[test]
625    fn test_mcp_is_enabled() {
626        let config = OutputLimitConfig::mcp();
627        assert!(config.is_enabled());
628        assert_eq!(config.max_bytes(), Some(8 * 1024));
629        assert_eq!(config.head_bytes(), 1024);
630        assert_eq!(config.tail_bytes(), 512);
631    }
632
633    #[test]
634    fn test_set_limit() {
635        let mut config = OutputLimitConfig::none();
636        assert!(!config.is_enabled());
637
638        config.set_limit(Some(1024));
639        assert!(config.is_enabled());
640        assert_eq!(config.max_bytes(), Some(1024));
641
642        config.set_limit(None);
643        assert!(!config.is_enabled());
644    }
645
646    #[test]
647    fn test_set_head_tail() {
648        let mut config = OutputLimitConfig::mcp();
649        config.set_head_bytes(2048);
650        config.set_tail_bytes(1024);
651        assert_eq!(config.head_bytes(), 2048);
652        assert_eq!(config.tail_bytes(), 1024);
653    }
654
655    #[test]
656    fn test_parse_size() {
657        assert_eq!(parse_size("64K").unwrap(), 64 * 1024);
658        assert_eq!(parse_size("64k").unwrap(), 64 * 1024);
659        assert_eq!(parse_size("1M").unwrap(), 1024 * 1024);
660        assert_eq!(parse_size("1m").unwrap(), 1024 * 1024);
661        assert_eq!(parse_size("65536").unwrap(), 65536);
662        assert!(parse_size("").is_err());
663        assert!(parse_size("abc").is_err());
664    }
665
666    #[test]
667    fn test_truncate_to_char_boundary() {
668        assert_eq!(truncate_to_char_boundary("hello", 10), "hello");
669        assert_eq!(truncate_to_char_boundary("hello", 3), "hel");
670        // Multi-byte: "日" is 3 bytes
671        assert_eq!(truncate_to_char_boundary("日本語", 3), "日");
672        assert_eq!(truncate_to_char_boundary("日本語", 4), "日");
673        assert_eq!(truncate_to_char_boundary("日本語", 6), "日本");
674    }
675
676    #[test]
677    fn test_tail_from_str() {
678        assert_eq!(tail_from_str("hello", 10), "hello");
679        assert_eq!(tail_from_str("hello", 3), "llo");
680        // Multi-byte
681        assert_eq!(tail_from_str("日本語", 3), "語");
682        assert_eq!(tail_from_str("日本語", 6), "本語");
683    }
684
685    #[test]
686    fn test_generate_spill_filename() {
687        let name = generate_spill_filename();
688        assert!(name.starts_with("spill-"));
689        assert!(name.ends_with(".txt"));
690    }
691
692    #[tokio::test]
693    async fn test_spill_if_needed_under_limit() {
694        let config = OutputLimitConfig::mcp();
695        let mut result = ExecResult::success("short output");
696        let spill = spill_if_needed(&mut result, &config).await;
697        assert!(spill.is_none());
698        assert_eq!(&*result.text_out(), "short output");
699        assert!(!result.did_spill);
700    }
701
702    #[tokio::test]
703    async fn test_spill_if_needed_over_limit() {
704        let config = OutputLimitConfig {
705            max_bytes: Some(100),
706            head_bytes: 20,
707            tail_bytes: 10,
708        };
709        let big_output = "x".repeat(200);
710        let mut result = ExecResult::success(big_output);
711        let spill = spill_if_needed(&mut result, &config).await;
712        assert!(spill.is_some());
713        assert!(result.did_spill);
714
715        let spill = spill.unwrap();
716        assert_eq!(spill.total_bytes, 200);
717        assert!(spill.path.exists());
718
719        // Verify truncated output
720        assert!(result.text_out().contains("..."));
721        assert!(result.text_out().contains("[output truncated: 200 bytes total"));
722        assert!(result.text_out().contains(&spill.path.to_string_lossy().to_string()));
723
724        // Verify head (first 20 bytes)
725        assert!(result.text_out().starts_with(&"x".repeat(20)));
726
727        // Verify spill file has full content
728        let spill_content = tokio::fs::read_to_string(&spill.path).await.unwrap();
729        assert_eq!(spill_content.len(), 200);
730
731        // Clean up
732        let _ = tokio::fs::remove_file(&spill.path).await;
733    }
734
735    #[tokio::test]
736    async fn test_spill_if_needed_disabled() {
737        let config = OutputLimitConfig::none();
738        let big_output = "x".repeat(200);
739        let mut result = ExecResult::success(big_output.clone());
740        let spill = spill_if_needed(&mut result, &config).await;
741        assert!(spill.is_none());
742        assert_eq!(&*result.text_out(), big_output);
743        assert!(!result.did_spill);
744    }
745
746    #[test]
747    fn test_build_truncated_output() {
748        let config = OutputLimitConfig {
749            max_bytes: Some(100),
750            head_bytes: 5,
751            tail_bytes: 3,
752        };
753        let full = "abcdefghijklmnop";
754        let path = PathBuf::from("/tmp/test-spill.txt");
755        let result = build_truncated_output(full, &config, &path, 16);
756        assert!(result.starts_with("abcde"));
757        assert!(result.contains("..."));
758        assert!(result.contains("nop"));
759        assert!(result.contains("[output truncated: 16 bytes total — full output at /tmp/test-spill.txt]"));
760    }
761
762    #[tokio::test]
763    async fn test_kernel_mcp_truncates_large_output() {
764        use crate::kernel::{Kernel, KernelConfig};
765
766        // MCP config has 8K limit by default — use a smaller limit for testing
767        let config = KernelConfig::mcp()
768            .with_output_limit(OutputLimitConfig {
769                max_bytes: Some(200),
770                head_bytes: 50,
771                tail_bytes: 30,
772            });
773        let kernel = Kernel::new(config).expect("kernel creation");
774
775        // seq 1 10000 produces lots of output
776        let result = kernel.execute("seq 1 10000").await.expect("execute");
777        assert!(result.text_out().contains("[output truncated:"));
778        assert!(result.text_out().contains("full output at"));
779        // Head should contain the first numbers
780        assert!(result.text_out().starts_with("1\n"));
781    }
782
783    #[tokio::test]
784    async fn test_spill_exits_3() {
785        use crate::kernel::{Kernel, KernelConfig};
786
787        let config = KernelConfig::mcp()
788            .with_output_limit(OutputLimitConfig {
789                max_bytes: Some(100),
790                head_bytes: 30,
791                tail_bytes: 20,
792            });
793        let kernel = Kernel::new(config).expect("kernel creation");
794
795        let big = "x".repeat(200);
796        let result = kernel.execute(&format!("echo '{}'", big)).await.expect("execute");
797        assert_eq!(result.code, 3, "spill should always exit 3");
798        assert_eq!(result.original_code, Some(0), "original command exit code preserved");
799        assert!(result.text_out().contains("[output truncated:"));
800    }
801
802    #[tokio::test]
803    async fn test_kernel_repl_no_truncation() {
804        use crate::kernel::{Kernel, KernelConfig};
805
806        // REPL has no limit
807        let config = KernelConfig::repl();
808        let kernel = Kernel::new(config).expect("kernel creation");
809
810        let result = kernel.execute("seq 1 100").await.expect("execute");
811        assert!(!result.text_out().contains("[output truncated:"));
812        assert!(result.text_out().contains("100"));
813    }
814
815    #[tokio::test]
816    async fn test_kernel_builtin_truncation() {
817        use crate::kernel::{Kernel, KernelConfig};
818
819        // Builtins go through post-hoc spill check
820        let config = KernelConfig::mcp()
821            .with_output_limit(OutputLimitConfig {
822                max_bytes: Some(100),
823                head_bytes: 30,
824                tail_bytes: 20,
825            });
826        let kernel = Kernel::new(config).expect("kernel creation");
827
828        // echo with a large string
829        let big = "x".repeat(200);
830        let result = kernel.execute(&format!("echo '{}'", big)).await.expect("execute");
831        assert!(result.text_out().contains("[output truncated:"));
832    }
833
834    // ── OutputData estimation and streaming tests ──
835
836    #[test]
837    fn test_estimated_byte_size_text() {
838        use crate::interpreter::OutputData;
839        let data = OutputData::text("hello world");
840        assert_eq!(data.estimated_byte_size(), 11);
841    }
842
843    #[test]
844    fn test_estimated_byte_size_table() {
845        use crate::interpreter::{OutputData, OutputNode};
846        let data = OutputData::table(
847            vec!["NAME".into(), "SIZE".into()],
848            vec![
849                OutputNode::new("foo").with_cells(vec!["123".into()]),
850                OutputNode::new("bar").with_cells(vec!["456".into()]),
851            ],
852        );
853        // "foo\t123\nbar\t456" = 3+1+3 + 1 + 3+1+3 = 15
854        assert_eq!(data.estimated_byte_size(), 15);
855    }
856
857    #[test]
858    fn test_estimated_byte_size_tree() {
859        use crate::interpreter::{OutputData, OutputNode};
860        let data = OutputData::nodes(vec![
861            OutputNode::new("src").with_children(vec![
862                OutputNode::new("main.rs"),
863                OutputNode::new("lib.rs"),
864            ]),
865        ]);
866        // "src/{main.rs,lib.rs}" = 3 + 2 + 7 + 1 + 6 + 1 = 20
867        assert_eq!(data.estimated_byte_size(), 20);
868    }
869
870    #[test]
871    fn test_write_canonical_matches_to_canonical_string() {
872        use crate::interpreter::{OutputData, OutputNode};
873
874        let cases: Vec<OutputData> = vec![
875            OutputData::text("hello world"),
876            OutputData::nodes(vec![
877                OutputNode::new("file1"),
878                OutputNode::new("file2"),
879            ]),
880            OutputData::table(
881                vec!["NAME".into(), "SIZE".into()],
882                vec![
883                    OutputNode::new("foo").with_cells(vec!["123".into()]),
884                    OutputNode::new("bar").with_cells(vec!["456".into()]),
885                ],
886            ),
887            OutputData::nodes(vec![
888                OutputNode::new("src").with_children(vec![
889                    OutputNode::new("main.rs"),
890                    OutputNode::new("lib.rs"),
891                ]),
892            ]),
893        ];
894
895        for data in cases {
896            let expected = data.to_canonical_string();
897            let mut buf = Vec::new();
898            let written = data.write_canonical(&mut buf, None).unwrap();
899            let got = String::from_utf8(buf).unwrap();
900            assert_eq!(got, expected, "write_canonical mismatch for {:?}", data);
901            assert_eq!(written, expected.len(), "byte count mismatch");
902        }
903    }
904
905    #[test]
906    fn test_write_canonical_budget_stops_early() {
907        use crate::interpreter::{OutputData, OutputNode};
908
909        let data = OutputData::nodes(
910            (0..1000).map(|i| OutputNode::new(format!("file_{:04}", i))).collect()
911        );
912        let mut buf = Vec::new();
913        let written = data.write_canonical(&mut buf, Some(100)).unwrap();
914        // Should have stopped shortly after 100 bytes
915        assert!(written > 100, "should exceed budget slightly");
916        assert!(written < 500, "should stop soon after budget: got {}", written);
917    }
918
919    #[tokio::test]
920    async fn test_spill_if_needed_large_output_data_no_oom() {
921        use crate::interpreter::{OutputData, OutputNode};
922
923        let config = OutputLimitConfig {
924            max_bytes: Some(1024),
925            head_bytes: 100,
926            tail_bytes: 50,
927        };
928
929        // 100K nodes — large enough to detect OOM if materialized carelessly,
930        // but small enough to not slow down the test
931        let nodes: Vec<OutputNode> = (0..100_000)
932            .map(|i| OutputNode::new(format!("node_{:06}", i)))
933            .collect();
934        let data = OutputData::nodes(nodes);
935        let mut result = ExecResult::with_output(data);
936
937        let spill = spill_if_needed(&mut result, &config).await;
938        assert!(spill.is_some(), "should have spilled");
939        assert!(result.did_spill);
940        assert!(result.text_out().contains("[output truncated:"));
941
942        // Clean up
943        if let Some(s) = spill {
944            let _ = tokio::fs::remove_file(&s.path).await;
945        }
946    }
947
948    // ── Streaming collector tests (using tokio::io::duplex) ──
949
950    #[tokio::test]
951    async fn test_collect_small_output_no_spill() {
952        let (mut writer, reader) = tokio::io::duplex(1024);
953        let config = OutputLimitConfig {
954            max_bytes: Some(1024),
955            head_bytes: 100,
956            tail_bytes: 50,
957        };
958
959        // Write small data and close
960        use tokio::io::AsyncWriteExt;
961        writer.write_all(b"hello world").await.unwrap();
962        drop(writer); // EOF
963
964        let mut reader = reader;
965        let (result, did_spill) = collect_stdout_with_spill(&mut reader, 1024, &config).await;
966        assert_eq!(result, "hello world");
967        assert!(!did_spill);
968    }
969
970    #[tokio::test]
971    async fn test_collect_large_output_spills() {
972        let (mut writer, reader) = tokio::io::duplex(64 * 1024);
973        let config = OutputLimitConfig {
974            max_bytes: Some(100),
975            head_bytes: 20,
976            tail_bytes: 10,
977        };
978
979        // Write data exceeding limit and close
980        use tokio::io::AsyncWriteExt;
981        let data = "x".repeat(500);
982        writer.write_all(data.as_bytes()).await.unwrap();
983        drop(writer); // EOF
984
985        let mut reader = reader;
986        let (result, did_spill) = collect_stdout_with_spill(&mut reader, 100, &config).await;
987        assert!(did_spill, "should have spilled");
988        assert!(result.contains("[output truncated:"));
989        assert!(result.contains("full output at"));
990    }
991
992    #[tokio::test]
993    async fn test_collect_exact_boundary_no_spill() {
994        let (mut writer, reader) = tokio::io::duplex(1024);
995        let config = OutputLimitConfig {
996            max_bytes: Some(100),
997            head_bytes: 20,
998            tail_bytes: 10,
999        };
1000
1001        // Write exactly max_bytes
1002        use tokio::io::AsyncWriteExt;
1003        let data = "x".repeat(100);
1004        writer.write_all(data.as_bytes()).await.unwrap();
1005        drop(writer); // EOF
1006
1007        let mut reader = reader;
1008        let (result, did_spill) = collect_stdout_with_spill(&mut reader, 100, &config).await;
1009        // Exactly at limit — should not spill (<=)
1010        assert!(!did_spill, "exact boundary should not spill");
1011        assert_eq!(result.len(), 100);
1012    }
1013
1014    #[tokio::test]
1015    async fn test_collect_broken_pipe() {
1016        let (writer, reader) = tokio::io::duplex(1024);
1017        let config = OutputLimitConfig {
1018            max_bytes: Some(1024),
1019            head_bytes: 100,
1020            tail_bytes: 50,
1021        };
1022
1023        // Write some data, then drop writer mid-stream
1024        use tokio::io::AsyncWriteExt;
1025        let mut writer = writer;
1026        writer.write_all(b"partial data").await.unwrap();
1027        drop(writer); // Simulate broken pipe
1028
1029        let mut reader = reader;
1030        let (result, did_spill) = collect_stdout_with_spill(&mut reader, 1024, &config).await;
1031        assert_eq!(result, "partial data");
1032        assert!(!did_spill);
1033    }
1034}