use tokio::io::AsyncReadExt;
use tokio::time::Duration;
pub struct OutputWatchdog {
idle_timeout: Duration,
max_bytes: usize,
}
#[derive(Debug, Clone)]
pub struct WatchdogOutput {
pub data: String,
pub truncated: bool,
pub idle_timeout_triggered: bool,
pub bytes_read: usize,
}
impl OutputWatchdog {
pub fn new(idle_timeout: Duration, max_bytes: usize) -> Self {
Self {
idle_timeout,
max_bytes,
}
}
pub async fn read_with_idle_detection<R: AsyncReadExt + Unpin>(
&self,
reader: &mut R,
) -> WatchdogOutput {
let mut buf = vec![0u8; self.max_bytes + 1];
let mut total = 0usize;
let mut idle_triggered = false;
loop {
match tokio::time::timeout(self.idle_timeout, reader.read(&mut buf[total..])).await {
Ok(Ok(0)) => break, Ok(Ok(n)) => {
total += n;
if total > self.max_bytes {
total = self.max_bytes;
break;
}
}
Ok(Err(_)) => break, Err(_) => {
idle_triggered = true;
break;
}
}
}
let truncated = total == self.max_bytes;
let bytes_read = total;
let output = String::from_utf8_lossy(&buf[..total]).to_string();
let data = if truncated {
format!(
"{}\n... [output truncated at {} bytes]",
output, self.max_bytes
)
} else {
output
};
WatchdogOutput {
data,
truncated,
idle_timeout_triggered: idle_triggered,
bytes_read,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::io::duplex;
#[tokio::test]
async fn test_continuous_output_no_idle_timeout() {
let (mut writer, mut reader) = duplex(1024);
let watchdog = OutputWatchdog::new(Duration::from_secs(5), 4096);
tokio::spawn(async move {
use tokio::io::AsyncWriteExt;
writer.write_all(b"hello world").await.unwrap();
drop(writer);
});
let output = watchdog.read_with_idle_detection(&mut reader).await;
assert!(!output.idle_timeout_triggered);
assert!(!output.truncated);
assert_eq!(output.data, "hello world");
assert_eq!(output.bytes_read, 11);
}
#[tokio::test]
async fn test_idle_timeout_triggers() {
let (_writer, mut reader) = duplex(1024);
let watchdog = OutputWatchdog::new(Duration::from_millis(50), 4096);
let output = watchdog.read_with_idle_detection(&mut reader).await;
assert!(output.idle_timeout_triggered);
assert!(!output.truncated);
assert_eq!(output.bytes_read, 0);
}
#[tokio::test]
async fn test_truncation_with_watchdog() {
let (mut writer, mut reader) = duplex(1024);
let watchdog = OutputWatchdog::new(Duration::from_secs(5), 10);
tokio::spawn(async move {
use tokio::io::AsyncWriteExt;
writer.write_all(b"abcdefghijklmnop").await.unwrap();
drop(writer);
});
let output = watchdog.read_with_idle_detection(&mut reader).await;
assert!(output.truncated);
assert!(!output.idle_timeout_triggered);
assert_eq!(output.bytes_read, 10);
assert!(output.data.contains("[output truncated at 10 bytes]"));
}
}