1use std::path::PathBuf;
11
12use crate::interpreter::ExecResult;
13use crate::paths;
14
15const DEFAULT_MCP_LIMIT: usize = 64 * 1024;
17
18const DEFAULT_HEAD_BYTES: usize = 1024;
20
21const DEFAULT_TAIL_BYTES: usize = 512;
23
24#[derive(Debug, Clone)]
29pub struct OutputLimitConfig {
30 max_bytes: Option<usize>,
31 head_bytes: usize,
32 tail_bytes: usize,
33}
34
35impl OutputLimitConfig {
36 pub fn none() -> Self {
38 Self {
39 max_bytes: None,
40 head_bytes: DEFAULT_HEAD_BYTES,
41 tail_bytes: DEFAULT_TAIL_BYTES,
42 }
43 }
44
45 pub fn mcp() -> Self {
47 Self {
48 max_bytes: Some(DEFAULT_MCP_LIMIT),
49 head_bytes: DEFAULT_HEAD_BYTES,
50 tail_bytes: DEFAULT_TAIL_BYTES,
51 }
52 }
53
54 pub fn is_enabled(&self) -> bool {
56 self.max_bytes.is_some()
57 }
58
59 pub fn max_bytes(&self) -> Option<usize> {
61 self.max_bytes
62 }
63
64 pub fn head_bytes(&self) -> usize {
66 self.head_bytes
67 }
68
69 pub fn tail_bytes(&self) -> usize {
71 self.tail_bytes
72 }
73
74 pub fn set_limit(&mut self, max: Option<usize>) {
76 self.max_bytes = max;
77 }
78
79 pub fn set_head_bytes(&mut self, bytes: usize) {
81 self.head_bytes = bytes;
82 }
83
84 pub fn set_tail_bytes(&mut self, bytes: usize) {
86 self.tail_bytes = bytes;
87 }
88}
89
90pub struct SpillResult {
92 pub path: PathBuf,
93 pub total_bytes: usize,
94}
95
96pub async fn spill_if_needed(
105 result: &mut ExecResult,
106 config: &OutputLimitConfig,
107) -> Option<SpillResult> {
108 let max = config.max_bytes?;
109 let total = result.out.len();
110 if total <= max {
111 return None;
112 }
113
114 match write_spill_file(result.out.as_bytes()).await {
115 Ok((path, written)) => {
116 result.out = build_truncated_output(&result.out, config, &path, total);
117 Some(SpillResult {
118 path,
119 total_bytes: written,
120 })
121 }
122 Err(e) => {
123 tracing::error!("output spill failed: {}", e);
125 *result = ExecResult::failure(1, format!(
126 "output exceeded {} byte limit ({} bytes) and spill to disk failed: {}",
127 max, total, e
128 ));
129 None
130 }
131 }
132}
133
134pub async fn spill_aware_collect(
142 mut stdout: tokio::process::ChildStdout,
143 mut stderr_reader: tokio::process::ChildStderr,
144 stderr_stream: Option<crate::scheduler::StderrStream>,
145 config: &OutputLimitConfig,
146) -> (String, String) {
147 let max = config.max_bytes.unwrap_or(usize::MAX);
148
149 let stderr_task = tokio::spawn(async move {
151 collect_stderr(&mut stderr_reader, stderr_stream.as_ref()).await
152 });
153
154 let stdout_result = collect_stdout_with_spill(&mut stdout, max, config).await;
155
156 let stderr = stderr_task.await.unwrap_or_default();
157 (stdout_result, stderr)
158}
159
160async fn collect_stderr(
162 reader: &mut tokio::process::ChildStderr,
163 stream: Option<&crate::scheduler::StderrStream>,
164) -> String {
165 use tokio::io::AsyncReadExt;
166
167 let mut buf = Vec::new();
168 let mut chunk = [0u8; 8192];
169 loop {
170 match reader.read(&mut chunk).await {
171 Ok(0) => break,
172 Ok(n) => {
173 if let Some(s) = stream {
174 s.write(&chunk[..n]);
175 } else {
176 buf.extend_from_slice(&chunk[..n]);
177 }
178 }
179 Err(_) => break,
180 }
181 }
182 if stream.is_some() {
183 String::new()
184 } else {
185 String::from_utf8_lossy(&buf).into_owned()
186 }
187}
188
189async fn collect_stdout_with_spill(
191 stdout: &mut tokio::process::ChildStdout,
192 max_bytes: usize,
193 config: &OutputLimitConfig,
194) -> String {
195 use tokio::io::AsyncReadExt;
196 use tokio::time::{sleep, Duration};
197
198 let mut buffer = Vec::new();
199 let mut chunk = [0u8; 8192];
200 let deadline = sleep(Duration::from_secs(1));
201 tokio::pin!(deadline);
202
203 loop {
205 tokio::select! {
206 biased;
207 result = stdout.read(&mut chunk) => {
208 match result {
209 Ok(0) => {
210 return String::from_utf8_lossy(&buffer).into_owned();
213 }
214 Ok(n) => {
215 buffer.extend_from_slice(&chunk[..n]);
216 if buffer.len() > max_bytes {
218 break;
219 }
220 }
221 Err(_) => {
222 return String::from_utf8_lossy(&buffer).into_owned();
223 }
224 }
225 }
226 () = &mut deadline => {
227 break;
229 }
230 }
231 }
232
233 if buffer.len() > max_bytes {
235 match stream_to_spill(&buffer, stdout, config).await {
237 Ok(result) => return result,
238 Err(e) => {
239 tracing::error!("streaming spill failed: {}", e);
242 let size = buffer.len();
243 drop(buffer);
244 return format!(
245 "ERROR: output exceeded {} byte limit ({} bytes buffered) and spill to disk failed: {}",
246 max_bytes, size, e
247 );
248 }
249 }
250 }
251
252 loop {
255 match stdout.read(&mut chunk).await {
256 Ok(0) => break,
257 Ok(n) => {
258 buffer.extend_from_slice(&chunk[..n]);
259 if buffer.len() > max_bytes {
261 match stream_to_spill(&buffer, stdout, config).await {
262 Ok(result) => return result,
263 Err(e) => {
264 tracing::error!("streaming spill failed: {}", e);
265 let size = buffer.len();
266 drop(buffer);
267 return format!(
268 "ERROR: output exceeded {} byte limit ({} bytes buffered) and spill to disk failed: {}",
269 max_bytes, size, e
270 );
271 }
272 }
273 }
274 }
275 Err(_) => break,
276 }
277 }
278
279 String::from_utf8_lossy(&buffer).into_owned()
280}
281
282async fn stream_to_spill(
284 buffer: &[u8],
285 stdout: &mut tokio::process::ChildStdout,
286 config: &OutputLimitConfig,
287) -> Result<String, std::io::Error> {
288 use tokio::io::AsyncReadExt;
289
290 let spill_dir = paths::spill_dir();
291 tokio::fs::create_dir_all(&spill_dir).await?;
292
293 let filename = generate_spill_filename();
294 let path = spill_dir.join(&filename);
295 let mut file = tokio::fs::File::create(&path).await?;
296
297 use tokio::io::AsyncWriteExt;
299 file.write_all(buffer).await?;
300 let mut total = buffer.len();
301
302 let mut chunk = [0u8; 8192];
304 loop {
305 match stdout.read(&mut chunk).await {
306 Ok(0) => break,
307 Ok(n) => {
308 file.write_all(&chunk[..n]).await?;
309 total += n;
310 }
311 Err(_) => break,
312 }
313 }
314 file.flush().await?;
315
316 let full = String::from_utf8_lossy(buffer);
318 let head = truncate_to_char_boundary(&full, config.head_bytes);
319
320 let tail: String = if total <= buffer.len() {
322 let full_str = String::from_utf8_lossy(buffer);
323 tail_from_str(&full_str, config.tail_bytes).to_string()
324 } else {
325 read_tail_from_file(&path, config.tail_bytes).await.unwrap_or_default()
326 };
327
328 let path_str = path.to_string_lossy();
329 Ok(format!(
330 "{}\n...\n{}\n[output truncated: {} bytes total — full output at {}]",
331 head, tail, total, path_str
332 ))
333}
334
335async fn write_spill_file(data: &[u8]) -> Result<(PathBuf, usize), std::io::Error> {
337 let dir = paths::spill_dir();
338 tokio::fs::create_dir_all(&dir).await?;
339
340 let filename = generate_spill_filename();
341 let path = dir.join(filename);
342 tokio::fs::write(&path, data).await?;
343 Ok((path, data.len()))
344}
345
346fn build_truncated_output(
348 full: &str,
349 config: &OutputLimitConfig,
350 spill_path: &std::path::Path,
351 total_bytes: usize,
352) -> String {
353 let head = truncate_to_char_boundary(full, config.head_bytes);
354 let tail = tail_from_str(full, config.tail_bytes);
355 let path_str = spill_path.to_string_lossy();
356 format!(
357 "{}\n...\n{}\n[output truncated: {} bytes total — full output at {}]",
358 head, tail, total_bytes, path_str
359 )
360}
361
362fn truncate_to_char_boundary(s: &str, max_bytes: usize) -> &str {
364 if s.len() <= max_bytes {
365 return s;
366 }
367 let mut end = max_bytes;
369 while end > 0 && !s.is_char_boundary(end) {
370 end -= 1;
371 }
372 &s[..end]
373}
374
375fn tail_from_str(s: &str, max_bytes: usize) -> &str {
377 if s.len() <= max_bytes {
378 return s;
379 }
380 let start = s.len() - max_bytes;
381 let mut adjusted = start;
382 while adjusted < s.len() && !s.is_char_boundary(adjusted) {
383 adjusted += 1;
384 }
385 &s[adjusted..]
386}
387
388async fn read_tail_from_file(path: &std::path::Path, max_bytes: usize) -> Result<String, std::io::Error> {
390 use tokio::io::{AsyncReadExt, AsyncSeekExt};
391
392 let mut file = tokio::fs::File::open(path).await?;
393 let metadata = file.metadata().await?;
394 let len = metadata.len() as usize;
395
396 if len <= max_bytes {
397 let mut buf = Vec::new();
398 file.read_to_end(&mut buf).await?;
399 return Ok(String::from_utf8_lossy(&buf).into_owned());
400 }
401
402 let offset = len - max_bytes;
403 file.seek(std::io::SeekFrom::Start(offset as u64)).await?;
404 let mut buf = vec![0u8; max_bytes];
405 let n = file.read(&mut buf).await?;
406 buf.truncate(n);
407
408 let s = String::from_utf8_lossy(&buf);
410 Ok(s.into_owned())
411}
412
413fn generate_spill_filename() -> String {
415 use std::sync::atomic::{AtomicUsize, Ordering};
416 use std::time::SystemTime;
417
418 static COUNTER: AtomicUsize = AtomicUsize::new(0);
419 let seq = COUNTER.fetch_add(1, Ordering::Relaxed);
420 let ts = SystemTime::now()
421 .duration_since(SystemTime::UNIX_EPOCH)
422 .unwrap_or_default();
423 let pid = std::process::id();
424 format!("spill-{}.{}-{}-{}.txt", ts.as_secs(), ts.subsec_nanos(), pid, seq)
425}
426
427pub fn parse_size(s: &str) -> Result<usize, String> {
431 let s = s.trim();
432 if s.is_empty() {
433 return Err("empty size string".to_string());
434 }
435
436 let (num_str, multiplier) = if let Some(n) = s.strip_suffix('K').or_else(|| s.strip_suffix('k')) {
437 (n, 1024)
438 } else if let Some(n) = s.strip_suffix('M').or_else(|| s.strip_suffix('m')) {
439 (n, 1024 * 1024)
440 } else {
441 (s, 1)
442 };
443
444 let num: usize = num_str
445 .parse()
446 .map_err(|_| format!("invalid size: {}", s))?;
447
448 Ok(num * multiplier)
449}
450
451#[cfg(test)]
452mod tests {
453 use super::*;
454
455 #[test]
456 fn test_none_is_disabled() {
457 let config = OutputLimitConfig::none();
458 assert!(!config.is_enabled());
459 assert_eq!(config.max_bytes(), None);
460 }
461
462 #[test]
463 fn test_mcp_is_enabled() {
464 let config = OutputLimitConfig::mcp();
465 assert!(config.is_enabled());
466 assert_eq!(config.max_bytes(), Some(64 * 1024));
467 assert_eq!(config.head_bytes(), 1024);
468 assert_eq!(config.tail_bytes(), 512);
469 }
470
471 #[test]
472 fn test_set_limit() {
473 let mut config = OutputLimitConfig::none();
474 assert!(!config.is_enabled());
475
476 config.set_limit(Some(1024));
477 assert!(config.is_enabled());
478 assert_eq!(config.max_bytes(), Some(1024));
479
480 config.set_limit(None);
481 assert!(!config.is_enabled());
482 }
483
484 #[test]
485 fn test_set_head_tail() {
486 let mut config = OutputLimitConfig::mcp();
487 config.set_head_bytes(2048);
488 config.set_tail_bytes(1024);
489 assert_eq!(config.head_bytes(), 2048);
490 assert_eq!(config.tail_bytes(), 1024);
491 }
492
493 #[test]
494 fn test_parse_size() {
495 assert_eq!(parse_size("64K").unwrap(), 64 * 1024);
496 assert_eq!(parse_size("64k").unwrap(), 64 * 1024);
497 assert_eq!(parse_size("1M").unwrap(), 1024 * 1024);
498 assert_eq!(parse_size("1m").unwrap(), 1024 * 1024);
499 assert_eq!(parse_size("65536").unwrap(), 65536);
500 assert!(parse_size("").is_err());
501 assert!(parse_size("abc").is_err());
502 }
503
504 #[test]
505 fn test_truncate_to_char_boundary() {
506 assert_eq!(truncate_to_char_boundary("hello", 10), "hello");
507 assert_eq!(truncate_to_char_boundary("hello", 3), "hel");
508 assert_eq!(truncate_to_char_boundary("日本語", 3), "日");
510 assert_eq!(truncate_to_char_boundary("日本語", 4), "日");
511 assert_eq!(truncate_to_char_boundary("日本語", 6), "日本");
512 }
513
514 #[test]
515 fn test_tail_from_str() {
516 assert_eq!(tail_from_str("hello", 10), "hello");
517 assert_eq!(tail_from_str("hello", 3), "llo");
518 assert_eq!(tail_from_str("日本語", 3), "語");
520 assert_eq!(tail_from_str("日本語", 6), "本語");
521 }
522
523 #[test]
524 fn test_generate_spill_filename() {
525 let name = generate_spill_filename();
526 assert!(name.starts_with("spill-"));
527 assert!(name.ends_with(".txt"));
528 }
529
530 #[tokio::test]
531 async fn test_spill_if_needed_under_limit() {
532 let config = OutputLimitConfig::mcp();
533 let mut result = ExecResult::success("short output");
534 let spill = spill_if_needed(&mut result, &config).await;
535 assert!(spill.is_none());
536 assert_eq!(result.out, "short output");
537 }
538
539 #[tokio::test]
540 async fn test_spill_if_needed_over_limit() {
541 let config = OutputLimitConfig {
542 max_bytes: Some(100),
543 head_bytes: 20,
544 tail_bytes: 10,
545 };
546 let big_output = "x".repeat(200);
547 let mut result = ExecResult::success(big_output);
548 let spill = spill_if_needed(&mut result, &config).await;
549 assert!(spill.is_some());
550
551 let spill = spill.unwrap();
552 assert_eq!(spill.total_bytes, 200);
553 assert!(spill.path.exists());
554
555 assert!(result.out.contains("..."));
557 assert!(result.out.contains("[output truncated: 200 bytes total"));
558 assert!(result.out.contains(&spill.path.to_string_lossy().to_string()));
559
560 assert!(result.out.starts_with(&"x".repeat(20)));
562
563 let spill_content = tokio::fs::read_to_string(&spill.path).await.unwrap();
565 assert_eq!(spill_content.len(), 200);
566
567 let _ = tokio::fs::remove_file(&spill.path).await;
569 }
570
571 #[tokio::test]
572 async fn test_spill_if_needed_disabled() {
573 let config = OutputLimitConfig::none();
574 let big_output = "x".repeat(200);
575 let mut result = ExecResult::success(big_output.clone());
576 let spill = spill_if_needed(&mut result, &config).await;
577 assert!(spill.is_none());
578 assert_eq!(result.out, big_output);
579 }
580
581 #[test]
582 fn test_build_truncated_output() {
583 let config = OutputLimitConfig {
584 max_bytes: Some(100),
585 head_bytes: 5,
586 tail_bytes: 3,
587 };
588 let full = "abcdefghijklmnop";
589 let path = PathBuf::from("/tmp/test-spill.txt");
590 let result = build_truncated_output(full, &config, &path, 16);
591 assert!(result.starts_with("abcde"));
592 assert!(result.contains("..."));
593 assert!(result.contains("nop"));
594 assert!(result.contains("[output truncated: 16 bytes total — full output at /tmp/test-spill.txt]"));
595 }
596
597 #[tokio::test]
598 async fn test_kernel_mcp_truncates_large_output() {
599 use crate::kernel::{Kernel, KernelConfig};
600
601 let config = KernelConfig::mcp()
603 .with_output_limit(OutputLimitConfig {
604 max_bytes: Some(200),
605 head_bytes: 50,
606 tail_bytes: 30,
607 });
608 let kernel = Kernel::new(config).expect("kernel creation");
609
610 let result = kernel.execute("seq 1 10000").await.expect("execute");
612 assert!(result.out.contains("[output truncated:"));
613 assert!(result.out.contains("full output at"));
614 assert!(result.out.starts_with("1\n"));
616 }
617
618 #[tokio::test]
619 async fn test_kernel_repl_no_truncation() {
620 use crate::kernel::{Kernel, KernelConfig};
621
622 let config = KernelConfig::repl();
624 let kernel = Kernel::new(config).expect("kernel creation");
625
626 let result = kernel.execute("seq 1 100").await.expect("execute");
627 assert!(!result.out.contains("[output truncated:"));
628 assert!(result.out.contains("100"));
629 }
630
631 #[tokio::test]
632 async fn test_kernel_builtin_truncation() {
633 use crate::kernel::{Kernel, KernelConfig};
634
635 let config = KernelConfig::mcp()
637 .with_output_limit(OutputLimitConfig {
638 max_bytes: Some(100),
639 head_bytes: 30,
640 tail_bytes: 20,
641 });
642 let kernel = Kernel::new(config).expect("kernel creation");
643
644 let big = "x".repeat(200);
646 let result = kernel.execute(&format!("echo '{}'", big)).await.expect("execute");
647 assert!(result.out.contains("[output truncated:"));
648 }
649}