1use std::path::PathBuf;
21
22use crate::interpreter::ExecResult;
23#[cfg(feature = "localfs")]
24use crate::paths;
25
26const DEFAULT_MCP_LIMIT: usize = 8 * 1024;
28
29const DEFAULT_HEAD_BYTES: usize = 1024;
31
32const DEFAULT_TAIL_BYTES: usize = 512;
34
35#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
42pub enum SpillMode {
43 #[default]
51 Disk,
52 Memory,
56}
57
58#[derive(Debug, Clone)]
63pub struct OutputLimitConfig {
64 max_bytes: Option<usize>,
65 head_bytes: usize,
66 tail_bytes: usize,
67 spill_mode: SpillMode,
68}
69
70impl OutputLimitConfig {
71 pub fn none() -> Self {
73 Self {
74 max_bytes: None,
75 head_bytes: DEFAULT_HEAD_BYTES,
76 tail_bytes: DEFAULT_TAIL_BYTES,
77 spill_mode: SpillMode::Disk,
78 }
79 }
80
81 pub fn default_limit() -> usize {
83 DEFAULT_MCP_LIMIT
84 }
85
86 pub fn mcp() -> Self {
88 Self {
89 max_bytes: Some(DEFAULT_MCP_LIMIT),
90 head_bytes: DEFAULT_HEAD_BYTES,
91 tail_bytes: DEFAULT_TAIL_BYTES,
92 spill_mode: SpillMode::Disk,
93 }
94 }
95
96 pub fn in_memory(mut self) -> Self {
104 self.spill_mode = SpillMode::Memory;
105 self
106 }
107
108 pub fn is_enabled(&self) -> bool {
110 self.max_bytes.is_some()
111 }
112
113 pub fn spill_mode(&self) -> SpillMode {
115 self.spill_mode
116 }
117
118 pub fn set_spill_mode(&mut self, mode: SpillMode) {
120 self.spill_mode = mode;
121 }
122
123 pub fn max_bytes(&self) -> Option<usize> {
125 self.max_bytes
126 }
127
128 pub fn head_bytes(&self) -> usize {
130 self.head_bytes
131 }
132
133 pub fn tail_bytes(&self) -> usize {
135 self.tail_bytes
136 }
137
138 pub fn set_limit(&mut self, max: Option<usize>) {
140 self.max_bytes = max;
141 }
142
143 pub fn set_head_bytes(&mut self, bytes: usize) {
145 self.head_bytes = bytes;
146 }
147
148 pub fn set_tail_bytes(&mut self, bytes: usize) {
150 self.tail_bytes = bytes;
151 }
152}
153
154pub struct SpillResult {
156 pub path: PathBuf,
157 pub total_bytes: usize,
158}
159
160pub async fn spill_if_needed(
172 result: &mut ExecResult,
173 config: &OutputLimitConfig,
174) -> Option<SpillResult> {
175 let max = config.max_bytes?;
176
177 #[cfg(feature = "localfs")]
180 if config.spill_mode == SpillMode::Disk {
181 if !result.text_out().is_empty() && !result.has_output() {
183 let total = result.text_out().len();
184 if total <= max {
185 return None;
186 }
187 return spill_string(result, config, max).await;
188 }
189
190 if let Some(output) = result.output() {
192 let estimate = output.estimated_byte_size();
193 if estimate <= max {
194 result.materialize();
196 if result.text_out().len() <= max {
198 return None;
199 }
200 return spill_string(result, config, max).await;
201 }
202
203 return spill_output_data(result, config, max).await;
205 }
206
207 return None;
208 }
209
210 truncate_in_memory(result, config, max)
212}
213
214fn truncate_in_memory(
227 result: &mut ExecResult,
228 config: &OutputLimitConfig,
229 max: usize,
230) -> Option<SpillResult> {
231 if let Some(output) = result.output() {
235 let estimate = output.estimated_byte_size();
236 if estimate > max {
237 let mut buf = Vec::with_capacity(config.head_bytes + 64);
239 let _ = output.write_canonical(&mut buf, Some(config.head_bytes));
241 let s = String::from_utf8_lossy(&buf);
242 let head = truncate_to_char_boundary(&s, config.head_bytes);
243 let truncated = format!(
244 "{}\n...\n[output truncated in memory: ~{} bytes (exceeds {} byte limit) — head only, no spill file]",
245 head, estimate, max
246 );
247 result.set_out(truncated);
248 result.did_spill = true;
249 return None;
250 }
251 result.materialize();
253 }
254
255 let total = result.text_out().len();
256 if total <= max {
257 return None;
258 }
259
260 let text = result.text_out().into_owned();
263 let head = truncate_to_char_boundary(&text, config.head_bytes);
264 let tail = tail_from_str(&text, config.tail_bytes);
265 let truncated = format!(
266 "{}\n...\n{}\n[output truncated in memory: {} bytes total — no spill file]",
267 head, tail, total
268 );
269 result.set_out(truncated);
270 result.did_spill = true;
271 None
272}
273
274#[cfg(feature = "localfs")]
276async fn spill_string(
277 result: &mut ExecResult,
278 config: &OutputLimitConfig,
279 max: usize,
280) -> Option<SpillResult> {
281 let total = result.text_out().len();
282 match write_spill_file(result.text_out().as_bytes()).await {
283 Ok((path, written)) => {
284 let truncated = build_truncated_output(&result.text_out(), config, &path, total);
285 result.set_out(truncated);
286 result.did_spill = true;
287 Some(SpillResult {
288 path,
289 total_bytes: written,
290 })
291 }
292 Err(e) => {
293 tracing::error!("output spill failed: {}", e);
294 *result = ExecResult::failure(1, format!(
295 "output exceeded {} byte limit ({} bytes) and spill to disk failed: {}",
296 max, total, e
297 ));
298 None
299 }
300 }
301}
302
303#[cfg(feature = "localfs")]
305async fn spill_output_data(
306 result: &mut ExecResult,
307 config: &OutputLimitConfig,
308 max: usize,
309) -> Option<SpillResult> {
310 let output = result.output()?;
311
312 let dir = paths::spill_dir();
313 if let Err(e) = tokio::fs::create_dir_all(&dir).await {
314 tracing::error!("output spill dir creation failed: {}", e);
315 *result = ExecResult::failure(1, format!(
316 "output exceeded {} byte limit and spill dir creation failed: {}", max, e
317 ));
318 return None;
319 }
320
321 let filename = generate_spill_filename();
322 let path = dir.join(&filename);
323
324 let total = match std::fs::File::create(&path) {
326 Ok(mut file) => {
327 match output.write_canonical(&mut file, None) {
328 Ok(n) => n,
329 Err(e) => {
330 tracing::error!("output spill write failed: {}", e);
331 *result = ExecResult::failure(1, format!(
332 "output exceeded {} byte limit and spill to disk failed: {}", max, e
333 ));
334 return None;
335 }
336 }
337 }
338 Err(e) => {
339 tracing::error!("output spill file creation failed: {}", e);
340 *result = ExecResult::failure(1, format!(
341 "output exceeded {} byte limit and spill to disk failed: {}", max, e
342 ));
343 return None;
344 }
345 };
346
347 let head = read_head_from_file(&path, config.head_bytes).await.unwrap_or_default();
349 let tail = read_tail_from_file(&path, config.tail_bytes).await.unwrap_or_default();
350 let path_str = path.to_string_lossy();
351
352 result.set_out(format!(
353 "{}\n...\n{}\n[output truncated: {} bytes total — full output at {}]",
354 head, tail, total, path_str
355 ));
356 result.did_spill = true;
357
358 Some(SpillResult {
359 path,
360 total_bytes: total,
361 })
362}
363
364#[cfg(feature = "subprocess")]
375pub async fn spill_aware_collect(
376 mut stdout: tokio::process::ChildStdout,
377 mut stderr_reader: tokio::process::ChildStderr,
378 stderr_stream: Option<crate::scheduler::StderrStream>,
379 config: &OutputLimitConfig,
380) -> (String, String, bool) {
381 let max = config.max_bytes.unwrap_or(usize::MAX);
382
383 let stderr_task = tokio::spawn(async move {
385 collect_stderr(&mut stderr_reader, stderr_stream.as_ref()).await
386 });
387
388 let (stdout_result, did_spill) = collect_stdout_with_spill(&mut stdout, max, config).await;
389
390 let stderr = stderr_task.await.unwrap_or_default();
391 (stdout_result, stderr, did_spill)
392}
393
394#[cfg(feature = "subprocess")]
396async fn collect_stderr(
397 reader: &mut tokio::process::ChildStderr,
398 stream: Option<&crate::scheduler::StderrStream>,
399) -> String {
400 use tokio::io::AsyncReadExt;
401
402 let mut buf = Vec::new();
403 let mut chunk = [0u8; 8192];
404 loop {
405 match reader.read(&mut chunk).await {
406 Ok(0) => break,
407 Ok(n) => {
408 if let Some(s) = stream {
409 s.write(&chunk[..n]);
410 } else {
411 buf.extend_from_slice(&chunk[..n]);
412 }
413 }
414 Err(_) => break,
415 }
416 }
417 if stream.is_some() {
418 String::new()
419 } else {
420 String::from_utf8_lossy(&buf).into_owned()
421 }
422}
423
424#[cfg(feature = "subprocess")]
431async fn collect_stdout_with_spill<R: tokio::io::AsyncRead + Unpin>(
432 stdout: &mut R,
433 max_bytes: usize,
434 config: &OutputLimitConfig,
435) -> (String, bool) {
436 use tokio::io::AsyncReadExt;
437 use tokio::time::{sleep, Duration};
438
439 let mut buffer = Vec::new();
440 let mut chunk = [0u8; 8192];
441 let deadline = sleep(Duration::from_secs(1));
442 tokio::pin!(deadline);
443
444 loop {
446 tokio::select! {
447 biased;
448 result = stdout.read(&mut chunk) => {
449 match result {
450 Ok(0) => {
451 return (String::from_utf8_lossy(&buffer).into_owned(), false);
454 }
455 Ok(n) => {
456 buffer.extend_from_slice(&chunk[..n]);
457 if buffer.len() > max_bytes {
459 break;
460 }
461 }
462 Err(_) => {
463 return (String::from_utf8_lossy(&buffer).into_owned(), false);
464 }
465 }
466 }
467 () = &mut deadline => {
468 break;
470 }
471 }
472 }
473
474 if buffer.len() > max_bytes {
476 return handle_overflow(&buffer, stdout, config, max_bytes).await;
478 }
479
480 loop {
483 match stdout.read(&mut chunk).await {
484 Ok(0) => break,
485 Ok(n) => {
486 buffer.extend_from_slice(&chunk[..n]);
487 if buffer.len() > max_bytes {
489 return handle_overflow(&buffer, stdout, config, max_bytes).await;
490 }
491 }
492 Err(_) => break,
493 }
494 }
495
496 (String::from_utf8_lossy(&buffer).into_owned(), false)
497}
498
499#[cfg(feature = "subprocess")]
503async fn handle_overflow<R: tokio::io::AsyncRead + Unpin>(
504 buffer: &[u8],
505 stdout: &mut R,
506 config: &OutputLimitConfig,
507 max_bytes: usize,
508) -> (String, bool) {
509 if config.spill_mode == SpillMode::Memory {
512 return (drain_in_memory(buffer, stdout, config).await, true);
513 }
514
515 match stream_to_spill(buffer, stdout, config).await {
516 Ok(result) => (result, true),
517 Err(e) => {
518 tracing::error!("streaming spill failed: {}", e);
521 (
522 format!(
523 "ERROR: output exceeded {} byte limit ({} bytes buffered) and spill to disk failed: {}",
524 max_bytes,
525 buffer.len(),
526 e
527 ),
528 false,
529 )
530 }
531 }
532}
533
534#[cfg(feature = "subprocess")]
539async fn drain_in_memory<R: tokio::io::AsyncRead + Unpin>(
540 buffer: &[u8],
541 stdout: &mut R,
542 config: &OutputLimitConfig,
543) -> String {
544 use tokio::io::AsyncReadExt;
545
546 let head = {
548 let s = String::from_utf8_lossy(buffer);
549 truncate_to_char_boundary(&s, config.head_bytes).to_string()
550 };
551
552 let cap = config.tail_bytes;
554 let mut tail: std::collections::VecDeque<u8> = std::collections::VecDeque::with_capacity(cap + 1);
555 extend_ring(&mut tail, buffer, cap);
556 let mut total = buffer.len();
557
558 let mut chunk = [0u8; 8192];
559 loop {
560 match stdout.read(&mut chunk).await {
561 Ok(0) => break,
562 Ok(n) => {
563 total += n;
564 extend_ring(&mut tail, &chunk[..n], cap);
565 }
566 Err(_) => break,
567 }
568 }
569
570 let tail_bytes: Vec<u8> = tail.into_iter().collect();
571 let tail_str = String::from_utf8_lossy(&tail_bytes);
572 let dropped = total.saturating_sub(head.len() + tail_bytes.len());
573 format!(
574 "{}\n...\n{}\n[output truncated in memory: {} bytes total, {} discarded — no spill file]",
575 head, tail_str, total, dropped
576 )
577}
578
579#[cfg(feature = "subprocess")]
582fn extend_ring(ring: &mut std::collections::VecDeque<u8>, bytes: &[u8], cap: usize) {
583 if cap == 0 {
584 return;
585 }
586 let start = bytes.len().saturating_sub(cap);
587 for &b in &bytes[start..] {
588 if ring.len() == cap {
589 ring.pop_front();
590 }
591 ring.push_back(b);
592 }
593}
594
595#[cfg(feature = "subprocess")]
599async fn stream_to_spill<R: tokio::io::AsyncRead + Unpin>(
600 buffer: &[u8],
601 stdout: &mut R,
602 config: &OutputLimitConfig,
603) -> Result<String, std::io::Error> {
604 use tokio::io::AsyncReadExt;
605
606 let spill_dir = paths::spill_dir();
607 tokio::fs::create_dir_all(&spill_dir).await?;
608
609 let filename = generate_spill_filename();
610 let path = spill_dir.join(&filename);
611 let mut file = tokio::fs::File::create(&path).await?;
612
613 use tokio::io::AsyncWriteExt;
615 file.write_all(buffer).await?;
616 let mut total = buffer.len();
617
618 let mut chunk = [0u8; 8192];
620 loop {
621 match stdout.read(&mut chunk).await {
622 Ok(0) => break,
623 Ok(n) => {
624 file.write_all(&chunk[..n]).await?;
625 total += n;
626 }
627 Err(_) => break,
628 }
629 }
630 file.flush().await?;
631
632 let full = String::from_utf8_lossy(buffer);
634 let head = truncate_to_char_boundary(&full, config.head_bytes);
635
636 let tail: String = if total <= buffer.len() {
638 let full_str = String::from_utf8_lossy(buffer);
639 tail_from_str(&full_str, config.tail_bytes).to_string()
640 } else {
641 read_tail_from_file(&path, config.tail_bytes).await.unwrap_or_default()
642 };
643
644 let path_str = path.to_string_lossy();
645 Ok(format!(
646 "{}\n...\n{}\n[output truncated: {} bytes total — full output at {}]",
647 head, tail, total, path_str
648 ))
649}
650
651#[cfg(feature = "localfs")]
653async fn write_spill_file(data: &[u8]) -> Result<(PathBuf, usize), std::io::Error> {
654 let dir = paths::spill_dir();
655 tokio::fs::create_dir_all(&dir).await?;
656
657 let filename = generate_spill_filename();
658 let path = dir.join(filename);
659 tokio::fs::write(&path, data).await?;
660 Ok((path, data.len()))
661}
662
663#[cfg(feature = "localfs")]
665fn build_truncated_output(
666 full: &str,
667 config: &OutputLimitConfig,
668 spill_path: &std::path::Path,
669 total_bytes: usize,
670) -> String {
671 let head = truncate_to_char_boundary(full, config.head_bytes);
672 let tail = tail_from_str(full, config.tail_bytes);
673 let path_str = spill_path.to_string_lossy();
674 format!(
675 "{}\n...\n{}\n[output truncated: {} bytes total — full output at {}]",
676 head, tail, total_bytes, path_str
677 )
678}
679
680fn truncate_to_char_boundary(s: &str, max_bytes: usize) -> &str {
682 if s.len() <= max_bytes {
683 return s;
684 }
685 let mut end = max_bytes;
687 while end > 0 && !s.is_char_boundary(end) {
688 end -= 1;
689 }
690 &s[..end]
691}
692
693fn tail_from_str(s: &str, max_bytes: usize) -> &str {
695 if s.len() <= max_bytes {
696 return s;
697 }
698 let start = s.len() - max_bytes;
699 let mut adjusted = start;
700 while adjusted < s.len() && !s.is_char_boundary(adjusted) {
701 adjusted += 1;
702 }
703 &s[adjusted..]
704}
705
706#[cfg(feature = "localfs")]
708async fn read_head_from_file(path: &std::path::Path, max_bytes: usize) -> Result<String, std::io::Error> {
709 use tokio::io::AsyncReadExt;
710
711 let mut file = tokio::fs::File::open(path).await?;
712 let mut buf = vec![0u8; max_bytes];
713 let n = file.read(&mut buf).await?;
714 buf.truncate(n);
715
716 let s = String::from_utf8_lossy(&buf);
717 let result = truncate_to_char_boundary(&s, max_bytes);
719 Ok(result.to_string())
720}
721
722#[cfg(feature = "localfs")]
724async fn read_tail_from_file(path: &std::path::Path, max_bytes: usize) -> Result<String, std::io::Error> {
725 use tokio::io::{AsyncReadExt, AsyncSeekExt};
726
727 let mut file = tokio::fs::File::open(path).await?;
728 let metadata = file.metadata().await?;
729 let len = metadata.len() as usize;
730
731 if len <= max_bytes {
732 let mut buf = Vec::new();
733 file.read_to_end(&mut buf).await?;
734 return Ok(String::from_utf8_lossy(&buf).into_owned());
735 }
736
737 let offset = len - max_bytes;
738 file.seek(std::io::SeekFrom::Start(offset as u64)).await?;
739 let mut buf = vec![0u8; max_bytes];
740 let n = file.read(&mut buf).await?;
741 buf.truncate(n);
742
743 let s = String::from_utf8_lossy(&buf);
745 Ok(s.into_owned())
746}
747
748#[cfg(feature = "localfs")]
750fn generate_spill_filename() -> String {
751 use std::sync::atomic::{AtomicUsize, Ordering};
752 use std::time::SystemTime;
753
754 static COUNTER: AtomicUsize = AtomicUsize::new(0);
755 let seq = COUNTER.fetch_add(1, Ordering::Relaxed);
756 let ts = SystemTime::now()
757 .duration_since(SystemTime::UNIX_EPOCH)
758 .unwrap_or_default();
759 let pid = std::process::id();
760 format!("spill-{}.{}-{}-{}.txt", ts.as_secs(), ts.subsec_nanos(), pid, seq)
761}
762
763pub fn parse_size(s: &str) -> Result<usize, String> {
767 let s = s.trim();
768 if s.is_empty() {
769 return Err("empty size string".to_string());
770 }
771
772 let (num_str, multiplier) = if let Some(n) = s.strip_suffix('K').or_else(|| s.strip_suffix('k')) {
773 (n, 1024)
774 } else if let Some(n) = s.strip_suffix('M').or_else(|| s.strip_suffix('m')) {
775 (n, 1024 * 1024)
776 } else {
777 (s, 1)
778 };
779
780 let num: usize = num_str
781 .parse()
782 .map_err(|_| format!("invalid size: {}", s))?;
783
784 Ok(num * multiplier)
785}
786
787#[cfg(all(test, feature = "localfs"))]
788mod tests {
789 use super::*;
790
791 #[test]
792 fn test_none_is_disabled() {
793 let config = OutputLimitConfig::none();
794 assert!(!config.is_enabled());
795 assert_eq!(config.max_bytes(), None);
796 }
797
798 #[test]
799 fn test_mcp_is_enabled() {
800 let config = OutputLimitConfig::mcp();
801 assert!(config.is_enabled());
802 assert_eq!(config.max_bytes(), Some(8 * 1024));
803 assert_eq!(config.head_bytes(), 1024);
804 assert_eq!(config.tail_bytes(), 512);
805 }
806
807 #[test]
808 fn test_set_limit() {
809 let mut config = OutputLimitConfig::none();
810 assert!(!config.is_enabled());
811
812 config.set_limit(Some(1024));
813 assert!(config.is_enabled());
814 assert_eq!(config.max_bytes(), Some(1024));
815
816 config.set_limit(None);
817 assert!(!config.is_enabled());
818 }
819
820 #[test]
821 fn test_set_head_tail() {
822 let mut config = OutputLimitConfig::mcp();
823 config.set_head_bytes(2048);
824 config.set_tail_bytes(1024);
825 assert_eq!(config.head_bytes(), 2048);
826 assert_eq!(config.tail_bytes(), 1024);
827 }
828
829 #[test]
830 fn test_parse_size() {
831 assert_eq!(parse_size("64K").unwrap(), 64 * 1024);
832 assert_eq!(parse_size("64k").unwrap(), 64 * 1024);
833 assert_eq!(parse_size("1M").unwrap(), 1024 * 1024);
834 assert_eq!(parse_size("1m").unwrap(), 1024 * 1024);
835 assert_eq!(parse_size("65536").unwrap(), 65536);
836 assert!(parse_size("").is_err());
837 assert!(parse_size("abc").is_err());
838 }
839
840 #[test]
841 fn test_truncate_to_char_boundary() {
842 assert_eq!(truncate_to_char_boundary("hello", 10), "hello");
843 assert_eq!(truncate_to_char_boundary("hello", 3), "hel");
844 assert_eq!(truncate_to_char_boundary("日本語", 3), "日");
846 assert_eq!(truncate_to_char_boundary("日本語", 4), "日");
847 assert_eq!(truncate_to_char_boundary("日本語", 6), "日本");
848 }
849
850 #[test]
851 fn test_tail_from_str() {
852 assert_eq!(tail_from_str("hello", 10), "hello");
853 assert_eq!(tail_from_str("hello", 3), "llo");
854 assert_eq!(tail_from_str("日本語", 3), "語");
856 assert_eq!(tail_from_str("日本語", 6), "本語");
857 }
858
859 #[test]
860 fn test_generate_spill_filename() {
861 let name = generate_spill_filename();
862 assert!(name.starts_with("spill-"));
863 assert!(name.ends_with(".txt"));
864 }
865
866 #[tokio::test]
867 async fn test_spill_if_needed_under_limit() {
868 let config = OutputLimitConfig::mcp();
869 let mut result = ExecResult::success("short output");
870 let spill = spill_if_needed(&mut result, &config).await;
871 assert!(spill.is_none());
872 assert_eq!(&*result.text_out(), "short output");
873 assert!(!result.did_spill);
874 }
875
876 #[tokio::test]
877 async fn test_spill_if_needed_over_limit() {
878 let config = OutputLimitConfig {
879 max_bytes: Some(100),
880 head_bytes: 20,
881 tail_bytes: 10,
882 spill_mode: SpillMode::Disk,
883 };
884 let big_output = "x".repeat(200);
885 let mut result = ExecResult::success(big_output);
886 let spill = spill_if_needed(&mut result, &config).await;
887 assert!(spill.is_some());
888 assert!(result.did_spill);
889
890 let spill = spill.unwrap();
891 assert_eq!(spill.total_bytes, 200);
892 assert!(spill.path.exists());
893
894 assert!(result.text_out().contains("..."));
896 assert!(result.text_out().contains("[output truncated: 200 bytes total"));
897 assert!(result.text_out().contains(&spill.path.to_string_lossy().to_string()));
898
899 assert!(result.text_out().starts_with(&"x".repeat(20)));
901
902 let spill_content = tokio::fs::read_to_string(&spill.path).await.unwrap();
904 assert_eq!(spill_content.len(), 200);
905
906 let _ = tokio::fs::remove_file(&spill.path).await;
908 }
909
910 #[tokio::test]
911 async fn test_spill_if_needed_disabled() {
912 let config = OutputLimitConfig::none();
913 let big_output = "x".repeat(200);
914 let mut result = ExecResult::success(big_output.clone());
915 let spill = spill_if_needed(&mut result, &config).await;
916 assert!(spill.is_none());
917 assert_eq!(&*result.text_out(), big_output);
918 assert!(!result.did_spill);
919 }
920
921 #[test]
922 fn test_build_truncated_output() {
923 let config = OutputLimitConfig {
924 max_bytes: Some(100),
925 head_bytes: 5,
926 tail_bytes: 3,
927 spill_mode: SpillMode::Disk,
928 };
929 let full = "abcdefghijklmnop";
930 let path = PathBuf::from("/tmp/test-spill.txt");
931 let result = build_truncated_output(full, &config, &path, 16);
932 assert!(result.starts_with("abcde"));
933 assert!(result.contains("..."));
934 assert!(result.contains("nop"));
935 assert!(result.contains("[output truncated: 16 bytes total — full output at /tmp/test-spill.txt]"));
936 }
937
938 #[tokio::test]
939 async fn test_kernel_mcp_truncates_large_output() {
940 use crate::kernel::{Kernel, KernelConfig};
941
942 let config = KernelConfig::mcp()
944 .with_output_limit(OutputLimitConfig {
945 max_bytes: Some(200),
946 head_bytes: 50,
947 tail_bytes: 30,
948 spill_mode: SpillMode::Disk,
949 });
950 let kernel = Kernel::new(config).expect("kernel creation");
951
952 let result = kernel.execute("seq 1 10000").await.expect("execute");
954 assert!(result.text_out().contains("[output truncated:"));
955 assert!(result.text_out().contains("full output at"));
956 assert!(result.text_out().starts_with("1\n"));
958 }
959
960 #[tokio::test]
961 async fn test_spill_exits_3() {
962 use crate::kernel::{Kernel, KernelConfig};
963
964 let config = KernelConfig::mcp()
965 .with_output_limit(OutputLimitConfig {
966 max_bytes: Some(100),
967 head_bytes: 30,
968 tail_bytes: 20,
969 spill_mode: SpillMode::Disk,
970 });
971 let kernel = Kernel::new(config).expect("kernel creation");
972
973 let big = "x".repeat(200);
974 let result = kernel.execute(&format!("echo '{}'", big)).await.expect("execute");
975 assert_eq!(result.code, 3, "spill should always exit 3");
976 assert_eq!(result.original_code, Some(0), "original command exit code preserved");
977 assert!(result.text_out().contains("[output truncated:"));
978 }
979
980 #[tokio::test]
981 async fn test_kernel_repl_no_truncation() {
982 use crate::kernel::{Kernel, KernelConfig};
983
984 let config = KernelConfig::repl();
986 let kernel = Kernel::new(config).expect("kernel creation");
987
988 let result = kernel.execute("seq 1 100").await.expect("execute");
989 assert!(!result.text_out().contains("[output truncated:"));
990 assert!(result.text_out().contains("100"));
991 }
992
993 #[tokio::test]
994 async fn test_kernel_builtin_truncation() {
995 use crate::kernel::{Kernel, KernelConfig};
996
997 let config = KernelConfig::mcp()
999 .with_output_limit(OutputLimitConfig {
1000 max_bytes: Some(100),
1001 head_bytes: 30,
1002 tail_bytes: 20,
1003 spill_mode: SpillMode::Disk,
1004 });
1005 let kernel = Kernel::new(config).expect("kernel creation");
1006
1007 let big = "x".repeat(200);
1009 let result = kernel.execute(&format!("echo '{}'", big)).await.expect("execute");
1010 assert!(result.text_out().contains("[output truncated:"));
1011 }
1012
1013 #[test]
1016 fn test_estimated_byte_size_text() {
1017 use crate::interpreter::OutputData;
1018 let data = OutputData::text("hello world");
1019 assert_eq!(data.estimated_byte_size(), 11);
1020 }
1021
1022 #[test]
1023 fn test_estimated_byte_size_table() {
1024 use crate::interpreter::{OutputData, OutputNode};
1025 let data = OutputData::table(
1026 vec!["NAME".into(), "SIZE".into()],
1027 vec![
1028 OutputNode::new("foo").with_cells(vec!["123".into()]),
1029 OutputNode::new("bar").with_cells(vec!["456".into()]),
1030 ],
1031 );
1032 assert_eq!(data.estimated_byte_size(), 15);
1034 }
1035
1036 #[test]
1037 fn test_estimated_byte_size_tree() {
1038 use crate::interpreter::{OutputData, OutputNode};
1039 let data = OutputData::nodes(vec![
1040 OutputNode::new("src").with_children(vec![
1041 OutputNode::new("main.rs"),
1042 OutputNode::new("lib.rs"),
1043 ]),
1044 ]);
1045 assert_eq!(data.estimated_byte_size(), 20);
1047 }
1048
1049 #[test]
1050 fn test_write_canonical_matches_to_canonical_string() {
1051 use crate::interpreter::{OutputData, OutputNode};
1052
1053 let cases: Vec<OutputData> = vec![
1054 OutputData::text("hello world"),
1055 OutputData::nodes(vec![
1056 OutputNode::new("file1"),
1057 OutputNode::new("file2"),
1058 ]),
1059 OutputData::table(
1060 vec!["NAME".into(), "SIZE".into()],
1061 vec![
1062 OutputNode::new("foo").with_cells(vec!["123".into()]),
1063 OutputNode::new("bar").with_cells(vec!["456".into()]),
1064 ],
1065 ),
1066 OutputData::nodes(vec![
1067 OutputNode::new("src").with_children(vec![
1068 OutputNode::new("main.rs"),
1069 OutputNode::new("lib.rs"),
1070 ]),
1071 ]),
1072 ];
1073
1074 for data in cases {
1075 let expected = data.to_canonical_string();
1076 let mut buf = Vec::new();
1077 let written = data.write_canonical(&mut buf, None).unwrap();
1078 let got = String::from_utf8(buf).unwrap();
1079 assert_eq!(got, expected, "write_canonical mismatch for {:?}", data);
1080 assert_eq!(written, expected.len(), "byte count mismatch");
1081 }
1082 }
1083
1084 #[test]
1085 fn test_write_canonical_budget_stops_early() {
1086 use crate::interpreter::{OutputData, OutputNode};
1087
1088 let data = OutputData::nodes(
1089 (0..1000).map(|i| OutputNode::new(format!("file_{:04}", i))).collect()
1090 );
1091 let mut buf = Vec::new();
1092 let written = data.write_canonical(&mut buf, Some(100)).unwrap();
1093 assert!(written > 100, "should exceed budget slightly");
1095 assert!(written < 500, "should stop soon after budget: got {}", written);
1096 }
1097
1098 #[tokio::test]
1099 async fn test_spill_if_needed_large_output_data_no_oom() {
1100 use crate::interpreter::{OutputData, OutputNode};
1101
1102 let config = OutputLimitConfig {
1103 max_bytes: Some(1024),
1104 head_bytes: 100,
1105 tail_bytes: 50,
1106 spill_mode: SpillMode::Disk,
1107 };
1108
1109 let nodes: Vec<OutputNode> = (0..100_000)
1112 .map(|i| OutputNode::new(format!("node_{:06}", i)))
1113 .collect();
1114 let data = OutputData::nodes(nodes);
1115 let mut result = ExecResult::with_output(data);
1116
1117 let spill = spill_if_needed(&mut result, &config).await;
1118 assert!(spill.is_some(), "should have spilled");
1119 assert!(result.did_spill);
1120 assert!(result.text_out().contains("[output truncated:"));
1121
1122 if let Some(s) = spill {
1124 let _ = tokio::fs::remove_file(&s.path).await;
1125 }
1126 }
1127
1128 #[cfg(feature = "subprocess")]
1133 #[tokio::test]
1134 async fn test_collect_small_output_no_spill() {
1135 let (mut writer, reader) = tokio::io::duplex(1024);
1136 let config = OutputLimitConfig {
1137 max_bytes: Some(1024),
1138 head_bytes: 100,
1139 tail_bytes: 50,
1140 spill_mode: SpillMode::Disk,
1141 };
1142
1143 use tokio::io::AsyncWriteExt;
1145 writer.write_all(b"hello world").await.unwrap();
1146 drop(writer); let mut reader = reader;
1149 let (result, did_spill) = collect_stdout_with_spill(&mut reader, 1024, &config).await;
1150 assert_eq!(result, "hello world");
1151 assert!(!did_spill);
1152 }
1153
1154 #[cfg(feature = "subprocess")]
1155 #[tokio::test]
1156 async fn test_collect_large_output_spills() {
1157 let (mut writer, reader) = tokio::io::duplex(64 * 1024);
1158 let config = OutputLimitConfig {
1159 max_bytes: Some(100),
1160 head_bytes: 20,
1161 tail_bytes: 10,
1162 spill_mode: SpillMode::Disk,
1163 };
1164
1165 use tokio::io::AsyncWriteExt;
1167 let data = "x".repeat(500);
1168 writer.write_all(data.as_bytes()).await.unwrap();
1169 drop(writer); let mut reader = reader;
1172 let (result, did_spill) = collect_stdout_with_spill(&mut reader, 100, &config).await;
1173 assert!(did_spill, "should have spilled");
1174 assert!(result.contains("[output truncated:"));
1175 assert!(result.contains("full output at"));
1176 }
1177
1178 #[cfg(feature = "subprocess")]
1179 #[tokio::test]
1180 async fn test_collect_exact_boundary_no_spill() {
1181 let (mut writer, reader) = tokio::io::duplex(1024);
1182 let config = OutputLimitConfig {
1183 max_bytes: Some(100),
1184 head_bytes: 20,
1185 tail_bytes: 10,
1186 spill_mode: SpillMode::Disk,
1187 };
1188
1189 use tokio::io::AsyncWriteExt;
1191 let data = "x".repeat(100);
1192 writer.write_all(data.as_bytes()).await.unwrap();
1193 drop(writer); let mut reader = reader;
1196 let (result, did_spill) = collect_stdout_with_spill(&mut reader, 100, &config).await;
1197 assert!(!did_spill, "exact boundary should not spill");
1199 assert_eq!(result.len(), 100);
1200 }
1201
1202 #[cfg(feature = "subprocess")]
1203 #[tokio::test]
1204 async fn test_collect_broken_pipe() {
1205 let (writer, reader) = tokio::io::duplex(1024);
1206 let config = OutputLimitConfig {
1207 max_bytes: Some(1024),
1208 head_bytes: 100,
1209 tail_bytes: 50,
1210 spill_mode: SpillMode::Disk,
1211 };
1212
1213 use tokio::io::AsyncWriteExt;
1215 let mut writer = writer;
1216 writer.write_all(b"partial data").await.unwrap();
1217 drop(writer); let mut reader = reader;
1220 let (result, did_spill) = collect_stdout_with_spill(&mut reader, 1024, &config).await;
1221 assert_eq!(result, "partial data");
1222 assert!(!did_spill);
1223 }
1224
1225 #[test]
1228 fn test_in_memory_builder_and_default() {
1229 assert_eq!(OutputLimitConfig::mcp().spill_mode(), SpillMode::Disk);
1230 assert_eq!(OutputLimitConfig::mcp().in_memory().spill_mode(), SpillMode::Memory);
1231
1232 let mut config = OutputLimitConfig::none();
1233 config.set_spill_mode(SpillMode::Memory);
1234 assert_eq!(config.spill_mode(), SpillMode::Memory);
1235 }
1236
1237 #[tokio::test]
1238 async fn test_memory_mode_truncates_string_without_disk() {
1239 let config = OutputLimitConfig {
1240 max_bytes: Some(100),
1241 head_bytes: 20,
1242 tail_bytes: 10,
1243 spill_mode: SpillMode::Memory,
1244 };
1245 let mut result = ExecResult::success("x".repeat(200));
1246 let spill = spill_if_needed(&mut result, &config).await;
1247
1248 assert!(spill.is_none(), "memory mode must not write a spill file");
1250 assert!(result.did_spill, "memory truncation must set did_spill for the exit-3 remap");
1251
1252 let out = result.text_out();
1253 assert!(out.contains("truncated in memory"), "got: {}", out);
1254 assert!(out.contains("200 bytes total"), "got: {}", out);
1255 assert!(!out.contains("full output at"), "memory mode must not point at a file: {}", out);
1256 assert!(out.starts_with(&"x".repeat(20)), "head preserved");
1257 }
1258
1259 #[tokio::test]
1260 async fn test_memory_mode_under_limit_untouched() {
1261 let config = OutputLimitConfig {
1262 max_bytes: Some(100),
1263 head_bytes: 20,
1264 tail_bytes: 10,
1265 spill_mode: SpillMode::Memory,
1266 };
1267 let mut result = ExecResult::success("short");
1268 let spill = spill_if_needed(&mut result, &config).await;
1269 assert!(spill.is_none());
1270 assert!(!result.did_spill);
1271 assert_eq!(&*result.text_out(), "short");
1272 }
1273
1274 #[tokio::test]
1275 async fn test_memory_mode_large_output_data_bounded() {
1276 use crate::interpreter::{OutputData, OutputNode};
1277
1278 let config = OutputLimitConfig {
1279 max_bytes: Some(1024),
1280 head_bytes: 100,
1281 tail_bytes: 50,
1282 spill_mode: SpillMode::Memory,
1283 };
1284
1285 let nodes: Vec<OutputNode> = (0..100_000)
1287 .map(|i| OutputNode::new(format!("node_{:06}", i)))
1288 .collect();
1289 let mut result = ExecResult::with_output(OutputData::nodes(nodes));
1290
1291 let spill = spill_if_needed(&mut result, &config).await;
1292 assert!(spill.is_none(), "memory mode writes no file");
1293 assert!(result.did_spill);
1294 let out = result.text_out();
1295 assert!(out.contains("truncated in memory"), "got: {}", out);
1296 assert!(out.starts_with("node_000000"), "head rendered: {}", out);
1297 assert!(out.contains("head only"), "got: {}", out);
1299 }
1300
1301 #[tokio::test]
1302 async fn test_kernel_memory_mode_exits_3_preserves_original() {
1303 use crate::kernel::{Kernel, KernelConfig};
1304
1305 let config = KernelConfig::mcp().with_output_limit(OutputLimitConfig {
1306 max_bytes: Some(100),
1307 head_bytes: 30,
1308 tail_bytes: 20,
1309 spill_mode: SpillMode::Memory,
1310 });
1311 let kernel = Kernel::new(config).expect("kernel creation");
1312
1313 let big = "x".repeat(200);
1314 let result = kernel.execute(&format!("echo '{}'", big)).await.expect("execute");
1315 assert_eq!(result.code, 3, "memory truncation still signals via exit 3");
1316 assert_eq!(result.original_code, Some(0), "original exit code preserved");
1317 assert!(result.text_out().contains("truncated in memory"));
1318 assert!(!result.text_out().contains("full output at"));
1319 }
1320
1321 #[tokio::test]
1322 async fn test_nolocal_kernel_forces_memory_spill() {
1323 use crate::kernel::{Kernel, KernelConfig, VfsMountMode};
1324
1325 let config = KernelConfig::mcp()
1329 .with_vfs_mode(VfsMountMode::NoLocal)
1330 .with_output_limit(OutputLimitConfig {
1331 max_bytes: Some(100),
1332 head_bytes: 30,
1333 tail_bytes: 20,
1334 spill_mode: SpillMode::Disk,
1335 });
1336 let kernel = Kernel::new(config).expect("kernel creation");
1337
1338 let big = "x".repeat(200);
1339 let result = kernel.execute(&format!("echo '{}'", big)).await.expect("execute");
1340 assert_eq!(result.code, 3, "still signals truncation via exit 3");
1341 assert!(result.text_out().contains("truncated in memory"), "got: {}", result.text_out());
1342 assert!(
1343 !result.text_out().contains("full output at"),
1344 "NoLocal kernel must not write a host spill file: {}",
1345 result.text_out()
1346 );
1347 }
1348
1349 #[cfg(feature = "subprocess")]
1350 #[tokio::test]
1351 async fn test_collect_memory_mode_drains_without_disk() {
1352 let (mut writer, reader) = tokio::io::duplex(64 * 1024);
1353 let config = OutputLimitConfig {
1354 max_bytes: Some(100),
1355 head_bytes: 20,
1356 tail_bytes: 10,
1357 spill_mode: SpillMode::Memory,
1358 };
1359
1360 use tokio::io::AsyncWriteExt;
1361 let data = format!("{}{}{}", "a".repeat(20), "b".repeat(500), "c".repeat(10));
1363 writer.write_all(data.as_bytes()).await.unwrap();
1364 drop(writer);
1365
1366 let mut reader = reader;
1367 let (result, did_spill) = collect_stdout_with_spill(&mut reader, 100, &config).await;
1368 assert!(did_spill, "drain flags truncation for the exit-3 remap");
1369 assert!(result.contains("truncated in memory"), "got: {}", result);
1370 assert!(!result.contains("full output at"), "no disk file in memory mode");
1371 assert!(result.starts_with(&"a".repeat(20)), "head preserved: {}", result);
1372 assert!(result.contains(&"c".repeat(10)), "tail preserved: {}", result);
1373 assert!(result.contains("530 bytes total"), "honest total: {}", result);
1374 }
1375
1376 #[cfg(feature = "subprocess")]
1377 #[test]
1378 fn test_extend_ring_keeps_last_cap_bytes() {
1379 let mut ring = std::collections::VecDeque::new();
1380 super::extend_ring(&mut ring, b"abcdef", 3);
1381 assert_eq!(ring.iter().copied().collect::<Vec<u8>>(), b"def");
1382 super::extend_ring(&mut ring, b"gh", 3);
1384 assert_eq!(ring.iter().copied().collect::<Vec<u8>>(), b"fgh");
1385 let mut empty = std::collections::VecDeque::new();
1387 super::extend_ring(&mut empty, b"xyz", 0);
1388 assert!(empty.is_empty());
1389 }
1390}