#![allow(dead_code)]
use std::io::{self, IsTerminal, Write};
#[cfg(any(unix, windows))]
use std::path::{Path, PathBuf};
#[cfg(any(unix, windows))]
use std::sync::Mutex;
use std::sync::OnceLock;
use std::time::Duration;
use crate::args::Arguments;
#[derive(Debug, Clone, Eq, PartialEq)]
pub(crate) struct HostLogRecord {
pub elapsed: Duration,
pub stream_tag: u8,
pub line: String,
}
static REAL_STDOUT_IS_TERMINAL: OnceLock<bool> = OnceLock::new();
static REAL_STDERR_IS_TERMINAL: OnceLock<bool> = OnceLock::new();
pub(crate) fn terminal_stdout_is_terminal() -> bool {
*REAL_STDOUT_IS_TERMINAL.get_or_init(|| io::stdout().is_terminal())
}
pub(crate) fn terminal_stderr_is_terminal() -> bool {
*REAL_STDERR_IS_TERMINAL.get_or_init(|| io::stderr().is_terminal())
}
#[cfg(unix)]
mod imp {
use super::*;
use std::fs::File;
use std::io::{BufRead, BufReader};
use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
use std::sync::Arc;
use std::thread::{self, JoinHandle};
use std::time::Instant;
use uuid::Uuid;
pub(super) struct HostCaptureImpl {
terminal_stdout_fd: OwnedFd,
terminal_stderr_fd: OwnedFd,
retained_write_end: Option<OwnedFd>,
reader: Option<JoinHandle<()>>,
spill_path: PathBuf,
epoch: Instant,
epoch_wall: std::time::SystemTime,
finalized: bool,
}
pub(super) fn install(_args: &Arguments) -> io::Result<HostCaptureImpl> {
let terminal_stdout_fd = dup_owned(libc::STDOUT_FILENO)?;
let terminal_stderr_fd = dup_owned(libc::STDERR_FILENO)?;
set_cloexec(terminal_stdout_fd.as_raw_fd())?;
set_cloexec(terminal_stderr_fd.as_raw_fd())?;
let (read_end, write_end) = make_pipe_cloexec()?;
let spill_path =
std::env::temp_dir().join(format!("test-r-host-log-{}.bin", Uuid::new_v4()));
let spill_file = std::fs::OpenOptions::new()
.create(true)
.truncate(true)
.write(true)
.open(&spill_path)?;
let spill_file = Arc::new(Mutex::new(spill_file));
let epoch = Instant::now();
let epoch_wall = std::time::SystemTime::now();
let setup = || -> io::Result<JoinHandle<()>> {
install_terminal_fds(&terminal_stdout_fd, &terminal_stderr_fd)?;
let read_end_file = unsafe { File::from_raw_fd(read_end.into_raw_fd()) };
let spill_clone = spill_file.clone();
let epoch_clone = epoch;
let reader = thread::Builder::new()
.name("test-r-host-capture".to_string())
.spawn(move || {
reader_loop(read_end_file, spill_clone, epoch_clone);
})?;
Ok(reader)
};
let reader = match setup() {
Ok(r) => r,
Err(e) => {
let _ = std::fs::remove_file(&spill_path);
return Err(e);
}
};
if let Err(e) = dup2_overwrite(write_end.as_raw_fd(), libc::STDOUT_FILENO) {
let _ = std::fs::remove_file(&spill_path);
return Err(e);
}
if let Err(e) = dup2_overwrite(write_end.as_raw_fd(), libc::STDERR_FILENO) {
let _ = dup2_overwrite(terminal_stdout_fd.as_raw_fd(), libc::STDOUT_FILENO);
let _ = std::fs::remove_file(&spill_path);
return Err(e);
}
Ok(HostCaptureImpl {
terminal_stdout_fd,
terminal_stderr_fd,
retained_write_end: Some(write_end),
reader: Some(reader),
spill_path,
epoch,
epoch_wall,
finalized: false,
})
}
impl HostCaptureImpl {
pub fn spill_path(&self) -> &std::path::Path {
&self.spill_path
}
pub fn epoch(&self) -> Instant {
self.epoch
}
pub fn epoch_wall(&self) -> std::time::SystemTime {
self.epoch_wall
}
fn shutdown_pipe(&mut self) {
if self.finalized {
return;
}
let _ = io::stdout().flush();
let _ = io::stderr().flush();
let _ = dup2_overwrite(self.terminal_stdout_fd.as_raw_fd(), libc::STDOUT_FILENO);
let _ = dup2_overwrite(self.terminal_stderr_fd.as_raw_fd(), libc::STDERR_FILENO);
clear_terminal_fds();
drop(self.retained_write_end.take());
if let Some(handle) = self.reader.take() {
let _ = handle.join();
}
self.finalized = true;
}
pub fn finalize_in_place(&mut self) -> Vec<super::HostLogRecord> {
self.shutdown_pipe();
let records = read_spill_file(&self.spill_path).unwrap_or_default();
let _ = std::fs::remove_file(&self.spill_path);
records
}
}
impl Drop for HostCaptureImpl {
fn drop(&mut self) {
self.shutdown_pipe();
let _ = std::fs::remove_file(&self.spill_path);
}
}
fn dup_owned(fd: RawFd) -> io::Result<OwnedFd> {
let new_fd = unsafe { libc::dup(fd) };
if new_fd < 0 {
return Err(io::Error::last_os_error());
}
Ok(unsafe { OwnedFd::from_raw_fd(new_fd) })
}
pub(super) fn dup_owned_cloexec(fd: RawFd) -> io::Result<OwnedFd> {
let new = dup_owned(fd)?;
set_cloexec(new.as_raw_fd())?;
Ok(new)
}
fn dup2_overwrite(src: RawFd, dst: RawFd) -> io::Result<()> {
loop {
let rc = unsafe { libc::dup2(src, dst) };
if rc >= 0 {
return Ok(());
}
let err = io::Error::last_os_error();
if err.kind() != io::ErrorKind::Interrupted {
return Err(err);
}
}
}
fn set_cloexec(fd: RawFd) -> io::Result<()> {
let flags = unsafe { libc::fcntl(fd, libc::F_GETFD) };
if flags < 0 {
return Err(io::Error::last_os_error());
}
let rc = unsafe { libc::fcntl(fd, libc::F_SETFD, flags | libc::FD_CLOEXEC) };
if rc < 0 {
return Err(io::Error::last_os_error());
}
Ok(())
}
fn make_pipe_cloexec() -> io::Result<(OwnedFd, OwnedFd)> {
let mut fds = [0 as RawFd; 2];
#[cfg(any(
target_os = "linux",
target_os = "freebsd",
target_os = "netbsd",
target_os = "openbsd",
target_os = "dragonfly",
target_os = "illumos",
target_os = "solaris",
))]
{
let rc = unsafe { libc::pipe2(fds.as_mut_ptr(), libc::O_CLOEXEC) };
if rc < 0 {
return Err(io::Error::last_os_error());
}
Ok(unsafe { (OwnedFd::from_raw_fd(fds[0]), OwnedFd::from_raw_fd(fds[1])) })
}
#[cfg(not(any(
target_os = "linux",
target_os = "freebsd",
target_os = "netbsd",
target_os = "openbsd",
target_os = "dragonfly",
target_os = "illumos",
target_os = "solaris",
)))]
{
let rc = unsafe { libc::pipe(fds.as_mut_ptr()) };
if rc < 0 {
return Err(io::Error::last_os_error());
}
let _ = set_cloexec(fds[0]);
let _ = set_cloexec(fds[1]);
Ok(unsafe { (OwnedFd::from_raw_fd(fds[0]), OwnedFd::from_raw_fd(fds[1])) })
}
}
fn reader_loop(read_end: File, spill_file: Arc<Mutex<File>>, epoch: Instant) {
let mut reader = BufReader::with_capacity(64 * 1024, read_end);
let mut line = Vec::with_capacity(256);
loop {
line.clear();
match reader.read_until(b'\n', &mut line) {
Ok(0) => break, Ok(_n) => {
if line.last() == Some(&b'\n') {
line.pop();
}
if line.last() == Some(&b'\r') {
line.pop();
}
let elapsed = epoch.elapsed().as_nanos();
let ts_ns = u64::try_from(elapsed).unwrap_or(u64::MAX);
let stream_tag: u8 = 0;
let len = u32::try_from(line.len()).unwrap_or(u32::MAX) as usize;
let len_u32 = len as u32;
let mut header = [0u8; 8 + 1 + 4];
header[..8].copy_from_slice(&ts_ns.to_le_bytes());
header[8] = stream_tag;
header[9..13].copy_from_slice(&len_u32.to_le_bytes());
if let Ok(mut f) = spill_file.lock() {
let _ = f.write_all(&header);
let _ = f.write_all(&line[..len]);
}
}
Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
Err(_) => break,
}
}
if let Ok(mut f) = spill_file.lock() {
let _ = f.flush();
}
}
pub(super) fn read_spill_file(path: &super::Path) -> io::Result<Vec<super::HostLogRecord>> {
use std::io::Read;
let mut file = match std::fs::File::open(path) {
Ok(f) => f,
Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(Vec::new()),
Err(e) => return Err(e),
};
let mut bytes = Vec::new();
file.read_to_end(&mut bytes)?;
let mut out = Vec::new();
let mut pos = 0;
while pos + 13 <= bytes.len() {
let mut ts_buf = [0u8; 8];
ts_buf.copy_from_slice(&bytes[pos..pos + 8]);
let elapsed_ns = u64::from_le_bytes(ts_buf);
let stream_tag = bytes[pos + 8];
let mut len_buf = [0u8; 4];
len_buf.copy_from_slice(&bytes[pos + 9..pos + 13]);
let line_len = u32::from_le_bytes(len_buf) as usize;
pos += 13;
if pos + line_len > bytes.len() {
break;
}
let line_bytes = &bytes[pos..pos + line_len];
pos += line_len;
out.push(super::HostLogRecord {
elapsed: super::Duration::from_nanos(elapsed_ns),
stream_tag,
line: String::from_utf8_lossy(line_bytes).into_owned(),
});
}
Ok(out)
}
}
#[cfg(windows)]
mod imp {
use super::*;
use std::fs::File;
use std::io::{BufRead, BufReader};
use std::os::windows::io::{AsRawHandle, FromRawHandle, OwnedHandle, RawHandle};
use std::sync::Arc;
use std::thread::{self, JoinHandle};
use std::time::Instant;
use uuid::Uuid;
use windows_sys::Win32::Foundation::{
DuplicateHandle, DUPLICATE_SAME_ACCESS, HANDLE, INVALID_HANDLE_VALUE,
};
use windows_sys::Win32::System::Console::{
GetStdHandle, SetStdHandle, STD_ERROR_HANDLE, STD_OUTPUT_HANDLE,
};
use windows_sys::Win32::System::Pipes::CreatePipe;
use windows_sys::Win32::System::Threading::GetCurrentProcess;
pub(super) struct HostCaptureImpl {
original_stdout: HANDLE,
original_stderr: HANDLE,
retained_write_end: Option<OwnedHandle>,
reader: Option<JoinHandle<()>>,
spill_path: PathBuf,
epoch: Instant,
epoch_wall: std::time::SystemTime,
finalized: bool,
}
pub(super) fn install(_args: &Arguments) -> io::Result<HostCaptureImpl> {
let original_stdout = unsafe { GetStdHandle(STD_OUTPUT_HANDLE) };
if original_stdout == INVALID_HANDLE_VALUE {
return Err(io::Error::last_os_error());
}
let original_stderr = unsafe { GetStdHandle(STD_ERROR_HANDLE) };
if original_stderr == INVALID_HANDLE_VALUE {
return Err(io::Error::last_os_error());
}
let mut read_end: HANDLE = std::ptr::null_mut();
let mut write_end: HANDLE = std::ptr::null_mut();
let rc = unsafe { CreatePipe(&mut read_end, &mut write_end, std::ptr::null(), 0) };
if rc == 0 {
return Err(io::Error::last_os_error());
}
let read_end = unsafe { OwnedHandle::from_raw_handle(read_end as RawHandle) };
let write_end = unsafe { OwnedHandle::from_raw_handle(write_end as RawHandle) };
let spill_path =
std::env::temp_dir().join(format!("test-r-host-log-{}.bin", Uuid::new_v4()));
let spill_file = std::fs::OpenOptions::new()
.create(true)
.truncate(true)
.write(true)
.open(&spill_path)?;
let spill_file = Arc::new(Mutex::new(spill_file));
let epoch = Instant::now();
let epoch_wall = std::time::SystemTime::now();
let setup = || -> io::Result<JoinHandle<()>> {
install_terminal_handles(original_stdout, original_stderr)?;
let read_end_file = File::from(read_end);
let spill_clone = spill_file.clone();
let epoch_clone = epoch;
let reader = thread::Builder::new()
.name("test-r-host-capture".to_string())
.spawn(move || {
reader_loop(read_end_file, spill_clone, epoch_clone);
})?;
Ok(reader)
};
let reader = match setup() {
Ok(r) => r,
Err(e) => {
let _ = std::fs::remove_file(&spill_path);
return Err(e);
}
};
let cleanup_on_failure = |write_end: OwnedHandle, reader: JoinHandle<()>| {
drop(write_end);
let _ = reader.join();
let _ = std::fs::remove_file(&spill_path);
};
let write_handle: HANDLE = write_end.as_raw_handle() as HANDLE;
if unsafe { SetStdHandle(STD_OUTPUT_HANDLE, write_handle) } == 0 {
let e = io::Error::last_os_error();
cleanup_on_failure(write_end, reader);
return Err(e);
}
if unsafe { SetStdHandle(STD_ERROR_HANDLE, write_handle) } == 0 {
let e = io::Error::last_os_error();
unsafe {
let _ = SetStdHandle(STD_OUTPUT_HANDLE, original_stdout);
}
cleanup_on_failure(write_end, reader);
return Err(e);
}
Ok(HostCaptureImpl {
original_stdout,
original_stderr,
retained_write_end: Some(write_end),
reader: Some(reader),
spill_path,
epoch,
epoch_wall,
finalized: false,
})
}
impl HostCaptureImpl {
pub fn spill_path(&self) -> &Path {
&self.spill_path
}
pub fn epoch(&self) -> Instant {
self.epoch
}
pub fn epoch_wall(&self) -> std::time::SystemTime {
self.epoch_wall
}
fn shutdown_pipe(&mut self) {
if self.finalized {
return;
}
let _ = io::stdout().flush();
let _ = io::stderr().flush();
unsafe {
let _ = SetStdHandle(STD_OUTPUT_HANDLE, self.original_stdout);
let _ = SetStdHandle(STD_ERROR_HANDLE, self.original_stderr);
}
clear_terminal_handles();
drop(self.retained_write_end.take());
if let Some(handle) = self.reader.take() {
let _ = handle.join();
}
self.finalized = true;
}
pub fn finalize_in_place(&mut self) -> Vec<super::HostLogRecord> {
self.shutdown_pipe();
let records = read_spill_file(&self.spill_path).unwrap_or_default();
let _ = std::fs::remove_file(&self.spill_path);
records
}
}
impl Drop for HostCaptureImpl {
fn drop(&mut self) {
self.shutdown_pipe();
let _ = std::fs::remove_file(&self.spill_path);
}
}
pub(super) fn duplicate_handle_owned(h: HANDLE) -> io::Result<OwnedHandle> {
let process = unsafe { GetCurrentProcess() };
let mut new_handle: HANDLE = std::ptr::null_mut();
let rc = unsafe {
DuplicateHandle(
process,
h,
process,
&mut new_handle,
0,
0, DUPLICATE_SAME_ACCESS,
)
};
if rc == 0 {
return Err(io::Error::last_os_error());
}
Ok(unsafe { OwnedHandle::from_raw_handle(new_handle as RawHandle) })
}
fn reader_loop(read_end: File, spill_file: Arc<Mutex<File>>, epoch: Instant) {
let mut reader = BufReader::with_capacity(64 * 1024, read_end);
let mut line = Vec::with_capacity(256);
loop {
line.clear();
match reader.read_until(b'\n', &mut line) {
Ok(0) => break, Ok(_) => {
if line.last() == Some(&b'\n') {
line.pop();
}
if line.last() == Some(&b'\r') {
line.pop();
}
let elapsed = epoch.elapsed().as_nanos();
let ts_ns = u64::try_from(elapsed).unwrap_or(u64::MAX);
let stream_tag: u8 = 0;
let len = u32::try_from(line.len()).unwrap_or(u32::MAX) as usize;
let len_u32 = len as u32;
let mut header = [0u8; 8 + 1 + 4];
header[..8].copy_from_slice(&ts_ns.to_le_bytes());
header[8] = stream_tag;
header[9..13].copy_from_slice(&len_u32.to_le_bytes());
if let Ok(mut f) = spill_file.lock() {
let _ = f.write_all(&header);
let _ = f.write_all(&line[..len]);
}
}
Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
Err(e) if e.kind() == io::ErrorKind::BrokenPipe => break,
Err(_) => break,
}
}
if let Ok(mut f) = spill_file.lock() {
let _ = f.flush();
}
}
pub(super) fn read_spill_file(path: &super::Path) -> io::Result<Vec<super::HostLogRecord>> {
use std::io::Read;
let mut file = match std::fs::File::open(path) {
Ok(f) => f,
Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(Vec::new()),
Err(e) => return Err(e),
};
let mut bytes = Vec::new();
file.read_to_end(&mut bytes)?;
let mut out = Vec::new();
let mut pos = 0;
while pos + 13 <= bytes.len() {
let mut ts_buf = [0u8; 8];
ts_buf.copy_from_slice(&bytes[pos..pos + 8]);
let elapsed_ns = u64::from_le_bytes(ts_buf);
let stream_tag = bytes[pos + 8];
let mut len_buf = [0u8; 4];
len_buf.copy_from_slice(&bytes[pos + 9..pos + 13]);
let line_len = u32::from_le_bytes(len_buf) as usize;
pos += 13;
if pos + line_len > bytes.len() {
break;
}
let line_bytes = &bytes[pos..pos + line_len];
pos += line_len;
out.push(super::HostLogRecord {
elapsed: super::Duration::from_nanos(elapsed_ns),
stream_tag,
line: String::from_utf8_lossy(line_bytes).into_owned(),
});
}
Ok(out)
}
}
#[cfg(not(any(unix, windows)))]
mod imp {
use super::*;
use std::path::Path;
use std::time::Instant;
pub(super) struct HostCaptureImpl;
pub(super) fn install(_args: &Arguments) -> io::Result<HostCaptureImpl> {
Err(io::Error::new(
io::ErrorKind::Unsupported,
"test-r host-side output capture is not supported on this target",
))
}
impl HostCaptureImpl {
pub fn spill_path(&self) -> &Path {
Path::new("")
}
pub fn epoch(&self) -> Instant {
Instant::now()
}
pub fn epoch_wall(&self) -> std::time::SystemTime {
std::time::SystemTime::now()
}
pub fn finalize_in_place(&mut self) -> Vec<super::HostLogRecord> {
Vec::new()
}
}
}
pub(crate) struct HostCapture {
inner: imp::HostCaptureImpl,
}
impl HostCapture {
pub(crate) fn spill_path(&self) -> &std::path::Path {
self.inner.spill_path()
}
pub(crate) fn epoch(&self) -> std::time::Instant {
self.inner.epoch()
}
pub(crate) fn epoch_wall(&self) -> std::time::SystemTime {
self.inner.epoch_wall()
}
pub(crate) fn finalize(mut self) -> Vec<HostLogRecord> {
self.inner.finalize_in_place()
}
}
#[derive(Debug, Clone, Copy)]
pub(crate) struct HostWindow {
pub start: Duration,
pub end: Duration,
}
impl HostWindow {
pub(crate) fn from_instants(
epoch: Option<std::time::Instant>,
start: std::time::Instant,
end: std::time::Instant,
) -> Option<Self> {
let epoch = epoch?;
Some(Self {
start: start.saturating_duration_since(epoch),
end: end.saturating_duration_since(epoch),
})
}
fn contains(&self, t: Duration) -> bool {
t >= self.start && t < self.end
}
}
pub(crate) fn record_to_capture(
epoch_wall: std::time::SystemTime,
rec: &HostLogRecord,
) -> crate::internal::CapturedOutput {
let ts = epoch_wall.checked_add(rec.elapsed).unwrap_or(epoch_wall);
crate::internal::CapturedOutput::host(ts, rec.line.clone())
}
pub(crate) fn attribute_records_to_tests(
epoch_wall: std::time::SystemTime,
records: &[HostLogRecord],
windows: &[(usize, HostWindow)],
results: &mut [(crate::internal::RegisteredTest, crate::internal::TestResult)],
) {
if records.is_empty() || windows.is_empty() {
return;
}
for (test_idx, win) in windows {
let mut additions: Vec<crate::internal::CapturedOutput> = records
.iter()
.filter(|r| win.contains(r.elapsed))
.map(|r| record_to_capture(epoch_wall, r))
.collect();
if additions.is_empty() {
continue;
}
let Some((_, result)) = results.get_mut(*test_idx) else {
continue;
};
let mut merged = result.captured_output().clone();
merged.append(&mut additions);
merged.sort();
result.set_captured_output(merged);
}
}
pub(crate) fn install_if_needed(args: &Arguments) -> Option<HostCapture> {
let _ = REAL_STDOUT_IS_TERMINAL.set(io::stdout().is_terminal());
let _ = REAL_STDERR_IS_TERMINAL.set(io::stderr().is_terminal());
if args.ipc.is_some() {
return None;
}
if args.nocapture {
return None;
}
if !args.spawn_workers {
return None;
}
match imp::install(args) {
Ok(inner) => Some(HostCapture { inner }),
Err(_) => None,
}
}
#[cfg(unix)]
static TERMINAL_STDOUT: OnceLock<Mutex<std::fs::File>> = OnceLock::new();
#[cfg(unix)]
static TERMINAL_STDERR: OnceLock<Mutex<std::fs::File>> = OnceLock::new();
#[cfg(unix)]
fn install_terminal_fds(
stdout_fd: &std::os::fd::OwnedFd,
stderr_fd: &std::os::fd::OwnedFd,
) -> io::Result<()> {
use std::os::fd::AsRawFd;
if TERMINAL_STDOUT.get().is_some() && TERMINAL_STDERR.get().is_some() {
return Ok(());
}
use imp::dup_owned_cloexec;
let stdout_owned = dup_owned_cloexec(stdout_fd.as_raw_fd())?;
let stderr_owned = match dup_owned_cloexec(stderr_fd.as_raw_fd()) {
Ok(fd) => fd,
Err(e) => {
drop(stdout_owned);
return Err(e);
}
};
let stdout_file = std::fs::File::from(stdout_owned);
let stderr_file = std::fs::File::from(stderr_owned);
if let Err(rejected) = TERMINAL_STDOUT.set(Mutex::new(stdout_file)) {
drop(rejected);
}
if let Err(rejected) = TERMINAL_STDERR.set(Mutex::new(stderr_file)) {
drop(rejected);
}
Ok(())
}
#[cfg(unix)]
fn clear_terminal_fds() {
}
#[cfg(windows)]
static TERMINAL_STDOUT: OnceLock<Mutex<std::fs::File>> = OnceLock::new();
#[cfg(windows)]
static TERMINAL_STDERR: OnceLock<Mutex<std::fs::File>> = OnceLock::new();
#[cfg(windows)]
fn install_terminal_handles(
stdout_handle: windows_sys::Win32::Foundation::HANDLE,
stderr_handle: windows_sys::Win32::Foundation::HANDLE,
) -> io::Result<()> {
if TERMINAL_STDOUT.get().is_some() && TERMINAL_STDERR.get().is_some() {
return Ok(());
}
let stdout_owned = imp::duplicate_handle_owned(stdout_handle)?;
let stderr_owned = match imp::duplicate_handle_owned(stderr_handle) {
Ok(h) => h,
Err(e) => {
drop(stdout_owned);
return Err(e);
}
};
let stdout_file = std::fs::File::from(stdout_owned);
let stderr_file = std::fs::File::from(stderr_owned);
if let Err(rejected) = TERMINAL_STDOUT.set(Mutex::new(stdout_file)) {
drop(rejected);
}
if let Err(rejected) = TERMINAL_STDERR.set(Mutex::new(stderr_file)) {
drop(rejected);
}
Ok(())
}
#[cfg(windows)]
fn clear_terminal_handles() {
}
#[derive(Clone, Copy, Default)]
pub(crate) struct TerminalStdout;
#[derive(Clone, Copy, Default)]
pub(crate) struct TerminalStderr;
impl Write for TerminalStdout {
#[cfg(any(unix, windows))]
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
match TERMINAL_STDOUT.get() {
Some(mtx) => mtx.lock().unwrap().write(buf),
None => io::stdout().write(buf),
}
}
#[cfg(any(unix, windows))]
fn flush(&mut self) -> io::Result<()> {
match TERMINAL_STDOUT.get() {
Some(mtx) => mtx.lock().unwrap().flush(),
None => io::stdout().flush(),
}
}
#[cfg(not(any(unix, windows)))]
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
io::stdout().write(buf)
}
#[cfg(not(any(unix, windows)))]
fn flush(&mut self) -> io::Result<()> {
io::stdout().flush()
}
}
impl Write for TerminalStderr {
#[cfg(any(unix, windows))]
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
match TERMINAL_STDERR.get() {
Some(mtx) => mtx.lock().unwrap().write(buf),
None => io::stderr().write(buf),
}
}
#[cfg(any(unix, windows))]
fn flush(&mut self) -> io::Result<()> {
match TERMINAL_STDERR.get() {
Some(mtx) => mtx.lock().unwrap().flush(),
None => io::stderr().flush(),
}
}
#[cfg(not(any(unix, windows)))]
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
io::stderr().write(buf)
}
#[cfg(not(any(unix, windows)))]
fn flush(&mut self) -> io::Result<()> {
io::stderr().flush()
}
}
#[cfg(all(test, any(unix, windows)))]
mod tests {
use super::*;
use crate::internal::{
CapturedOutput, RegisteredTest, TestFunction, TestProperties, TestResult,
};
use std::sync::Arc;
use std::time::{Duration, SystemTime};
fn dummy_test(name: &str) -> RegisteredTest {
RegisteredTest {
name: name.to_string(),
crate_name: "test_crate".to_string(),
module_path: "test_module".to_string(),
run: TestFunction::Sync(Arc::new(|_| Box::new(()))),
props: TestProperties::default(),
dependencies: None,
}
}
fn encode_records(items: &[(u64, &str)]) -> Vec<u8> {
let mut out = Vec::new();
for (ts_ns, line) in items {
let len = u32::try_from(line.len()).unwrap();
out.extend_from_slice(&ts_ns.to_le_bytes());
out.push(0u8); out.extend_from_slice(&len.to_le_bytes());
out.extend_from_slice(line.as_bytes());
}
out
}
#[test]
fn read_spill_file_round_trips_records_in_order() {
let dir = std::env::temp_dir().join(format!("test-r-host-rt-{}", uuid::Uuid::new_v4()));
std::fs::create_dir_all(&dir).unwrap();
let path = dir.join("spill.bin");
let bytes = encode_records(&[
(1_000_000, "first line"),
(5_000_000, "second line"),
(9_999_999_999, "third line, large ts"),
]);
std::fs::write(&path, bytes).unwrap();
let records = imp::read_spill_file(&path).unwrap();
assert_eq!(records.len(), 3, "all three records must parse");
assert_eq!(records[0].elapsed, Duration::from_nanos(1_000_000));
assert_eq!(records[0].line, "first line");
assert_eq!(records[1].line, "second line");
assert_eq!(records[2].elapsed, Duration::from_nanos(9_999_999_999));
assert_eq!(records[2].line, "third line, large ts");
let _ = std::fs::remove_file(&path);
let _ = std::fs::remove_dir(&dir);
}
#[test]
fn read_spill_file_drops_truncated_tail() {
let dir = std::env::temp_dir().join(format!("test-r-host-tr-{}", uuid::Uuid::new_v4()));
std::fs::create_dir_all(&dir).unwrap();
let path = dir.join("spill.bin");
let mut bytes = encode_records(&[(1_000, "complete")]);
bytes.extend_from_slice(&2_000u64.to_le_bytes());
bytes.push(0u8);
bytes.extend_from_slice(&64u32.to_le_bytes());
bytes.extend_from_slice(b"abc");
std::fs::write(&path, bytes).unwrap();
let records = imp::read_spill_file(&path).unwrap();
assert_eq!(
records.len(),
1,
"the truncated trailing record must be dropped, not panic"
);
assert_eq!(records[0].line, "complete");
let _ = std::fs::remove_file(&path);
let _ = std::fs::remove_dir(&dir);
}
#[test]
fn read_spill_file_missing_returns_empty_vec() {
let path = std::env::temp_dir().join(format!(
"test-r-host-nf-{}-does-not-exist.bin",
uuid::Uuid::new_v4()
));
let records = imp::read_spill_file(&path).unwrap();
assert!(records.is_empty());
}
#[test]
fn attribute_records_to_tests_inserts_host_lines_for_matching_window() {
let epoch_wall = SystemTime::now() - Duration::from_secs(1);
let win_a = HostWindow {
start: Duration::from_millis(200),
end: Duration::from_millis(400),
};
let win_b = HostWindow {
start: Duration::from_millis(500),
end: Duration::from_millis(700),
};
let records = vec![
HostLogRecord {
elapsed: Duration::from_millis(100),
stream_tag: 0,
line: "before any test".to_string(),
},
HostLogRecord {
elapsed: Duration::from_millis(250),
stream_tag: 0,
line: "during a".to_string(),
},
HostLogRecord {
elapsed: Duration::from_millis(600),
stream_tag: 0,
line: "during b".to_string(),
},
HostLogRecord {
elapsed: Duration::from_millis(900),
stream_tag: 0,
line: "after both tests".to_string(),
},
];
let a = dummy_test("a");
let b = dummy_test("b");
let mut results: Vec<(RegisteredTest, TestResult)> = vec![
(a.clone(), TestResult::passed(Duration::from_millis(200))),
(b.clone(), TestResult::passed(Duration::from_millis(200))),
];
results[0]
.1
.set_captured_output(vec![CapturedOutput::Stdout {
timestamp: SystemTime::UNIX_EPOCH,
line: "from test a".to_string(),
}]);
results[1]
.1
.set_captured_output(vec![CapturedOutput::Stdout {
timestamp: SystemTime::UNIX_EPOCH,
line: "from test b".to_string(),
}]);
let windows_indexed = vec![(0usize, win_a), (1usize, win_b)];
attribute_records_to_tests(epoch_wall, &records, &windows_indexed, &mut results);
let a_caps = results[0].1.captured_output();
let a_host: Vec<&str> = a_caps
.iter()
.filter_map(|c| match c {
CapturedOutput::Host { line, .. } => Some(line.as_str()),
_ => None,
})
.collect();
assert_eq!(
a_host,
vec!["during a"],
"test A must receive only the host record inside its window"
);
let b_caps = results[1].1.captured_output();
let b_host: Vec<&str> = b_caps
.iter()
.filter_map(|c| match c {
CapturedOutput::Host { line, .. } => Some(line.as_str()),
_ => None,
})
.collect();
assert_eq!(
b_host,
vec!["during b"],
"test B must receive only the host record inside its window"
);
assert!(a_caps
.iter()
.any(|c| matches!(c, CapturedOutput::Stdout { line, .. } if line == "from test a")));
assert!(b_caps
.iter()
.any(|c| matches!(c, CapturedOutput::Stdout { line, .. } if line == "from test b")));
let _ = a;
let _ = b;
}
#[test]
fn attribute_records_to_tests_handles_overlapping_windows() {
let epoch_wall = SystemTime::now() - Duration::from_secs(1);
let overlap = HostWindow {
start: Duration::from_millis(100),
end: Duration::from_millis(800),
};
let records = vec![HostLogRecord {
elapsed: Duration::from_millis(500),
stream_tag: 0,
line: "shared host line".to_string(),
}];
let mut results: Vec<(RegisteredTest, TestResult)> = vec![
(
dummy_test("a"),
TestResult::passed(Duration::from_millis(700)),
),
(
dummy_test("b"),
TestResult::passed(Duration::from_millis(700)),
),
];
let windows_indexed = vec![(0usize, overlap), (1usize, overlap)];
attribute_records_to_tests(epoch_wall, &records, &windows_indexed, &mut results);
for (_, r) in &results {
let host_lines: Vec<&str> = r
.captured_output()
.iter()
.filter_map(|c| match c {
CapturedOutput::Host { line, .. } => Some(line.as_str()),
_ => None,
})
.collect();
assert_eq!(
host_lines,
vec!["shared host line"],
"overlapping windows must each receive the same host record"
);
}
}
}