use crate::error::Result;
use std::fs::File;
use std::io::{Read, Write};
use std::process::{Child, ExitStatus};
use std::time::{Duration, Instant};
const DEFAULT_MAX_STREAM_BYTES: usize = 1024 * 1024;
const DEFAULT_TIMEOUT_SECS: u64 = 30;
const POLL_INTERVAL: Duration = Duration::from_millis(25);
pub fn max_stream_bytes() -> usize {
std::env::var("BALLS_PLUGIN_MAX_STREAM_BYTES")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(DEFAULT_MAX_STREAM_BYTES)
}
pub fn timeout() -> Duration {
let secs = std::env::var("BALLS_PLUGIN_TIMEOUT_SECS")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(DEFAULT_TIMEOUT_SECS);
Duration::from_secs(secs)
}
pub struct PluginOutcome {
pub status: ExitStatus,
pub stdout: Vec<u8>,
pub stderr: Vec<u8>,
pub diagnostics: Vec<u8>,
pub truncated: bool,
pub timed_out: bool,
}
pub fn run_with_limits(
mut child: Child,
stdin_bytes: &[u8],
diag_read: Option<File>,
) -> Result<PluginOutcome> {
if let Some(mut sin) = child.stdin.take() {
let bytes = stdin_bytes.to_vec();
std::thread::spawn(move || {
let _ = sin.write_all(&bytes);
});
}
let cap = max_stream_bytes();
let stdout = child.stdout.take().expect("stdout piped");
let stderr = child.stderr.take().expect("stderr piped");
let stdout_thread = std::thread::spawn(move || drain_capped(stdout, cap));
let stderr_thread = std::thread::spawn(move || drain_capped(stderr, cap));
let diag_thread = diag_read.map(|r| std::thread::spawn(move || drain_capped(r, cap)));
let deadline = Instant::now() + timeout();
let mut timed_out = false;
let status = loop {
if let Some(s) = child.try_wait()? {
break s;
}
if Instant::now() >= deadline {
kill_process_group(child.id());
timed_out = true;
break child.wait()?;
}
std::thread::sleep(POLL_INTERVAL);
};
let (stdout_buf, stdout_trunc) = stdout_thread.join().unwrap_or_default();
let (stderr_buf, _) = stderr_thread.join().unwrap_or_default();
let diagnostics = diag_thread
.map(|t| t.join().unwrap_or_default().0)
.unwrap_or_default();
Ok(PluginOutcome {
status,
stdout: stdout_buf,
stderr: stderr_buf,
diagnostics,
truncated: stdout_trunc,
timed_out,
})
}
fn drain_capped<R: Read>(mut r: R, cap: usize) -> (Vec<u8>, bool) {
let mut buf = Vec::with_capacity(cap.min(64 * 1024));
let mut truncated = false;
let mut tmp = [0u8; 8192];
while let Ok(n) = r.read(&mut tmp) {
if n == 0 {
break;
}
if buf.len() < cap {
let room = cap - buf.len();
let take = n.min(room);
buf.extend_from_slice(&tmp[..take]);
if take < n {
truncated = true;
}
} else {
truncated = true;
}
}
(buf, truncated)
}
fn kill_process_group(child_pid: u32) {
#[allow(clippy::cast_possible_wrap)]
let pgid = child_pid as i32;
unsafe {
libc::killpg(pgid, libc::SIGKILL);
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::io;
struct Chunks(Vec<io::Result<&'static [u8]>>);
impl Read for Chunks {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if self.0.is_empty() {
return Ok(0);
}
match self.0.remove(0) {
Ok(bytes) => {
let n = bytes.len().min(buf.len());
buf[..n].copy_from_slice(&bytes[..n]);
Ok(n)
}
Err(e) => Err(e),
}
}
}
#[test]
fn drain_capped_clean_read() {
let r = Chunks(vec![Ok(b"hello world")]);
let (buf, trunc) = drain_capped(r, 100);
assert_eq!(buf, b"hello world");
assert!(!trunc);
}
#[test]
fn drain_capped_truncates_when_first_read_exceeds_cap() {
let r = Chunks(vec![Ok(b"hello world")]);
let (buf, trunc) = drain_capped(r, 5);
assert_eq!(buf, b"hello");
assert!(trunc);
}
#[test]
fn drain_capped_keeps_draining_after_cap_reached() {
let r = Chunks(vec![Ok(b"abcd"), Ok(b"efgh"), Ok(b"ijkl")]);
let (buf, trunc) = drain_capped(r, 4);
assert_eq!(buf, b"abcd");
assert!(trunc);
}
#[test]
fn drain_capped_treats_read_error_as_eof() {
let r = Chunks(vec![
Ok(b"partial"),
Err(io::Error::other("boom")),
]);
let (buf, trunc) = drain_capped(r, 100);
assert_eq!(buf, b"partial");
assert!(!trunc);
}
}