#![cfg(test)]
#![allow(dead_code)]
use super::super::affinity::*;
use super::super::config::*;
use super::*;
use std::collections::{BTreeMap, BTreeSet};
use std::sync::atomic::AtomicBool;
use std::time::{Duration, Instant};
pub(super) fn spawn_and_collect_after(
work_type: WorkType,
num_workers: usize,
sleep_ms: u64,
) -> Vec<WorkerReport> {
let config = WorkloadConfig {
num_workers,
affinity: AffinityIntent::Inherit,
work_type,
sched_policy: SchedPolicy::Normal,
..Default::default()
};
let mut h = WorkloadHandle::spawn(&config).unwrap();
h.start();
std::thread::sleep(std::time::Duration::from_millis(sleep_ms));
h.stop_and_collect()
}
pub(super) fn count_open_fds() -> usize {
std::fs::read_dir("/proc/self/fd")
.map(|d| d.count())
.unwrap_or(0)
}
pub(super) fn any_zombie_child() -> bool {
let mut status = 0i32;
let ret = unsafe { libc::waitpid(-1, &mut status, libc::WNOHANG) };
ret > 0
}
pub(super) fn set_rlimit_nproc_zero_headroom() -> bool {
let rl = libc::rlimit {
rlim_cur: 0,
rlim_max: 0,
};
unsafe { libc::setrlimit(libc::RLIMIT_NPROC, &rl) == 0 }
}
pub(super) fn run_in_forked_child<F: FnOnce() -> i32>(body: F) -> i32 {
let pid = unsafe { libc::fork() };
assert!(pid >= 0, "fork failed: {}", std::io::Error::last_os_error());
if pid == 0 {
let _ = std::panic::take_hook();
std::panic::set_hook(Box::new(|_| {}));
let code = std::panic::catch_unwind(std::panic::AssertUnwindSafe(body)).unwrap_or(99);
unsafe { libc::_exit(code) };
}
let mut status: libc::c_int = 0;
let waited = unsafe { libc::waitpid(pid, &mut status, 0) };
assert_eq!(
waited,
pid,
"waitpid({pid}) failed: {}",
std::io::Error::last_os_error()
);
if libc::WIFEXITED(status) {
libc::WEXITSTATUS(status)
} else {
100 + libc::WTERMSIG(status)
}
}
pub(super) fn stub_custom_fn(_stop: &AtomicBool) -> WorkerReport {
WorkerReport {
tid: 0,
work_units: 0,
cpu_time_ns: 0,
wall_time_ns: 0,
off_cpu_ns: 0,
migration_count: 0,
cpus_used: BTreeSet::new(),
migrations: vec![],
max_gap_ms: 0,
max_gap_cpu: 0,
max_gap_at_ms: 0,
resume_latencies_ns: vec![],
wake_sample_total: 0,
iteration_costs_ns: vec![],
iteration_cost_sample_total: 0,
iterations: 0,
schedstat_run_delay_ns: 0,
schedstat_run_count: 0,
schedstat_cpu_time_ns: 0,
completed: true,
numa_pages: BTreeMap::new(),
vmstat_numa_pages_migrated: 0,
exit_info: None,
is_messenger: false,
group_idx: 0,
affinity_error: None,
}
}
pub(super) fn custom_spin_fn(stop: &AtomicBool) -> WorkerReport {
let tid: libc::pid_t = unsafe { libc::getpid() };
let start = Instant::now();
let mut work_units = 0u64;
while !stop_requested(stop) {
work_units = std::hint::black_box(work_units.wrapping_add(1));
std::hint::spin_loop();
}
let wall_time_ns = start.elapsed().as_nanos() as u64;
WorkerReport {
tid,
work_units,
cpu_time_ns: 0,
wall_time_ns,
off_cpu_ns: 0,
migration_count: 0,
cpus_used: BTreeSet::new(),
migrations: vec![],
max_gap_ms: 0,
max_gap_cpu: 0,
max_gap_at_ms: 0,
resume_latencies_ns: vec![],
wake_sample_total: 0,
iteration_costs_ns: vec![],
iteration_cost_sample_total: 0,
iterations: work_units,
schedstat_run_delay_ns: 0,
schedstat_run_count: 0,
schedstat_cpu_time_ns: 0,
completed: true,
numa_pages: BTreeMap::new(),
vmstat_numa_pages_migrated: 0,
exit_info: None,
is_messenger: false,
group_idx: 0,
affinity_error: None,
}
}
pub(super) fn ready_file_path(pid: libc::pid_t) -> std::path::PathBuf {
std::env::temp_dir().join(format!("ktstr-sigusr1-ignore-ready-{pid}"))
}
pub(super) fn ignore_sigusr1_and_get_pid() -> libc::pid_t {
unsafe {
libc::signal(libc::SIGUSR1, libc::SIG_IGN);
}
unsafe { libc::getpid() }
}
pub(super) fn wait_for_deadline(stop: &AtomicBool, timeout: Duration) {
let deadline = Instant::now() + timeout;
while !stop_requested(stop) && Instant::now() < deadline {
std::thread::sleep(Duration::from_millis(10));
}
}
pub(super) fn wait_for_file_or_panic(
path: &std::path::Path,
timeout: Duration,
liveness_pid: libc::pid_t,
context: &str,
) {
let deadline = Instant::now() + timeout;
while !path.exists() {
if nix::sys::signal::kill(nix::unistd::Pid::from_raw(liveness_pid), None).is_err() {
panic!("pid {liveness_pid} exited before writing ready file {path:?} — {context}",);
}
if Instant::now() >= deadline {
panic!(
"pid {liveness_pid} did not write ready file {path:?} within {timeout:?} — {context}",
);
}
std::thread::sleep(Duration::from_millis(10));
}
}
pub(super) fn ignores_sigusr1_fn(stop: &AtomicBool) -> WorkerReport {
let tid = ignore_sigusr1_and_get_pid();
stop.store(false, Ordering::Relaxed);
let ready_path = ready_file_path(tid);
let _ = std::fs::write(&ready_path, []);
wait_for_deadline(stop, Duration::from_secs(7));
WorkerReport {
tid,
..WorkerReport::default()
}
}
pub(super) fn grandchild_pidfile_path(worker_pid: libc::pid_t) -> std::path::PathBuf {
std::env::temp_dir().join(format!("ktstr-grandchild-pid-{worker_pid}"))
}
pub(super) const GRANDCHILD_SLEEP_BINARY: &str = "/bin/sleep";
pub(super) fn require_grandchild_sleep_binary() {
use std::os::unix::fs::PermissionsExt;
let path = std::path::Path::new(GRANDCHILD_SLEEP_BINARY);
let meta = match std::fs::metadata(path) {
Ok(m) => m,
Err(e) => panic!(
"grandchild reaping tests require {GRANDCHILD_SLEEP_BINARY} to \
exist; stat failed: {e}. Install coreutils (or adjust the \
test's exec target + update GRANDCHILD_SLEEP_BINARY)."
),
};
if meta.permissions().mode() & 0o111 == 0 {
panic!(
"grandchild reaping tests require {GRANDCHILD_SLEEP_BINARY} to \
have at least one execute bit set; mode = {:o}. Fix the \
file's permissions or adjust the test's exec target.",
meta.permissions().mode() & 0o7777,
);
}
}
pub(super) fn read_grandchild_gpid_from_pidfile(
worker_pid: libc::pid_t,
pidfile: &std::path::Path,
) -> libc::pid_t {
wait_for_file_or_panic(
pidfile,
Duration::from_secs(3),
worker_pid,
"fork+exec path likely broken — check /bin/sleep exists and is executable",
);
let read_deadline = Instant::now() + Duration::from_secs(3);
let gpid_str = loop {
let s = std::fs::read_to_string(pidfile).expect("pidfile readable once exists");
if !s.trim().is_empty() {
break s;
}
if Instant::now() >= read_deadline {
panic!(
"pidfile {pidfile:?} stayed empty for 3s after exists() \
returned true — writer may have crashed between O_TRUNC \
and write",
);
}
std::thread::sleep(Duration::from_millis(10));
};
let gpid: libc::pid_t = gpid_str
.trim()
.parse()
.expect("pidfile holds a valid pid_t");
assert!(gpid > 0, "grandchild pid must be positive: {gpid}");
gpid
}
pub(super) fn wait_for_grandchild_reap(gpid: libc::pid_t, timeout: Duration) -> Result<(), ()> {
let deadline = Instant::now() + timeout;
loop {
match nix::sys::signal::kill(nix::unistd::Pid::from_raw(gpid), None) {
Err(nix::errno::Errno::ESRCH) => return Ok(()),
Err(e) => panic!(
"unexpected errno from existence probe: {e} \
(common non-ESRCH errnos: EPERM = caller may not \
signal this process despite it existing; EINVAL = \
invalid signal number, which cannot happen here \
since we pass None / signal 0)",
),
Ok(()) => {
match nix::sys::wait::waitpid(
nix::unistd::Pid::from_raw(gpid),
Some(nix::sys::wait::WaitPidFlag::WNOHANG),
) {
Ok(nix::sys::wait::WaitStatus::Exited(_, _))
| Ok(nix::sys::wait::WaitStatus::Signaled(_, _, _)) => return Ok(()),
_ => {}
}
if Instant::now() >= deadline {
return Err(());
}
std::thread::sleep(Duration::from_millis(20));
}
}
}
}
pub(super) fn assert_grandchild_reaped_within(gpid: libc::pid_t, timeout: Duration, context: &str) {
if wait_for_grandchild_reap(gpid, timeout).is_err() {
let _ = nix::sys::signal::kill(
nix::unistd::Pid::from_raw(gpid),
nix::sys::signal::Signal::SIGKILL,
);
panic!(
"grandchild {gpid} still alive {:?} after {context} — \
setpgid/killpg path broken",
timeout,
);
}
}
pub(super) struct PidfileCleanup(pub(super) Vec<std::path::PathBuf>);
impl Drop for PidfileCleanup {
fn drop(&mut self) {
for p in &self.0 {
let _ = std::fs::remove_file(p);
}
}
}
pub(super) fn fork_and_exec_grandchild_and_publish_pidfile() -> libc::pid_t {
let exec_path = std::ffi::CString::new(GRANDCHILD_SLEEP_BINARY)
.expect("GRANDCHILD_SLEEP_BINARY must have no interior NUL");
let exec_arg = std::ffi::CString::new("60").expect("literal has no NUL");
let worker_pid = unsafe { libc::getpid() };
let gpid = unsafe { libc::fork() };
if gpid < 0 {
eprintln!("fork failed: {}", std::io::Error::last_os_error());
unsafe {
libc::_exit(127);
}
}
if gpid == 0 {
let rc = unsafe { libc::close_range(3, u32::MAX, 0) };
if rc != 0 {
for fd in 3..=256 {
unsafe {
libc::close(fd);
}
}
}
let argv: [*const libc::c_char; 3] =
[exec_path.as_ptr(), exec_arg.as_ptr(), std::ptr::null()];
unsafe {
libc::execv(exec_path.as_ptr(), argv.as_ptr());
libc::_exit(127);
}
}
let pidfile = grandchild_pidfile_path(worker_pid);
let pidfile_tmp = std::env::temp_dir().join(format!("ktstr-grandchild-pid-{worker_pid}.tmp"));
if let Err(e) = std::fs::write(&pidfile_tmp, gpid.to_string()) {
eprintln!("failed to write grandchild pidfile tmp {pidfile_tmp:?}: {e}");
unsafe {
libc::_exit(127);
}
}
if let Err(e) = std::fs::rename(&pidfile_tmp, &pidfile) {
eprintln!("failed to rename grandchild pidfile {pidfile_tmp:?} → {pidfile:?}: {e}");
unsafe {
libc::_exit(127);
}
}
worker_pid
}
pub(super) fn forks_grandchild_sleep_fn(stop: &AtomicBool) -> WorkerReport {
let worker_pid = ignore_sigusr1_and_get_pid();
fork_and_exec_grandchild_and_publish_pidfile();
wait_for_deadline(stop, Duration::from_secs(7));
WorkerReport {
tid: worker_pid,
..WorkerReport::default()
}
}
pub(super) fn forks_grandchild_and_exits_cleanly_fn(stop: &AtomicBool) -> WorkerReport {
let worker_pid = fork_and_exec_grandchild_and_publish_pidfile();
wait_for_deadline(stop, Duration::from_secs(10));
WorkerReport {
tid: worker_pid,
..WorkerReport::default()
}
}
pub(super) fn forks_grandchild_and_panics_fn(_stop: &AtomicBool) -> WorkerReport {
let _worker_pid = ignore_sigusr1_and_get_pid();
fork_and_exec_grandchild_and_publish_pidfile();
panic!(
"intentional panic after grandchild fork to exercise the \
Custom-closure panic path in stop_and_collect"
);
}
pub(super) fn require_isolated_cpus(min_cpus: usize, test_name: &str) -> bool {
let available = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1);
if available < min_cpus {
eprintln!(
"ktstr: {test_name}: skipping — host reports \
available_parallelism={available}, test requires \
≥ {min_cpus} CPUs to keep stages on independent \
execution units"
);
return true;
}
false
}