Skip to main content

kaish_kernel/
output_limit.rs

1//! Configurable output size limits for agent safety.
2//!
3//! When output exceeds the threshold the result is capped and `ExecResult.out`
4//! is replaced with a head+tail preview. Two strategies, selected at runtime by
5//! [`SpillMode`]:
6//! - [`SpillMode::Disk`] (default): the full output is written to a spill file
7//!   on the real filesystem and the preview points at it. The agent can then
8//!   selectively read the file.
9//! - [`SpillMode::Memory`]: the output is truncated in memory only — no disk
10//!   I/O, no recoverable file. For runtime read-only kernels (e.g. kaibo) that
11//!   must not touch the host filesystem even when `localfs` is compiled in.
12//!   Memory stays bounded regardless of how much the command produces.
13//!
14//! Either way the exit code is remapped to 3 (`did_spill`) so callers can tell
15//! the output was capped.
16//!
17//! Per-mode defaults: MCP kernels get an 8KB limit, REPL/test kernels
18//! are unlimited. Runtime-switchable via the `kaish-output-limit` builtin.
19
20use std::path::PathBuf;
21
22use crate::interpreter::ExecResult;
23#[cfg(feature = "localfs")]
24use crate::paths;
25
26/// Default output limit for MCP mode (8KB).
27const DEFAULT_MCP_LIMIT: usize = 8 * 1024;
28
29/// Default head preview size (bytes of output start to keep).
30const DEFAULT_HEAD_BYTES: usize = 1024;
31
32/// Default tail preview size (bytes of output end to keep).
33const DEFAULT_TAIL_BYTES: usize = 512;
34
35/// Where overflow output goes when it exceeds the limit.
36///
37/// This is a *runtime* choice, distinct from the compile-time `localfs`
38/// feature: a `localfs`-built kernel can still be told to truncate in memory.
39/// A build without `localfs` always behaves as [`SpillMode::Memory`] regardless
40/// of this setting, since disk I/O is unavailable.
41#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
42pub enum SpillMode {
43    /// Write overflow to a disk spill file under `paths::spill_dir()` and keep a
44    /// head+tail preview in the result (the message carries the file path).
45    /// Requires the `localfs` feature. This is the default.
46    ///
47    /// Auto-overridden to [`Memory`](Self::Memory) at kernel construction when
48    /// the VFS mount is `NoLocal` (memory-only) — such a kernel has no host
49    /// filesystem to spill to. See `Kernel::assemble`.
50    #[default]
51    Disk,
52    /// Truncate in memory to head+tail only — no disk I/O, no recoverable file.
53    /// For runtime read-only kernels (e.g. kaibo) that must never touch the host
54    /// filesystem even when `localfs` is compiled in.
55    Memory,
56}
57
58/// Configurable output size limit.
59///
60/// Threaded through `KernelConfig` → `ExecContext` → kernel pipeline execution.
61/// Runtime-mutable via the `kaish-output-limit` builtin.
62#[derive(Debug, Clone)]
63pub struct OutputLimitConfig {
64    max_bytes: Option<usize>,
65    head_bytes: usize,
66    tail_bytes: usize,
67    spill_mode: SpillMode,
68}
69
70impl OutputLimitConfig {
71    /// No limiting — REPL/embedded/test default.
72    pub fn none() -> Self {
73        Self {
74            max_bytes: None,
75            head_bytes: DEFAULT_HEAD_BYTES,
76            tail_bytes: DEFAULT_TAIL_BYTES,
77            spill_mode: SpillMode::Disk,
78        }
79    }
80
81    /// Default limit used by `on` subcommand and `set -o output-limit`.
82    pub fn default_limit() -> usize {
83        DEFAULT_MCP_LIMIT
84    }
85
86    /// MCP-safe defaults: 8KB limit, 1KB head, 512B tail, disk spill.
87    pub fn mcp() -> Self {
88        Self {
89            max_bytes: Some(DEFAULT_MCP_LIMIT),
90            head_bytes: DEFAULT_HEAD_BYTES,
91            tail_bytes: DEFAULT_TAIL_BYTES,
92            spill_mode: SpillMode::Disk,
93        }
94    }
95
96    /// Switch to in-memory truncation — no disk spill, no host filesystem
97    /// writes. For runtime read-only kernels (e.g. kaibo). Builder form of
98    /// [`set_spill_mode`](Self::set_spill_mode).
99    ///
100    /// Note: a `NoLocal` VFS mount forces this mode automatically at kernel
101    /// construction, so an embedder only needs this for a `localfs`-mounted
102    /// kernel it nonetheless wants to keep off the host disk.
103    pub fn in_memory(mut self) -> Self {
104        self.spill_mode = SpillMode::Memory;
105        self
106    }
107
108    /// Whether output limiting is enabled.
109    pub fn is_enabled(&self) -> bool {
110        self.max_bytes.is_some()
111    }
112
113    /// The spill mode (disk vs in-memory truncation).
114    pub fn spill_mode(&self) -> SpillMode {
115        self.spill_mode
116    }
117
118    /// Set the spill mode.
119    pub fn set_spill_mode(&mut self, mode: SpillMode) {
120        self.spill_mode = mode;
121    }
122
123    /// The maximum output size in bytes, if set.
124    pub fn max_bytes(&self) -> Option<usize> {
125        self.max_bytes
126    }
127
128    /// Bytes of output head to preserve in truncated result.
129    pub fn head_bytes(&self) -> usize {
130        self.head_bytes
131    }
132
133    /// Bytes of output tail to preserve in truncated result.
134    pub fn tail_bytes(&self) -> usize {
135        self.tail_bytes
136    }
137
138    /// Set the output limit. `None` disables limiting.
139    pub fn set_limit(&mut self, max: Option<usize>) {
140        self.max_bytes = max;
141    }
142
143    /// Set the head preview size.
144    pub fn set_head_bytes(&mut self, bytes: usize) {
145        self.head_bytes = bytes;
146    }
147
148    /// Set the tail preview size.
149    pub fn set_tail_bytes(&mut self, bytes: usize) {
150        self.tail_bytes = bytes;
151    }
152}
153
154/// Result of a spill operation.
155pub struct SpillResult {
156    pub path: PathBuf,
157    pub total_bytes: usize,
158}
159
160/// Check if the result output exceeds the limit and spill to disk if so.
161///
162/// Mutates `result.out` in place: replaces with head+tail+pointer message.
163/// Returns `Some(SpillResult)` if a spill file was written, `None` otherwise.
164///
165/// If the filesystem write fails, the result is replaced with an error.
166/// Fail fast: truncating output silently could corrupt structured data
167/// that an agent acts on. An explicit error is safer.
168///
169/// In [`SpillMode::Memory`], or in any build without the `localfs` feature,
170/// performs in-memory head+tail truncation (no disk I/O) instead.
171pub async fn spill_if_needed(
172    result: &mut ExecResult,
173    config: &OutputLimitConfig,
174) -> Option<SpillResult> {
175    let max = config.max_bytes?;
176
177    // Binary payloads are measured and spilled by RAW bytes. text_out() would
178    // lossy-decode a Bytes result — corrupting the spill file and mis-measuring
179    // the size (U+FFFD is 3 bytes per invalid byte). Handle them up front.
180    if let Some(total) = result.out_bytes().map(|b| b.len()) {
181        if total <= max {
182            return None;
183        }
184        #[cfg(feature = "localfs")]
185        if config.spill_mode == SpillMode::Disk {
186            let bytes = result.out_bytes().unwrap_or_default().to_vec();
187            return match write_spill_file(&bytes).await {
188                Ok((path, written)) => {
189                    result.set_out(format!(
190                        "[binary output: {total} bytes spilled to {} — read it with `cat {}`]",
191                        path.display(),
192                        path.display()
193                    ));
194                    result.did_spill = true;
195                    Some(SpillResult { path, total_bytes: written })
196                }
197                Err(e) => {
198                    tracing::error!("binary output spill failed: {}", e);
199                    *result = ExecResult::failure(
200                        1,
201                        format!(
202                            "binary output exceeded {max} byte limit ({total} bytes) and spill \
203                             to disk failed: {e}"
204                        ),
205                    );
206                    None
207                }
208            };
209        }
210        // Memory mode (or no localfs): bounded head+tail of the raw bytes. The
211        // result stays binary, just truncated.
212        let bytes = result.out_bytes().unwrap_or_default().to_vec();
213        let head_n = config.head_bytes.min(bytes.len());
214        let tail_n = config.tail_bytes.min(bytes.len().saturating_sub(head_n));
215        let mut truncated = bytes[..head_n].to_vec();
216        truncated.extend_from_slice(&bytes[bytes.len() - tail_n..]);
217        result.set_out_bytes(truncated);
218        result.did_spill = true;
219        return None;
220    }
221
222    // Disk spill requires `localfs` AND the caller selecting it. Memory mode
223    // (or a build without `localfs`) falls through to in-memory truncation.
224    #[cfg(feature = "localfs")]
225    if config.spill_mode == SpillMode::Disk {
226        // If result.out is already populated (external commands), check it directly
227        if !result.text_out().is_empty() && !result.has_output() {
228            let total = result.text_out().len();
229            if total <= max {
230                return None;
231            }
232            return spill_string(result, config, max).await;
233        }
234
235        // If we have structured OutputData, estimate size before materializing
236        if let Some(output) = result.output() {
237            let estimate = output.estimated_byte_size();
238            if estimate <= max {
239                // Small enough — materialize normally
240                result.materialize();
241                // Re-check actual size (estimate is a lower bound)
242                if result.text_out().len() <= max {
243                    return None;
244                }
245                return spill_string(result, config, max).await;
246            }
247
248            // Large — stream directly to spill file, never holding full String
249            return spill_output_data(result, config, max).await;
250        }
251
252        return None;
253    }
254
255    // In-memory head+tail truncation (Memory mode or no `localfs`): no disk I/O.
256    truncate_in_memory(result, config, max)
257}
258
259/// Truncate output in memory to head+tail, with no disk I/O.
260///
261/// Sets `did_spill = true` so the kernel remaps the exit code to 3 — the same
262/// "output was capped" signal as a disk spill — but the message carries no file
263/// path because there is no recoverable file. Returns `None` (no `SpillResult`,
264/// since nothing was written); the caller distinguishes truncation via
265/// `result.did_spill`.
266///
267/// Memory is bounded: large structured `OutputData` is streamed through a byte
268/// budget rather than materialized into a full `String`, so a builtin emitting
269/// a huge tree (e.g. a recursive `ls` of a giant directory) cannot OOM a
270/// read-only kernel.
271fn truncate_in_memory(
272    result: &mut ExecResult,
273    config: &OutputLimitConfig,
274    max: usize,
275) -> Option<SpillResult> {
276    // Structured OutputData: estimate first. If it would clearly overflow,
277    // render only a bounded head prefix via `write_canonical` rather than
278    // materializing the whole thing.
279    if let Some(output) = result.output() {
280        let estimate = output.estimated_byte_size();
281        if estimate > max {
282            // Render a bounded head prefix only — no full materialization.
283            let mut buf = Vec::with_capacity(config.head_bytes + 64);
284            // write_canonical stops shortly after the budget; ignore the count.
285            let _ = output.write_canonical(&mut buf, Some(config.head_bytes));
286            let s = String::from_utf8_lossy(&buf);
287            let head = truncate_to_char_boundary(&s, config.head_bytes);
288            let truncated = format!(
289                "{}\n...\n[output truncated in memory: ~{} bytes (exceeds {} byte limit) — head only, no spill file]",
290                head, estimate, max
291            );
292            result.set_out(truncated);
293            result.did_spill = true;
294            return None;
295        }
296        // Small enough to materialize safely.
297        result.materialize();
298    }
299
300    let total = result.text_out().len();
301    if total <= max {
302        return None;
303    }
304
305    // Already-materialized text fits in memory (it was produced into RAM
306    // regardless) — give a precise head+tail+total.
307    let text = result.text_out().into_owned();
308    let head = truncate_to_char_boundary(&text, config.head_bytes);
309    let tail = tail_from_str(&text, config.tail_bytes);
310    let truncated = format!(
311        "{}\n...\n{}\n[output truncated in memory: {} bytes total — no spill file]",
312        head, tail, total
313    );
314    result.set_out(truncated);
315    result.did_spill = true;
316    None
317}
318
319/// Spill an already-materialized string in result.out.
320#[cfg(feature = "localfs")]
321async fn spill_string(
322    result: &mut ExecResult,
323    config: &OutputLimitConfig,
324    max: usize,
325) -> Option<SpillResult> {
326    let total = result.text_out().len();
327    match write_spill_file(result.text_out().as_bytes()).await {
328        Ok((path, written)) => {
329            let truncated = build_truncated_output(&result.text_out(), config, &path, total);
330            result.set_out(truncated);
331            result.did_spill = true;
332            Some(SpillResult {
333                path,
334                total_bytes: written,
335            })
336        }
337        Err(e) => {
338            tracing::error!("output spill failed: {}", e);
339            *result = ExecResult::failure(1, format!(
340                "output exceeded {} byte limit ({} bytes) and spill to disk failed: {}",
341                max, total, e
342            ));
343            None
344        }
345    }
346}
347
348/// Stream OutputData directly to a spill file without materializing the full String.
349#[cfg(feature = "localfs")]
350async fn spill_output_data(
351    result: &mut ExecResult,
352    config: &OutputLimitConfig,
353    max: usize,
354) -> Option<SpillResult> {
355    let output = result.output()?;
356
357    let dir = paths::spill_dir();
358    if let Err(e) = tokio::fs::create_dir_all(&dir).await {
359        tracing::error!("output spill dir creation failed: {}", e);
360        *result = ExecResult::failure(1, format!(
361            "output exceeded {} byte limit and spill dir creation failed: {}", max, e
362        ));
363        return None;
364    }
365
366    let filename = generate_spill_filename();
367    let path = dir.join(&filename);
368
369    // Write OutputData directly to file via write_canonical
370    let total = match std::fs::File::create(&path) {
371        Ok(mut file) => {
372            match output.write_canonical(&mut file, None) {
373                Ok(n) => n,
374                Err(e) => {
375                    tracing::error!("output spill write failed: {}", e);
376                    *result = ExecResult::failure(1, format!(
377                        "output exceeded {} byte limit and spill to disk failed: {}", max, e
378                    ));
379                    return None;
380                }
381            }
382        }
383        Err(e) => {
384            tracing::error!("output spill file creation failed: {}", e);
385            *result = ExecResult::failure(1, format!(
386                "output exceeded {} byte limit and spill to disk failed: {}", max, e
387            ));
388            return None;
389        }
390    };
391
392    // Read head and tail from the spill file for the truncated preview
393    let head = read_head_from_file(&path, config.head_bytes).await.unwrap_or_default();
394    let tail = read_tail_from_file(&path, config.tail_bytes).await.unwrap_or_default();
395    let path_str = path.to_string_lossy();
396
397    result.set_out(format!(
398        "{}\n...\n{}\n[output truncated: {} bytes total — full output at {}]",
399        head, tail, total, path_str
400    ));
401    result.did_spill = true;
402
403    Some(SpillResult {
404        path,
405        total_bytes: total,
406    })
407}
408
409/// Collect stdout from a child process with spill-aware size limiting.
410///
411/// Two-phase approach:
412/// 1. Detection window (up to 1s): accumulate in memory
413/// 2. If still running after 1s and over limit: stream to spill file
414///
415/// Returns `(stdout_string, stderr_string, did_spill)`.
416///
417/// Handles child-process stdio, so it lives on the `subprocess` axis (which
418/// implies `localfs`); the pure disk-spill helpers below stay on `localfs`.
419#[cfg(feature = "subprocess")]
420pub async fn spill_aware_collect(
421    mut stdout: tokio::process::ChildStdout,
422    mut stderr_reader: tokio::process::ChildStderr,
423    stderr_stream: Option<crate::scheduler::StderrStream>,
424    config: &OutputLimitConfig,
425) -> (Vec<u8>, String, bool) {
426    let max = config.max_bytes.unwrap_or(usize::MAX);
427
428    // Spawn stderr collection
429    let stderr_task = tokio::spawn(async move {
430        collect_stderr(&mut stderr_reader, stderr_stream.as_ref()).await
431    });
432
433    // stdout is returned as RAW bytes — the caller applies `success_text_or_bytes`
434    // so a binary-producing external command (curl, openssl, gzip) isn't lossy
435    // decoded. The over-limit spill/truncation *notice* is valid UTF-8 text, so
436    // it round-trips through the same path as a plain text result.
437    let (stdout_result, did_spill) = collect_stdout_with_spill(&mut stdout, max, config).await;
438
439    let stderr = stderr_task.await.unwrap_or_default();
440    (stdout_result, stderr, did_spill)
441}
442
443/// Collect stderr (same pattern as existing dispatch code).
444#[cfg(feature = "subprocess")]
445async fn collect_stderr(
446    reader: &mut tokio::process::ChildStderr,
447    stream: Option<&crate::scheduler::StderrStream>,
448) -> String {
449    use tokio::io::AsyncReadExt;
450
451    let mut buf = Vec::new();
452    let mut chunk = [0u8; 8192];
453    loop {
454        match reader.read(&mut chunk).await {
455            Ok(0) => break,
456            Ok(n) => {
457                if let Some(s) = stream {
458                    s.write(&chunk[..n]);
459                } else {
460                    buf.extend_from_slice(&chunk[..n]);
461                }
462            }
463            Err(_) => break,
464        }
465    }
466    if stream.is_some() {
467        String::new()
468    } else {
469        String::from_utf8_lossy(&buf).into_owned()
470    }
471}
472
473/// Collect stdout with two-phase spill detection.
474///
475/// Generic over `AsyncRead + Unpin` — works with `ChildStdout` in production
476/// and `DuplexStream` in tests.
477///
478/// Returns `(stdout_string, did_spill)`.
479#[cfg(feature = "subprocess")]
480async fn collect_stdout_with_spill<R: tokio::io::AsyncRead + Unpin>(
481    stdout: &mut R,
482    max_bytes: usize,
483    config: &OutputLimitConfig,
484) -> (Vec<u8>, bool) {
485    use tokio::io::AsyncReadExt;
486    use tokio::time::{sleep, Duration};
487
488    let mut buffer = Vec::new();
489    let mut chunk = [0u8; 8192];
490    let deadline = sleep(Duration::from_secs(1));
491    tokio::pin!(deadline);
492
493    // Phase 1: Detection window (up to 1s)
494    loop {
495        tokio::select! {
496            biased;
497            result = stdout.read(&mut chunk) => {
498                match result {
499                    Ok(0) => {
500                        // EOF — command finished within detection window.
501                        // Post-hoc spill check happens in the caller.
502                        return (buffer, false);
503                    }
504                    Ok(n) => {
505                        buffer.extend_from_slice(&chunk[..n]);
506                        // Break early if already over limit — don't OOM during detection window
507                        if buffer.len() > max_bytes {
508                            break;
509                        }
510                    }
511                    Err(_) => {
512                        return (buffer, false);
513                    }
514                }
515            }
516            () = &mut deadline => {
517                // 1s elapsed, command still running
518                break;
519            }
520        }
521    }
522
523    // Phase 2: Check if we should switch to spill mode
524    if buffer.len() > max_bytes {
525        // Already over limit — hand off to the disk-spill or in-memory-drain path
526        let (msg, spilled) = handle_overflow(&buffer, stdout, config, max_bytes).await;
527        return (msg.into_bytes(), spilled);
528    }
529
530    // Continue collecting (under limit so far)
531    // Check size after each chunk
532    loop {
533        match stdout.read(&mut chunk).await {
534            Ok(0) => break,
535            Ok(n) => {
536                buffer.extend_from_slice(&chunk[..n]);
537                // Check if we've exceeded limit mid-stream
538                if buffer.len() > max_bytes {
539                    let (msg, spilled) = handle_overflow(&buffer, stdout, config, max_bytes).await;
540        return (msg.into_bytes(), spilled);
541                }
542            }
543            Err(_) => break,
544        }
545    }
546
547    (buffer, false)
548}
549
550/// Decide how to handle stdout that has overflowed the limit: spill the rest to
551/// a disk file ([`SpillMode::Disk`]) or drain it with a bounded in-memory
552/// head+tail buffer ([`SpillMode::Memory`]). Returns `(message, did_spill)`.
553#[cfg(feature = "subprocess")]
554async fn handle_overflow<R: tokio::io::AsyncRead + Unpin>(
555    buffer: &[u8],
556    stdout: &mut R,
557    config: &OutputLimitConfig,
558    max_bytes: usize,
559) -> (String, bool) {
560    // Memory mode: never touch disk. Drain with a bounded head+tail buffer so
561    // an unbounded child cannot OOM us.
562    if config.spill_mode == SpillMode::Memory {
563        return (drain_in_memory(buffer, stdout, config).await, true);
564    }
565
566    match stream_to_spill(buffer, stdout, config).await {
567        Ok(result) => (result, true),
568        Err(e) => {
569            // Spill failed — return error. Don't continue accumulating.
570            // Dropping stdout closes the pipe, which sends SIGPIPE to the child.
571            tracing::error!("streaming spill failed: {}", e);
572            (
573                format!(
574                    "ERROR: output exceeded {} byte limit ({} bytes buffered) and spill to disk failed: {}",
575                    max_bytes,
576                    buffer.len(),
577                    e
578                ),
579                false,
580            )
581        }
582    }
583}
584
585/// Drain stdout to EOF keeping only a bounded head+tail in memory, discarding
586/// the middle. Memory use is capped at `head_bytes + tail_bytes + one chunk`
587/// regardless of how much the child produces. Counts the true total so the
588/// truncation marker is honest about how much was dropped.
589#[cfg(feature = "subprocess")]
590async fn drain_in_memory<R: tokio::io::AsyncRead + Unpin>(
591    buffer: &[u8],
592    stdout: &mut R,
593    config: &OutputLimitConfig,
594) -> String {
595    use tokio::io::AsyncReadExt;
596
597    // Head is fixed from the prefix we have already buffered.
598    let head = {
599        let s = String::from_utf8_lossy(buffer);
600        truncate_to_char_boundary(&s, config.head_bytes).to_string()
601    };
602
603    // Tail ring over the entire stream, bounded to tail_bytes.
604    let cap = config.tail_bytes;
605    let mut tail: std::collections::VecDeque<u8> = std::collections::VecDeque::with_capacity(cap + 1);
606    extend_ring(&mut tail, buffer, cap);
607    let mut total = buffer.len();
608
609    let mut chunk = [0u8; 8192];
610    loop {
611        match stdout.read(&mut chunk).await {
612            Ok(0) => break,
613            Ok(n) => {
614                total += n;
615                extend_ring(&mut tail, &chunk[..n], cap);
616            }
617            Err(_) => break,
618        }
619    }
620
621    let tail_bytes: Vec<u8> = tail.into_iter().collect();
622    let tail_str = String::from_utf8_lossy(&tail_bytes);
623    let dropped = total.saturating_sub(head.len() + tail_bytes.len());
624    format!(
625        "{}\n...\n{}\n[output truncated in memory: {} bytes total, {} discarded — no spill file]",
626        head, tail_str, total, dropped
627    )
628}
629
630/// Append `bytes` to a tail ring buffer bounded to `cap` bytes, evicting from
631/// the front. If `bytes` alone exceeds `cap`, only its last `cap` bytes are kept.
632#[cfg(feature = "subprocess")]
633fn extend_ring(ring: &mut std::collections::VecDeque<u8>, bytes: &[u8], cap: usize) {
634    if cap == 0 {
635        return;
636    }
637    let start = bytes.len().saturating_sub(cap);
638    for &b in &bytes[start..] {
639        if ring.len() == cap {
640            ring.pop_front();
641        }
642        ring.push_back(b);
643    }
644}
645
646/// Write buffered data + remaining stdout to a spill file, return truncated result.
647///
648/// Generic over `AsyncRead + Unpin` for testability.
649#[cfg(feature = "subprocess")]
650async fn stream_to_spill<R: tokio::io::AsyncRead + Unpin>(
651    buffer: &[u8],
652    stdout: &mut R,
653    config: &OutputLimitConfig,
654) -> Result<String, std::io::Error> {
655    use tokio::io::AsyncReadExt;
656
657    let spill_dir = paths::spill_dir();
658    tokio::fs::create_dir_all(&spill_dir).await?;
659
660    let filename = generate_spill_filename();
661    let path = spill_dir.join(&filename);
662    let mut file = tokio::fs::File::create(&path).await?;
663
664    // Write buffered data
665    use tokio::io::AsyncWriteExt;
666    file.write_all(buffer).await?;
667    let mut total = buffer.len();
668
669    // Stream remaining chunks directly to file
670    let mut chunk = [0u8; 8192];
671    loop {
672        match stdout.read(&mut chunk).await {
673            Ok(0) => break,
674            Ok(n) => {
675                file.write_all(&chunk[..n]).await?;
676                total += n;
677            }
678            Err(_) => break,
679        }
680    }
681    file.flush().await?;
682
683    // Read head + tail for the truncated message
684    let full = String::from_utf8_lossy(buffer);
685    let head = truncate_to_char_boundary(&full, config.head_bytes);
686
687    // For tail, read from the spill file if buffer doesn't cover the end
688    let tail: String = if total <= buffer.len() {
689        let full_str = String::from_utf8_lossy(buffer);
690        tail_from_str(&full_str, config.tail_bytes).to_string()
691    } else {
692        read_tail_from_file(&path, config.tail_bytes).await.unwrap_or_default()
693    };
694
695    let path_str = path.to_string_lossy();
696    Ok(format!(
697        "{}\n...\n{}\n[output truncated: {} bytes total — full output at {}]",
698        head, tail, total, path_str
699    ))
700}
701
702/// Write output bytes to a new spill file. Returns (path, bytes_written).
703#[cfg(feature = "localfs")]
704async fn write_spill_file(data: &[u8]) -> Result<(PathBuf, usize), std::io::Error> {
705    let dir = paths::spill_dir();
706    tokio::fs::create_dir_all(&dir).await?;
707
708    let filename = generate_spill_filename();
709    let path = dir.join(filename);
710    tokio::fs::write(&path, data).await?;
711    Ok((path, data.len()))
712}
713
714/// Build the truncated output string with head, tail, and pointer.
715#[cfg(feature = "localfs")]
716fn build_truncated_output(
717    full: &str,
718    config: &OutputLimitConfig,
719    spill_path: &std::path::Path,
720    total_bytes: usize,
721) -> String {
722    let head = truncate_to_char_boundary(full, config.head_bytes);
723    let tail = tail_from_str(full, config.tail_bytes);
724    let path_str = spill_path.to_string_lossy();
725    format!(
726        "{}\n...\n{}\n[output truncated: {} bytes total — full output at {}]",
727        head, tail, total_bytes, path_str
728    )
729}
730
731/// Truncate a string to at most `max_bytes`, respecting UTF-8 char boundaries.
732fn truncate_to_char_boundary(s: &str, max_bytes: usize) -> &str {
733    if s.len() <= max_bytes {
734        return s;
735    }
736    // Find the last char boundary at or before max_bytes
737    let mut end = max_bytes;
738    while end > 0 && !s.is_char_boundary(end) {
739        end -= 1;
740    }
741    &s[..end]
742}
743
744/// Get the last `max_bytes` of a string, respecting UTF-8 char boundaries.
745fn tail_from_str(s: &str, max_bytes: usize) -> &str {
746    if s.len() <= max_bytes {
747        return s;
748    }
749    let start = s.len() - max_bytes;
750    let mut adjusted = start;
751    while adjusted < s.len() && !s.is_char_boundary(adjusted) {
752        adjusted += 1;
753    }
754    &s[adjusted..]
755}
756
757/// Read the first N bytes from a file for head preview.
758#[cfg(feature = "localfs")]
759async fn read_head_from_file(path: &std::path::Path, max_bytes: usize) -> Result<String, std::io::Error> {
760    use tokio::io::AsyncReadExt;
761
762    let mut file = tokio::fs::File::open(path).await?;
763    let mut buf = vec![0u8; max_bytes];
764    let n = file.read(&mut buf).await?;
765    buf.truncate(n);
766
767    let s = String::from_utf8_lossy(&buf);
768    // Truncate to char boundary
769    let result = truncate_to_char_boundary(&s, max_bytes);
770    Ok(result.to_string())
771}
772
773/// Read the last N bytes from a file for tail preview.
774#[cfg(feature = "localfs")]
775async fn read_tail_from_file(path: &std::path::Path, max_bytes: usize) -> Result<String, std::io::Error> {
776    use tokio::io::{AsyncReadExt, AsyncSeekExt};
777
778    let mut file = tokio::fs::File::open(path).await?;
779    let metadata = file.metadata().await?;
780    let len = metadata.len() as usize;
781
782    if len <= max_bytes {
783        let mut buf = Vec::new();
784        file.read_to_end(&mut buf).await?;
785        return Ok(String::from_utf8_lossy(&buf).into_owned());
786    }
787
788    let offset = len - max_bytes;
789    file.seek(std::io::SeekFrom::Start(offset as u64)).await?;
790    let mut buf = vec![0u8; max_bytes];
791    let n = file.read(&mut buf).await?;
792    buf.truncate(n);
793
794    // Adjust to char boundary
795    let s = String::from_utf8_lossy(&buf);
796    Ok(s.into_owned())
797}
798
799/// Generate a unique spill filename using timestamp, PID, and monotonic counter.
800#[cfg(feature = "localfs")]
801fn generate_spill_filename() -> String {
802    use std::sync::atomic::{AtomicUsize, Ordering};
803    use std::time::SystemTime;
804
805    static COUNTER: AtomicUsize = AtomicUsize::new(0);
806    let seq = COUNTER.fetch_add(1, Ordering::Relaxed);
807    let ts = SystemTime::now()
808        .duration_since(SystemTime::UNIX_EPOCH)
809        .unwrap_or_default();
810    let pid = std::process::id();
811    format!("spill-{}.{}-{}-{}.txt", ts.as_secs(), ts.subsec_nanos(), pid, seq)
812}
813
814/// Parse a size string with optional K/M suffix into bytes.
815///
816/// Accepts: "64K", "64k", "1M", "1m", "65536" (raw bytes).
817pub fn parse_size(s: &str) -> Result<usize, String> {
818    let s = s.trim();
819    if s.is_empty() {
820        return Err("empty size string".to_string());
821    }
822
823    let (num_str, multiplier) = if let Some(n) = s.strip_suffix('K').or_else(|| s.strip_suffix('k')) {
824        (n, 1024)
825    } else if let Some(n) = s.strip_suffix('M').or_else(|| s.strip_suffix('m')) {
826        (n, 1024 * 1024)
827    } else {
828        (s, 1)
829    };
830
831    let num: usize = num_str
832        .parse()
833        .map_err(|_| format!("invalid size: {}", s))?;
834
835    Ok(num * multiplier)
836}
837
838#[cfg(all(test, feature = "localfs"))]
839mod tests {
840    use super::*;
841
842    #[test]
843    fn test_none_is_disabled() {
844        let config = OutputLimitConfig::none();
845        assert!(!config.is_enabled());
846        assert_eq!(config.max_bytes(), None);
847    }
848
849    #[test]
850    fn test_mcp_is_enabled() {
851        let config = OutputLimitConfig::mcp();
852        assert!(config.is_enabled());
853        assert_eq!(config.max_bytes(), Some(8 * 1024));
854        assert_eq!(config.head_bytes(), 1024);
855        assert_eq!(config.tail_bytes(), 512);
856    }
857
858    #[test]
859    fn test_set_limit() {
860        let mut config = OutputLimitConfig::none();
861        assert!(!config.is_enabled());
862
863        config.set_limit(Some(1024));
864        assert!(config.is_enabled());
865        assert_eq!(config.max_bytes(), Some(1024));
866
867        config.set_limit(None);
868        assert!(!config.is_enabled());
869    }
870
871    #[test]
872    fn test_set_head_tail() {
873        let mut config = OutputLimitConfig::mcp();
874        config.set_head_bytes(2048);
875        config.set_tail_bytes(1024);
876        assert_eq!(config.head_bytes(), 2048);
877        assert_eq!(config.tail_bytes(), 1024);
878    }
879
880    #[test]
881    fn test_parse_size() {
882        assert_eq!(parse_size("64K").unwrap(), 64 * 1024);
883        assert_eq!(parse_size("64k").unwrap(), 64 * 1024);
884        assert_eq!(parse_size("1M").unwrap(), 1024 * 1024);
885        assert_eq!(parse_size("1m").unwrap(), 1024 * 1024);
886        assert_eq!(parse_size("65536").unwrap(), 65536);
887        assert!(parse_size("").is_err());
888        assert!(parse_size("abc").is_err());
889    }
890
891    #[test]
892    fn test_truncate_to_char_boundary() {
893        assert_eq!(truncate_to_char_boundary("hello", 10), "hello");
894        assert_eq!(truncate_to_char_boundary("hello", 3), "hel");
895        // Multi-byte: "日" is 3 bytes
896        assert_eq!(truncate_to_char_boundary("日本語", 3), "日");
897        assert_eq!(truncate_to_char_boundary("日本語", 4), "日");
898        assert_eq!(truncate_to_char_boundary("日本語", 6), "日本");
899    }
900
901    #[test]
902    fn test_tail_from_str() {
903        assert_eq!(tail_from_str("hello", 10), "hello");
904        assert_eq!(tail_from_str("hello", 3), "llo");
905        // Multi-byte
906        assert_eq!(tail_from_str("日本語", 3), "語");
907        assert_eq!(tail_from_str("日本語", 6), "本語");
908    }
909
910    #[test]
911    fn test_generate_spill_filename() {
912        let name = generate_spill_filename();
913        assert!(name.starts_with("spill-"));
914        assert!(name.ends_with(".txt"));
915    }
916
917    #[tokio::test]
918    async fn test_spill_if_needed_under_limit() {
919        let config = OutputLimitConfig::mcp();
920        let mut result = ExecResult::success("short output");
921        let spill = spill_if_needed(&mut result, &config).await;
922        assert!(spill.is_none());
923        assert_eq!(&*result.text_out(), "short output");
924        assert!(!result.did_spill);
925    }
926
927    #[tokio::test]
928    async fn test_spill_if_needed_over_limit() {
929        let config = OutputLimitConfig {
930            max_bytes: Some(100),
931            head_bytes: 20,
932            tail_bytes: 10,
933            spill_mode: SpillMode::Disk,
934        };
935        let big_output = "x".repeat(200);
936        let mut result = ExecResult::success(big_output);
937        let spill = spill_if_needed(&mut result, &config).await;
938        assert!(spill.is_some());
939        assert!(result.did_spill);
940
941        let spill = spill.unwrap();
942        assert_eq!(spill.total_bytes, 200);
943        assert!(spill.path.exists());
944
945        // Verify truncated output
946        assert!(result.text_out().contains("..."));
947        assert!(result.text_out().contains("[output truncated: 200 bytes total"));
948        assert!(result.text_out().contains(&spill.path.to_string_lossy().to_string()));
949
950        // Verify head (first 20 bytes)
951        assert!(result.text_out().starts_with(&"x".repeat(20)));
952
953        // Verify spill file has full content
954        let spill_content = tokio::fs::read_to_string(&spill.path).await.unwrap();
955        assert_eq!(spill_content.len(), 200);
956
957        // Clean up
958        let _ = tokio::fs::remove_file(&spill.path).await;
959    }
960
961    #[tokio::test]
962    async fn test_spill_if_needed_disabled() {
963        let config = OutputLimitConfig::none();
964        let big_output = "x".repeat(200);
965        let mut result = ExecResult::success(big_output.clone());
966        let spill = spill_if_needed(&mut result, &config).await;
967        assert!(spill.is_none());
968        assert_eq!(&*result.text_out(), big_output);
969        assert!(!result.did_spill);
970    }
971
972    #[test]
973    fn test_build_truncated_output() {
974        let config = OutputLimitConfig {
975            max_bytes: Some(100),
976            head_bytes: 5,
977            tail_bytes: 3,
978            spill_mode: SpillMode::Disk,
979        };
980        let full = "abcdefghijklmnop";
981        let path = PathBuf::from("/tmp/test-spill.txt");
982        let result = build_truncated_output(full, &config, &path, 16);
983        assert!(result.starts_with("abcde"));
984        assert!(result.contains("..."));
985        assert!(result.contains("nop"));
986        assert!(result.contains("[output truncated: 16 bytes total — full output at /tmp/test-spill.txt]"));
987    }
988
989    #[tokio::test]
990    async fn test_kernel_mcp_truncates_large_output() {
991        use crate::kernel::{Kernel, KernelConfig};
992
993        // MCP config has 8K limit by default — use a smaller limit for testing
994        let config = KernelConfig::mcp()
995            .with_output_limit(OutputLimitConfig {
996                max_bytes: Some(200),
997                head_bytes: 50,
998                tail_bytes: 30,
999                spill_mode: SpillMode::Disk,
1000            });
1001        let kernel = Kernel::new(config).expect("kernel creation");
1002
1003        // seq 1 10000 produces lots of output
1004        let result = kernel.execute("seq 1 10000").await.expect("execute");
1005        assert!(result.text_out().contains("[output truncated:"));
1006        assert!(result.text_out().contains("full output at"));
1007        // Head should contain the first numbers
1008        assert!(result.text_out().starts_with("1\n"));
1009    }
1010
1011    #[tokio::test]
1012    async fn test_spill_exits_3() {
1013        use crate::kernel::{Kernel, KernelConfig};
1014
1015        let config = KernelConfig::mcp()
1016            .with_output_limit(OutputLimitConfig {
1017                max_bytes: Some(100),
1018                head_bytes: 30,
1019                tail_bytes: 20,
1020                spill_mode: SpillMode::Disk,
1021            });
1022        let kernel = Kernel::new(config).expect("kernel creation");
1023
1024        let big = "x".repeat(200);
1025        let result = kernel.execute(&format!("echo '{}'", big)).await.expect("execute");
1026        assert_eq!(result.code, 3, "spill should always exit 3");
1027        assert_eq!(result.original_code, Some(0), "original command exit code preserved");
1028        assert!(result.text_out().contains("[output truncated:"));
1029    }
1030
1031    #[tokio::test]
1032    async fn test_kernel_repl_no_truncation() {
1033        use crate::kernel::{Kernel, KernelConfig};
1034
1035        // REPL has no limit
1036        let config = KernelConfig::repl();
1037        let kernel = Kernel::new(config).expect("kernel creation");
1038
1039        let result = kernel.execute("seq 1 100").await.expect("execute");
1040        assert!(!result.text_out().contains("[output truncated:"));
1041        assert!(result.text_out().contains("100"));
1042    }
1043
1044    #[tokio::test]
1045    async fn test_kernel_builtin_truncation() {
1046        use crate::kernel::{Kernel, KernelConfig};
1047
1048        // Builtins go through post-hoc spill check
1049        let config = KernelConfig::mcp()
1050            .with_output_limit(OutputLimitConfig {
1051                max_bytes: Some(100),
1052                head_bytes: 30,
1053                tail_bytes: 20,
1054                spill_mode: SpillMode::Disk,
1055            });
1056        let kernel = Kernel::new(config).expect("kernel creation");
1057
1058        // echo with a large string
1059        let big = "x".repeat(200);
1060        let result = kernel.execute(&format!("echo '{}'", big)).await.expect("execute");
1061        assert!(result.text_out().contains("[output truncated:"));
1062    }
1063
1064    // ── OutputData estimation and streaming tests ──
1065
1066    #[test]
1067    fn test_estimated_byte_size_text() {
1068        use crate::interpreter::OutputData;
1069        let data = OutputData::text("hello world");
1070        assert_eq!(data.estimated_byte_size(), 11);
1071    }
1072
1073    #[test]
1074    fn test_estimated_byte_size_table() {
1075        use crate::interpreter::{OutputData, OutputNode};
1076        let data = OutputData::table(
1077            vec!["NAME".into(), "SIZE".into()],
1078            vec![
1079                OutputNode::new("foo").with_cells(vec!["123".into()]),
1080                OutputNode::new("bar").with_cells(vec!["456".into()]),
1081            ],
1082        );
1083        // "foo\t123\nbar\t456" = 3+1+3 + 1 + 3+1+3 = 15
1084        assert_eq!(data.estimated_byte_size(), 15);
1085    }
1086
1087    #[test]
1088    fn test_estimated_byte_size_tree() {
1089        use crate::interpreter::{OutputData, OutputNode};
1090        let data = OutputData::nodes(vec![
1091            OutputNode::new("src").with_children(vec![
1092                OutputNode::new("main.rs"),
1093                OutputNode::new("lib.rs"),
1094            ]),
1095        ]);
1096        // "src/{main.rs,lib.rs}" = 3 + 2 + 7 + 1 + 6 + 1 = 20
1097        assert_eq!(data.estimated_byte_size(), 20);
1098    }
1099
1100    #[test]
1101    fn test_write_canonical_matches_to_canonical_string() {
1102        use crate::interpreter::{OutputData, OutputNode};
1103
1104        let cases: Vec<OutputData> = vec![
1105            OutputData::text("hello world"),
1106            OutputData::nodes(vec![
1107                OutputNode::new("file1"),
1108                OutputNode::new("file2"),
1109            ]),
1110            OutputData::table(
1111                vec!["NAME".into(), "SIZE".into()],
1112                vec![
1113                    OutputNode::new("foo").with_cells(vec!["123".into()]),
1114                    OutputNode::new("bar").with_cells(vec!["456".into()]),
1115                ],
1116            ),
1117            OutputData::nodes(vec![
1118                OutputNode::new("src").with_children(vec![
1119                    OutputNode::new("main.rs"),
1120                    OutputNode::new("lib.rs"),
1121                ]),
1122            ]),
1123        ];
1124
1125        for data in cases {
1126            let expected = data.to_canonical_string();
1127            let mut buf = Vec::new();
1128            let written = data.write_canonical(&mut buf, None).unwrap();
1129            let got = String::from_utf8(buf).unwrap();
1130            assert_eq!(got, expected, "write_canonical mismatch for {:?}", data);
1131            assert_eq!(written, expected.len(), "byte count mismatch");
1132        }
1133    }
1134
1135    #[test]
1136    fn test_write_canonical_budget_stops_early() {
1137        use crate::interpreter::{OutputData, OutputNode};
1138
1139        let data = OutputData::nodes(
1140            (0..1000).map(|i| OutputNode::new(format!("file_{:04}", i))).collect()
1141        );
1142        let mut buf = Vec::new();
1143        let written = data.write_canonical(&mut buf, Some(100)).unwrap();
1144        // Should have stopped shortly after 100 bytes
1145        assert!(written > 100, "should exceed budget slightly");
1146        assert!(written < 500, "should stop soon after budget: got {}", written);
1147    }
1148
1149    #[tokio::test]
1150    async fn test_spill_if_needed_large_output_data_no_oom() {
1151        use crate::interpreter::{OutputData, OutputNode};
1152
1153        let config = OutputLimitConfig {
1154            max_bytes: Some(1024),
1155            head_bytes: 100,
1156            tail_bytes: 50,
1157            spill_mode: SpillMode::Disk,
1158        };
1159
1160        // 100K nodes — large enough to detect OOM if materialized carelessly,
1161        // but small enough to not slow down the test
1162        let nodes: Vec<OutputNode> = (0..100_000)
1163            .map(|i| OutputNode::new(format!("node_{:06}", i)))
1164            .collect();
1165        let data = OutputData::nodes(nodes);
1166        let mut result = ExecResult::with_output(data);
1167
1168        let spill = spill_if_needed(&mut result, &config).await;
1169        assert!(spill.is_some(), "should have spilled");
1170        assert!(result.did_spill);
1171        assert!(result.text_out().contains("[output truncated:"));
1172
1173        // Clean up
1174        if let Some(s) = spill {
1175            let _ = tokio::fs::remove_file(&s.path).await;
1176        }
1177    }
1178
1179    // ── Streaming collector tests (using tokio::io::duplex) ──
1180    // These exercise `collect_stdout_with_spill`, the child-stdout path, so
1181    // they live on the `subprocess` axis.
1182
1183    #[cfg(feature = "subprocess")]
1184    #[tokio::test]
1185    async fn test_collect_small_output_no_spill() {
1186        let (mut writer, reader) = tokio::io::duplex(1024);
1187        let config = OutputLimitConfig {
1188            max_bytes: Some(1024),
1189            head_bytes: 100,
1190            tail_bytes: 50,
1191            spill_mode: SpillMode::Disk,
1192        };
1193
1194        // Write small data and close
1195        use tokio::io::AsyncWriteExt;
1196        writer.write_all(b"hello world").await.unwrap();
1197        drop(writer); // EOF
1198
1199        let mut reader = reader;
1200        let (result, did_spill) = collect_stdout_with_spill(&mut reader, 1024, &config).await;
1201        let result = String::from_utf8(result).expect("test output is valid UTF-8");
1202        assert_eq!(result, "hello world");
1203        assert!(!did_spill);
1204    }
1205
1206    #[cfg(feature = "subprocess")]
1207    #[tokio::test]
1208    async fn test_collect_large_output_spills() {
1209        let (mut writer, reader) = tokio::io::duplex(64 * 1024);
1210        let config = OutputLimitConfig {
1211            max_bytes: Some(100),
1212            head_bytes: 20,
1213            tail_bytes: 10,
1214            spill_mode: SpillMode::Disk,
1215        };
1216
1217        // Write data exceeding limit and close
1218        use tokio::io::AsyncWriteExt;
1219        let data = "x".repeat(500);
1220        writer.write_all(data.as_bytes()).await.unwrap();
1221        drop(writer); // EOF
1222
1223        let mut reader = reader;
1224        let (result, did_spill) = collect_stdout_with_spill(&mut reader, 100, &config).await;
1225        let result = String::from_utf8(result).expect("test output is valid UTF-8");
1226        assert!(did_spill, "should have spilled");
1227        assert!(result.contains("[output truncated:"));
1228        assert!(result.contains("full output at"));
1229    }
1230
1231    #[cfg(feature = "subprocess")]
1232    #[tokio::test]
1233    async fn test_collect_exact_boundary_no_spill() {
1234        let (mut writer, reader) = tokio::io::duplex(1024);
1235        let config = OutputLimitConfig {
1236            max_bytes: Some(100),
1237            head_bytes: 20,
1238            tail_bytes: 10,
1239            spill_mode: SpillMode::Disk,
1240        };
1241
1242        // Write exactly max_bytes
1243        use tokio::io::AsyncWriteExt;
1244        let data = "x".repeat(100);
1245        writer.write_all(data.as_bytes()).await.unwrap();
1246        drop(writer); // EOF
1247
1248        let mut reader = reader;
1249        let (result, did_spill) = collect_stdout_with_spill(&mut reader, 100, &config).await;
1250        let result = String::from_utf8(result).expect("test output is valid UTF-8");
1251        // Exactly at limit — should not spill (<=)
1252        assert!(!did_spill, "exact boundary should not spill");
1253        assert_eq!(result.len(), 100);
1254    }
1255
1256    #[cfg(feature = "subprocess")]
1257    #[tokio::test]
1258    async fn test_collect_broken_pipe() {
1259        let (writer, reader) = tokio::io::duplex(1024);
1260        let config = OutputLimitConfig {
1261            max_bytes: Some(1024),
1262            head_bytes: 100,
1263            tail_bytes: 50,
1264            spill_mode: SpillMode::Disk,
1265        };
1266
1267        // Write some data, then drop writer mid-stream
1268        use tokio::io::AsyncWriteExt;
1269        let mut writer = writer;
1270        writer.write_all(b"partial data").await.unwrap();
1271        drop(writer); // Simulate broken pipe
1272
1273        let mut reader = reader;
1274        let (result, did_spill) = collect_stdout_with_spill(&mut reader, 1024, &config).await;
1275        let result = String::from_utf8(result).expect("test output is valid UTF-8");
1276        assert_eq!(result, "partial data");
1277        assert!(!did_spill);
1278    }
1279
1280    // ── In-memory spill mode (SpillMode::Memory) ──
1281
1282    #[test]
1283    fn test_in_memory_builder_and_default() {
1284        assert_eq!(OutputLimitConfig::mcp().spill_mode(), SpillMode::Disk);
1285        assert_eq!(OutputLimitConfig::mcp().in_memory().spill_mode(), SpillMode::Memory);
1286
1287        let mut config = OutputLimitConfig::none();
1288        config.set_spill_mode(SpillMode::Memory);
1289        assert_eq!(config.spill_mode(), SpillMode::Memory);
1290    }
1291
1292    #[tokio::test]
1293    async fn test_memory_mode_truncates_string_without_disk() {
1294        let config = OutputLimitConfig {
1295            max_bytes: Some(100),
1296            head_bytes: 20,
1297            tail_bytes: 10,
1298            spill_mode: SpillMode::Memory,
1299        };
1300        let mut result = ExecResult::success("x".repeat(200));
1301        let spill = spill_if_needed(&mut result, &config).await;
1302
1303        // No SpillResult (no file written) but did_spill flags the truncation.
1304        assert!(spill.is_none(), "memory mode must not write a spill file");
1305        assert!(result.did_spill, "memory truncation must set did_spill for the exit-3 remap");
1306
1307        let out = result.text_out();
1308        assert!(out.contains("truncated in memory"), "got: {}", out);
1309        assert!(out.contains("200 bytes total"), "got: {}", out);
1310        assert!(!out.contains("full output at"), "memory mode must not point at a file: {}", out);
1311        assert!(out.starts_with(&"x".repeat(20)), "head preserved");
1312    }
1313
1314    #[tokio::test]
1315    async fn test_memory_mode_under_limit_untouched() {
1316        let config = OutputLimitConfig {
1317            max_bytes: Some(100),
1318            head_bytes: 20,
1319            tail_bytes: 10,
1320            spill_mode: SpillMode::Memory,
1321        };
1322        let mut result = ExecResult::success("short");
1323        let spill = spill_if_needed(&mut result, &config).await;
1324        assert!(spill.is_none());
1325        assert!(!result.did_spill);
1326        assert_eq!(&*result.text_out(), "short");
1327    }
1328
1329    #[tokio::test]
1330    async fn test_memory_mode_large_output_data_bounded() {
1331        use crate::interpreter::{OutputData, OutputNode};
1332
1333        let config = OutputLimitConfig {
1334            max_bytes: Some(1024),
1335            head_bytes: 100,
1336            tail_bytes: 50,
1337            spill_mode: SpillMode::Memory,
1338        };
1339
1340        // 100K nodes — would be a huge String if fully materialized.
1341        let nodes: Vec<OutputNode> = (0..100_000)
1342            .map(|i| OutputNode::new(format!("node_{:06}", i)))
1343            .collect();
1344        let mut result = ExecResult::with_output(OutputData::nodes(nodes));
1345
1346        let spill = spill_if_needed(&mut result, &config).await;
1347        assert!(spill.is_none(), "memory mode writes no file");
1348        assert!(result.did_spill);
1349        let out = result.text_out();
1350        assert!(out.contains("truncated in memory"), "got: {}", out);
1351        assert!(out.starts_with("node_000000"), "head rendered: {}", out);
1352        // Head-only path for oversized structured data: no tail section echoed.
1353        assert!(out.contains("head only"), "got: {}", out);
1354    }
1355
1356    #[tokio::test]
1357    async fn test_kernel_memory_mode_exits_3_preserves_original() {
1358        use crate::kernel::{Kernel, KernelConfig};
1359
1360        let config = KernelConfig::mcp().with_output_limit(OutputLimitConfig {
1361            max_bytes: Some(100),
1362            head_bytes: 30,
1363            tail_bytes: 20,
1364            spill_mode: SpillMode::Memory,
1365        });
1366        let kernel = Kernel::new(config).expect("kernel creation");
1367
1368        let big = "x".repeat(200);
1369        let result = kernel.execute(&format!("echo '{}'", big)).await.expect("execute");
1370        assert_eq!(result.code, 3, "memory truncation still signals via exit 3");
1371        assert_eq!(result.original_code, Some(0), "original exit code preserved");
1372        assert!(result.text_out().contains("truncated in memory"));
1373        assert!(!result.text_out().contains("full output at"));
1374    }
1375
1376    #[tokio::test]
1377    async fn test_nolocal_kernel_forces_memory_spill() {
1378        use crate::kernel::{Kernel, KernelConfig, VfsMountMode};
1379
1380        // NoLocal mount + an explicit Disk spill mode: the kernel must override
1381        // to Memory so nothing is written to a host spill file, even though
1382        // `localfs` is compiled in.
1383        let config = KernelConfig::mcp()
1384            .with_vfs_mode(VfsMountMode::NoLocal)
1385            .with_output_limit(OutputLimitConfig {
1386                max_bytes: Some(100),
1387                head_bytes: 30,
1388                tail_bytes: 20,
1389                spill_mode: SpillMode::Disk,
1390            });
1391        let kernel = Kernel::new(config).expect("kernel creation");
1392
1393        let big = "x".repeat(200);
1394        let result = kernel.execute(&format!("echo '{}'", big)).await.expect("execute");
1395        assert_eq!(result.code, 3, "still signals truncation via exit 3");
1396        assert!(result.text_out().contains("truncated in memory"), "got: {}", result.text_out());
1397        assert!(
1398            !result.text_out().contains("full output at"),
1399            "NoLocal kernel must not write a host spill file: {}",
1400            result.text_out()
1401        );
1402    }
1403
1404    #[cfg(feature = "subprocess")]
1405    #[tokio::test]
1406    async fn test_collect_memory_mode_drains_without_disk() {
1407        let (mut writer, reader) = tokio::io::duplex(64 * 1024);
1408        let config = OutputLimitConfig {
1409            max_bytes: Some(100),
1410            head_bytes: 20,
1411            tail_bytes: 10,
1412            spill_mode: SpillMode::Memory,
1413        };
1414
1415        use tokio::io::AsyncWriteExt;
1416        // head 'a's, filler 'b's, tail 'c's so we can check head+tail survive.
1417        let data = format!("{}{}{}", "a".repeat(20), "b".repeat(500), "c".repeat(10));
1418        writer.write_all(data.as_bytes()).await.unwrap();
1419        drop(writer);
1420
1421        let mut reader = reader;
1422        let (result, did_spill) = collect_stdout_with_spill(&mut reader, 100, &config).await;
1423        let result = String::from_utf8(result).expect("test output is valid UTF-8");
1424        assert!(did_spill, "drain flags truncation for the exit-3 remap");
1425        assert!(result.contains("truncated in memory"), "got: {}", result);
1426        assert!(!result.contains("full output at"), "no disk file in memory mode");
1427        assert!(result.starts_with(&"a".repeat(20)), "head preserved: {}", result);
1428        assert!(result.contains(&"c".repeat(10)), "tail preserved: {}", result);
1429        assert!(result.contains("530 bytes total"), "honest total: {}", result);
1430    }
1431
1432    #[cfg(feature = "subprocess")]
1433    #[test]
1434    fn test_extend_ring_keeps_last_cap_bytes() {
1435        let mut ring = std::collections::VecDeque::new();
1436        super::extend_ring(&mut ring, b"abcdef", 3);
1437        assert_eq!(ring.iter().copied().collect::<Vec<u8>>(), b"def");
1438        // Subsequent pushes keep evicting from the front.
1439        super::extend_ring(&mut ring, b"gh", 3);
1440        assert_eq!(ring.iter().copied().collect::<Vec<u8>>(), b"fgh");
1441        // cap 0 retains nothing.
1442        let mut empty = std::collections::VecDeque::new();
1443        super::extend_ring(&mut empty, b"xyz", 0);
1444        assert!(empty.is_empty());
1445    }
1446}