use std::collections::VecDeque;
use std::io::{BufRead, BufReader};
use std::process::ChildStderr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};
use std::time::{Duration, Instant};
const TRUNCATION_MARKER: &str = "…[truncated]";
const SHUTDOWN_JOIN_BUDGET: Duration = Duration::from_millis(200);
const SHUTDOWN_POLL_INTERVAL: Duration = Duration::from_millis(5);
#[derive(Debug, Clone)]
pub struct ConsoleConfig {
pub max_lines: usize,
pub max_line_bytes: usize,
}
impl Default for ConsoleConfig {
fn default() -> Self {
Self {
max_lines: 200,
max_line_bytes: 4096,
}
}
}
#[derive(Debug)]
pub struct ConsoleCapture {
buffer: Arc<Mutex<VecDeque<String>>>,
shutdown: Arc<AtomicBool>,
drainer: Option<JoinHandle<()>>,
}
impl ConsoleCapture {
pub fn attach(stderr: ChildStderr, config: ConsoleConfig) -> Self {
let buffer = Arc::new(Mutex::new(VecDeque::with_capacity(config.max_lines.min(
256,
))));
let shutdown = Arc::new(AtomicBool::new(false));
let buffer_for_thread = Arc::clone(&buffer);
let shutdown_for_thread = Arc::clone(&shutdown);
let max_lines = config.max_lines;
let max_line_bytes = config.max_line_bytes;
let drainer = thread::Builder::new()
.name("microvm-console-drainer".into())
.spawn(move || {
drain_loop(
BufReader::new(stderr),
buffer_for_thread,
shutdown_for_thread,
max_lines,
max_line_bytes,
);
})
.ok();
Self {
buffer,
shutdown,
drainer,
}
}
pub fn tail(&self) -> Vec<String> {
let guard = match self.buffer.lock() {
Ok(guard) => guard,
Err(poison) => poison.into_inner(),
};
guard.iter().cloned().collect()
}
pub fn shutdown(&mut self) {
self.shutdown.store(true, Ordering::SeqCst);
if let Some(handle) = self.drainer.take()
&& join_with_deadline(handle, SHUTDOWN_JOIN_BUDGET).is_err()
{
}
}
}
impl Drop for ConsoleCapture {
fn drop(&mut self) {
self.shutdown();
}
}
fn join_with_deadline(handle: JoinHandle<()>, budget: Duration) -> Result<(), JoinHandle<()>> {
let deadline = Instant::now() + budget;
loop {
if handle.is_finished() {
let _ = handle.join();
return Ok(());
}
if Instant::now() >= deadline {
return Err(handle);
}
thread::sleep(SHUTDOWN_POLL_INTERVAL);
}
}
fn drain_loop<R: BufRead>(
mut reader: R,
buffer: Arc<Mutex<VecDeque<String>>>,
shutdown: Arc<AtomicBool>,
max_lines: usize,
max_line_bytes: usize,
) {
let mut line = String::new();
loop {
if shutdown.load(Ordering::SeqCst) {
return;
}
line.clear();
match reader.read_line(&mut line) {
Ok(0) => return, Ok(_) => {
trim_trailing_newline(&mut line);
let to_push = enforce_line_limit(&line, max_line_bytes);
push_with_eviction(&buffer, to_push, max_lines);
}
Err(_) => return, }
}
}
fn trim_trailing_newline(line: &mut String) {
if line.ends_with('\n') {
line.pop();
if line.ends_with('\r') {
line.pop();
}
}
}
fn enforce_line_limit(line: &str, max_line_bytes: usize) -> String {
if line.len() <= max_line_bytes {
return line.to_owned();
}
let mut cut = max_line_bytes;
while cut > 0 && !line.is_char_boundary(cut) {
cut -= 1;
}
let mut out = String::with_capacity(cut + TRUNCATION_MARKER.len());
out.push_str(&line[..cut]);
out.push_str(TRUNCATION_MARKER);
out
}
fn push_with_eviction(buffer: &Mutex<VecDeque<String>>, line: String, max_lines: usize) {
let mut guard = match buffer.lock() {
Ok(guard) => guard,
Err(poison) => poison.into_inner(),
};
if max_lines == 0 {
return;
}
guard.push_back(line);
while guard.len() > max_lines {
guard.pop_front();
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::process::{Command, Stdio};
fn sh_stderr(script: &str) -> ChildStderr {
let mut child = Command::new("sh")
.arg("-c")
.arg(script)
.stdin(Stdio::null())
.stdout(Stdio::null())
.stderr(Stdio::piped())
.spawn()
.expect("spawn sh");
let stderr = child.stderr.take().expect("piped stderr");
thread::spawn(move || {
let _ = child.wait();
});
stderr
}
fn wait_for_tail<F>(capture: &ConsoleCapture, deadline: Duration, predicate: F) -> Vec<String>
where
F: Fn(&[String]) -> bool,
{
let start = Instant::now();
loop {
let snapshot = capture.tail();
if predicate(&snapshot) {
return snapshot;
}
if start.elapsed() >= deadline {
return snapshot;
}
thread::sleep(Duration::from_millis(10));
}
}
#[test]
fn tail_returns_lines_in_chronological_order() {
let stderr = sh_stderr("printf 'a\\nb\\nc\\n' 1>&2");
let capture = ConsoleCapture::attach(stderr, ConsoleConfig::default());
let tail = wait_for_tail(&capture, Duration::from_secs(2), |t| t.len() >= 3);
assert_eq!(
tail,
vec!["a".to_string(), "b".to_string(), "c".to_string()]
);
}
#[test]
fn oldest_line_is_evicted_when_capacity_exceeded() {
let stderr = sh_stderr("printf '1\\n2\\n3\\n4\\n5\\n' 1>&2");
let capture = ConsoleCapture::attach(
stderr,
ConsoleConfig {
max_lines: 2,
max_line_bytes: 4096,
},
);
let tail = wait_for_tail(&capture, Duration::from_secs(2), |t| {
t.last().map(|s| s == "5").unwrap_or(false)
});
assert_eq!(tail, vec!["4".to_string(), "5".to_string()]);
}
#[test]
fn long_line_is_truncated_with_suffix() {
let stderr =
sh_stderr("printf 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\\n' 1>&2");
let capture = ConsoleCapture::attach(
stderr,
ConsoleConfig {
max_lines: 16,
max_line_bytes: 10,
},
);
let tail = wait_for_tail(&capture, Duration::from_secs(2), |t| !t.is_empty());
assert_eq!(tail.len(), 1);
let line = &tail[0];
assert!(line.ends_with(TRUNCATION_MARKER), "line was {line:?}");
assert_eq!(&line[..10], "xxxxxxxxxx");
assert_eq!(line.len(), 10 + TRUNCATION_MARKER.len());
}
#[test]
fn truncation_respects_utf8_boundary() {
let stderr = sh_stderr("printf '123456789€XYZ\\n' 1>&2");
let capture = ConsoleCapture::attach(
stderr,
ConsoleConfig {
max_lines: 4,
max_line_bytes: 10,
},
);
let tail = wait_for_tail(&capture, Duration::from_secs(2), |t| !t.is_empty());
assert_eq!(tail.len(), 1);
let line = &tail[0];
assert!(line.ends_with(TRUNCATION_MARKER));
assert_eq!(&line[..9], "123456789");
assert!(!line[..line.len() - TRUNCATION_MARKER.len()].contains('€'));
}
#[test]
fn tail_survives_process_exit() {
let stderr = sh_stderr("printf 'panic: kernel\\nrebooting\\n' 1>&2");
let capture = ConsoleCapture::attach(stderr, ConsoleConfig::default());
let tail = wait_for_tail(&capture, Duration::from_secs(2), |t| t.len() >= 2);
assert_eq!(
tail,
vec!["panic: kernel".to_string(), "rebooting".to_string()]
);
thread::sleep(Duration::from_millis(50));
assert_eq!(capture.tail(), tail);
}
#[test]
fn shutdown_then_tail_does_not_panic() {
let stderr = sh_stderr("printf 'one\\ntwo\\n' 1>&2");
let mut capture = ConsoleCapture::attach(stderr, ConsoleConfig::default());
let _ = wait_for_tail(&capture, Duration::from_secs(2), |t| t.len() >= 2);
capture.shutdown();
let tail = capture.tail();
assert!(tail.contains(&"one".to_string()));
assert!(tail.contains(&"two".to_string()));
capture.shutdown();
}
#[test]
fn drop_does_not_deadlock_when_thread_is_mid_read() {
let stderr = sh_stderr("sleep 5");
let capture = ConsoleCapture::attach(stderr, ConsoleConfig::default());
thread::sleep(Duration::from_millis(20));
let start = Instant::now();
drop(capture);
let elapsed = start.elapsed();
assert!(
elapsed < Duration::from_millis(1000),
"drop took too long: {elapsed:?}"
);
}
#[test]
fn empty_lines_are_retained() {
let stderr = sh_stderr("printf 'a\\n\\nb\\n' 1>&2");
let capture = ConsoleCapture::attach(stderr, ConsoleConfig::default());
let tail = wait_for_tail(&capture, Duration::from_secs(2), |t| t.len() >= 3);
assert_eq!(tail, vec!["a".to_string(), String::new(), "b".to_string()]);
}
#[test]
fn crlf_line_endings_are_stripped() {
let stderr = sh_stderr("printf 'win\\r\\nlin\\n' 1>&2");
let capture = ConsoleCapture::attach(stderr, ConsoleConfig::default());
let tail = wait_for_tail(&capture, Duration::from_secs(2), |t| t.len() >= 2);
assert_eq!(tail, vec!["win".to_string(), "lin".to_string()]);
}
#[test]
fn max_lines_zero_drains_without_storing() {
let stderr = sh_stderr("printf 'a\\nb\\nc\\n' 1>&2");
let capture = ConsoleCapture::attach(
stderr,
ConsoleConfig {
max_lines: 0,
max_line_bytes: 4096,
},
);
thread::sleep(Duration::from_millis(100));
assert!(capture.tail().is_empty());
}
#[test]
fn poisoned_buffer_lock_still_returns_tail() {
let stderr = sh_stderr("printf 'survivor\\n' 1>&2");
let capture = ConsoleCapture::attach(stderr, ConsoleConfig::default());
wait_for_tail(&capture, Duration::from_secs(2), |t| !t.is_empty());
let buffer = Arc::clone(&capture.buffer);
let _ = thread::spawn(move || {
let _guard = buffer.lock().expect("lock");
panic!("intentional poison");
})
.join();
let tail = capture.tail();
assert!(tail.contains(&"survivor".to_string()), "tail was {tail:?}");
}
}