1use std::path::PathBuf;
11
12use crate::interpreter::ExecResult;
13use crate::paths;
14
15const DEFAULT_MCP_LIMIT: usize = 8 * 1024;
17
18const DEFAULT_HEAD_BYTES: usize = 1024;
20
21const DEFAULT_TAIL_BYTES: usize = 512;
23
24#[derive(Debug, Clone)]
29pub struct OutputLimitConfig {
30 max_bytes: Option<usize>,
31 head_bytes: usize,
32 tail_bytes: usize,
33}
34
35impl OutputLimitConfig {
36 pub fn none() -> Self {
38 Self {
39 max_bytes: None,
40 head_bytes: DEFAULT_HEAD_BYTES,
41 tail_bytes: DEFAULT_TAIL_BYTES,
42 }
43 }
44
45 pub fn default_limit() -> usize {
47 DEFAULT_MCP_LIMIT
48 }
49
50 pub fn mcp() -> Self {
52 Self {
53 max_bytes: Some(DEFAULT_MCP_LIMIT),
54 head_bytes: DEFAULT_HEAD_BYTES,
55 tail_bytes: DEFAULT_TAIL_BYTES,
56 }
57 }
58
59 pub fn is_enabled(&self) -> bool {
61 self.max_bytes.is_some()
62 }
63
64 pub fn max_bytes(&self) -> Option<usize> {
66 self.max_bytes
67 }
68
69 pub fn head_bytes(&self) -> usize {
71 self.head_bytes
72 }
73
74 pub fn tail_bytes(&self) -> usize {
76 self.tail_bytes
77 }
78
79 pub fn set_limit(&mut self, max: Option<usize>) {
81 self.max_bytes = max;
82 }
83
84 pub fn set_head_bytes(&mut self, bytes: usize) {
86 self.head_bytes = bytes;
87 }
88
89 pub fn set_tail_bytes(&mut self, bytes: usize) {
91 self.tail_bytes = bytes;
92 }
93}
94
95pub struct SpillResult {
97 pub path: PathBuf,
98 pub total_bytes: usize,
99}
100
101pub async fn spill_if_needed(
110 result: &mut ExecResult,
111 config: &OutputLimitConfig,
112) -> Option<SpillResult> {
113 let max = config.max_bytes?;
114
115 if !result.out.is_empty() {
117 let total = result.out.len();
118 if total <= max {
119 return None;
120 }
121 return spill_string(result, config, max).await;
122 }
123
124 if let Some(ref output) = result.output {
126 let estimate = output.estimated_byte_size();
127 if estimate <= max {
128 result.out = output.to_canonical_string();
130 if result.out.len() <= max {
132 return None;
133 }
134 return spill_string(result, config, max).await;
135 }
136
137 return spill_output_data(result, config, max).await;
139 }
140
141 None
142}
143
144async fn spill_string(
146 result: &mut ExecResult,
147 config: &OutputLimitConfig,
148 max: usize,
149) -> Option<SpillResult> {
150 let total = result.out.len();
151 match write_spill_file(result.out.as_bytes()).await {
152 Ok((path, written)) => {
153 result.out = build_truncated_output(&result.out, config, &path, total);
154 result.did_spill = true;
155 Some(SpillResult {
156 path,
157 total_bytes: written,
158 })
159 }
160 Err(e) => {
161 tracing::error!("output spill failed: {}", e);
162 *result = ExecResult::failure(1, format!(
163 "output exceeded {} byte limit ({} bytes) and spill to disk failed: {}",
164 max, total, e
165 ));
166 None
167 }
168 }
169}
170
171async fn spill_output_data(
173 result: &mut ExecResult,
174 config: &OutputLimitConfig,
175 max: usize,
176) -> Option<SpillResult> {
177 let output = result.output.as_ref()?;
178
179 let dir = paths::spill_dir();
180 if let Err(e) = tokio::fs::create_dir_all(&dir).await {
181 tracing::error!("output spill dir creation failed: {}", e);
182 *result = ExecResult::failure(1, format!(
183 "output exceeded {} byte limit and spill dir creation failed: {}", max, e
184 ));
185 return None;
186 }
187
188 let filename = generate_spill_filename();
189 let path = dir.join(&filename);
190
191 let total = match std::fs::File::create(&path) {
193 Ok(mut file) => {
194 match output.write_canonical(&mut file, None) {
195 Ok(n) => n,
196 Err(e) => {
197 tracing::error!("output spill write failed: {}", e);
198 *result = ExecResult::failure(1, format!(
199 "output exceeded {} byte limit and spill to disk failed: {}", max, e
200 ));
201 return None;
202 }
203 }
204 }
205 Err(e) => {
206 tracing::error!("output spill file creation failed: {}", e);
207 *result = ExecResult::failure(1, format!(
208 "output exceeded {} byte limit and spill to disk failed: {}", max, e
209 ));
210 return None;
211 }
212 };
213
214 let head = read_head_from_file(&path, config.head_bytes).await.unwrap_or_default();
216 let tail = read_tail_from_file(&path, config.tail_bytes).await.unwrap_or_default();
217 let path_str = path.to_string_lossy();
218
219 result.out = format!(
220 "{}\n...\n{}\n[output truncated: {} bytes total — full output at {}]",
221 head, tail, total, path_str
222 );
223 result.did_spill = true;
224
225 Some(SpillResult {
226 path,
227 total_bytes: total,
228 })
229}
230
231pub async fn spill_aware_collect(
239 mut stdout: tokio::process::ChildStdout,
240 mut stderr_reader: tokio::process::ChildStderr,
241 stderr_stream: Option<crate::scheduler::StderrStream>,
242 config: &OutputLimitConfig,
243) -> (String, String, bool) {
244 let max = config.max_bytes.unwrap_or(usize::MAX);
245
246 let stderr_task = tokio::spawn(async move {
248 collect_stderr(&mut stderr_reader, stderr_stream.as_ref()).await
249 });
250
251 let (stdout_result, did_spill) = collect_stdout_with_spill(&mut stdout, max, config).await;
252
253 let stderr = stderr_task.await.unwrap_or_default();
254 (stdout_result, stderr, did_spill)
255}
256
257async fn collect_stderr(
259 reader: &mut tokio::process::ChildStderr,
260 stream: Option<&crate::scheduler::StderrStream>,
261) -> String {
262 use tokio::io::AsyncReadExt;
263
264 let mut buf = Vec::new();
265 let mut chunk = [0u8; 8192];
266 loop {
267 match reader.read(&mut chunk).await {
268 Ok(0) => break,
269 Ok(n) => {
270 if let Some(s) = stream {
271 s.write(&chunk[..n]);
272 } else {
273 buf.extend_from_slice(&chunk[..n]);
274 }
275 }
276 Err(_) => break,
277 }
278 }
279 if stream.is_some() {
280 String::new()
281 } else {
282 String::from_utf8_lossy(&buf).into_owned()
283 }
284}
285
286async fn collect_stdout_with_spill<R: tokio::io::AsyncRead + Unpin>(
293 stdout: &mut R,
294 max_bytes: usize,
295 config: &OutputLimitConfig,
296) -> (String, bool) {
297 use tokio::io::AsyncReadExt;
298 use tokio::time::{sleep, Duration};
299
300 let mut buffer = Vec::new();
301 let mut chunk = [0u8; 8192];
302 let deadline = sleep(Duration::from_secs(1));
303 tokio::pin!(deadline);
304
305 loop {
307 tokio::select! {
308 biased;
309 result = stdout.read(&mut chunk) => {
310 match result {
311 Ok(0) => {
312 return (String::from_utf8_lossy(&buffer).into_owned(), false);
315 }
316 Ok(n) => {
317 buffer.extend_from_slice(&chunk[..n]);
318 if buffer.len() > max_bytes {
320 break;
321 }
322 }
323 Err(_) => {
324 return (String::from_utf8_lossy(&buffer).into_owned(), false);
325 }
326 }
327 }
328 () = &mut deadline => {
329 break;
331 }
332 }
333 }
334
335 if buffer.len() > max_bytes {
337 match stream_to_spill(&buffer, stdout, config).await {
339 Ok(result) => return (result, true),
340 Err(e) => {
341 tracing::error!("streaming spill failed: {}", e);
344 let size = buffer.len();
345 drop(buffer);
346 return (format!(
347 "ERROR: output exceeded {} byte limit ({} bytes buffered) and spill to disk failed: {}",
348 max_bytes, size, e
349 ), false);
350 }
351 }
352 }
353
354 loop {
357 match stdout.read(&mut chunk).await {
358 Ok(0) => break,
359 Ok(n) => {
360 buffer.extend_from_slice(&chunk[..n]);
361 if buffer.len() > max_bytes {
363 match stream_to_spill(&buffer, stdout, config).await {
364 Ok(result) => return (result, true),
365 Err(e) => {
366 tracing::error!("streaming spill failed: {}", e);
367 let size = buffer.len();
368 drop(buffer);
369 return (format!(
370 "ERROR: output exceeded {} byte limit ({} bytes buffered) and spill to disk failed: {}",
371 max_bytes, size, e
372 ), false);
373 }
374 }
375 }
376 }
377 Err(_) => break,
378 }
379 }
380
381 (String::from_utf8_lossy(&buffer).into_owned(), false)
382}
383
384async fn stream_to_spill<R: tokio::io::AsyncRead + Unpin>(
388 buffer: &[u8],
389 stdout: &mut R,
390 config: &OutputLimitConfig,
391) -> Result<String, std::io::Error> {
392 use tokio::io::AsyncReadExt;
393
394 let spill_dir = paths::spill_dir();
395 tokio::fs::create_dir_all(&spill_dir).await?;
396
397 let filename = generate_spill_filename();
398 let path = spill_dir.join(&filename);
399 let mut file = tokio::fs::File::create(&path).await?;
400
401 use tokio::io::AsyncWriteExt;
403 file.write_all(buffer).await?;
404 let mut total = buffer.len();
405
406 let mut chunk = [0u8; 8192];
408 loop {
409 match stdout.read(&mut chunk).await {
410 Ok(0) => break,
411 Ok(n) => {
412 file.write_all(&chunk[..n]).await?;
413 total += n;
414 }
415 Err(_) => break,
416 }
417 }
418 file.flush().await?;
419
420 let full = String::from_utf8_lossy(buffer);
422 let head = truncate_to_char_boundary(&full, config.head_bytes);
423
424 let tail: String = if total <= buffer.len() {
426 let full_str = String::from_utf8_lossy(buffer);
427 tail_from_str(&full_str, config.tail_bytes).to_string()
428 } else {
429 read_tail_from_file(&path, config.tail_bytes).await.unwrap_or_default()
430 };
431
432 let path_str = path.to_string_lossy();
433 Ok(format!(
434 "{}\n...\n{}\n[output truncated: {} bytes total — full output at {}]",
435 head, tail, total, path_str
436 ))
437}
438
439async fn write_spill_file(data: &[u8]) -> Result<(PathBuf, usize), std::io::Error> {
441 let dir = paths::spill_dir();
442 tokio::fs::create_dir_all(&dir).await?;
443
444 let filename = generate_spill_filename();
445 let path = dir.join(filename);
446 tokio::fs::write(&path, data).await?;
447 Ok((path, data.len()))
448}
449
450fn build_truncated_output(
452 full: &str,
453 config: &OutputLimitConfig,
454 spill_path: &std::path::Path,
455 total_bytes: usize,
456) -> String {
457 let head = truncate_to_char_boundary(full, config.head_bytes);
458 let tail = tail_from_str(full, config.tail_bytes);
459 let path_str = spill_path.to_string_lossy();
460 format!(
461 "{}\n...\n{}\n[output truncated: {} bytes total — full output at {}]",
462 head, tail, total_bytes, path_str
463 )
464}
465
466fn truncate_to_char_boundary(s: &str, max_bytes: usize) -> &str {
468 if s.len() <= max_bytes {
469 return s;
470 }
471 let mut end = max_bytes;
473 while end > 0 && !s.is_char_boundary(end) {
474 end -= 1;
475 }
476 &s[..end]
477}
478
479fn tail_from_str(s: &str, max_bytes: usize) -> &str {
481 if s.len() <= max_bytes {
482 return s;
483 }
484 let start = s.len() - max_bytes;
485 let mut adjusted = start;
486 while adjusted < s.len() && !s.is_char_boundary(adjusted) {
487 adjusted += 1;
488 }
489 &s[adjusted..]
490}
491
492async fn read_head_from_file(path: &std::path::Path, max_bytes: usize) -> Result<String, std::io::Error> {
494 use tokio::io::AsyncReadExt;
495
496 let mut file = tokio::fs::File::open(path).await?;
497 let mut buf = vec![0u8; max_bytes];
498 let n = file.read(&mut buf).await?;
499 buf.truncate(n);
500
501 let s = String::from_utf8_lossy(&buf);
502 let result = truncate_to_char_boundary(&s, max_bytes);
504 Ok(result.to_string())
505}
506
507async fn read_tail_from_file(path: &std::path::Path, max_bytes: usize) -> Result<String, std::io::Error> {
509 use tokio::io::{AsyncReadExt, AsyncSeekExt};
510
511 let mut file = tokio::fs::File::open(path).await?;
512 let metadata = file.metadata().await?;
513 let len = metadata.len() as usize;
514
515 if len <= max_bytes {
516 let mut buf = Vec::new();
517 file.read_to_end(&mut buf).await?;
518 return Ok(String::from_utf8_lossy(&buf).into_owned());
519 }
520
521 let offset = len - max_bytes;
522 file.seek(std::io::SeekFrom::Start(offset as u64)).await?;
523 let mut buf = vec![0u8; max_bytes];
524 let n = file.read(&mut buf).await?;
525 buf.truncate(n);
526
527 let s = String::from_utf8_lossy(&buf);
529 Ok(s.into_owned())
530}
531
532fn generate_spill_filename() -> String {
534 use std::sync::atomic::{AtomicUsize, Ordering};
535 use std::time::SystemTime;
536
537 static COUNTER: AtomicUsize = AtomicUsize::new(0);
538 let seq = COUNTER.fetch_add(1, Ordering::Relaxed);
539 let ts = SystemTime::now()
540 .duration_since(SystemTime::UNIX_EPOCH)
541 .unwrap_or_default();
542 let pid = std::process::id();
543 format!("spill-{}.{}-{}-{}.txt", ts.as_secs(), ts.subsec_nanos(), pid, seq)
544}
545
546pub fn parse_size(s: &str) -> Result<usize, String> {
550 let s = s.trim();
551 if s.is_empty() {
552 return Err("empty size string".to_string());
553 }
554
555 let (num_str, multiplier) = if let Some(n) = s.strip_suffix('K').or_else(|| s.strip_suffix('k')) {
556 (n, 1024)
557 } else if let Some(n) = s.strip_suffix('M').or_else(|| s.strip_suffix('m')) {
558 (n, 1024 * 1024)
559 } else {
560 (s, 1)
561 };
562
563 let num: usize = num_str
564 .parse()
565 .map_err(|_| format!("invalid size: {}", s))?;
566
567 Ok(num * multiplier)
568}
569
570#[cfg(test)]
571mod tests {
572 use super::*;
573
574 #[test]
575 fn test_none_is_disabled() {
576 let config = OutputLimitConfig::none();
577 assert!(!config.is_enabled());
578 assert_eq!(config.max_bytes(), None);
579 }
580
581 #[test]
582 fn test_mcp_is_enabled() {
583 let config = OutputLimitConfig::mcp();
584 assert!(config.is_enabled());
585 assert_eq!(config.max_bytes(), Some(8 * 1024));
586 assert_eq!(config.head_bytes(), 1024);
587 assert_eq!(config.tail_bytes(), 512);
588 }
589
590 #[test]
591 fn test_set_limit() {
592 let mut config = OutputLimitConfig::none();
593 assert!(!config.is_enabled());
594
595 config.set_limit(Some(1024));
596 assert!(config.is_enabled());
597 assert_eq!(config.max_bytes(), Some(1024));
598
599 config.set_limit(None);
600 assert!(!config.is_enabled());
601 }
602
603 #[test]
604 fn test_set_head_tail() {
605 let mut config = OutputLimitConfig::mcp();
606 config.set_head_bytes(2048);
607 config.set_tail_bytes(1024);
608 assert_eq!(config.head_bytes(), 2048);
609 assert_eq!(config.tail_bytes(), 1024);
610 }
611
612 #[test]
613 fn test_parse_size() {
614 assert_eq!(parse_size("64K").unwrap(), 64 * 1024);
615 assert_eq!(parse_size("64k").unwrap(), 64 * 1024);
616 assert_eq!(parse_size("1M").unwrap(), 1024 * 1024);
617 assert_eq!(parse_size("1m").unwrap(), 1024 * 1024);
618 assert_eq!(parse_size("65536").unwrap(), 65536);
619 assert!(parse_size("").is_err());
620 assert!(parse_size("abc").is_err());
621 }
622
623 #[test]
624 fn test_truncate_to_char_boundary() {
625 assert_eq!(truncate_to_char_boundary("hello", 10), "hello");
626 assert_eq!(truncate_to_char_boundary("hello", 3), "hel");
627 assert_eq!(truncate_to_char_boundary("日本語", 3), "日");
629 assert_eq!(truncate_to_char_boundary("日本語", 4), "日");
630 assert_eq!(truncate_to_char_boundary("日本語", 6), "日本");
631 }
632
633 #[test]
634 fn test_tail_from_str() {
635 assert_eq!(tail_from_str("hello", 10), "hello");
636 assert_eq!(tail_from_str("hello", 3), "llo");
637 assert_eq!(tail_from_str("日本語", 3), "語");
639 assert_eq!(tail_from_str("日本語", 6), "本語");
640 }
641
642 #[test]
643 fn test_generate_spill_filename() {
644 let name = generate_spill_filename();
645 assert!(name.starts_with("spill-"));
646 assert!(name.ends_with(".txt"));
647 }
648
649 #[tokio::test]
650 async fn test_spill_if_needed_under_limit() {
651 let config = OutputLimitConfig::mcp();
652 let mut result = ExecResult::success("short output");
653 let spill = spill_if_needed(&mut result, &config).await;
654 assert!(spill.is_none());
655 assert_eq!(result.out, "short output");
656 assert!(!result.did_spill);
657 }
658
659 #[tokio::test]
660 async fn test_spill_if_needed_over_limit() {
661 let config = OutputLimitConfig {
662 max_bytes: Some(100),
663 head_bytes: 20,
664 tail_bytes: 10,
665 };
666 let big_output = "x".repeat(200);
667 let mut result = ExecResult::success(big_output);
668 let spill = spill_if_needed(&mut result, &config).await;
669 assert!(spill.is_some());
670 assert!(result.did_spill);
671
672 let spill = spill.unwrap();
673 assert_eq!(spill.total_bytes, 200);
674 assert!(spill.path.exists());
675
676 assert!(result.out.contains("..."));
678 assert!(result.out.contains("[output truncated: 200 bytes total"));
679 assert!(result.out.contains(&spill.path.to_string_lossy().to_string()));
680
681 assert!(result.out.starts_with(&"x".repeat(20)));
683
684 let spill_content = tokio::fs::read_to_string(&spill.path).await.unwrap();
686 assert_eq!(spill_content.len(), 200);
687
688 let _ = tokio::fs::remove_file(&spill.path).await;
690 }
691
692 #[tokio::test]
693 async fn test_spill_if_needed_disabled() {
694 let config = OutputLimitConfig::none();
695 let big_output = "x".repeat(200);
696 let mut result = ExecResult::success(big_output.clone());
697 let spill = spill_if_needed(&mut result, &config).await;
698 assert!(spill.is_none());
699 assert_eq!(result.out, big_output);
700 assert!(!result.did_spill);
701 }
702
703 #[test]
704 fn test_build_truncated_output() {
705 let config = OutputLimitConfig {
706 max_bytes: Some(100),
707 head_bytes: 5,
708 tail_bytes: 3,
709 };
710 let full = "abcdefghijklmnop";
711 let path = PathBuf::from("/tmp/test-spill.txt");
712 let result = build_truncated_output(full, &config, &path, 16);
713 assert!(result.starts_with("abcde"));
714 assert!(result.contains("..."));
715 assert!(result.contains("nop"));
716 assert!(result.contains("[output truncated: 16 bytes total — full output at /tmp/test-spill.txt]"));
717 }
718
719 #[tokio::test]
720 async fn test_kernel_mcp_truncates_large_output() {
721 use crate::kernel::{Kernel, KernelConfig};
722
723 let config = KernelConfig::mcp()
725 .with_output_limit(OutputLimitConfig {
726 max_bytes: Some(200),
727 head_bytes: 50,
728 tail_bytes: 30,
729 });
730 let kernel = Kernel::new(config).expect("kernel creation");
731
732 let result = kernel.execute("seq 1 10000").await.expect("execute");
734 assert!(result.out.contains("[output truncated:"));
735 assert!(result.out.contains("full output at"));
736 assert!(result.out.starts_with("1\n"));
738 }
739
740 #[tokio::test]
741 async fn test_spill_exits_3() {
742 use crate::kernel::{Kernel, KernelConfig};
743
744 let config = KernelConfig::mcp()
745 .with_output_limit(OutputLimitConfig {
746 max_bytes: Some(100),
747 head_bytes: 30,
748 tail_bytes: 20,
749 });
750 let kernel = Kernel::new(config).expect("kernel creation");
751
752 let big = "x".repeat(200);
753 let result = kernel.execute(&format!("echo '{}'", big)).await.expect("execute");
754 assert_eq!(result.code, 3, "spill should always exit 3");
755 assert_eq!(result.original_code, Some(0), "original command exit code preserved");
756 assert!(result.out.contains("[output truncated:"));
757 }
758
759 #[tokio::test]
760 async fn test_kernel_repl_no_truncation() {
761 use crate::kernel::{Kernel, KernelConfig};
762
763 let config = KernelConfig::repl();
765 let kernel = Kernel::new(config).expect("kernel creation");
766
767 let result = kernel.execute("seq 1 100").await.expect("execute");
768 assert!(!result.out.contains("[output truncated:"));
769 assert!(result.out.contains("100"));
770 }
771
772 #[tokio::test]
773 async fn test_kernel_builtin_truncation() {
774 use crate::kernel::{Kernel, KernelConfig};
775
776 let config = KernelConfig::mcp()
778 .with_output_limit(OutputLimitConfig {
779 max_bytes: Some(100),
780 head_bytes: 30,
781 tail_bytes: 20,
782 });
783 let kernel = Kernel::new(config).expect("kernel creation");
784
785 let big = "x".repeat(200);
787 let result = kernel.execute(&format!("echo '{}'", big)).await.expect("execute");
788 assert!(result.out.contains("[output truncated:"));
789 }
790
791 #[test]
794 fn test_estimated_byte_size_text() {
795 use crate::interpreter::OutputData;
796 let data = OutputData::text("hello world");
797 assert_eq!(data.estimated_byte_size(), 11);
798 }
799
800 #[test]
801 fn test_estimated_byte_size_table() {
802 use crate::interpreter::{OutputData, OutputNode};
803 let data = OutputData::table(
804 vec!["NAME".into(), "SIZE".into()],
805 vec![
806 OutputNode::new("foo").with_cells(vec!["123".into()]),
807 OutputNode::new("bar").with_cells(vec!["456".into()]),
808 ],
809 );
810 assert_eq!(data.estimated_byte_size(), 15);
812 }
813
814 #[test]
815 fn test_estimated_byte_size_tree() {
816 use crate::interpreter::{OutputData, OutputNode};
817 let data = OutputData::nodes(vec![
818 OutputNode::new("src").with_children(vec![
819 OutputNode::new("main.rs"),
820 OutputNode::new("lib.rs"),
821 ]),
822 ]);
823 assert_eq!(data.estimated_byte_size(), 20);
825 }
826
827 #[test]
828 fn test_write_canonical_matches_to_canonical_string() {
829 use crate::interpreter::{OutputData, OutputNode};
830
831 let cases: Vec<OutputData> = vec![
832 OutputData::text("hello world"),
833 OutputData::nodes(vec![
834 OutputNode::new("file1"),
835 OutputNode::new("file2"),
836 ]),
837 OutputData::table(
838 vec!["NAME".into(), "SIZE".into()],
839 vec![
840 OutputNode::new("foo").with_cells(vec!["123".into()]),
841 OutputNode::new("bar").with_cells(vec!["456".into()]),
842 ],
843 ),
844 OutputData::nodes(vec![
845 OutputNode::new("src").with_children(vec![
846 OutputNode::new("main.rs"),
847 OutputNode::new("lib.rs"),
848 ]),
849 ]),
850 ];
851
852 for data in cases {
853 let expected = data.to_canonical_string();
854 let mut buf = Vec::new();
855 let written = data.write_canonical(&mut buf, None).unwrap();
856 let got = String::from_utf8(buf).unwrap();
857 assert_eq!(got, expected, "write_canonical mismatch for {:?}", data);
858 assert_eq!(written, expected.len(), "byte count mismatch");
859 }
860 }
861
862 #[test]
863 fn test_write_canonical_budget_stops_early() {
864 use crate::interpreter::{OutputData, OutputNode};
865
866 let data = OutputData::nodes(
867 (0..1000).map(|i| OutputNode::new(format!("file_{:04}", i))).collect()
868 );
869 let mut buf = Vec::new();
870 let written = data.write_canonical(&mut buf, Some(100)).unwrap();
871 assert!(written > 100, "should exceed budget slightly");
873 assert!(written < 500, "should stop soon after budget: got {}", written);
874 }
875
876 #[tokio::test]
877 async fn test_spill_if_needed_large_output_data_no_oom() {
878 use crate::interpreter::{OutputData, OutputNode};
879
880 let config = OutputLimitConfig {
881 max_bytes: Some(1024),
882 head_bytes: 100,
883 tail_bytes: 50,
884 };
885
886 let nodes: Vec<OutputNode> = (0..100_000)
889 .map(|i| OutputNode::new(format!("node_{:06}", i)))
890 .collect();
891 let data = OutputData::nodes(nodes);
892 let mut result = ExecResult::with_output(data);
893
894 let spill = spill_if_needed(&mut result, &config).await;
895 assert!(spill.is_some(), "should have spilled");
896 assert!(result.did_spill);
897 assert!(result.out.contains("[output truncated:"));
898
899 if let Some(s) = spill {
901 let _ = tokio::fs::remove_file(&s.path).await;
902 }
903 }
904
905 #[tokio::test]
908 async fn test_collect_small_output_no_spill() {
909 let (mut writer, reader) = tokio::io::duplex(1024);
910 let config = OutputLimitConfig {
911 max_bytes: Some(1024),
912 head_bytes: 100,
913 tail_bytes: 50,
914 };
915
916 use tokio::io::AsyncWriteExt;
918 writer.write_all(b"hello world").await.unwrap();
919 drop(writer); let mut reader = reader;
922 let (result, did_spill) = collect_stdout_with_spill(&mut reader, 1024, &config).await;
923 assert_eq!(result, "hello world");
924 assert!(!did_spill);
925 }
926
927 #[tokio::test]
928 async fn test_collect_large_output_spills() {
929 let (mut writer, reader) = tokio::io::duplex(64 * 1024);
930 let config = OutputLimitConfig {
931 max_bytes: Some(100),
932 head_bytes: 20,
933 tail_bytes: 10,
934 };
935
936 use tokio::io::AsyncWriteExt;
938 let data = "x".repeat(500);
939 writer.write_all(data.as_bytes()).await.unwrap();
940 drop(writer); let mut reader = reader;
943 let (result, did_spill) = collect_stdout_with_spill(&mut reader, 100, &config).await;
944 assert!(did_spill, "should have spilled");
945 assert!(result.contains("[output truncated:"));
946 assert!(result.contains("full output at"));
947 }
948
949 #[tokio::test]
950 async fn test_collect_exact_boundary_no_spill() {
951 let (mut writer, reader) = tokio::io::duplex(1024);
952 let config = OutputLimitConfig {
953 max_bytes: Some(100),
954 head_bytes: 20,
955 tail_bytes: 10,
956 };
957
958 use tokio::io::AsyncWriteExt;
960 let data = "x".repeat(100);
961 writer.write_all(data.as_bytes()).await.unwrap();
962 drop(writer); let mut reader = reader;
965 let (result, did_spill) = collect_stdout_with_spill(&mut reader, 100, &config).await;
966 assert!(!did_spill, "exact boundary should not spill");
968 assert_eq!(result.len(), 100);
969 }
970
971 #[tokio::test]
972 async fn test_collect_broken_pipe() {
973 let (writer, reader) = tokio::io::duplex(1024);
974 let config = OutputLimitConfig {
975 max_bytes: Some(1024),
976 head_bytes: 100,
977 tail_bytes: 50,
978 };
979
980 use tokio::io::AsyncWriteExt;
982 let mut writer = writer;
983 writer.write_all(b"partial data").await.unwrap();
984 drop(writer); let mut reader = reader;
987 let (result, did_spill) = collect_stdout_with_spill(&mut reader, 1024, &config).await;
988 assert_eq!(result, "partial data");
989 assert!(!did_spill);
990 }
991}