use std::fs::File;
use std::io::Write;
use std::os::unix::io::{AsRawFd, RawFd};
use std::path::PathBuf;
use std::process::{ChildStderr, ChildStdout};
const BUF: usize = 8192;
pub fn start_log_relay(
stdout: Option<ChildStdout>,
stderr: Option<ChildStderr>,
stdout_path: PathBuf,
stderr_path: PathBuf,
) -> std::thread::JoinHandle<()> {
std::thread::spawn(move || relay_loop(stdout, stderr, stdout_path, stderr_path, [None, None]))
}
pub fn start_tee_relay(
stdout: Option<ChildStdout>,
stderr: Option<ChildStderr>,
stdout_path: PathBuf,
stderr_path: PathBuf,
attach_fds: [Option<RawFd>; 2],
) -> std::thread::JoinHandle<()> {
std::thread::spawn(move || relay_loop(stdout, stderr, stdout_path, stderr_path, attach_fds))
}
fn relay_loop(
stdout: Option<ChildStdout>,
stderr: Option<ChildStderr>,
stdout_path: PathBuf,
stderr_path: PathBuf,
attach_fds: [Option<RawFd>; 2],
) {
let epfd = unsafe { libc::epoll_create1(libc::EPOLL_CLOEXEC) };
if epfd < 0 {
log::warn!(
"relay: epoll_create1 failed: {}",
std::io::Error::last_os_error()
);
return;
}
let mut files: [Option<File>; 2] = [None, None];
let mut done: [bool; 2] = [true, true];
let mut raw_fds: [i32; 2] = [-1, -1];
let mut register = |idx: usize, fd: i32, path: &PathBuf| {
if let Ok(f) = File::create(path) {
let mut ev = libc::epoll_event {
events: (libc::EPOLLIN | libc::EPOLLHUP) as u32,
u64: idx as u64,
};
if unsafe { libc::epoll_ctl(epfd, libc::EPOLL_CTL_ADD, fd, &mut ev) } == 0 {
files[idx] = Some(f);
done[idx] = false;
raw_fds[idx] = fd;
}
}
};
if let Some(ref s) = stdout {
register(0, s.as_raw_fd(), &stdout_path);
}
if let Some(ref s) = stderr {
register(1, s.as_raw_fd(), &stderr_path);
}
let mut buf = [0u8; BUF];
let mut events = [libc::epoll_event { events: 0, u64: 0 }; 4];
while !done[0] || !done[1] {
let n = unsafe { libc::epoll_wait(epfd, events.as_mut_ptr(), 4, -1) };
if n < 0 {
if std::io::Error::last_os_error().raw_os_error() == Some(libc::EINTR) {
continue;
}
break;
}
for ev in &events[..n as usize] {
let idx = ev.u64 as usize;
if done[idx] {
continue;
}
let fd = raw_fds[idx];
let nread = loop {
let r = unsafe { libc::read(fd, buf.as_mut_ptr() as *mut libc::c_void, buf.len()) };
if r >= 0 || std::io::Error::last_os_error().raw_os_error() != Some(libc::EINTR) {
break r;
}
};
if nread > 0 {
let data = &buf[..nread as usize];
if let Some(ref mut f) = files[idx] {
let _ = f.write_all(data);
}
if let Some(afd) = attach_fds[idx] {
let mut written = 0usize;
while written < data.len() {
let r = unsafe {
libc::write(
afd,
data[written..].as_ptr() as *const libc::c_void,
data.len() - written,
)
};
if r <= 0 {
break;
}
written += r as usize;
}
}
} else {
unsafe { libc::epoll_ctl(epfd, libc::EPOLL_CTL_DEL, fd, std::ptr::null_mut()) };
done[idx] = true;
}
}
}
unsafe { libc::close(epfd) };
for afd in attach_fds.into_iter().flatten() {
unsafe { libc::close(afd) };
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_relay_captures_stdout_and_stderr() {
let mut child = std::process::Command::new("sh")
.arg("-c")
.arg("printf 'hello stdout'; printf 'hello stderr' >&2")
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()
.expect("spawn sh");
let stdout = child.stdout.take();
let stderr = child.stderr.take();
let tmp = tempfile::TempDir::new().expect("tempdir");
let out_path = tmp.path().join("stdout.log");
let err_path = tmp.path().join("stderr.log");
let handle = start_log_relay(stdout, stderr, out_path.clone(), err_path.clone());
child.wait().expect("wait");
handle.join().expect("join relay thread");
assert_eq!(
std::fs::read_to_string(&out_path).unwrap_or_default(),
"hello stdout"
);
assert_eq!(
std::fs::read_to_string(&err_path).unwrap_or_default(),
"hello stderr"
);
}
#[test]
fn test_relay_large_output() {
let mut child = std::process::Command::new("sh")
.arg("-c")
.arg("yes x | head -c 65536")
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::null())
.spawn()
.expect("spawn sh");
let stdout = child.stdout.take();
let tmp = tempfile::TempDir::new().expect("tempdir");
let out_path = tmp.path().join("stdout.log");
let err_path = tmp.path().join("stderr.log");
let handle = start_log_relay(stdout, None, out_path.clone(), err_path.clone());
child.wait().expect("wait");
handle.join().expect("join relay thread");
let out = std::fs::read(&out_path).unwrap_or_default();
assert_eq!(out.len(), 65536, "expected 65536 bytes, got {}", out.len());
}
#[test]
fn test_relay_none_handles() {
let tmp = tempfile::TempDir::new().expect("tempdir");
let out_path = tmp.path().join("stdout.log");
let err_path = tmp.path().join("stderr.log");
let handle = start_log_relay(None, None, out_path.clone(), err_path.clone());
handle.join().expect("join relay thread");
}
}