1use std::path::PathBuf;
21
22use crate::interpreter::ExecResult;
23#[cfg(feature = "localfs")]
24use crate::paths;
25
26const DEFAULT_MCP_LIMIT: usize = 8 * 1024;
28
29const DEFAULT_HEAD_BYTES: usize = 1024;
31
32const DEFAULT_TAIL_BYTES: usize = 512;
34
35#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
42pub enum SpillMode {
43 #[default]
51 Disk,
52 Memory,
56}
57
58#[derive(Debug, Clone)]
63pub struct OutputLimitConfig {
64 max_bytes: Option<usize>,
65 head_bytes: usize,
66 tail_bytes: usize,
67 spill_mode: SpillMode,
68}
69
70impl OutputLimitConfig {
71 pub fn none() -> Self {
73 Self {
74 max_bytes: None,
75 head_bytes: DEFAULT_HEAD_BYTES,
76 tail_bytes: DEFAULT_TAIL_BYTES,
77 spill_mode: SpillMode::Disk,
78 }
79 }
80
81 pub fn default_limit() -> usize {
83 DEFAULT_MCP_LIMIT
84 }
85
86 pub fn mcp() -> Self {
88 Self {
89 max_bytes: Some(DEFAULT_MCP_LIMIT),
90 head_bytes: DEFAULT_HEAD_BYTES,
91 tail_bytes: DEFAULT_TAIL_BYTES,
92 spill_mode: SpillMode::Disk,
93 }
94 }
95
96 pub fn in_memory(mut self) -> Self {
104 self.spill_mode = SpillMode::Memory;
105 self
106 }
107
108 pub fn is_enabled(&self) -> bool {
110 self.max_bytes.is_some()
111 }
112
113 pub fn spill_mode(&self) -> SpillMode {
115 self.spill_mode
116 }
117
118 pub fn set_spill_mode(&mut self, mode: SpillMode) {
120 self.spill_mode = mode;
121 }
122
123 pub fn max_bytes(&self) -> Option<usize> {
125 self.max_bytes
126 }
127
128 pub fn head_bytes(&self) -> usize {
130 self.head_bytes
131 }
132
133 pub fn tail_bytes(&self) -> usize {
135 self.tail_bytes
136 }
137
138 pub fn set_limit(&mut self, max: Option<usize>) {
140 self.max_bytes = max;
141 }
142
143 pub fn set_head_bytes(&mut self, bytes: usize) {
145 self.head_bytes = bytes;
146 }
147
148 pub fn set_tail_bytes(&mut self, bytes: usize) {
150 self.tail_bytes = bytes;
151 }
152}
153
154pub struct SpillResult {
156 pub path: PathBuf,
157 pub total_bytes: usize,
158}
159
160pub async fn spill_if_needed(
172 result: &mut ExecResult,
173 config: &OutputLimitConfig,
174) -> Option<SpillResult> {
175 let max = config.max_bytes?;
176
177 if let Some(total) = result.out_bytes().map(|b| b.len()) {
181 if total <= max {
182 return None;
183 }
184 #[cfg(feature = "localfs")]
185 if config.spill_mode == SpillMode::Disk {
186 let bytes = result.out_bytes().unwrap_or_default().to_vec();
187 return match write_spill_file(&bytes).await {
188 Ok((path, written)) => {
189 result.set_out(format!(
190 "[binary output: {total} bytes spilled to {} — read it with `cat {}`]",
191 path.display(),
192 path.display()
193 ));
194 result.did_spill = true;
195 Some(SpillResult { path, total_bytes: written })
196 }
197 Err(e) => {
198 tracing::error!("binary output spill failed: {}", e);
199 *result = ExecResult::failure(
200 1,
201 format!(
202 "binary output exceeded {max} byte limit ({total} bytes) and spill \
203 to disk failed: {e}"
204 ),
205 );
206 None
207 }
208 };
209 }
210 let bytes = result.out_bytes().unwrap_or_default().to_vec();
213 let head_n = config.head_bytes.min(bytes.len());
214 let tail_n = config.tail_bytes.min(bytes.len().saturating_sub(head_n));
215 let mut truncated = bytes[..head_n].to_vec();
216 truncated.extend_from_slice(&bytes[bytes.len() - tail_n..]);
217 result.set_out_bytes(truncated);
218 result.did_spill = true;
219 return None;
220 }
221
222 #[cfg(feature = "localfs")]
225 if config.spill_mode == SpillMode::Disk {
226 if !result.text_out().is_empty() && !result.has_output() {
228 let total = result.text_out().len();
229 if total <= max {
230 return None;
231 }
232 return spill_string(result, config, max).await;
233 }
234
235 if let Some(output) = result.output() {
237 let estimate = output.estimated_byte_size();
238 if estimate <= max {
239 result.materialize();
241 if result.text_out().len() <= max {
243 return None;
244 }
245 return spill_string(result, config, max).await;
246 }
247
248 return spill_output_data(result, config, max).await;
250 }
251
252 return None;
253 }
254
255 truncate_in_memory(result, config, max)
257}
258
259fn truncate_in_memory(
272 result: &mut ExecResult,
273 config: &OutputLimitConfig,
274 max: usize,
275) -> Option<SpillResult> {
276 if let Some(output) = result.output() {
280 let estimate = output.estimated_byte_size();
281 if estimate > max {
282 let mut buf = Vec::with_capacity(config.head_bytes + 64);
284 let _ = output.write_canonical(&mut buf, Some(config.head_bytes));
286 let s = String::from_utf8_lossy(&buf);
287 let head = truncate_to_char_boundary(&s, config.head_bytes);
288 let truncated = format!(
289 "{}\n...\n[output truncated in memory: ~{} bytes (exceeds {} byte limit) — head only, no spill file]",
290 head, estimate, max
291 );
292 result.set_out(truncated);
293 result.did_spill = true;
294 return None;
295 }
296 result.materialize();
298 }
299
300 let total = result.text_out().len();
301 if total <= max {
302 return None;
303 }
304
305 let text = result.text_out().into_owned();
308 let head = truncate_to_char_boundary(&text, config.head_bytes);
309 let tail = tail_from_str(&text, config.tail_bytes);
310 let truncated = format!(
311 "{}\n...\n{}\n[output truncated in memory: {} bytes total — no spill file]",
312 head, tail, total
313 );
314 result.set_out(truncated);
315 result.did_spill = true;
316 None
317}
318
319#[cfg(feature = "localfs")]
321async fn spill_string(
322 result: &mut ExecResult,
323 config: &OutputLimitConfig,
324 max: usize,
325) -> Option<SpillResult> {
326 let total = result.text_out().len();
327 match write_spill_file(result.text_out().as_bytes()).await {
328 Ok((path, written)) => {
329 let truncated = build_truncated_output(&result.text_out(), config, &path, total);
330 result.set_out(truncated);
331 result.did_spill = true;
332 Some(SpillResult {
333 path,
334 total_bytes: written,
335 })
336 }
337 Err(e) => {
338 tracing::error!("output spill failed: {}", e);
339 *result = ExecResult::failure(1, format!(
340 "output exceeded {} byte limit ({} bytes) and spill to disk failed: {}",
341 max, total, e
342 ));
343 None
344 }
345 }
346}
347
348#[cfg(feature = "localfs")]
350async fn spill_output_data(
351 result: &mut ExecResult,
352 config: &OutputLimitConfig,
353 max: usize,
354) -> Option<SpillResult> {
355 let output = result.output()?;
356
357 let dir = paths::spill_dir();
358 if let Err(e) = tokio::fs::create_dir_all(&dir).await {
359 tracing::error!("output spill dir creation failed: {}", e);
360 *result = ExecResult::failure(1, format!(
361 "output exceeded {} byte limit and spill dir creation failed: {}", max, e
362 ));
363 return None;
364 }
365
366 let filename = generate_spill_filename();
367 let path = dir.join(&filename);
368
369 let total = match std::fs::File::create(&path) {
371 Ok(mut file) => {
372 match output.write_canonical(&mut file, None) {
373 Ok(n) => n,
374 Err(e) => {
375 tracing::error!("output spill write failed: {}", e);
376 *result = ExecResult::failure(1, format!(
377 "output exceeded {} byte limit and spill to disk failed: {}", max, e
378 ));
379 return None;
380 }
381 }
382 }
383 Err(e) => {
384 tracing::error!("output spill file creation failed: {}", e);
385 *result = ExecResult::failure(1, format!(
386 "output exceeded {} byte limit and spill to disk failed: {}", max, e
387 ));
388 return None;
389 }
390 };
391
392 let head = read_head_from_file(&path, config.head_bytes).await.unwrap_or_default();
394 let tail = read_tail_from_file(&path, config.tail_bytes).await.unwrap_or_default();
395 let path_str = path.to_string_lossy();
396
397 result.set_out(format!(
398 "{}\n...\n{}\n[output truncated: {} bytes total — full output at {}]",
399 head, tail, total, path_str
400 ));
401 result.did_spill = true;
402
403 Some(SpillResult {
404 path,
405 total_bytes: total,
406 })
407}
408
409#[cfg(feature = "subprocess")]
420pub async fn spill_aware_collect(
421 mut stdout: tokio::process::ChildStdout,
422 mut stderr_reader: tokio::process::ChildStderr,
423 stderr_stream: Option<crate::scheduler::StderrStream>,
424 config: &OutputLimitConfig,
425) -> (Vec<u8>, String, bool) {
426 let max = config.max_bytes.unwrap_or(usize::MAX);
427
428 let stderr_task = tokio::spawn(async move {
430 collect_stderr(&mut stderr_reader, stderr_stream.as_ref()).await
431 });
432
433 let (stdout_result, did_spill) = collect_stdout_with_spill(&mut stdout, max, config).await;
438
439 let stderr = stderr_task.await.unwrap_or_default();
440 (stdout_result, stderr, did_spill)
441}
442
443#[cfg(feature = "subprocess")]
445async fn collect_stderr(
446 reader: &mut tokio::process::ChildStderr,
447 stream: Option<&crate::scheduler::StderrStream>,
448) -> String {
449 use tokio::io::AsyncReadExt;
450
451 let mut buf = Vec::new();
452 let mut chunk = [0u8; 8192];
453 loop {
454 match reader.read(&mut chunk).await {
455 Ok(0) => break,
456 Ok(n) => {
457 if let Some(s) = stream {
458 s.write(&chunk[..n]);
459 } else {
460 buf.extend_from_slice(&chunk[..n]);
461 }
462 }
463 Err(_) => break,
464 }
465 }
466 if stream.is_some() {
467 String::new()
468 } else {
469 String::from_utf8_lossy(&buf).into_owned()
470 }
471}
472
473#[cfg(feature = "subprocess")]
480async fn collect_stdout_with_spill<R: tokio::io::AsyncRead + Unpin>(
481 stdout: &mut R,
482 max_bytes: usize,
483 config: &OutputLimitConfig,
484) -> (Vec<u8>, bool) {
485 use tokio::io::AsyncReadExt;
486 use tokio::time::{sleep, Duration};
487
488 let mut buffer = Vec::new();
489 let mut chunk = [0u8; 8192];
490 let deadline = sleep(Duration::from_secs(1));
491 tokio::pin!(deadline);
492
493 loop {
495 tokio::select! {
496 biased;
497 result = stdout.read(&mut chunk) => {
498 match result {
499 Ok(0) => {
500 return (buffer, false);
503 }
504 Ok(n) => {
505 buffer.extend_from_slice(&chunk[..n]);
506 if buffer.len() > max_bytes {
508 break;
509 }
510 }
511 Err(_) => {
512 return (buffer, false);
513 }
514 }
515 }
516 () = &mut deadline => {
517 break;
519 }
520 }
521 }
522
523 if buffer.len() > max_bytes {
525 let (msg, spilled) = handle_overflow(&buffer, stdout, config, max_bytes).await;
527 return (msg.into_bytes(), spilled);
528 }
529
530 loop {
533 match stdout.read(&mut chunk).await {
534 Ok(0) => break,
535 Ok(n) => {
536 buffer.extend_from_slice(&chunk[..n]);
537 if buffer.len() > max_bytes {
539 let (msg, spilled) = handle_overflow(&buffer, stdout, config, max_bytes).await;
540 return (msg.into_bytes(), spilled);
541 }
542 }
543 Err(_) => break,
544 }
545 }
546
547 (buffer, false)
548}
549
550#[cfg(feature = "subprocess")]
554async fn handle_overflow<R: tokio::io::AsyncRead + Unpin>(
555 buffer: &[u8],
556 stdout: &mut R,
557 config: &OutputLimitConfig,
558 max_bytes: usize,
559) -> (String, bool) {
560 if config.spill_mode == SpillMode::Memory {
563 return (drain_in_memory(buffer, stdout, config).await, true);
564 }
565
566 match stream_to_spill(buffer, stdout, config).await {
567 Ok(result) => (result, true),
568 Err(e) => {
569 tracing::error!("streaming spill failed: {}", e);
572 (
573 format!(
574 "ERROR: output exceeded {} byte limit ({} bytes buffered) and spill to disk failed: {}",
575 max_bytes,
576 buffer.len(),
577 e
578 ),
579 false,
580 )
581 }
582 }
583}
584
585#[cfg(feature = "subprocess")]
590async fn drain_in_memory<R: tokio::io::AsyncRead + Unpin>(
591 buffer: &[u8],
592 stdout: &mut R,
593 config: &OutputLimitConfig,
594) -> String {
595 use tokio::io::AsyncReadExt;
596
597 let head = {
599 let s = String::from_utf8_lossy(buffer);
600 truncate_to_char_boundary(&s, config.head_bytes).to_string()
601 };
602
603 let cap = config.tail_bytes;
605 let mut tail: std::collections::VecDeque<u8> = std::collections::VecDeque::with_capacity(cap + 1);
606 extend_ring(&mut tail, buffer, cap);
607 let mut total = buffer.len();
608
609 let mut chunk = [0u8; 8192];
610 loop {
611 match stdout.read(&mut chunk).await {
612 Ok(0) => break,
613 Ok(n) => {
614 total += n;
615 extend_ring(&mut tail, &chunk[..n], cap);
616 }
617 Err(_) => break,
618 }
619 }
620
621 let tail_bytes: Vec<u8> = tail.into_iter().collect();
622 let tail_str = String::from_utf8_lossy(&tail_bytes);
623 let dropped = total.saturating_sub(head.len() + tail_bytes.len());
624 format!(
625 "{}\n...\n{}\n[output truncated in memory: {} bytes total, {} discarded — no spill file]",
626 head, tail_str, total, dropped
627 )
628}
629
630#[cfg(feature = "subprocess")]
633fn extend_ring(ring: &mut std::collections::VecDeque<u8>, bytes: &[u8], cap: usize) {
634 if cap == 0 {
635 return;
636 }
637 let start = bytes.len().saturating_sub(cap);
638 for &b in &bytes[start..] {
639 if ring.len() == cap {
640 ring.pop_front();
641 }
642 ring.push_back(b);
643 }
644}
645
646#[cfg(feature = "subprocess")]
650async fn stream_to_spill<R: tokio::io::AsyncRead + Unpin>(
651 buffer: &[u8],
652 stdout: &mut R,
653 config: &OutputLimitConfig,
654) -> Result<String, std::io::Error> {
655 use tokio::io::AsyncReadExt;
656
657 let spill_dir = paths::spill_dir();
658 tokio::fs::create_dir_all(&spill_dir).await?;
659
660 let filename = generate_spill_filename();
661 let path = spill_dir.join(&filename);
662 let mut file = tokio::fs::File::create(&path).await?;
663
664 use tokio::io::AsyncWriteExt;
666 file.write_all(buffer).await?;
667 let mut total = buffer.len();
668
669 let mut chunk = [0u8; 8192];
671 loop {
672 match stdout.read(&mut chunk).await {
673 Ok(0) => break,
674 Ok(n) => {
675 file.write_all(&chunk[..n]).await?;
676 total += n;
677 }
678 Err(_) => break,
679 }
680 }
681 file.flush().await?;
682
683 let full = String::from_utf8_lossy(buffer);
685 let head = truncate_to_char_boundary(&full, config.head_bytes);
686
687 let tail: String = if total <= buffer.len() {
689 let full_str = String::from_utf8_lossy(buffer);
690 tail_from_str(&full_str, config.tail_bytes).to_string()
691 } else {
692 read_tail_from_file(&path, config.tail_bytes).await.unwrap_or_default()
693 };
694
695 let path_str = path.to_string_lossy();
696 Ok(format!(
697 "{}\n...\n{}\n[output truncated: {} bytes total — full output at {}]",
698 head, tail, total, path_str
699 ))
700}
701
702#[cfg(feature = "localfs")]
704async fn write_spill_file(data: &[u8]) -> Result<(PathBuf, usize), std::io::Error> {
705 let dir = paths::spill_dir();
706 tokio::fs::create_dir_all(&dir).await?;
707
708 let filename = generate_spill_filename();
709 let path = dir.join(filename);
710 tokio::fs::write(&path, data).await?;
711 Ok((path, data.len()))
712}
713
714#[cfg(feature = "localfs")]
716fn build_truncated_output(
717 full: &str,
718 config: &OutputLimitConfig,
719 spill_path: &std::path::Path,
720 total_bytes: usize,
721) -> String {
722 let head = truncate_to_char_boundary(full, config.head_bytes);
723 let tail = tail_from_str(full, config.tail_bytes);
724 let path_str = spill_path.to_string_lossy();
725 format!(
726 "{}\n...\n{}\n[output truncated: {} bytes total — full output at {}]",
727 head, tail, total_bytes, path_str
728 )
729}
730
731fn truncate_to_char_boundary(s: &str, max_bytes: usize) -> &str {
733 if s.len() <= max_bytes {
734 return s;
735 }
736 let mut end = max_bytes;
738 while end > 0 && !s.is_char_boundary(end) {
739 end -= 1;
740 }
741 &s[..end]
742}
743
744fn tail_from_str(s: &str, max_bytes: usize) -> &str {
746 if s.len() <= max_bytes {
747 return s;
748 }
749 let start = s.len() - max_bytes;
750 let mut adjusted = start;
751 while adjusted < s.len() && !s.is_char_boundary(adjusted) {
752 adjusted += 1;
753 }
754 &s[adjusted..]
755}
756
757#[cfg(feature = "localfs")]
759async fn read_head_from_file(path: &std::path::Path, max_bytes: usize) -> Result<String, std::io::Error> {
760 use tokio::io::AsyncReadExt;
761
762 let mut file = tokio::fs::File::open(path).await?;
763 let mut buf = vec![0u8; max_bytes];
764 let n = file.read(&mut buf).await?;
765 buf.truncate(n);
766
767 let s = String::from_utf8_lossy(&buf);
768 let result = truncate_to_char_boundary(&s, max_bytes);
770 Ok(result.to_string())
771}
772
773#[cfg(feature = "localfs")]
775async fn read_tail_from_file(path: &std::path::Path, max_bytes: usize) -> Result<String, std::io::Error> {
776 use tokio::io::{AsyncReadExt, AsyncSeekExt};
777
778 let mut file = tokio::fs::File::open(path).await?;
779 let metadata = file.metadata().await?;
780 let len = metadata.len() as usize;
781
782 if len <= max_bytes {
783 let mut buf = Vec::new();
784 file.read_to_end(&mut buf).await?;
785 return Ok(String::from_utf8_lossy(&buf).into_owned());
786 }
787
788 let offset = len - max_bytes;
789 file.seek(std::io::SeekFrom::Start(offset as u64)).await?;
790 let mut buf = vec![0u8; max_bytes];
791 let n = file.read(&mut buf).await?;
792 buf.truncate(n);
793
794 let s = String::from_utf8_lossy(&buf);
796 Ok(s.into_owned())
797}
798
799#[cfg(feature = "localfs")]
801fn generate_spill_filename() -> String {
802 use std::sync::atomic::{AtomicUsize, Ordering};
803 use std::time::SystemTime;
804
805 static COUNTER: AtomicUsize = AtomicUsize::new(0);
806 let seq = COUNTER.fetch_add(1, Ordering::Relaxed);
807 let ts = SystemTime::now()
808 .duration_since(SystemTime::UNIX_EPOCH)
809 .unwrap_or_default();
810 let pid = std::process::id();
811 format!("spill-{}.{}-{}-{}.txt", ts.as_secs(), ts.subsec_nanos(), pid, seq)
812}
813
814pub fn parse_size(s: &str) -> Result<usize, String> {
818 let s = s.trim();
819 if s.is_empty() {
820 return Err("empty size string".to_string());
821 }
822
823 let (num_str, multiplier) = if let Some(n) = s.strip_suffix('K').or_else(|| s.strip_suffix('k')) {
824 (n, 1024)
825 } else if let Some(n) = s.strip_suffix('M').or_else(|| s.strip_suffix('m')) {
826 (n, 1024 * 1024)
827 } else {
828 (s, 1)
829 };
830
831 let num: usize = num_str
832 .parse()
833 .map_err(|_| format!("invalid size: {}", s))?;
834
835 Ok(num * multiplier)
836}
837
838#[cfg(all(test, feature = "localfs"))]
839mod tests {
840 use super::*;
841
842 #[test]
843 fn test_none_is_disabled() {
844 let config = OutputLimitConfig::none();
845 assert!(!config.is_enabled());
846 assert_eq!(config.max_bytes(), None);
847 }
848
849 #[test]
850 fn test_mcp_is_enabled() {
851 let config = OutputLimitConfig::mcp();
852 assert!(config.is_enabled());
853 assert_eq!(config.max_bytes(), Some(8 * 1024));
854 assert_eq!(config.head_bytes(), 1024);
855 assert_eq!(config.tail_bytes(), 512);
856 }
857
858 #[test]
859 fn test_set_limit() {
860 let mut config = OutputLimitConfig::none();
861 assert!(!config.is_enabled());
862
863 config.set_limit(Some(1024));
864 assert!(config.is_enabled());
865 assert_eq!(config.max_bytes(), Some(1024));
866
867 config.set_limit(None);
868 assert!(!config.is_enabled());
869 }
870
871 #[test]
872 fn test_set_head_tail() {
873 let mut config = OutputLimitConfig::mcp();
874 config.set_head_bytes(2048);
875 config.set_tail_bytes(1024);
876 assert_eq!(config.head_bytes(), 2048);
877 assert_eq!(config.tail_bytes(), 1024);
878 }
879
880 #[test]
881 fn test_parse_size() {
882 assert_eq!(parse_size("64K").unwrap(), 64 * 1024);
883 assert_eq!(parse_size("64k").unwrap(), 64 * 1024);
884 assert_eq!(parse_size("1M").unwrap(), 1024 * 1024);
885 assert_eq!(parse_size("1m").unwrap(), 1024 * 1024);
886 assert_eq!(parse_size("65536").unwrap(), 65536);
887 assert!(parse_size("").is_err());
888 assert!(parse_size("abc").is_err());
889 }
890
891 #[test]
892 fn test_truncate_to_char_boundary() {
893 assert_eq!(truncate_to_char_boundary("hello", 10), "hello");
894 assert_eq!(truncate_to_char_boundary("hello", 3), "hel");
895 assert_eq!(truncate_to_char_boundary("日本語", 3), "日");
897 assert_eq!(truncate_to_char_boundary("日本語", 4), "日");
898 assert_eq!(truncate_to_char_boundary("日本語", 6), "日本");
899 }
900
901 #[test]
902 fn test_tail_from_str() {
903 assert_eq!(tail_from_str("hello", 10), "hello");
904 assert_eq!(tail_from_str("hello", 3), "llo");
905 assert_eq!(tail_from_str("日本語", 3), "語");
907 assert_eq!(tail_from_str("日本語", 6), "本語");
908 }
909
910 #[test]
911 fn test_generate_spill_filename() {
912 let name = generate_spill_filename();
913 assert!(name.starts_with("spill-"));
914 assert!(name.ends_with(".txt"));
915 }
916
917 #[tokio::test]
918 async fn test_spill_if_needed_under_limit() {
919 let config = OutputLimitConfig::mcp();
920 let mut result = ExecResult::success("short output");
921 let spill = spill_if_needed(&mut result, &config).await;
922 assert!(spill.is_none());
923 assert_eq!(&*result.text_out(), "short output");
924 assert!(!result.did_spill);
925 }
926
927 #[tokio::test]
928 async fn test_spill_if_needed_over_limit() {
929 let config = OutputLimitConfig {
930 max_bytes: Some(100),
931 head_bytes: 20,
932 tail_bytes: 10,
933 spill_mode: SpillMode::Disk,
934 };
935 let big_output = "x".repeat(200);
936 let mut result = ExecResult::success(big_output);
937 let spill = spill_if_needed(&mut result, &config).await;
938 assert!(spill.is_some());
939 assert!(result.did_spill);
940
941 let spill = spill.unwrap();
942 assert_eq!(spill.total_bytes, 200);
943 assert!(spill.path.exists());
944
945 assert!(result.text_out().contains("..."));
947 assert!(result.text_out().contains("[output truncated: 200 bytes total"));
948 assert!(result.text_out().contains(&spill.path.to_string_lossy().to_string()));
949
950 assert!(result.text_out().starts_with(&"x".repeat(20)));
952
953 let spill_content = tokio::fs::read_to_string(&spill.path).await.unwrap();
955 assert_eq!(spill_content.len(), 200);
956
957 let _ = tokio::fs::remove_file(&spill.path).await;
959 }
960
961 #[tokio::test]
962 async fn test_spill_if_needed_disabled() {
963 let config = OutputLimitConfig::none();
964 let big_output = "x".repeat(200);
965 let mut result = ExecResult::success(big_output.clone());
966 let spill = spill_if_needed(&mut result, &config).await;
967 assert!(spill.is_none());
968 assert_eq!(&*result.text_out(), big_output);
969 assert!(!result.did_spill);
970 }
971
972 #[test]
973 fn test_build_truncated_output() {
974 let config = OutputLimitConfig {
975 max_bytes: Some(100),
976 head_bytes: 5,
977 tail_bytes: 3,
978 spill_mode: SpillMode::Disk,
979 };
980 let full = "abcdefghijklmnop";
981 let path = PathBuf::from("/tmp/test-spill.txt");
982 let result = build_truncated_output(full, &config, &path, 16);
983 assert!(result.starts_with("abcde"));
984 assert!(result.contains("..."));
985 assert!(result.contains("nop"));
986 assert!(result.contains("[output truncated: 16 bytes total — full output at /tmp/test-spill.txt]"));
987 }
988
989 #[tokio::test]
990 async fn test_kernel_mcp_truncates_large_output() {
991 use crate::kernel::{Kernel, KernelConfig};
992
993 let config = KernelConfig::mcp()
995 .with_output_limit(OutputLimitConfig {
996 max_bytes: Some(200),
997 head_bytes: 50,
998 tail_bytes: 30,
999 spill_mode: SpillMode::Disk,
1000 });
1001 let kernel = Kernel::new(config).expect("kernel creation");
1002
1003 let result = kernel.execute("seq 1 10000").await.expect("execute");
1005 assert!(result.text_out().contains("[output truncated:"));
1006 assert!(result.text_out().contains("full output at"));
1007 assert!(result.text_out().starts_with("1\n"));
1009 }
1010
1011 #[tokio::test]
1012 async fn test_spill_exits_3() {
1013 use crate::kernel::{Kernel, KernelConfig};
1014
1015 let config = KernelConfig::mcp()
1016 .with_output_limit(OutputLimitConfig {
1017 max_bytes: Some(100),
1018 head_bytes: 30,
1019 tail_bytes: 20,
1020 spill_mode: SpillMode::Disk,
1021 });
1022 let kernel = Kernel::new(config).expect("kernel creation");
1023
1024 let big = "x".repeat(200);
1025 let result = kernel.execute(&format!("echo '{}'", big)).await.expect("execute");
1026 assert_eq!(result.code, 3, "spill should always exit 3");
1027 assert_eq!(result.original_code, Some(0), "original command exit code preserved");
1028 assert!(result.text_out().contains("[output truncated:"));
1029 }
1030
1031 #[tokio::test]
1032 async fn test_kernel_repl_no_truncation() {
1033 use crate::kernel::{Kernel, KernelConfig};
1034
1035 let config = KernelConfig::repl();
1037 let kernel = Kernel::new(config).expect("kernel creation");
1038
1039 let result = kernel.execute("seq 1 100").await.expect("execute");
1040 assert!(!result.text_out().contains("[output truncated:"));
1041 assert!(result.text_out().contains("100"));
1042 }
1043
1044 #[tokio::test]
1045 async fn test_kernel_builtin_truncation() {
1046 use crate::kernel::{Kernel, KernelConfig};
1047
1048 let config = KernelConfig::mcp()
1050 .with_output_limit(OutputLimitConfig {
1051 max_bytes: Some(100),
1052 head_bytes: 30,
1053 tail_bytes: 20,
1054 spill_mode: SpillMode::Disk,
1055 });
1056 let kernel = Kernel::new(config).expect("kernel creation");
1057
1058 let big = "x".repeat(200);
1060 let result = kernel.execute(&format!("echo '{}'", big)).await.expect("execute");
1061 assert!(result.text_out().contains("[output truncated:"));
1062 }
1063
1064 #[test]
1067 fn test_estimated_byte_size_text() {
1068 use crate::interpreter::OutputData;
1069 let data = OutputData::text("hello world");
1070 assert_eq!(data.estimated_byte_size(), 11);
1071 }
1072
1073 #[test]
1074 fn test_estimated_byte_size_table() {
1075 use crate::interpreter::{OutputData, OutputNode};
1076 let data = OutputData::table(
1077 vec!["NAME".into(), "SIZE".into()],
1078 vec![
1079 OutputNode::new("foo").with_cells(vec!["123".into()]),
1080 OutputNode::new("bar").with_cells(vec!["456".into()]),
1081 ],
1082 );
1083 assert_eq!(data.estimated_byte_size(), 15);
1085 }
1086
1087 #[test]
1088 fn test_estimated_byte_size_tree() {
1089 use crate::interpreter::{OutputData, OutputNode};
1090 let data = OutputData::nodes(vec![
1091 OutputNode::new("src").with_children(vec![
1092 OutputNode::new("main.rs"),
1093 OutputNode::new("lib.rs"),
1094 ]),
1095 ]);
1096 assert_eq!(data.estimated_byte_size(), 20);
1098 }
1099
1100 #[test]
1101 fn test_write_canonical_matches_to_canonical_string() {
1102 use crate::interpreter::{OutputData, OutputNode};
1103
1104 let cases: Vec<OutputData> = vec![
1105 OutputData::text("hello world"),
1106 OutputData::nodes(vec![
1107 OutputNode::new("file1"),
1108 OutputNode::new("file2"),
1109 ]),
1110 OutputData::table(
1111 vec!["NAME".into(), "SIZE".into()],
1112 vec![
1113 OutputNode::new("foo").with_cells(vec!["123".into()]),
1114 OutputNode::new("bar").with_cells(vec!["456".into()]),
1115 ],
1116 ),
1117 OutputData::nodes(vec![
1118 OutputNode::new("src").with_children(vec![
1119 OutputNode::new("main.rs"),
1120 OutputNode::new("lib.rs"),
1121 ]),
1122 ]),
1123 ];
1124
1125 for data in cases {
1126 let expected = data.to_canonical_string();
1127 let mut buf = Vec::new();
1128 let written = data.write_canonical(&mut buf, None).unwrap();
1129 let got = String::from_utf8(buf).unwrap();
1130 assert_eq!(got, expected, "write_canonical mismatch for {:?}", data);
1131 assert_eq!(written, expected.len(), "byte count mismatch");
1132 }
1133 }
1134
1135 #[test]
1136 fn test_write_canonical_budget_stops_early() {
1137 use crate::interpreter::{OutputData, OutputNode};
1138
1139 let data = OutputData::nodes(
1140 (0..1000).map(|i| OutputNode::new(format!("file_{:04}", i))).collect()
1141 );
1142 let mut buf = Vec::new();
1143 let written = data.write_canonical(&mut buf, Some(100)).unwrap();
1144 assert!(written > 100, "should exceed budget slightly");
1146 assert!(written < 500, "should stop soon after budget: got {}", written);
1147 }
1148
1149 #[tokio::test]
1150 async fn test_spill_if_needed_large_output_data_no_oom() {
1151 use crate::interpreter::{OutputData, OutputNode};
1152
1153 let config = OutputLimitConfig {
1154 max_bytes: Some(1024),
1155 head_bytes: 100,
1156 tail_bytes: 50,
1157 spill_mode: SpillMode::Disk,
1158 };
1159
1160 let nodes: Vec<OutputNode> = (0..100_000)
1163 .map(|i| OutputNode::new(format!("node_{:06}", i)))
1164 .collect();
1165 let data = OutputData::nodes(nodes);
1166 let mut result = ExecResult::with_output(data);
1167
1168 let spill = spill_if_needed(&mut result, &config).await;
1169 assert!(spill.is_some(), "should have spilled");
1170 assert!(result.did_spill);
1171 assert!(result.text_out().contains("[output truncated:"));
1172
1173 if let Some(s) = spill {
1175 let _ = tokio::fs::remove_file(&s.path).await;
1176 }
1177 }
1178
1179 #[cfg(feature = "subprocess")]
1184 #[tokio::test]
1185 async fn test_collect_small_output_no_spill() {
1186 let (mut writer, reader) = tokio::io::duplex(1024);
1187 let config = OutputLimitConfig {
1188 max_bytes: Some(1024),
1189 head_bytes: 100,
1190 tail_bytes: 50,
1191 spill_mode: SpillMode::Disk,
1192 };
1193
1194 use tokio::io::AsyncWriteExt;
1196 writer.write_all(b"hello world").await.unwrap();
1197 drop(writer); let mut reader = reader;
1200 let (result, did_spill) = collect_stdout_with_spill(&mut reader, 1024, &config).await;
1201 let result = String::from_utf8(result).expect("test output is valid UTF-8");
1202 assert_eq!(result, "hello world");
1203 assert!(!did_spill);
1204 }
1205
1206 #[cfg(feature = "subprocess")]
1207 #[tokio::test]
1208 async fn test_collect_large_output_spills() {
1209 let (mut writer, reader) = tokio::io::duplex(64 * 1024);
1210 let config = OutputLimitConfig {
1211 max_bytes: Some(100),
1212 head_bytes: 20,
1213 tail_bytes: 10,
1214 spill_mode: SpillMode::Disk,
1215 };
1216
1217 use tokio::io::AsyncWriteExt;
1219 let data = "x".repeat(500);
1220 writer.write_all(data.as_bytes()).await.unwrap();
1221 drop(writer); let mut reader = reader;
1224 let (result, did_spill) = collect_stdout_with_spill(&mut reader, 100, &config).await;
1225 let result = String::from_utf8(result).expect("test output is valid UTF-8");
1226 assert!(did_spill, "should have spilled");
1227 assert!(result.contains("[output truncated:"));
1228 assert!(result.contains("full output at"));
1229 }
1230
1231 #[cfg(feature = "subprocess")]
1232 #[tokio::test]
1233 async fn test_collect_exact_boundary_no_spill() {
1234 let (mut writer, reader) = tokio::io::duplex(1024);
1235 let config = OutputLimitConfig {
1236 max_bytes: Some(100),
1237 head_bytes: 20,
1238 tail_bytes: 10,
1239 spill_mode: SpillMode::Disk,
1240 };
1241
1242 use tokio::io::AsyncWriteExt;
1244 let data = "x".repeat(100);
1245 writer.write_all(data.as_bytes()).await.unwrap();
1246 drop(writer); let mut reader = reader;
1249 let (result, did_spill) = collect_stdout_with_spill(&mut reader, 100, &config).await;
1250 let result = String::from_utf8(result).expect("test output is valid UTF-8");
1251 assert!(!did_spill, "exact boundary should not spill");
1253 assert_eq!(result.len(), 100);
1254 }
1255
1256 #[cfg(feature = "subprocess")]
1257 #[tokio::test]
1258 async fn test_collect_broken_pipe() {
1259 let (writer, reader) = tokio::io::duplex(1024);
1260 let config = OutputLimitConfig {
1261 max_bytes: Some(1024),
1262 head_bytes: 100,
1263 tail_bytes: 50,
1264 spill_mode: SpillMode::Disk,
1265 };
1266
1267 use tokio::io::AsyncWriteExt;
1269 let mut writer = writer;
1270 writer.write_all(b"partial data").await.unwrap();
1271 drop(writer); let mut reader = reader;
1274 let (result, did_spill) = collect_stdout_with_spill(&mut reader, 1024, &config).await;
1275 let result = String::from_utf8(result).expect("test output is valid UTF-8");
1276 assert_eq!(result, "partial data");
1277 assert!(!did_spill);
1278 }
1279
1280 #[test]
1283 fn test_in_memory_builder_and_default() {
1284 assert_eq!(OutputLimitConfig::mcp().spill_mode(), SpillMode::Disk);
1285 assert_eq!(OutputLimitConfig::mcp().in_memory().spill_mode(), SpillMode::Memory);
1286
1287 let mut config = OutputLimitConfig::none();
1288 config.set_spill_mode(SpillMode::Memory);
1289 assert_eq!(config.spill_mode(), SpillMode::Memory);
1290 }
1291
1292 #[tokio::test]
1293 async fn test_memory_mode_truncates_string_without_disk() {
1294 let config = OutputLimitConfig {
1295 max_bytes: Some(100),
1296 head_bytes: 20,
1297 tail_bytes: 10,
1298 spill_mode: SpillMode::Memory,
1299 };
1300 let mut result = ExecResult::success("x".repeat(200));
1301 let spill = spill_if_needed(&mut result, &config).await;
1302
1303 assert!(spill.is_none(), "memory mode must not write a spill file");
1305 assert!(result.did_spill, "memory truncation must set did_spill for the exit-3 remap");
1306
1307 let out = result.text_out();
1308 assert!(out.contains("truncated in memory"), "got: {}", out);
1309 assert!(out.contains("200 bytes total"), "got: {}", out);
1310 assert!(!out.contains("full output at"), "memory mode must not point at a file: {}", out);
1311 assert!(out.starts_with(&"x".repeat(20)), "head preserved");
1312 }
1313
1314 #[tokio::test]
1315 async fn test_memory_mode_under_limit_untouched() {
1316 let config = OutputLimitConfig {
1317 max_bytes: Some(100),
1318 head_bytes: 20,
1319 tail_bytes: 10,
1320 spill_mode: SpillMode::Memory,
1321 };
1322 let mut result = ExecResult::success("short");
1323 let spill = spill_if_needed(&mut result, &config).await;
1324 assert!(spill.is_none());
1325 assert!(!result.did_spill);
1326 assert_eq!(&*result.text_out(), "short");
1327 }
1328
1329 #[tokio::test]
1330 async fn test_memory_mode_large_output_data_bounded() {
1331 use crate::interpreter::{OutputData, OutputNode};
1332
1333 let config = OutputLimitConfig {
1334 max_bytes: Some(1024),
1335 head_bytes: 100,
1336 tail_bytes: 50,
1337 spill_mode: SpillMode::Memory,
1338 };
1339
1340 let nodes: Vec<OutputNode> = (0..100_000)
1342 .map(|i| OutputNode::new(format!("node_{:06}", i)))
1343 .collect();
1344 let mut result = ExecResult::with_output(OutputData::nodes(nodes));
1345
1346 let spill = spill_if_needed(&mut result, &config).await;
1347 assert!(spill.is_none(), "memory mode writes no file");
1348 assert!(result.did_spill);
1349 let out = result.text_out();
1350 assert!(out.contains("truncated in memory"), "got: {}", out);
1351 assert!(out.starts_with("node_000000"), "head rendered: {}", out);
1352 assert!(out.contains("head only"), "got: {}", out);
1354 }
1355
1356 #[tokio::test]
1357 async fn test_kernel_memory_mode_exits_3_preserves_original() {
1358 use crate::kernel::{Kernel, KernelConfig};
1359
1360 let config = KernelConfig::mcp().with_output_limit(OutputLimitConfig {
1361 max_bytes: Some(100),
1362 head_bytes: 30,
1363 tail_bytes: 20,
1364 spill_mode: SpillMode::Memory,
1365 });
1366 let kernel = Kernel::new(config).expect("kernel creation");
1367
1368 let big = "x".repeat(200);
1369 let result = kernel.execute(&format!("echo '{}'", big)).await.expect("execute");
1370 assert_eq!(result.code, 3, "memory truncation still signals via exit 3");
1371 assert_eq!(result.original_code, Some(0), "original exit code preserved");
1372 assert!(result.text_out().contains("truncated in memory"));
1373 assert!(!result.text_out().contains("full output at"));
1374 }
1375
1376 #[tokio::test]
1377 async fn test_nolocal_kernel_forces_memory_spill() {
1378 use crate::kernel::{Kernel, KernelConfig, VfsMountMode};
1379
1380 let config = KernelConfig::mcp()
1384 .with_vfs_mode(VfsMountMode::NoLocal)
1385 .with_output_limit(OutputLimitConfig {
1386 max_bytes: Some(100),
1387 head_bytes: 30,
1388 tail_bytes: 20,
1389 spill_mode: SpillMode::Disk,
1390 });
1391 let kernel = Kernel::new(config).expect("kernel creation");
1392
1393 let big = "x".repeat(200);
1394 let result = kernel.execute(&format!("echo '{}'", big)).await.expect("execute");
1395 assert_eq!(result.code, 3, "still signals truncation via exit 3");
1396 assert!(result.text_out().contains("truncated in memory"), "got: {}", result.text_out());
1397 assert!(
1398 !result.text_out().contains("full output at"),
1399 "NoLocal kernel must not write a host spill file: {}",
1400 result.text_out()
1401 );
1402 }
1403
1404 #[cfg(feature = "subprocess")]
1405 #[tokio::test]
1406 async fn test_collect_memory_mode_drains_without_disk() {
1407 let (mut writer, reader) = tokio::io::duplex(64 * 1024);
1408 let config = OutputLimitConfig {
1409 max_bytes: Some(100),
1410 head_bytes: 20,
1411 tail_bytes: 10,
1412 spill_mode: SpillMode::Memory,
1413 };
1414
1415 use tokio::io::AsyncWriteExt;
1416 let data = format!("{}{}{}", "a".repeat(20), "b".repeat(500), "c".repeat(10));
1418 writer.write_all(data.as_bytes()).await.unwrap();
1419 drop(writer);
1420
1421 let mut reader = reader;
1422 let (result, did_spill) = collect_stdout_with_spill(&mut reader, 100, &config).await;
1423 let result = String::from_utf8(result).expect("test output is valid UTF-8");
1424 assert!(did_spill, "drain flags truncation for the exit-3 remap");
1425 assert!(result.contains("truncated in memory"), "got: {}", result);
1426 assert!(!result.contains("full output at"), "no disk file in memory mode");
1427 assert!(result.starts_with(&"a".repeat(20)), "head preserved: {}", result);
1428 assert!(result.contains(&"c".repeat(10)), "tail preserved: {}", result);
1429 assert!(result.contains("530 bytes total"), "honest total: {}", result);
1430 }
1431
1432 #[cfg(feature = "subprocess")]
1433 #[test]
1434 fn test_extend_ring_keeps_last_cap_bytes() {
1435 let mut ring = std::collections::VecDeque::new();
1436 super::extend_ring(&mut ring, b"abcdef", 3);
1437 assert_eq!(ring.iter().copied().collect::<Vec<u8>>(), b"def");
1438 super::extend_ring(&mut ring, b"gh", 3);
1440 assert_eq!(ring.iter().copied().collect::<Vec<u8>>(), b"fgh");
1441 let mut empty = std::collections::VecDeque::new();
1443 super::extend_ring(&mut empty, b"xyz", 0);
1444 assert!(empty.is_empty());
1445 }
1446}