use anyhow::{Context, Result};
use std::collections::{BTreeMap, BTreeSet};
use std::io::{Read, Seek, Write};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::time::{Duration, Instant};
#[cfg(not(target_endian = "little"))]
compile_error!(
"ktstr's FanOutCompute generation-counter layout assumes a \
little-endian target — the u64 counter at offset 0 of the \
shared futex region must expose its low 32 bits to the \
futex syscall at that same offset. Porting to a big-endian \
target requires reworking the layout so futex_wait sees the \
incrementing low 4 bytes."
);
#[derive(Clone, Debug, Default)]
pub enum AffinityKind {
#[default]
Inherit,
RandomSubset,
LlcAligned,
CrossCgroup,
SingleCpu,
Exact(BTreeSet<usize>),
}
impl AffinityKind {
pub fn exact(cpus: impl IntoIterator<Item = usize>) -> Self {
AffinityKind::Exact(cpus.into_iter().collect())
}
}
#[derive(Debug, Clone)]
pub enum AffinityMode {
None,
Fixed(BTreeSet<usize>),
Random { from: BTreeSet<usize>, count: usize },
SingleCpu(usize),
}
#[derive(Clone, Debug)]
pub enum Phase {
Spin(Duration),
Sleep(Duration),
Yield(Duration),
Io(Duration),
}
#[derive(Debug, Clone, strum::VariantNames)]
pub enum WorkType {
CpuSpin,
YieldHeavy,
Mixed,
IoSync,
Bursty { burst_ms: u64, sleep_ms: u64 },
PipeIo { burst_iters: u64 },
FutexPingPong { spin_iters: u64 },
CachePressure { size_kb: usize, stride: usize },
CacheYield { size_kb: usize, stride: usize },
CachePipe { size_kb: usize, burst_iters: u64 },
FutexFanOut { fan_out: usize, spin_iters: u64 },
Sequence { first: Phase, rest: Vec<Phase> },
ForkExit,
NiceSweep,
AffinityChurn { spin_iters: u64 },
PolicyChurn { spin_iters: u64 },
FanOutCompute {
fan_out: usize,
cache_footprint_kb: usize,
operations: usize,
sleep_usec: u64,
},
PageFaultChurn {
region_kb: usize,
touches_per_cycle: usize,
spin_iters: u64,
},
MutexContention {
contenders: usize,
hold_iters: u64,
work_iters: u64,
},
Custom {
name: &'static str,
run: fn(&AtomicBool) -> WorkerReport,
},
}
pub mod defaults {
pub const BURSTY_BURST_MS: u64 = 50;
pub const BURSTY_SLEEP_MS: u64 = 100;
pub const PIPE_IO_BURST_ITERS: u64 = 1024;
pub const FUTEX_PING_PONG_SPIN_ITERS: u64 = 1024;
pub const CACHE_PRESSURE_SIZE_KB: usize = 32;
pub const CACHE_PRESSURE_STRIDE: usize = 64;
pub const CACHE_YIELD_SIZE_KB: usize = 32;
pub const CACHE_YIELD_STRIDE: usize = 64;
pub const CACHE_PIPE_SIZE_KB: usize = 32;
pub const CACHE_PIPE_BURST_ITERS: u64 = 1024;
pub const FUTEX_FAN_OUT_FAN_OUT: usize = 4;
pub const FUTEX_FAN_OUT_SPIN_ITERS: u64 = 1024;
pub const AFFINITY_CHURN_SPIN_ITERS: u64 = 1024;
pub const POLICY_CHURN_SPIN_ITERS: u64 = 1024;
pub const FAN_OUT_COMPUTE_FAN_OUT: usize = 4;
pub const FAN_OUT_COMPUTE_CACHE_FOOTPRINT_KB: usize = 256;
pub const FAN_OUT_COMPUTE_OPERATIONS: usize = 5;
pub const FAN_OUT_COMPUTE_SLEEP_USEC: u64 = 100;
pub const PAGE_FAULT_CHURN_REGION_KB: usize = 4096;
pub const PAGE_FAULT_CHURN_TOUCHES_PER_CYCLE: usize = 256;
pub const PAGE_FAULT_CHURN_SPIN_ITERS: u64 = 64;
pub const MUTEX_CONTENTION_CONTENDERS: usize = 4;
pub const MUTEX_CONTENTION_HOLD_ITERS: u64 = 256;
pub const MUTEX_CONTENTION_WORK_ITERS: u64 = 1024;
}
impl WorkType {
pub const ALL_NAMES: &'static [&'static str] = <Self as strum::VariantNames>::VARIANTS;
pub fn name(&self) -> &'static str {
match self {
WorkType::CpuSpin => "CpuSpin",
WorkType::YieldHeavy => "YieldHeavy",
WorkType::Mixed => "Mixed",
WorkType::IoSync => "IoSync",
WorkType::Bursty { .. } => "Bursty",
WorkType::PipeIo { .. } => "PipeIo",
WorkType::FutexPingPong { .. } => "FutexPingPong",
WorkType::CachePressure { .. } => "CachePressure",
WorkType::CacheYield { .. } => "CacheYield",
WorkType::CachePipe { .. } => "CachePipe",
WorkType::FutexFanOut { .. } => "FutexFanOut",
WorkType::Sequence { .. } => "Sequence",
WorkType::ForkExit => "ForkExit",
WorkType::NiceSweep => "NiceSweep",
WorkType::AffinityChurn { .. } => "AffinityChurn",
WorkType::PolicyChurn { .. } => "PolicyChurn",
WorkType::FanOutCompute { .. } => "FanOutCompute",
WorkType::PageFaultChurn { .. } => "PageFaultChurn",
WorkType::MutexContention { .. } => "MutexContention",
WorkType::Custom { name, .. } => name,
}
}
pub fn from_name(s: &str) -> Option<WorkType> {
match s {
"CpuSpin" => Some(WorkType::CpuSpin),
"YieldHeavy" => Some(WorkType::YieldHeavy),
"Mixed" => Some(WorkType::Mixed),
"IoSync" => Some(WorkType::IoSync),
"Bursty" => Some(WorkType::Bursty {
burst_ms: defaults::BURSTY_BURST_MS,
sleep_ms: defaults::BURSTY_SLEEP_MS,
}),
"PipeIo" => Some(WorkType::PipeIo {
burst_iters: defaults::PIPE_IO_BURST_ITERS,
}),
"FutexPingPong" => Some(WorkType::FutexPingPong {
spin_iters: defaults::FUTEX_PING_PONG_SPIN_ITERS,
}),
"CachePressure" => Some(WorkType::CachePressure {
size_kb: defaults::CACHE_PRESSURE_SIZE_KB,
stride: defaults::CACHE_PRESSURE_STRIDE,
}),
"CacheYield" => Some(WorkType::CacheYield {
size_kb: defaults::CACHE_YIELD_SIZE_KB,
stride: defaults::CACHE_YIELD_STRIDE,
}),
"CachePipe" => Some(WorkType::CachePipe {
size_kb: defaults::CACHE_PIPE_SIZE_KB,
burst_iters: defaults::CACHE_PIPE_BURST_ITERS,
}),
"FutexFanOut" => Some(WorkType::FutexFanOut {
fan_out: defaults::FUTEX_FAN_OUT_FAN_OUT,
spin_iters: defaults::FUTEX_FAN_OUT_SPIN_ITERS,
}),
"ForkExit" => Some(WorkType::ForkExit),
"NiceSweep" => Some(WorkType::NiceSweep),
"AffinityChurn" => Some(WorkType::AffinityChurn {
spin_iters: defaults::AFFINITY_CHURN_SPIN_ITERS,
}),
"PolicyChurn" => Some(WorkType::PolicyChurn {
spin_iters: defaults::POLICY_CHURN_SPIN_ITERS,
}),
"FanOutCompute" => Some(WorkType::FanOutCompute {
fan_out: defaults::FAN_OUT_COMPUTE_FAN_OUT,
cache_footprint_kb: defaults::FAN_OUT_COMPUTE_CACHE_FOOTPRINT_KB,
operations: defaults::FAN_OUT_COMPUTE_OPERATIONS,
sleep_usec: defaults::FAN_OUT_COMPUTE_SLEEP_USEC,
}),
"PageFaultChurn" => Some(WorkType::PageFaultChurn {
region_kb: defaults::PAGE_FAULT_CHURN_REGION_KB,
touches_per_cycle: defaults::PAGE_FAULT_CHURN_TOUCHES_PER_CYCLE,
spin_iters: defaults::PAGE_FAULT_CHURN_SPIN_ITERS,
}),
"MutexContention" => Some(WorkType::MutexContention {
contenders: defaults::MUTEX_CONTENTION_CONTENDERS,
hold_iters: defaults::MUTEX_CONTENTION_HOLD_ITERS,
work_iters: defaults::MUTEX_CONTENTION_WORK_ITERS,
}),
_ => None,
}
}
pub fn suggest(s: &str) -> Option<&'static str> {
Self::ALL_NAMES
.iter()
.copied()
.find(|n| n.eq_ignore_ascii_case(s))
}
pub fn worker_group_size(&self) -> Option<usize> {
match self {
WorkType::PipeIo { .. }
| WorkType::FutexPingPong { .. }
| WorkType::CachePipe { .. } => Some(2),
WorkType::FutexFanOut { fan_out, .. } => Some(fan_out + 1),
WorkType::FanOutCompute { fan_out, .. } => Some(fan_out + 1),
WorkType::MutexContention { contenders, .. } => Some(*contenders),
_ => None,
}
}
pub fn needs_shared_mem(&self) -> bool {
matches!(
self,
WorkType::FutexPingPong { .. }
| WorkType::FutexFanOut { .. }
| WorkType::FanOutCompute { .. }
| WorkType::MutexContention { .. }
)
}
pub fn needs_cache_buf(&self) -> bool {
matches!(
self,
WorkType::CachePressure { .. }
| WorkType::CacheYield { .. }
| WorkType::CachePipe { .. }
| WorkType::FanOutCompute { .. }
)
}
pub fn bursty(burst_ms: u64, sleep_ms: u64) -> Self {
WorkType::Bursty { burst_ms, sleep_ms }
}
pub fn pipe_io(burst_iters: u64) -> Self {
WorkType::PipeIo { burst_iters }
}
pub fn futex_ping_pong(spin_iters: u64) -> Self {
WorkType::FutexPingPong { spin_iters }
}
pub fn cache_pressure(size_kb: usize, stride: usize) -> Self {
WorkType::CachePressure { size_kb, stride }
}
pub fn cache_yield(size_kb: usize, stride: usize) -> Self {
WorkType::CacheYield { size_kb, stride }
}
pub fn cache_pipe(size_kb: usize, burst_iters: u64) -> Self {
WorkType::CachePipe {
size_kb,
burst_iters,
}
}
pub fn futex_fan_out(fan_out: usize, spin_iters: u64) -> Self {
WorkType::FutexFanOut {
fan_out,
spin_iters,
}
}
pub fn affinity_churn(spin_iters: u64) -> Self {
WorkType::AffinityChurn { spin_iters }
}
pub fn policy_churn(spin_iters: u64) -> Self {
WorkType::PolicyChurn { spin_iters }
}
pub fn fan_out_compute(
fan_out: usize,
cache_footprint_kb: usize,
operations: usize,
sleep_usec: u64,
) -> Self {
WorkType::FanOutCompute {
fan_out,
cache_footprint_kb,
operations,
sleep_usec,
}
}
pub fn page_fault_churn(region_kb: usize, touches_per_cycle: usize, spin_iters: u64) -> Self {
WorkType::PageFaultChurn {
region_kb,
touches_per_cycle,
spin_iters,
}
}
pub fn mutex_contention(contenders: usize, hold_iters: u64, work_iters: u64) -> Self {
WorkType::MutexContention {
contenders,
hold_iters,
work_iters,
}
}
pub fn custom(name: &'static str, run: fn(&AtomicBool) -> WorkerReport) -> Self {
WorkType::Custom { name, run }
}
}
pub(crate) fn resolve_work_type(
base: &WorkType,
override_wt: Option<&WorkType>,
swappable: bool,
num_workers: usize,
) -> WorkType {
if !swappable {
return base.clone();
}
match override_wt {
Some(wt) => {
if let Some(gs) = wt.worker_group_size()
&& !num_workers.is_multiple_of(gs)
{
return base.clone();
}
wt.clone()
}
None => base.clone(),
}
}
#[derive(Debug, Clone, Copy)]
pub enum SchedPolicy {
Normal,
Batch,
Idle,
Fifo(u32),
RoundRobin(u32),
}
impl SchedPolicy {
pub fn fifo(priority: u32) -> Self {
SchedPolicy::Fifo(priority)
}
pub fn round_robin(priority: u32) -> Self {
SchedPolicy::RoundRobin(priority)
}
}
#[derive(Clone, Debug, Default)]
pub enum MemPolicy {
#[default]
Default,
Bind(BTreeSet<usize>),
Preferred(usize),
Interleave(BTreeSet<usize>),
Local,
PreferredMany(BTreeSet<usize>),
WeightedInterleave(BTreeSet<usize>),
}
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
pub struct MpolFlags(u32);
impl MpolFlags {
pub const NONE: Self = Self(0);
pub const STATIC_NODES: Self = Self(1 << 15);
pub const RELATIVE_NODES: Self = Self(1 << 14);
pub const NUMA_BALANCING: Self = Self(1 << 13);
#[cfg(test)]
pub(crate) const fn from_bits_for_test(bits: u32) -> Self {
Self(bits)
}
pub const fn union(self, other: Self) -> Self {
Self(self.0 | other.0)
}
pub const fn bits(self) -> u32 {
self.0
}
pub const fn contains(self, other: Self) -> bool {
(self.0 & other.0) == other.0
}
}
impl std::ops::BitOr for MpolFlags {
type Output = Self;
fn bitor(self, rhs: Self) -> Self {
self.union(rhs)
}
}
impl MemPolicy {
pub fn bind(nodes: impl IntoIterator<Item = usize>) -> Self {
MemPolicy::Bind(nodes.into_iter().collect())
}
pub fn preferred(node: usize) -> Self {
MemPolicy::Preferred(node)
}
pub fn interleave(nodes: impl IntoIterator<Item = usize>) -> Self {
MemPolicy::Interleave(nodes.into_iter().collect())
}
pub fn preferred_many(nodes: impl IntoIterator<Item = usize>) -> Self {
MemPolicy::PreferredMany(nodes.into_iter().collect())
}
pub fn weighted_interleave(nodes: impl IntoIterator<Item = usize>) -> Self {
MemPolicy::WeightedInterleave(nodes.into_iter().collect())
}
pub fn node_set(&self) -> BTreeSet<usize> {
match self {
MemPolicy::Default | MemPolicy::Local => BTreeSet::new(),
MemPolicy::Bind(nodes)
| MemPolicy::Interleave(nodes)
| MemPolicy::PreferredMany(nodes)
| MemPolicy::WeightedInterleave(nodes) => nodes.clone(),
MemPolicy::Preferred(node) => [*node].into_iter().collect(),
}
}
pub fn validate(&self) -> std::result::Result<(), String> {
match self {
MemPolicy::Default | MemPolicy::Local => Ok(()),
MemPolicy::Preferred(_) => Ok(()),
MemPolicy::Bind(nodes) if nodes.is_empty() => {
Err("Bind policy requires at least one NUMA node".into())
}
MemPolicy::Interleave(nodes) if nodes.is_empty() => {
Err("Interleave policy requires at least one NUMA node".into())
}
MemPolicy::PreferredMany(nodes) if nodes.is_empty() => {
Err("PreferredMany policy requires at least one NUMA node".into())
}
MemPolicy::WeightedInterleave(nodes) if nodes.is_empty() => {
Err("WeightedInterleave policy requires at least one NUMA node".into())
}
_ => Ok(()),
}
}
}
pub fn build_nodemask(nodes: &BTreeSet<usize>) -> (Vec<libc::c_ulong>, libc::c_ulong) {
if nodes.is_empty() {
return (vec![], 0);
}
let max_node = nodes.iter().copied().max().unwrap_or(0);
let mask_bits = max_node + 2;
let bits_per_word = std::mem::size_of::<libc::c_ulong>() * 8;
let mask_words = mask_bits.div_ceil(bits_per_word);
let mut nodemask = vec![0 as libc::c_ulong; mask_words];
for &node in nodes {
nodemask[node / bits_per_word] |= 1 << (node % bits_per_word);
}
(nodemask, mask_bits as libc::c_ulong)
}
const MPOL_PREFERRED_MANY: i32 = 5;
const MPOL_WEIGHTED_INTERLEAVE: i32 = 6;
const WORKER_STOP_POLL_NS: libc::c_long = 100_000_000;
const FUTEX_WAIT_TIMEOUT: libc::timespec = libc::timespec {
tv_sec: 0,
tv_nsec: WORKER_STOP_POLL_NS,
};
const FAN_OUT_POST_WAKE_SPIN_ITERS: u64 = 256;
fn apply_mempolicy_with_flags(policy: &MemPolicy, flags: MpolFlags) {
let (mode, node_set): (i32, BTreeSet<usize>) = match policy {
MemPolicy::Default => return,
MemPolicy::Bind(nodes) => (libc::MPOL_BIND, nodes.clone()),
MemPolicy::Preferred(node) => (libc::MPOL_PREFERRED, [*node].into_iter().collect()),
MemPolicy::Interleave(nodes) => (libc::MPOL_INTERLEAVE, nodes.clone()),
MemPolicy::PreferredMany(nodes) => (MPOL_PREFERRED_MANY, nodes.clone()),
MemPolicy::WeightedInterleave(nodes) => (MPOL_WEIGHTED_INTERLEAVE, nodes.clone()),
MemPolicy::Local => {
let rc = unsafe {
libc::syscall(
libc::SYS_set_mempolicy,
libc::MPOL_LOCAL | flags.bits() as i32,
std::ptr::null::<libc::c_ulong>(),
0 as libc::c_ulong,
)
};
if rc != 0 {
eprintln!(
"ktstr: set_mempolicy(MPOL_LOCAL) failed: {}",
std::io::Error::last_os_error(),
);
}
return;
}
};
if node_set.is_empty() {
eprintln!("ktstr: set_mempolicy: empty node set, skipping");
return;
}
let (mask, maxnode) = build_nodemask(&node_set);
let effective_mode = mode | flags.bits() as i32;
let rc = unsafe {
libc::syscall(
libc::SYS_set_mempolicy,
effective_mode,
mask.as_ptr(),
maxnode,
)
};
if rc != 0 {
eprintln!(
"ktstr: set_mempolicy(mode={}, nodes={:?}) failed: {}",
mode,
node_set,
std::io::Error::last_os_error(),
);
}
}
#[derive(Debug, Clone)]
pub struct WorkloadConfig {
pub num_workers: usize,
pub affinity: AffinityMode,
pub work_type: WorkType,
pub sched_policy: SchedPolicy,
pub mem_policy: MemPolicy,
pub mpol_flags: MpolFlags,
}
impl Default for WorkloadConfig {
fn default() -> Self {
Self {
num_workers: 1,
affinity: AffinityMode::None,
work_type: WorkType::CpuSpin,
sched_policy: SchedPolicy::Normal,
mem_policy: MemPolicy::Default,
mpol_flags: MpolFlags::NONE,
}
}
}
impl WorkloadConfig {
pub fn workers(mut self, n: usize) -> Self {
self.num_workers = n;
self
}
pub fn affinity(mut self, a: AffinityMode) -> Self {
self.affinity = a;
self
}
pub fn work_type(mut self, wt: WorkType) -> Self {
self.work_type = wt;
self
}
pub fn sched_policy(mut self, p: SchedPolicy) -> Self {
self.sched_policy = p;
self
}
pub fn mem_policy(mut self, p: MemPolicy) -> Self {
self.mem_policy = p;
self
}
pub fn mpol_flags(mut self, f: MpolFlags) -> Self {
self.mpol_flags = f;
self
}
}
#[derive(Clone, Debug)]
pub struct Work {
pub work_type: WorkType,
pub sched_policy: SchedPolicy,
pub num_workers: Option<usize>,
pub affinity: AffinityKind,
pub mem_policy: MemPolicy,
pub mpol_flags: MpolFlags,
}
impl Default for Work {
fn default() -> Self {
Self {
work_type: WorkType::CpuSpin,
sched_policy: SchedPolicy::Normal,
num_workers: None,
affinity: AffinityKind::Inherit,
mem_policy: MemPolicy::Default,
mpol_flags: MpolFlags::NONE,
}
}
}
impl Work {
pub fn workers(mut self, n: usize) -> Self {
self.num_workers = Some(n);
self
}
pub fn work_type(mut self, wt: WorkType) -> Self {
self.work_type = wt;
self
}
pub fn sched_policy(mut self, p: SchedPolicy) -> Self {
self.sched_policy = p;
self
}
pub fn affinity(mut self, a: AffinityKind) -> Self {
self.affinity = a;
self
}
pub fn mem_policy(mut self, p: MemPolicy) -> Self {
self.mem_policy = p;
self
}
pub fn mpol_flags(mut self, f: MpolFlags) -> Self {
self.mpol_flags = f;
self
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct Migration {
pub at_ns: u64,
pub from_cpu: usize,
pub to_cpu: usize,
}
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
pub struct WorkerReport {
pub tid: i32,
pub work_units: u64,
pub cpu_time_ns: u64,
pub wall_time_ns: u64,
pub off_cpu_ns: u64,
pub migration_count: u64,
pub cpus_used: BTreeSet<usize>,
pub migrations: Vec<Migration>,
pub max_gap_ms: u64,
pub max_gap_cpu: usize,
pub max_gap_at_ms: u64,
pub resume_latencies_ns: Vec<u64>,
pub wake_sample_total: u64,
pub iterations: u64,
pub schedstat_run_delay_ns: u64,
pub schedstat_run_count: u64,
pub schedstat_cpu_time_ns: u64,
#[serde(default)]
pub completed: bool,
#[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
pub numa_pages: BTreeMap<usize, u64>,
pub vmstat_numa_pages_migrated: u64,
#[serde(skip_serializing_if = "Option::is_none")]
pub exit_info: Option<WorkerExitInfo>,
#[serde(default)]
pub is_messenger: bool,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub enum WorkerExitInfo {
Exited(i32),
Signaled(i32),
TimedOut,
WaitFailed(String),
}
fn classify_wait_outcome(
source: Result<nix::sys::wait::WaitStatus, nix::errno::Errno>,
) -> WorkerExitInfo {
match source {
Ok(nix::sys::wait::WaitStatus::Exited(_, code)) => WorkerExitInfo::Exited(code),
Ok(nix::sys::wait::WaitStatus::Signaled(_, sig, _)) => WorkerExitInfo::Signaled(sig as i32),
Ok(nix::sys::wait::WaitStatus::StillAlive) => WorkerExitInfo::TimedOut,
Ok(_) => WorkerExitInfo::TimedOut,
Err(e) => WorkerExitInfo::WaitFailed(e.to_string()),
}
}
static SCHED_PID: std::sync::atomic::AtomicI32 = std::sync::atomic::AtomicI32::new(0);
static REPRO_MODE: std::sync::atomic::AtomicBool = std::sync::atomic::AtomicBool::new(false);
#[doc(hidden)]
pub(crate) fn set_sched_pid(pid: i32) {
SCHED_PID.store(pid, std::sync::atomic::Ordering::Relaxed);
}
#[doc(hidden)]
pub(crate) fn set_repro_mode(v: bool) {
REPRO_MODE.store(v, std::sync::atomic::Ordering::Relaxed);
}
#[must_use = "dropping a WorkloadHandle immediately kills all worker processes"]
pub struct WorkloadHandle {
children: Vec<(
libc::pid_t,
std::os::unix::io::RawFd,
std::os::unix::io::RawFd,
)>,
started: bool,
futex_ptrs: Vec<*mut u32>,
futex_region_size: usize,
iter_counters: *mut AtomicU64,
iter_counter_len: usize,
}
struct SpawnGuard {
pipe_pairs: Vec<([i32; 2], [i32; 2])>,
futex_ptrs: Vec<*mut u32>,
futex_region_size: usize,
iter_counters: *mut AtomicU64,
iter_counter_bytes: usize,
children: Vec<(libc::pid_t, i32, i32)>,
}
impl SpawnGuard {
fn new(futex_region_size: usize) -> Self {
Self {
pipe_pairs: Vec::new(),
futex_ptrs: Vec::new(),
futex_region_size,
iter_counters: std::ptr::null_mut(),
iter_counter_bytes: 0,
children: Vec::new(),
}
}
fn into_handle(mut self) -> WorkloadHandle {
let children = std::mem::take(&mut self.children);
let futex_ptrs = std::mem::take(&mut self.futex_ptrs);
let iter_counters = std::mem::replace(&mut self.iter_counters, std::ptr::null_mut());
let iter_counter_bytes = std::mem::replace(&mut self.iter_counter_bytes, 0);
let iter_counter_len = iter_counter_bytes / std::mem::size_of::<AtomicU64>();
WorkloadHandle {
children,
started: false,
futex_ptrs,
futex_region_size: self.futex_region_size,
iter_counters,
iter_counter_len,
}
}
}
impl Drop for SpawnGuard {
fn drop(&mut self) {
for &(pid, _, _) in &self.children {
let npid = nix::unistd::Pid::from_raw(pid);
let _ = nix::sys::signal::kill(npid, nix::sys::signal::Signal::SIGKILL);
let _ = nix::sys::wait::waitpid(npid, None);
}
for &(_, rfd, wfd) in &self.children {
for fd in [rfd, wfd] {
if fd >= 0 {
let _ = nix::unistd::close(fd);
}
}
}
for (ab, ba) in &self.pipe_pairs {
for fd in [ab[0], ab[1], ba[0], ba[1]] {
let _ = nix::unistd::close(fd);
}
}
for &ptr in &self.futex_ptrs {
unsafe {
libc::munmap(ptr as *mut libc::c_void, self.futex_region_size);
}
}
if !self.iter_counters.is_null() && self.iter_counter_bytes > 0 {
unsafe {
libc::munmap(
self.iter_counters as *mut libc::c_void,
self.iter_counter_bytes,
);
}
}
}
}
unsafe impl Send for WorkloadHandle {}
unsafe impl Sync for WorkloadHandle {}
impl WorkloadHandle {
pub fn spawn(config: &WorkloadConfig) -> Result<Self> {
let needs_pipes = matches!(
config.work_type,
WorkType::PipeIo { .. } | WorkType::CachePipe { .. }
);
let needs_futex = config.work_type.needs_shared_mem();
if let Some(group_size) = config.work_type.worker_group_size()
&& (config.num_workers == 0 || !config.num_workers.is_multiple_of(group_size))
{
anyhow::bail!(
"{} requires num_workers divisible by {}, got {}",
config.work_type.name(),
group_size,
config.num_workers
);
}
let futex_region_size = if matches!(config.work_type, WorkType::FanOutCompute { .. }) {
16
} else {
std::mem::size_of::<u32>()
};
let mut guard = SpawnGuard::new(futex_region_size);
if needs_pipes {
for _ in 0..config.num_workers / 2 {
let mut ab = [0i32; 2]; if unsafe { libc::pipe(ab.as_mut_ptr()) } != 0 {
anyhow::bail!("pipe failed: {}", std::io::Error::last_os_error());
}
let mut ba = [0i32; 2]; if unsafe { libc::pipe(ba.as_mut_ptr()) } != 0 {
unsafe {
libc::close(ab[0]);
libc::close(ab[1]);
}
anyhow::bail!("pipe failed: {}", std::io::Error::last_os_error());
}
guard.pipe_pairs.push((ab, ba));
}
}
let futex_group_size = config.work_type.worker_group_size().unwrap_or(2);
if needs_futex {
for _ in 0..config.num_workers / futex_group_size {
let ptr = unsafe {
libc::mmap(
std::ptr::null_mut(),
futex_region_size,
libc::PROT_READ | libc::PROT_WRITE,
libc::MAP_SHARED | libc::MAP_ANONYMOUS,
-1,
0,
)
};
if ptr == libc::MAP_FAILED {
let errno = std::io::Error::last_os_error();
let hint = mmap_shared_anon_errno_hint(errno.raw_os_error());
anyhow::bail!(
"mmap(MAP_SHARED|MAP_ANONYMOUS, {futex_region_size} bytes) \
for a futex shared-memory region failed: {errno}{hint}; \
this region backs the {:?} worker-group's \
inter-process futex word and is allocated \
before fork so every child inherits the same \
mapping. Remediation: reduce num_workers (each \
futex group consumes one shared page) or raise \
`vm.max_map_count` / the memory cgroup limit.",
config.work_type.name(),
);
}
unsafe { std::ptr::write_bytes(ptr as *mut u8, 0, futex_region_size) };
guard.futex_ptrs.push(ptr as *mut u32);
}
}
let iter_counter_len = config.num_workers;
if iter_counter_len > 0 {
let size = iter_counter_len * std::mem::size_of::<AtomicU64>();
let ptr = unsafe {
libc::mmap(
std::ptr::null_mut(),
size,
libc::PROT_READ | libc::PROT_WRITE,
libc::MAP_SHARED | libc::MAP_ANONYMOUS,
-1,
0,
)
};
if ptr == libc::MAP_FAILED {
let errno = std::io::Error::last_os_error();
let hint = mmap_shared_anon_errno_hint(errno.raw_os_error());
anyhow::bail!(
"mmap(MAP_SHARED|MAP_ANONYMOUS, {size} bytes) for the \
{work_type:?} worker-group's per-worker iter_counters \
region failed: {errno}{hint}; this region holds one \
AtomicU64 per worker ({iter_counter_len} slots) so \
the parent can snapshot iteration counts via \
`snapshot_iterations()`. Remediation: reduce \
num_workers (each worker consumes 8 bytes of this \
region, rounded up to a page) or raise \
`vm.max_map_count` / the memory cgroup limit.",
work_type = config.work_type.name(),
);
}
guard.iter_counters = ptr as *mut AtomicU64;
guard.iter_counter_bytes = size;
}
for i in 0..config.num_workers {
let affinity = resolve_affinity(&config.affinity)?;
let worker_pipe_fds: Option<(i32, i32)> = if needs_pipes {
let pair_idx = i / 2;
let (ref ab, ref ba) = guard.pipe_pairs[pair_idx];
if i % 2 == 0 {
Some((ba[0], ab[1]))
} else {
Some((ab[0], ba[1]))
}
} else {
None
};
let worker_futex: Option<(*mut u32, bool)> = if needs_futex {
let group_idx = i / futex_group_size;
let is_first = i % futex_group_size == 0;
Some((guard.futex_ptrs[group_idx], is_first))
} else {
None
};
let iter_slot: *mut AtomicU64 = if !guard.iter_counters.is_null() {
unsafe { guard.iter_counters.add(i) }
} else {
std::ptr::null_mut()
};
let mut report_fds = [0i32; 2];
if unsafe { libc::pipe(report_fds.as_mut_ptr()) } != 0 {
anyhow::bail!(
"worker {}/{}: report pipe failed: {}",
i + 1,
config.num_workers,
std::io::Error::last_os_error(),
);
}
let mut start_fds = [0i32; 2];
if unsafe { libc::pipe(start_fds.as_mut_ptr()) } != 0 {
unsafe {
libc::close(report_fds[0]);
libc::close(report_fds[1]);
}
anyhow::bail!(
"worker {}/{}: start pipe failed: {}",
i + 1,
config.num_workers,
std::io::Error::last_os_error(),
);
}
let pid = unsafe { libc::fork() };
match pid {
-1 => {
unsafe {
libc::close(report_fds[0]);
libc::close(report_fds[1]);
libc::close(start_fds[0]);
libc::close(start_fds[1]);
}
anyhow::bail!(
"worker {}/{}: fork failed: {}",
i + 1,
config.num_workers,
std::io::Error::last_os_error(),
);
}
0 => {
unsafe {
libc::prctl(libc::PR_SET_PDEATHSIG, libc::SIGKILL);
}
if std::env::var_os("KTSTR_GUEST_INIT").is_none()
&& unsafe { libc::getppid() } == 1
{
unsafe {
libc::_exit(0);
}
}
unsafe {
libc::setpgid(0, 0);
}
STOP.store(false, Ordering::Relaxed);
unsafe {
libc::signal(
libc::SIGUSR1,
sigusr1_handler as *const () as libc::sighandler_t,
);
}
unsafe {
libc::close(report_fds[0]);
libc::close(start_fds[1]);
}
if needs_pipes {
let pair_idx = i / 2;
let (ref ab, ref ba) = guard.pipe_pairs[pair_idx];
if i % 2 == 0 {
unsafe {
libc::close(ab[0]);
libc::close(ba[1]);
}
} else {
unsafe {
libc::close(ab[1]);
libc::close(ba[0]);
}
}
for (j, (ab2, ba2)) in guard.pipe_pairs.iter().enumerate() {
if j != pair_idx {
unsafe {
libc::close(ab2[0]);
libc::close(ab2[1]);
libc::close(ba2[0]);
libc::close(ba2[1]);
}
}
}
}
let _ = std::panic::take_hook();
std::panic::set_hook(Box::new(|_| {}));
let child_result =
std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
std::mem::forget(guard);
let mut pfd = libc::pollfd {
fd: start_fds[0],
events: libc::POLLIN,
revents: 0,
};
let ret = unsafe { libc::poll(&mut pfd, 1, 30_000) };
if ret <= 0 {
unsafe {
libc::_exit(1);
}
}
let mut buf = [0u8; 1];
let mut f = unsafe { std::fs::File::from_raw_fd(start_fds[0]) };
let _ = f.read_exact(&mut buf);
drop(f);
STOP.store(false, Ordering::Relaxed);
let report = worker_main(
affinity,
config.work_type.clone(),
config.sched_policy,
config.mem_policy.clone(),
config.mpol_flags,
worker_pipe_fds,
worker_futex,
iter_slot,
);
let json = serde_json::to_vec(&report).unwrap_or_default();
let mut f = unsafe { std::fs::File::from_raw_fd(report_fds[1]) };
let _ = f.write_all(&json);
drop(f);
}));
let code = if child_result.is_ok() { 0 } else { 1 };
unsafe {
libc::_exit(code);
}
}
child_pid => {
unsafe {
libc::close(report_fds[1]);
libc::close(start_fds[0]);
}
guard
.children
.push((child_pid, report_fds[0], start_fds[1]));
}
}
}
Ok(guard.into_handle())
}
pub fn worker_pids(&self) -> Vec<libc::pid_t> {
self.children.iter().map(|(pid, _, _)| *pid).collect()
}
pub fn start(&mut self) {
if self.started {
return;
}
self.started = true;
for (_, _, start_fd) in &mut self.children {
unsafe {
libc::write(*start_fd, b"s".as_ptr() as *const _, 1);
libc::close(*start_fd);
}
*start_fd = -1;
}
}
pub fn set_affinity(&self, idx: usize, cpus: &BTreeSet<usize>) -> Result<()> {
let (pid, _, _) = self.children[idx];
set_thread_affinity(pid, cpus)
}
pub fn snapshot_iterations(&self) -> Vec<u64> {
if self.iter_counters.is_null() || self.iter_counter_len == 0 {
return Vec::new();
}
(0..self.iter_counter_len)
.map(|i| {
unsafe { &*self.iter_counters.add(i) }.load(Ordering::Relaxed)
})
.collect()
}
pub fn stop_and_collect(mut self) -> Vec<WorkerReport> {
let was_started = self.started;
self.start();
if !was_started {
std::thread::sleep(std::time::Duration::from_millis(500));
}
let mut reports = Vec::new();
let children = std::mem::take(&mut self.children);
for &(pid, _, _) in &children {
let _ = nix::sys::signal::kill(
nix::unistd::Pid::from_raw(pid),
nix::sys::signal::Signal::SIGUSR1,
);
}
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(5);
for (pid, read_fd, _) in children {
let mut buf = Vec::new();
let remaining = deadline.saturating_duration_since(std::time::Instant::now());
let ms = remaining.as_millis().min(i32::MAX as u128) as i32;
if ms > 0 {
let mut pfd = libc::pollfd {
fd: read_fd,
events: libc::POLLIN,
revents: 0,
};
let ready = unsafe { libc::poll(&mut pfd, 1, ms) };
if ready > 0 {
let mut f = unsafe { std::fs::File::from_raw_fd(read_fd) };
let _ = f.read_to_end(&mut buf);
drop(f);
} else {
let _ = nix::unistd::close(read_fd);
}
} else {
let _ = nix::unistd::close(read_fd);
}
let npid = nix::unistd::Pid::from_raw(pid);
let waited = nix::sys::wait::waitpid(npid, Some(nix::sys::wait::WaitPidFlag::WNOHANG));
let still_running = matches!(waited, Ok(nix::sys::wait::WaitStatus::StillAlive),);
let _ = nix::sys::signal::killpg(npid, nix::sys::signal::Signal::SIGKILL);
let exit_info_source: Result<nix::sys::wait::WaitStatus, nix::errno::Errno> =
if still_running {
let _ = nix::sys::signal::kill(npid, nix::sys::signal::Signal::SIGKILL);
let _ = nix::sys::wait::waitpid(npid, None);
Ok(nix::sys::wait::WaitStatus::StillAlive)
} else {
waited
};
if let Ok(report) = serde_json::from_slice::<WorkerReport>(&buf) {
reports.push(report);
} else {
let exit_info = classify_wait_outcome(exit_info_source);
eprintln!(
"ktstr: worker pid={pid} returned no report ({} bytes read, exit={exit_info:?})",
buf.len()
);
reports.push(WorkerReport {
tid: pid,
exit_info: Some(exit_info),
..WorkerReport::default()
});
}
}
reports
}
}
impl Drop for WorkloadHandle {
fn drop(&mut self) {
use nix::sys::signal::{Signal, kill};
use nix::sys::wait::waitpid;
use nix::unistd::{Pid, close};
for &(pid, rfd, wfd) in &self.children {
let nix_pid = Pid::from_raw(pid);
if let Err(e) = nix::sys::signal::killpg(nix_pid, Signal::SIGKILL)
&& e != nix::errno::Errno::ESRCH
{
tracing::warn!(pid, %e, "killpg failed in WorkloadHandle::drop");
}
if let Err(e) = kill(nix_pid, Signal::SIGKILL) {
tracing::warn!(pid, %e, "kill failed in WorkloadHandle::drop");
}
if let Err(e) = waitpid(nix_pid, None) {
tracing::warn!(pid, %e, "waitpid failed in WorkloadHandle::drop");
}
for fd in [rfd, wfd] {
if fd >= 0
&& let Err(e) = close(fd)
{
tracing::warn!(fd, %e, "close failed in WorkloadHandle::drop");
}
}
}
for &ptr in &self.futex_ptrs {
unsafe {
libc::munmap(ptr as *mut libc::c_void, self.futex_region_size);
}
}
if !self.iter_counters.is_null() && self.iter_counter_len > 0 {
unsafe {
libc::munmap(
self.iter_counters as *mut libc::c_void,
self.iter_counter_len * std::mem::size_of::<u64>(),
);
}
}
}
}
use std::os::unix::io::FromRawFd;
static STOP: AtomicBool = AtomicBool::new(false);
#[inline]
fn clamp_futex_wake_n(n: usize) -> i32 {
n.min(i32::MAX as usize) as i32
}
fn mmap_shared_anon_errno_hint(errno: Option<i32>) -> &'static str {
match errno {
Some(libc::ENOMEM) => {
" (ENOMEM: host is out of memory \
or /proc/sys/vm/max_map_count is too low — \
check `sysctl vm.max_map_count` and `free -h`)"
}
Some(libc::EPERM) => {
" (EPERM: MAP_SHARED|MAP_ANONYMOUS \
rejected by the kernel — check memory cgroup \
limits and container seccomp policy)"
}
Some(libc::EINVAL) => {
" (EINVAL: invalid length or \
flag combination — verify num_workers > 0 so the \
region size is non-zero, and that the total size \
does not overflow usize)"
}
_ => "",
}
}
unsafe fn futex_wake(futex_ptr: *mut u32, n_waiters: i32) {
unsafe {
libc::syscall(
libc::SYS_futex,
futex_ptr,
libc::FUTEX_WAKE,
n_waiters,
std::ptr::null::<libc::timespec>(),
std::ptr::null::<u32>(),
0u32,
);
}
}
unsafe fn futex_wait(futex_ptr: *mut u32, expected: u32, ts: &libc::timespec) {
unsafe {
libc::syscall(
libc::SYS_futex,
futex_ptr,
libc::FUTEX_WAIT,
expected,
ts as *const libc::timespec,
std::ptr::null::<u32>(),
0u32,
);
}
}
#[allow(clippy::too_many_arguments)]
fn worker_main(
affinity: Option<BTreeSet<usize>>,
work_type: WorkType,
sched_policy: SchedPolicy,
mem_policy: MemPolicy,
mpol_flags: MpolFlags,
pipe_fds: Option<(i32, i32)>,
futex: Option<(*mut u32, bool)>,
iter_slot: *mut AtomicU64,
) -> WorkerReport {
let tid: libc::pid_t = unsafe { libc::getpid() };
if let Some(ref cpus) = affinity {
let _ = set_thread_affinity(tid, cpus);
}
let _ = set_sched_policy(tid, sched_policy);
apply_mempolicy_with_flags(&mem_policy, mpol_flags);
let start = Instant::now();
let mut work_units: u64 = 0;
let mut migration_count: u64 = 0;
let mut cpus_used = BTreeSet::new();
let mut migrations = Vec::new();
let mut last_cpu = sched_getcpu();
cpus_used.insert(last_cpu);
let mut last_iter_time = start;
let mut max_gap_ns: u64 = 0;
let mut max_gap_cpu: usize = last_cpu;
let mut max_gap_at_ns: u64 = 0;
let mut cache_pressure_buf: Option<Vec<u8>> = None;
let mut matrix_buf: Option<Vec<u64>> = None;
let mut io_sync_file: Option<(std::fs::File, String)> = None;
let mut io_seq_file: Option<(std::fs::File, String)> = None;
let mut page_fault_region: Option<(*mut libc::c_void, usize)> = None;
let mut page_fault_rng_state: u64 = 0;
const MAX_WAKE_SAMPLES: usize = 100_000;
let mut resume_latencies_ns: Vec<u64> = Vec::with_capacity(MAX_WAKE_SAMPLES);
let mut wake_sample_count: u64 = 0;
let mut iterations: u64 = 0;
if let WorkType::Custom { run, .. } = &work_type {
return run(&STOP);
}
let affinity_churn_cpus: Vec<usize> = if matches!(work_type, WorkType::AffinityChurn { .. }) {
let mut cpu_set: libc::cpu_set_t = unsafe { std::mem::zeroed() };
let ret = unsafe {
libc::sched_getaffinity(0, std::mem::size_of::<libc::cpu_set_t>(), &mut cpu_set)
};
if ret == 0 {
(0..libc::CPU_SETSIZE as usize)
.filter(|c| unsafe { libc::CPU_ISSET(*c, &cpu_set) })
.collect()
} else {
Vec::new()
}
} else {
Vec::new()
};
let policy_churn_policies: Vec<(i32, i32)> =
if matches!(work_type, WorkType::PolicyChurn { .. }) {
let mut policies = vec![
(libc::SCHED_OTHER, 0),
(libc::SCHED_BATCH, 0),
(libc::SCHED_IDLE, 0),
];
let param = libc::sched_param { sched_priority: 1 };
let ret = unsafe { libc::sched_setscheduler(0, libc::SCHED_FIFO, ¶m) };
if ret == 0 {
let normal = libc::sched_param { sched_priority: 0 };
unsafe { libc::sched_setscheduler(0, libc::SCHED_OTHER, &normal) };
policies.push((libc::SCHED_FIFO, 1));
policies.push((libc::SCHED_RR, 1));
}
policies
} else {
Vec::new()
};
let matrix_size: usize = if let WorkType::FanOutCompute {
cache_footprint_kb,
operations,
..
} = &work_type
{
if *operations > 0 && *cache_footprint_kb > 0 {
((cache_footprint_kb * 1024 / 3 / std::mem::size_of::<u64>()) as f64).sqrt() as usize
} else {
0
}
} else {
0
};
let vmstat_migrated_start = read_vmstat_numa_pages_migrated();
let schedstat_start = read_schedstat();
while !STOP.load(Ordering::Relaxed) {
match work_type {
WorkType::CpuSpin => {
spin_burst(&mut work_units, 1024);
iterations += 1;
}
WorkType::YieldHeavy => {
work_units = std::hint::black_box(work_units.wrapping_add(1));
std::thread::yield_now();
iterations += 1;
}
WorkType::Mixed => {
spin_burst(&mut work_units, 1024);
std::thread::yield_now();
iterations += 1;
}
WorkType::IoSync => {
let (f, _) = io_sync_file.get_or_insert_with(|| {
let path = std::env::temp_dir()
.join(format!("ktstr_io_{tid}"))
.to_string_lossy()
.to_string();
let f = std::fs::OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(&path)
.expect("failed to create IoSync temp file");
(f, path)
});
let _ = f.set_len(0);
let _ = f.seek(std::io::SeekFrom::Start(0));
let buf = [0u8; 4096];
for _ in 0..16 {
let _ = f.write_all(&buf);
work_units = std::hint::black_box(work_units.wrapping_add(1));
}
let before_sleep = Instant::now();
std::thread::sleep(Duration::from_micros(100));
reservoir_push(
&mut resume_latencies_ns,
&mut wake_sample_count,
before_sleep.elapsed().as_nanos() as u64,
MAX_WAKE_SAMPLES,
);
last_iter_time = Instant::now();
iterations += 1;
}
WorkType::Bursty { burst_ms, sleep_ms } => {
let burst_end = Instant::now() + Duration::from_millis(burst_ms);
while Instant::now() < burst_end && !STOP.load(Ordering::Relaxed) {
spin_burst(&mut work_units, 1024);
}
if !STOP.load(Ordering::Relaxed) {
let before_sleep = Instant::now();
std::thread::sleep(Duration::from_millis(sleep_ms));
reservoir_push(
&mut resume_latencies_ns,
&mut wake_sample_count,
before_sleep.elapsed().as_nanos() as u64,
MAX_WAKE_SAMPLES,
);
}
iterations += 1;
}
WorkType::PipeIo { burst_iters } => {
let (read_fd, write_fd) = pipe_fds.unwrap_or((-1, -1));
if read_fd < 0 || write_fd < 0 {
break;
}
spin_burst(&mut work_units, burst_iters);
pipe_exchange(
read_fd,
write_fd,
&mut resume_latencies_ns,
&mut wake_sample_count,
MAX_WAKE_SAMPLES,
);
last_iter_time = Instant::now();
iterations += 1;
}
WorkType::FutexPingPong { spin_iters } => {
let (futex_ptr, is_first) = match futex {
Some(f) => f,
None => break,
};
spin_burst(&mut work_units, spin_iters);
let my_val: u32 = if is_first { 0 } else { 1 };
let partner_val: u32 = if is_first { 1 } else { 0 };
let atom = unsafe { &*(futex_ptr as *const std::sync::atomic::AtomicU32) };
atom.store(partner_val, Ordering::Relaxed);
unsafe { futex_wake(futex_ptr, 1) };
let before_block = Instant::now();
let atom = unsafe { &*(futex_ptr as *const std::sync::atomic::AtomicU32) };
loop {
if STOP.load(Ordering::Relaxed) {
break;
}
let cur = atom.load(Ordering::Relaxed);
if cur == my_val {
reservoir_push(
&mut resume_latencies_ns,
&mut wake_sample_count,
before_block.elapsed().as_nanos() as u64,
MAX_WAKE_SAMPLES,
);
break;
}
unsafe { futex_wait(futex_ptr, partner_val, &FUTEX_WAIT_TIMEOUT) };
}
last_iter_time = Instant::now();
iterations += 1;
}
WorkType::CachePressure { size_kb, stride } => {
let buf = cache_pressure_buf.get_or_insert_with(|| vec![0u8; size_kb * 1024]);
if buf.is_empty() || stride == 0 {
break;
}
cache_rmw_loop(buf, stride, 1024, &mut work_units);
iterations += 1;
}
WorkType::CacheYield { size_kb, stride } => {
let buf = cache_pressure_buf.get_or_insert_with(|| vec![0u8; size_kb * 1024]);
if buf.is_empty() || stride == 0 {
break;
}
cache_rmw_loop(buf, stride, 1024, &mut work_units);
let before_yield = Instant::now();
std::thread::yield_now();
reservoir_push(
&mut resume_latencies_ns,
&mut wake_sample_count,
before_yield.elapsed().as_nanos() as u64,
MAX_WAKE_SAMPLES,
);
iterations += 1;
}
WorkType::CachePipe {
size_kb,
burst_iters,
} => {
let (read_fd, write_fd) = pipe_fds.unwrap_or((-1, -1));
if read_fd < 0 || write_fd < 0 {
break;
}
let buf = cache_pressure_buf.get_or_insert_with(|| vec![0u8; size_kb * 1024]);
if !buf.is_empty() {
cache_rmw_loop(buf, 64, burst_iters, &mut work_units);
}
pipe_exchange(
read_fd,
write_fd,
&mut resume_latencies_ns,
&mut wake_sample_count,
MAX_WAKE_SAMPLES,
);
last_iter_time = Instant::now();
iterations += 1;
}
WorkType::FutexFanOut {
fan_out,
spin_iters,
} => {
let (futex_ptr, is_messenger) = match futex {
Some(f) => f,
None => break,
};
spin_burst(&mut work_units, spin_iters);
let atom = unsafe { &*(futex_ptr as *const std::sync::atomic::AtomicU32) };
if is_messenger {
let next = atom.load(Ordering::Relaxed).wrapping_add(1);
let wake_n = clamp_futex_wake_n(fan_out);
atom.store(next, Ordering::Relaxed);
unsafe { futex_wake(futex_ptr, wake_n) };
spin_burst(&mut work_units, FAN_OUT_POST_WAKE_SPIN_ITERS);
} else {
let expected = atom.load(Ordering::Relaxed);
let before_block = Instant::now();
loop {
if STOP.load(Ordering::Relaxed) {
break;
}
let cur = atom.load(Ordering::Relaxed);
if cur != expected {
reservoir_push(
&mut resume_latencies_ns,
&mut wake_sample_count,
before_block.elapsed().as_nanos() as u64,
MAX_WAKE_SAMPLES,
);
break;
}
unsafe { futex_wait(futex_ptr, expected, &FUTEX_WAIT_TIMEOUT) };
}
}
last_iter_time = Instant::now();
iterations += 1;
}
WorkType::Sequence {
ref first,
ref rest,
} => {
for phase in std::iter::once(first).chain(rest.iter()) {
if STOP.load(Ordering::Relaxed) {
break;
}
match phase {
Phase::Spin(dur) => {
let end = Instant::now() + *dur;
while Instant::now() < end && !STOP.load(Ordering::Relaxed) {
spin_burst(&mut work_units, 1024);
}
}
Phase::Sleep(dur) => {
let before_sleep = Instant::now();
std::thread::sleep(*dur);
reservoir_push(
&mut resume_latencies_ns,
&mut wake_sample_count,
before_sleep.elapsed().as_nanos() as u64,
MAX_WAKE_SAMPLES,
);
last_iter_time = Instant::now();
}
Phase::Yield(dur) => {
let end = Instant::now() + *dur;
while Instant::now() < end && !STOP.load(Ordering::Relaxed) {
work_units = std::hint::black_box(work_units.wrapping_add(1));
let before_yield = Instant::now();
std::thread::yield_now();
reservoir_push(
&mut resume_latencies_ns,
&mut wake_sample_count,
before_yield.elapsed().as_nanos() as u64,
MAX_WAKE_SAMPLES,
);
}
last_iter_time = Instant::now();
}
Phase::Io(dur) => {
let end = Instant::now() + *dur;
let (f, _) = io_seq_file.get_or_insert_with(|| {
let path = std::env::temp_dir()
.join(format!("ktstr_seq_{tid}"))
.to_string_lossy()
.to_string();
let f = std::fs::OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(&path)
.expect("failed to create Phase::Io temp file");
(f, path)
});
while Instant::now() < end && !STOP.load(Ordering::Relaxed) {
let _ = f.set_len(0);
let _ = f.seek(std::io::SeekFrom::Start(0));
let buf = [0u8; 4096];
for _ in 0..16 {
let _ = f.write_all(&buf);
work_units = std::hint::black_box(work_units.wrapping_add(1));
}
let before_sleep = Instant::now();
std::thread::sleep(Duration::from_micros(100));
reservoir_push(
&mut resume_latencies_ns,
&mut wake_sample_count,
before_sleep.elapsed().as_nanos() as u64,
MAX_WAKE_SAMPLES,
);
}
last_iter_time = Instant::now();
}
}
}
iterations += 1;
}
WorkType::ForkExit => {
let pid = unsafe { libc::fork() };
match pid {
-1 => {
work_units = std::hint::black_box(work_units.wrapping_add(1));
iterations += 1;
}
0 => {
unsafe { libc::_exit(0) };
}
child => {
let mut status = 0i32;
let before_wait = Instant::now();
unsafe { libc::waitpid(child, &mut status, 0) };
reservoir_push(
&mut resume_latencies_ns,
&mut wake_sample_count,
before_wait.elapsed().as_nanos() as u64,
MAX_WAKE_SAMPLES,
);
work_units = std::hint::black_box(work_units.wrapping_add(1));
iterations += 1;
}
}
}
WorkType::NiceSweep => {
let effective_min: i32 = {
static PROBED_MIN: std::sync::atomic::AtomicI32 =
std::sync::atomic::AtomicI32::new(i32::MIN);
let cached = PROBED_MIN.load(Ordering::Relaxed);
if cached != i32::MIN {
cached
} else {
let ret = unsafe { libc::setpriority(libc::PRIO_PROCESS, 0, -20) };
let min = if ret == -1 {
0i32
} else {
unsafe { libc::setpriority(libc::PRIO_PROCESS, 0, 0) };
-20i32
};
PROBED_MIN.store(min, Ordering::Relaxed);
min
}
};
let range = (19 - effective_min + 1) as u64;
let nice_val = effective_min + (iterations % range) as i32;
spin_burst(&mut work_units, 512);
unsafe {
libc::setpriority(libc::PRIO_PROCESS, 0, nice_val);
}
let before_yield = Instant::now();
std::thread::yield_now();
reservoir_push(
&mut resume_latencies_ns,
&mut wake_sample_count,
before_yield.elapsed().as_nanos() as u64,
MAX_WAKE_SAMPLES,
);
iterations += 1;
}
WorkType::AffinityChurn { spin_iters } => {
spin_burst(&mut work_units, spin_iters);
if !affinity_churn_cpus.is_empty() {
use rand::RngExt;
let idx = rand::rng().random_range(0..affinity_churn_cpus.len());
let target = affinity_churn_cpus[idx];
let mut cpu_set: libc::cpu_set_t = unsafe { std::mem::zeroed() };
unsafe {
libc::CPU_ZERO(&mut cpu_set);
libc::CPU_SET(target, &mut cpu_set);
libc::sched_setaffinity(
0,
std::mem::size_of::<libc::cpu_set_t>(),
&cpu_set,
);
}
}
let before_yield = Instant::now();
std::thread::yield_now();
reservoir_push(
&mut resume_latencies_ns,
&mut wake_sample_count,
before_yield.elapsed().as_nanos() as u64,
MAX_WAKE_SAMPLES,
);
iterations += 1;
}
WorkType::PolicyChurn { spin_iters } => {
spin_burst(&mut work_units, spin_iters);
let idx = (iterations as usize) % policy_churn_policies.len().max(1);
let (pol, prio) = policy_churn_policies[idx];
let param = libc::sched_param {
sched_priority: prio,
};
unsafe {
libc::sched_setscheduler(0, pol, ¶m);
}
let before_yield = Instant::now();
std::thread::yield_now();
reservoir_push(
&mut resume_latencies_ns,
&mut wake_sample_count,
before_yield.elapsed().as_nanos() as u64,
MAX_WAKE_SAMPLES,
);
iterations += 1;
}
WorkType::FanOutCompute {
fan_out,
operations,
sleep_usec,
..
} => {
let (futex_ptr, is_messenger) = match futex {
Some(f) => f,
None => break,
};
let wake_ts_ptr = unsafe { (futex_ptr as *mut u8).add(8) as *mut u64 };
let gen_atom = unsafe { &*(futex_ptr as *const std::sync::atomic::AtomicU64) };
let wake_atom = unsafe { &*(wake_ts_ptr as *const std::sync::atomic::AtomicU64) };
if is_messenger {
if let Some(wake_ns) = clock_gettime_ns(libc::CLOCK_MONOTONIC) {
wake_atom.store(wake_ns, Ordering::Relaxed);
gen_atom.fetch_add(1, Ordering::Release);
unsafe { futex_wake(futex_ptr, clamp_futex_wake_n(fan_out)) };
}
spin_burst(&mut work_units, FAN_OUT_POST_WAKE_SPIN_ITERS);
} else {
let expected = gen_atom.load(Ordering::Relaxed);
let expected_low = expected as u32;
loop {
if STOP.load(Ordering::Relaxed) {
break;
}
let cur = gen_atom.load(Ordering::Acquire);
if cur != expected {
if let Some(now_ns) = clock_gettime_ns(libc::CLOCK_MONOTONIC) {
let wake_ns = wake_atom.load(Ordering::Relaxed);
let latency = now_ns.saturating_sub(wake_ns);
reservoir_push(
&mut resume_latencies_ns,
&mut wake_sample_count,
latency,
MAX_WAKE_SAMPLES,
);
}
break;
}
unsafe { futex_wait(futex_ptr, expected_low, &FUTEX_WAIT_TIMEOUT) };
}
if sleep_usec > 0 && !STOP.load(Ordering::Relaxed) {
std::thread::sleep(Duration::from_micros(sleep_usec));
}
if matrix_size > 0 && !STOP.load(Ordering::Relaxed) {
let buf = matrix_buf
.get_or_insert_with(|| vec![0u64; 3 * matrix_size * matrix_size]);
for _ in 0..operations {
matrix_multiply(buf, matrix_size, &mut work_units);
work_units = std::hint::black_box(work_units.wrapping_add(1));
}
}
}
last_iter_time = Instant::now();
iterations += 1;
}
WorkType::PageFaultChurn {
region_kb,
touches_per_cycle,
spin_iters,
} => {
let (ptr, region_size) = match page_fault_region {
Some(p) => p,
None => {
let region_size = match region_kb.checked_mul(1024) {
Some(v) => v,
None => {
tracing::warn!(
tid,
region_kb,
"PageFaultChurn region_kb * 1024 overflowed usize — worker exiting outer loop without doing page-fault work"
);
break;
}
};
let ptr = unsafe {
libc::mmap(
std::ptr::null_mut(),
region_size,
libc::PROT_READ | libc::PROT_WRITE,
libc::MAP_PRIVATE | libc::MAP_ANONYMOUS,
-1,
0,
)
};
if ptr == libc::MAP_FAILED {
break;
}
unsafe {
libc::madvise(ptr, region_size, libc::MADV_NOHUGEPAGE);
}
page_fault_rng_state = (tid as u64) | 1;
page_fault_region = Some((ptr, region_size));
(ptr, region_size)
}
};
let page_count = (region_size / 4096).max(1);
let xorshift64 = |state: &mut u64| -> u64 {
let mut x = *state;
x ^= x << 13;
x ^= x >> 7;
x ^= x << 17;
*state = x;
x
};
for _ in 0..touches_per_cycle {
let page_idx = (xorshift64(&mut page_fault_rng_state) as usize) % page_count;
let page_ptr = unsafe { (ptr as *mut u8).add(page_idx * 4096) };
unsafe { std::ptr::write_volatile(page_ptr, 1u8) };
work_units = std::hint::black_box(work_units.wrapping_add(1));
}
unsafe {
libc::madvise(ptr, region_size, libc::MADV_DONTNEED);
}
spin_burst(&mut work_units, spin_iters);
iterations += 1;
}
WorkType::MutexContention {
hold_iters,
work_iters,
..
} => {
let (futex_ptr, _) = match futex {
Some(f) => f,
None => break,
};
spin_burst(&mut work_units, work_iters);
let atom = unsafe { &*(futex_ptr as *const std::sync::atomic::AtomicU32) };
loop {
if STOP.load(Ordering::Relaxed) {
break;
}
if atom
.compare_exchange_weak(0, 1, Ordering::Acquire, Ordering::Relaxed)
.is_ok()
{
break;
}
let before_block = Instant::now();
unsafe {
futex_wait(
futex_ptr,
1u32,
&FUTEX_WAIT_TIMEOUT,
)
};
reservoir_push(
&mut resume_latencies_ns,
&mut wake_sample_count,
before_block.elapsed().as_nanos() as u64,
MAX_WAKE_SAMPLES,
);
}
spin_burst(&mut work_units, hold_iters);
atom.store(0, Ordering::Release);
unsafe { futex_wake(futex_ptr, 1) };
last_iter_time = Instant::now();
iterations += 1;
}
WorkType::Custom { .. } => unreachable!("handled by early return"),
}
if !iter_slot.is_null() {
unsafe { &*iter_slot }.store(iterations, Ordering::Relaxed);
}
if work_units.is_multiple_of(1024) {
let now = Instant::now();
let gap = now.duration_since(last_iter_time).as_nanos() as u64;
if gap > max_gap_ns {
max_gap_ns = gap;
max_gap_cpu = last_cpu;
max_gap_at_ns = now.duration_since(start).as_nanos() as u64;
}
if gap > 2_000_000_000 && !REPRO_MODE.load(std::sync::atomic::Ordering::Relaxed) {
let pid = SCHED_PID.load(std::sync::atomic::Ordering::Relaxed);
if pid > 0 {
let _ = nix::sys::signal::kill(
nix::unistd::Pid::from_raw(pid),
nix::sys::signal::Signal::SIGUSR2,
);
}
}
last_iter_time = now;
let cpu = sched_getcpu();
if cpu != last_cpu {
migration_count += 1;
cpus_used.insert(cpu);
migrations.push(Migration {
at_ns: now.duration_since(start).as_nanos() as u64,
from_cpu: last_cpu,
to_cpu: cpu,
});
last_cpu = cpu;
}
}
}
if matches!(work_type, WorkType::NiceSweep) {
unsafe { libc::setpriority(libc::PRIO_PROCESS, 0, 0) };
}
if matches!(work_type, WorkType::PolicyChurn { .. }) {
let param = libc::sched_param { sched_priority: 0 };
unsafe { libc::sched_setscheduler(0, libc::SCHED_OTHER, ¶m) };
}
if let Some((_, path)) = io_sync_file {
let _ = std::fs::remove_file(&path);
}
if let Some((_, path)) = io_seq_file {
let _ = std::fs::remove_file(&path);
}
if let Some((ptr, size)) = page_fault_region {
unsafe { libc::munmap(ptr, size) };
}
if !iter_slot.is_null() {
unsafe { &*iter_slot }.store(iterations, Ordering::Relaxed);
}
let wall_time = start.elapsed();
let cpu_time_ns = thread_cpu_time_ns();
let wall_time_ns = wall_time.as_nanos() as u64;
let schedstat_end = read_schedstat();
let (ss_delay_delta, ss_ts_delta, ss_cpu_delta) = match (schedstat_start, schedstat_end) {
(Some((cpu_s, delay_s, ts_s)), Some((cpu_e, delay_e, ts_e))) => (
delay_e.saturating_sub(delay_s),
ts_e.saturating_sub(ts_s),
cpu_e.saturating_sub(cpu_s),
),
_ => (0, 0, 0),
};
let numa_pages = read_numa_maps_pages();
let vmstat_migrated_end = read_vmstat_numa_pages_migrated();
let vmstat_migrated_delta = vmstat_migrated_end.saturating_sub(vmstat_migrated_start);
WorkerReport {
tid,
work_units,
cpu_time_ns,
wall_time_ns,
off_cpu_ns: wall_time_ns.saturating_sub(cpu_time_ns),
migration_count,
cpus_used,
migrations,
max_gap_ms: max_gap_ns / 1_000_000,
max_gap_cpu,
max_gap_at_ms: max_gap_at_ns / 1_000_000,
resume_latencies_ns,
wake_sample_total: wake_sample_count,
iterations,
schedstat_run_delay_ns: ss_delay_delta,
schedstat_run_count: ss_ts_delta,
schedstat_cpu_time_ns: ss_cpu_delta,
completed: true,
numa_pages,
vmstat_numa_pages_migrated: vmstat_migrated_delta,
exit_info: None,
is_messenger: matches!(
work_type,
WorkType::FutexFanOut { .. } | WorkType::FanOutCompute { .. }
) && futex.map(|(_, b)| b).unwrap_or(false),
}
}
#[inline(never)]
fn spin_burst(work_units: &mut u64, count: u64) {
for _ in 0..count {
*work_units = std::hint::black_box(work_units.wrapping_add(1));
std::hint::spin_loop();
}
}
#[inline(never)]
fn cache_rmw_loop(buf: &mut [u8], stride: usize, iters: u64, work_units: &mut u64) {
let len = buf.len();
let mut idx = 0;
for _ in 0..iters {
let cur = unsafe { std::ptr::read_volatile(&buf[idx]) };
unsafe { std::ptr::write_volatile(&mut buf[idx], cur.wrapping_add(1)) };
idx = (idx + stride) % len;
*work_units = std::hint::black_box(work_units.wrapping_add(1));
}
}
#[inline(never)]
fn matrix_multiply(data: &mut [u64], size: usize, work_units: &mut u64) {
debug_assert_eq!(data.len(), 3 * size * size);
let stride = size * size;
for i in 0..size {
for j in 0..size {
let mut acc: u64 = 0;
for k in 0..size {
acc = acc.wrapping_add(
std::hint::black_box(data[i * size + k])
.wrapping_mul(std::hint::black_box(data[stride + k * size + j])),
);
}
unsafe {
std::ptr::write_volatile(
&mut data[2 * stride + i * size + j] as *mut u64,
std::hint::black_box(acc),
);
}
}
}
*work_units = work_units.wrapping_add(std::hint::black_box(data[2 * stride]));
}
fn pipe_exchange(
read_fd: i32,
write_fd: i32,
resume_latencies_ns: &mut Vec<u64>,
wake_sample_count: &mut u64,
max_wake_samples: usize,
) {
unsafe { libc::write(write_fd, b"x".as_ptr() as *const _, 1) };
let before_block = Instant::now();
let mut pfd = libc::pollfd {
fd: read_fd,
events: libc::POLLIN,
revents: 0,
};
loop {
if STOP.load(Ordering::Relaxed) {
break;
}
let ret = unsafe { libc::poll(&mut pfd, 1, 100) };
if ret > 0 {
let mut byte = [0u8; 1];
unsafe { libc::read(read_fd, byte.as_mut_ptr() as *mut _, 1) };
reservoir_push(
resume_latencies_ns,
wake_sample_count,
before_block.elapsed().as_nanos() as u64,
max_wake_samples,
);
break;
}
if ret < 0 {
break;
}
}
}
extern "C" fn sigusr1_handler(_: libc::c_int) {
STOP.store(true, Ordering::Relaxed);
}
fn resolve_affinity(mode: &AffinityMode) -> Result<Option<BTreeSet<usize>>> {
match mode {
AffinityMode::None => Ok(None),
AffinityMode::Fixed(cpus) => Ok(Some(cpus.clone())),
AffinityMode::SingleCpu(cpu) => Ok(Some([*cpu].into_iter().collect())),
AffinityMode::Random { from, count } => {
use rand::seq::IndexedRandom;
if *count == 0 {
anyhow::bail!(
"AffinityMode::Random.count must be > 0; a zero count \
previously silently coerced to 1, masking caller bugs"
);
}
if from.is_empty() {
tracing::debug!(
count = count,
"resolve_affinity: empty Random pool, leaving affinity unset"
);
return Ok(None);
}
let pool: Vec<usize> = from.iter().copied().collect();
let count = (*count).min(pool.len());
Ok(Some(
pool.sample(&mut rand::rng(), count).copied().collect(),
))
}
}
}
fn sched_getcpu() -> usize {
nix::sched::sched_getcpu().unwrap_or(0)
}
fn reservoir_push(buf: &mut Vec<u64>, count: &mut u64, sample: u64, cap: usize) {
*count += 1;
if buf.len() < cap {
buf.push(sample);
} else {
use rand::RngExt;
let idx = rand::rng().random_range(0..*count) as usize;
if idx < cap {
buf[idx] = sample;
}
}
}
fn read_schedstat() -> Option<(u64, u64, u64)> {
let data = match std::fs::read_to_string("/proc/self/schedstat") {
Ok(d) => d,
Err(_) => {
warn_schedstat_unavailable_once();
return None;
}
};
parse_schedstat_line(&data)
}
fn parse_schedstat_line(data: &str) -> Option<(u64, u64, u64)> {
let mut parts = data.split_whitespace();
let cpu_time = parts.next()?.parse::<u64>().ok()?;
let run_delay = parts.next()?.parse::<u64>().ok()?;
let timeslices = parts.next()?.parse::<u64>().ok()?;
Some((cpu_time, run_delay, timeslices))
}
fn warn_schedstat_unavailable_once() {
static WARNED: std::sync::Once = std::sync::Once::new();
WARNED.call_once(|| {
eprintln!(
"workload: /proc/self/schedstat unavailable (CONFIG_SCHEDSTATS off?); \
schedstat_* fields in WorkerReport will be zero"
);
});
}
fn read_numa_maps_pages() -> BTreeMap<usize, u64> {
let content = match std::fs::read_to_string("/proc/self/numa_maps") {
Ok(c) => c,
Err(_) => return BTreeMap::new(),
};
let entries = crate::assert::parse_numa_maps(&content);
let mut totals: BTreeMap<usize, u64> = BTreeMap::new();
for entry in &entries {
for (&node, &count) in &entry.node_pages {
*totals.entry(node).or_insert(0) += count;
}
}
totals
}
fn read_vmstat_numa_pages_migrated() -> u64 {
let content = match std::fs::read_to_string("/proc/vmstat") {
Ok(c) => c,
Err(_) => return 0,
};
crate::assert::parse_vmstat_numa_pages_migrated(&content).unwrap_or(0)
}
fn clock_gettime_ns(clk: libc::clockid_t) -> Option<u64> {
let mut ts = libc::timespec {
tv_sec: 0,
tv_nsec: 0,
};
let rc = unsafe { libc::clock_gettime(clk, &mut ts) };
if rc != 0 {
warn_clock_gettime_failed_once(clk);
return None;
}
Some((ts.tv_sec as u64) * 1_000_000_000 + (ts.tv_nsec as u64))
}
fn warn_clock_gettime_failed_once(clk: libc::clockid_t) {
static WARNED_THREAD: std::sync::Once = std::sync::Once::new();
static WARNED_MONO: std::sync::Once = std::sync::Once::new();
let once = match clk {
libc::CLOCK_THREAD_CPUTIME_ID => &WARNED_THREAD,
libc::CLOCK_MONOTONIC => &WARNED_MONO,
_ => unreachable!("unexpected clockid {clk}"),
};
once.call_once(|| {
let errno = std::io::Error::last_os_error();
eprintln!(
"workload: clock_gettime(clk={clk}) failed: {errno}; affected samples will be zero or skipped"
);
});
}
fn thread_cpu_time_ns() -> u64 {
clock_gettime_ns(libc::CLOCK_THREAD_CPUTIME_ID).unwrap_or(0)
}
fn set_sched_policy(pid: libc::pid_t, policy: SchedPolicy) -> Result<()> {
if pid <= 0 {
anyhow::bail!("sched_setscheduler: invalid pid {pid} (must be > 0)");
}
let (pol, prio) = match policy {
SchedPolicy::Normal => return Ok(()),
SchedPolicy::Batch => (libc::SCHED_BATCH, 0),
SchedPolicy::Idle => (libc::SCHED_IDLE, 0),
SchedPolicy::Fifo(p) => (libc::SCHED_FIFO, p.clamp(1, 99) as i32),
SchedPolicy::RoundRobin(p) => (libc::SCHED_RR, p.clamp(1, 99) as i32),
};
let param = libc::sched_param {
sched_priority: prio,
};
if unsafe { libc::sched_setscheduler(pid, pol, ¶m) } != 0 {
anyhow::bail!("sched_setscheduler: {}", std::io::Error::last_os_error());
}
Ok(())
}
pub fn set_thread_affinity(pid: libc::pid_t, cpus: &BTreeSet<usize>) -> Result<()> {
use nix::sched::{CpuSet, sched_setaffinity};
use nix::unistd::Pid;
if pid <= 0 {
anyhow::bail!("sched_setaffinity: invalid pid {pid} (must be > 0)");
}
let mut cpu_set = CpuSet::new();
for &cpu in cpus {
cpu_set
.set(cpu)
.with_context(|| format!("CPU {cpu} out of range"))?;
}
sched_setaffinity(Pid::from_raw(pid), &cpu_set)
.with_context(|| format!("sched_setaffinity pid={pid}"))?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
fn spawn_and_collect_after(
work_type: WorkType,
num_workers: usize,
sleep_ms: u64,
) -> Vec<WorkerReport> {
let config = WorkloadConfig {
num_workers,
affinity: AffinityMode::None,
work_type,
sched_policy: SchedPolicy::Normal,
..Default::default()
};
let mut h = WorkloadHandle::spawn(&config).unwrap();
h.start();
std::thread::sleep(std::time::Duration::from_millis(sleep_ms));
h.stop_and_collect()
}
#[test]
fn mmap_shared_anon_errno_hint_variants() {
let enomem = mmap_shared_anon_errno_hint(Some(libc::ENOMEM));
assert!(
enomem.starts_with(' '),
"non-empty hint must begin with a space so \"{{errno}}{{hint}}\" has its separator; got {enomem:?}",
);
assert!(
enomem.contains("ENOMEM"),
"ENOMEM arm must name the errno in the hint; got {enomem:?}",
);
assert!(
enomem.contains("vm.max_map_count"),
"ENOMEM arm must mention the remediation sysctl; got {enomem:?}",
);
let eperm = mmap_shared_anon_errno_hint(Some(libc::EPERM));
assert!(eperm.starts_with(' '), "EPERM hint must start with a space");
assert!(
eperm.contains("EPERM"),
"EPERM arm must name the errno; got {eperm:?}",
);
assert!(
eperm.contains("cgroup"),
"EPERM arm must mention memory cgroup as a remediation path; got {eperm:?}",
);
let einval = mmap_shared_anon_errno_hint(Some(libc::EINVAL));
assert!(
einval.starts_with(' '),
"EINVAL hint must start with a space"
);
assert!(
einval.contains("EINVAL"),
"EINVAL arm must name the errno; got {einval:?}",
);
assert!(
einval.contains("num_workers > 0"),
"EINVAL arm must give the concrete `num_workers > 0` remediation \
(the older 'zero or misaligned' wording was too vague); got {einval:?}",
);
assert_eq!(
mmap_shared_anon_errno_hint(Some(libc::EACCES)),
"",
"unrecognised errno must fold to empty-string hint",
);
assert_eq!(
mmap_shared_anon_errno_hint(None),
"",
"None errno (io::Error without raw_os_error) must fold to empty-string",
);
}
#[test]
fn clock_gettime_ns_monotonic_non_decreasing() {
const N: usize = 1000;
let samples: Vec<u64> = (0..N)
.map(|i| {
clock_gettime_ns(libc::CLOCK_MONOTONIC).unwrap_or_else(|| {
panic!(
"CLOCK_MONOTONIC must be readable on any Linux host; \
sample {i}/{N} returned None"
)
})
})
.collect();
for i in 1..N {
assert!(
samples[i] >= samples[i - 1],
"CLOCK_MONOTONIC went backwards at sample {i}: \
prev={prev} curr={curr} (delta={delta})",
prev = samples[i - 1],
curr = samples[i],
delta = samples[i - 1] - samples[i],
);
}
}
#[test]
fn classify_wait_outcome_exited_preserves_code() {
let status = nix::sys::wait::WaitStatus::Exited(nix::unistd::Pid::from_raw(123), 42);
match classify_wait_outcome(Ok(status)) {
WorkerExitInfo::Exited(code) => assert_eq!(code, 42),
other => panic!("expected Exited(42), got {other:?}"),
}
}
#[test]
fn classify_wait_outcome_signaled_preserves_signum() {
let status = nix::sys::wait::WaitStatus::Signaled(
nix::unistd::Pid::from_raw(123),
nix::sys::signal::Signal::SIGABRT,
false,
);
match classify_wait_outcome(Ok(status)) {
WorkerExitInfo::Signaled(sig) => {
assert_eq!(sig, nix::sys::signal::Signal::SIGABRT as i32);
}
other => panic!("expected Signaled(SIGABRT), got {other:?}"),
}
}
#[test]
fn classify_wait_outcome_still_alive_maps_to_timed_out() {
match classify_wait_outcome(Ok(nix::sys::wait::WaitStatus::StillAlive)) {
WorkerExitInfo::TimedOut => {}
other => panic!("expected TimedOut, got {other:?}"),
}
}
#[test]
fn classify_wait_outcome_exotic_continued_maps_to_timed_out() {
let status = nix::sys::wait::WaitStatus::Continued(nix::unistd::Pid::from_raw(123));
match classify_wait_outcome(Ok(status)) {
WorkerExitInfo::TimedOut => {}
other => panic!("expected TimedOut (exotic→TimedOut), got {other:?}"),
}
}
#[test]
fn classify_wait_outcome_errno_maps_to_wait_failed() {
match classify_wait_outcome(Err(nix::errno::Errno::ECHILD)) {
WorkerExitInfo::WaitFailed(msg) => {
assert!(
msg.to_ascii_lowercase().contains("child"),
"expected ECHILD description to mention 'child', got {msg:?}",
);
}
other => panic!("expected WaitFailed, got {other:?}"),
}
}
#[test]
fn work_type_name_roundtrip() {
for &name in WorkType::ALL_NAMES {
if name == "Sequence" || name == "Custom" {
assert!(WorkType::from_name(name).is_none());
continue;
}
let wt = WorkType::from_name(name).unwrap();
assert_eq!(wt.name(), name);
}
}
#[test]
fn work_type_from_name_unknown() {
assert!(WorkType::from_name("Nonexistent").is_none());
}
#[test]
fn suggest_then_from_name_roundtrips_for_buildable_variants() {
assert!(WorkType::from_name("cpuspin").is_none());
let canonical = WorkType::suggest("cpuspin").expect("suggest must find CpuSpin");
assert_eq!(canonical, "CpuSpin");
let wt =
WorkType::from_name(canonical).expect("from_name must build from canonical spelling");
assert!(matches!(wt, WorkType::CpuSpin));
assert!(WorkType::from_name("YIELDHEAVY").is_none());
let canonical = WorkType::suggest("YIELDHEAVY").expect("suggest must find YieldHeavy");
assert_eq!(canonical, "YieldHeavy");
let wt = WorkType::from_name(canonical).expect("from_name must build");
assert!(matches!(wt, WorkType::YieldHeavy));
assert_eq!(WorkType::suggest("sequence"), Some("Sequence"));
assert!(WorkType::from_name("Sequence").is_none());
assert_eq!(WorkType::suggest("custom"), Some("Custom"));
assert!(WorkType::from_name("Custom").is_none());
}
#[test]
fn suggest_is_case_insensitive_and_canonical() {
assert_eq!(WorkType::suggest("cpuspin"), Some("CpuSpin"));
assert_eq!(WorkType::suggest("CPUSPIN"), Some("CpuSpin"));
assert_eq!(WorkType::suggest("CpuSpin"), Some("CpuSpin"));
assert_eq!(WorkType::suggest("YIELDHEAVY"), Some("YieldHeavy"));
assert_eq!(WorkType::suggest("sequence"), Some("Sequence"));
assert_eq!(WorkType::suggest("custom"), Some("Custom"));
assert!(WorkType::suggest("nonexistent").is_none());
assert!(WorkType::suggest("").is_none());
assert!(WorkType::suggest("cpu").is_none());
}
#[test]
fn suggest_rejects_whitespace_padded_inputs() {
assert!(WorkType::suggest(" CpuSpin").is_none());
assert!(WorkType::suggest("CpuSpin ").is_none());
assert!(WorkType::suggest(" CpuSpin ").is_none());
assert!(WorkType::suggest("CpuSpin\n").is_none());
assert!(WorkType::suggest("\tCpuSpin").is_none());
assert!(WorkType::suggest("CpuSpin\t").is_none());
assert!(WorkType::suggest("Cpu Spin").is_none());
assert!(WorkType::suggest(" ").is_none());
assert!(WorkType::suggest("\n").is_none());
assert_eq!(WorkType::suggest("CpuSpin"), Some("CpuSpin"));
}
#[test]
fn work_type_all_names_count() {
assert_eq!(WorkType::ALL_NAMES.len(), 20);
}
#[test]
fn matrix_multiply_1x1_produces_product() {
let mut data = vec![0u64; 3];
data[0] = 3; data[1] = 5; let mut work_units = 0u64;
matrix_multiply(&mut data, 1, &mut work_units);
assert_eq!(data[2], 15, "C = A * B for 1x1 matrix");
assert_eq!(work_units, 15, "post-loop sink folds C[0] into work_units");
}
#[test]
fn matrix_multiply_2x2_against_reference() {
let size = 2;
let stride = size * size;
let mut data = vec![0u64; 3 * stride];
data[0] = 1;
data[1] = 2;
data[2] = 3;
data[3] = 4;
data[stride] = 5;
data[stride + 1] = 6;
data[stride + 2] = 7;
data[stride + 3] = 8;
let mut work_units = 0u64;
matrix_multiply(&mut data, size, &mut work_units);
assert_eq!(data[2 * stride], 19);
assert_eq!(data[2 * stride + 1], 22);
assert_eq!(data[2 * stride + 2], 43);
assert_eq!(data[2 * stride + 3], 50);
}
#[test]
fn matrix_multiply_3x3_diagonal() {
let size = 3;
let stride = size * size;
let mut data = vec![0u64; 3 * stride];
data[0] = 2;
data[4] = 3;
data[8] = 5;
data[stride] = 1;
data[stride + 4] = 1;
data[stride + 8] = 1;
let mut work_units = 0u64;
matrix_multiply(&mut data, size, &mut work_units);
let c = &data[2 * stride..3 * stride];
assert_eq!(c[0], 2);
assert_eq!(c[4], 3);
assert_eq!(c[8], 5);
assert_eq!(c[1], 0);
assert_eq!(c[2], 0);
assert_eq!(c[3], 0);
assert_eq!(c[5], 0);
assert_eq!(c[6], 0);
assert_eq!(c[7], 0);
}
#[test]
#[cfg(debug_assertions)]
#[should_panic(expected = "assertion")]
fn matrix_multiply_mismatched_len_panics_in_debug() {
let mut data = vec![0u64; 5]; let mut work_units = 0u64;
matrix_multiply(&mut data, 2, &mut work_units);
}
#[test]
fn resolve_affinity_none() {
let r = resolve_affinity(&AffinityMode::None).unwrap();
assert!(r.is_none());
}
#[test]
fn resolve_affinity_fixed() {
let cpus: BTreeSet<usize> = [0, 1, 2].into_iter().collect();
let r = resolve_affinity(&AffinityMode::Fixed(cpus.clone())).unwrap();
assert_eq!(r, Some(cpus));
}
#[test]
fn resolve_affinity_single_cpu() {
let r = resolve_affinity(&AffinityMode::SingleCpu(5)).unwrap();
assert_eq!(r, Some([5].into_iter().collect()));
}
#[test]
fn resolve_affinity_random() {
let from: BTreeSet<usize> = (0..8).collect();
let r = resolve_affinity(&AffinityMode::Random { from, count: 3 }).unwrap();
let cpus = r.unwrap();
assert_eq!(cpus.len(), 3);
assert!(cpus.iter().all(|c| *c < 8));
}
#[test]
fn resolve_affinity_random_clamps_count() {
let from: BTreeSet<usize> = [0, 1].into_iter().collect();
let r = resolve_affinity(&AffinityMode::Random { from, count: 10 }).unwrap();
assert_eq!(r.unwrap().len(), 2);
}
#[test]
fn workload_config_default() {
let c = WorkloadConfig::default();
assert_eq!(c.num_workers, 1);
assert!(matches!(c.work_type, WorkType::CpuSpin));
assert!(matches!(c.sched_policy, SchedPolicy::Normal));
assert!(matches!(c.affinity, AffinityMode::None));
}
#[test]
fn workload_config_builder_setters_chain() {
let cfg = WorkloadConfig::default()
.workers(7)
.work_type(WorkType::CpuSpin)
.sched_policy(SchedPolicy::Batch);
assert_eq!(cfg.num_workers, 7);
assert!(matches!(cfg.work_type, WorkType::CpuSpin));
assert!(matches!(cfg.sched_policy, SchedPolicy::Batch));
}
#[test]
fn worker_report_serde_roundtrip() {
let r = WorkerReport {
tid: 42,
work_units: 1000,
cpu_time_ns: 5_000_000_000,
wall_time_ns: 10_000_000_000,
off_cpu_ns: 5_000_000_000,
migration_count: 3,
cpus_used: [0, 1, 2].into_iter().collect(),
migrations: vec![Migration {
at_ns: 100,
from_cpu: 0,
to_cpu: 1,
}],
max_gap_ms: 50,
max_gap_cpu: 1,
max_gap_at_ms: 500,
resume_latencies_ns: vec![1000, 2000],
wake_sample_total: 2,
iterations: 10,
schedstat_run_delay_ns: 500_000,
schedstat_run_count: 20,
schedstat_cpu_time_ns: 4_000_000_000,
completed: true,
numa_pages: BTreeMap::new(),
vmstat_numa_pages_migrated: 0,
exit_info: None,
is_messenger: true,
};
let json = serde_json::to_string(&r).unwrap();
let r2: WorkerReport = serde_json::from_str(&json).unwrap();
assert_eq!(r.tid, r2.tid);
assert_eq!(r.work_units, r2.work_units);
assert_eq!(r.migration_count, r2.migration_count);
assert_eq!(r.cpus_used, r2.cpus_used);
assert_eq!(r.max_gap_ms, r2.max_gap_ms);
assert_eq!(r.wake_sample_total, r2.wake_sample_total);
assert_eq!(r.completed, r2.completed);
assert_eq!(r.is_messenger, r2.is_messenger);
}
#[test]
fn migration_serde() {
let m = Migration {
at_ns: 12345,
from_cpu: 0,
to_cpu: 3,
};
let json = serde_json::to_string(&m).unwrap();
let m2: Migration = serde_json::from_str(&json).unwrap();
assert_eq!(m.at_ns, m2.at_ns);
assert_eq!(m.from_cpu, m2.from_cpu);
assert_eq!(m.to_cpu, m2.to_cpu);
}
#[test]
fn spawn_start_collect_integration() {
let config = WorkloadConfig {
num_workers: 2,
affinity: AffinityMode::None,
work_type: WorkType::CpuSpin,
sched_policy: SchedPolicy::Normal,
..Default::default()
};
let mut h = WorkloadHandle::spawn(&config).unwrap();
assert_eq!(h.worker_pids().len(), 2);
h.start();
std::thread::sleep(std::time::Duration::from_millis(200));
let reports = h.stop_and_collect();
assert_eq!(reports.len(), 2);
for r in &reports {
assert!(r.work_units > 0, "worker {} did no work", r.tid);
assert!(r.wall_time_ns > 0);
assert!(!r.cpus_used.is_empty());
}
}
#[test]
fn spawn_auto_start_on_collect() {
let config = WorkloadConfig {
num_workers: 1,
affinity: AffinityMode::None,
work_type: WorkType::CpuSpin,
sched_policy: SchedPolicy::Normal,
..Default::default()
};
let h = WorkloadHandle::spawn(&config).unwrap();
let reports = h.stop_and_collect();
assert_eq!(reports.len(), 1);
}
#[test]
fn spawn_yield_heavy_produces_work() {
let reports = spawn_and_collect_after(WorkType::YieldHeavy, 1, 200);
assert_eq!(reports.len(), 1);
assert!(reports[0].work_units > 0);
}
#[test]
fn spawn_mixed_produces_work() {
let reports = spawn_and_collect_after(WorkType::Mixed, 1, 200);
assert_eq!(reports.len(), 1);
assert!(reports[0].work_units > 0);
}
#[test]
fn spawn_pids_fit_in_pid_t() {
let config = WorkloadConfig {
num_workers: 4,
affinity: AffinityMode::None,
work_type: WorkType::CpuSpin,
sched_policy: SchedPolicy::Normal,
..Default::default()
};
let h = WorkloadHandle::spawn(&config).unwrap();
for pid in h.worker_pids() {
assert!(pid > 0, "child pid must be positive, got {pid}");
nix::sys::signal::kill(nix::unistd::Pid::from_raw(pid), None)
.unwrap_or_else(|e| panic!("spawned child pid {pid} not addressable: {e}"));
}
}
#[test]
fn handle_drop_reaps_children_and_closes_pipes() {
let config = WorkloadConfig {
num_workers: 2,
affinity: AffinityMode::None,
work_type: WorkType::PipeIo { burst_iters: 4 },
sched_policy: SchedPolicy::Normal,
..Default::default()
};
let h = WorkloadHandle::spawn(&config).unwrap();
let pids = h.worker_pids();
assert_eq!(pids.len(), 2, "both workers spawned");
drop(h);
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(2);
for pid in pids {
loop {
let alive = nix::sys::signal::kill(nix::unistd::Pid::from_raw(pid), None).is_ok();
if !alive {
break;
}
if std::time::Instant::now() >= deadline {
panic!("child {pid} still alive after drop deadline");
}
std::thread::sleep(std::time::Duration::from_millis(20));
}
}
}
#[test]
fn spawn_multiple_workers_distinct_pids() {
let config = WorkloadConfig {
num_workers: 4,
affinity: AffinityMode::None,
work_type: WorkType::CpuSpin,
sched_policy: SchedPolicy::Normal,
..Default::default()
};
let mut h = WorkloadHandle::spawn(&config).unwrap();
let pids = h.worker_pids();
assert_eq!(pids.len(), 4);
let unique: std::collections::HashSet<libc::pid_t> = pids.iter().copied().collect();
assert_eq!(unique.len(), 4, "all worker PIDs should be distinct");
h.start();
std::thread::sleep(std::time::Duration::from_millis(500));
let reports = h.stop_and_collect();
assert_eq!(reports.len(), 4);
}
#[test]
fn spawn_with_fixed_affinity() {
let config = WorkloadConfig {
num_workers: 1,
affinity: AffinityMode::Fixed([0].into_iter().collect()),
work_type: WorkType::CpuSpin,
sched_policy: SchedPolicy::Normal,
..Default::default()
};
let mut h = WorkloadHandle::spawn(&config).unwrap();
h.start();
std::thread::sleep(std::time::Duration::from_millis(200));
let reports = h.stop_and_collect();
assert_eq!(reports.len(), 1);
assert!(reports[0].cpus_used.contains(&0));
assert_eq!(reports[0].cpus_used.len(), 1, "should only use pinned CPU");
}
#[test]
fn drop_kills_children() {
let config = WorkloadConfig {
num_workers: 2,
..Default::default()
};
let h = WorkloadHandle::spawn(&config).unwrap();
let pids = h.worker_pids();
drop(h);
for pid in pids {
let alive = nix::sys::signal::kill(nix::unistd::Pid::from_raw(pid), None).is_ok();
assert!(!alive, "child {} should be dead after drop", pid);
}
}
fn count_open_fds() -> usize {
std::fs::read_dir("/proc/self/fd")
.map(|d| d.count())
.unwrap_or(0)
}
fn any_zombie_child() -> bool {
let mut status = 0i32;
let ret = unsafe { libc::waitpid(-1, &mut status, libc::WNOHANG) };
ret > 0
}
fn set_rlimit_nproc_zero_headroom() -> bool {
let rl = libc::rlimit {
rlim_cur: 0,
rlim_max: 0,
};
unsafe { libc::setrlimit(libc::RLIMIT_NPROC, &rl) == 0 }
}
fn run_in_forked_child<F: FnOnce() -> i32>(body: F) -> i32 {
let pid = unsafe { libc::fork() };
assert!(pid >= 0, "fork failed: {}", std::io::Error::last_os_error());
if pid == 0 {
let _ = std::panic::take_hook();
std::panic::set_hook(Box::new(|_| {}));
let code = std::panic::catch_unwind(std::panic::AssertUnwindSafe(body)).unwrap_or(99);
unsafe { libc::_exit(code) };
}
let mut status: libc::c_int = 0;
let waited = unsafe { libc::waitpid(pid, &mut status, 0) };
assert_eq!(
waited,
pid,
"waitpid({pid}) failed: {}",
std::io::Error::last_os_error()
);
if libc::WIFEXITED(status) {
libc::WEXITSTATUS(status)
} else {
100 + libc::WTERMSIG(status)
}
}
#[test]
fn spawn_guard_cleans_up_on_interworker_pipe_emfile() {
let code = run_in_forked_child(|| {
let baseline = count_open_fds();
let mut original_rlimit = libc::rlimit {
rlim_cur: 0,
rlim_max: 0,
};
if unsafe { libc::getrlimit(libc::RLIMIT_NOFILE, &mut original_rlimit) } != 0 {
return 13;
}
let target_cur = (baseline + 5) as u64;
let lowered = libc::rlimit {
rlim_cur: target_cur,
rlim_max: original_rlimit.rlim_max,
};
if unsafe { libc::setrlimit(libc::RLIMIT_NOFILE, &lowered) } != 0 {
return 13;
}
let config = WorkloadConfig {
num_workers: 4,
affinity: AffinityMode::None,
work_type: WorkType::PipeIo { burst_iters: 1 },
sched_policy: SchedPolicy::Normal,
..Default::default()
};
let result = WorkloadHandle::spawn(&config);
if result.is_ok() {
return 10; }
let err_msg = format!("{:#}", result.as_ref().err().unwrap());
if unsafe { libc::setrlimit(libc::RLIMIT_NOFILE, &original_rlimit) } != 0 {
return 15;
}
if !err_msg.contains("pipe failed") {
return 14;
}
let after = count_open_fds();
if after > baseline {
return 11; }
if any_zombie_child() {
return 12;
}
0
});
assert_eq!(
code, 0,
"child reported cleanup defect (code {code}): see exit-code table above \
spawn_guard_cleans_up_on_interworker_pipe_emfile"
);
}
#[test]
fn spawn_guard_cleans_up_on_fork_eagain() {
let code = run_in_forked_child(|| {
let baseline = count_open_fds();
if !set_rlimit_nproc_zero_headroom() {
return 13;
}
let config = WorkloadConfig {
num_workers: 1,
affinity: AffinityMode::None,
work_type: WorkType::CpuSpin,
sched_policy: SchedPolicy::Normal,
..Default::default()
};
let result = WorkloadHandle::spawn(&config);
if result.is_ok() {
return 10; }
let msg = format!("{:#}", result.err().unwrap());
if !msg.contains("fork failed") {
return 14;
}
let after = count_open_fds();
if after > baseline {
return 11;
}
if any_zombie_child() {
return 12;
}
0
});
assert_eq!(
code, 0,
"child reported cleanup defect (code {code}): see exit-code table above \
spawn_guard_cleans_up_on_fork_eagain"
);
}
#[test]
fn spawn_io_sync_produces_work() {
let reports = spawn_and_collect_after(WorkType::IoSync, 1, 200);
assert_eq!(reports.len(), 1);
assert!(reports[0].work_units > 0);
}
#[test]
fn spawn_bursty_produces_work() {
let reports = spawn_and_collect_after(
WorkType::Bursty {
burst_ms: 50,
sleep_ms: 50,
},
1,
300,
);
assert_eq!(reports.len(), 1);
assert!(reports[0].work_units > 0);
}
#[test]
fn spawn_pipeio_produces_work() {
let reports = spawn_and_collect_after(WorkType::PipeIo { burst_iters: 1024 }, 2, 300);
assert_eq!(reports.len(), 2);
for r in &reports {
assert!(r.work_units > 0, "PipeIo worker {} did no work", r.tid);
}
}
#[test]
fn spawn_pipeio_odd_workers_fails() {
let config = WorkloadConfig {
num_workers: 3,
affinity: AffinityMode::None,
work_type: WorkType::PipeIo { burst_iters: 1024 },
sched_policy: SchedPolicy::Normal,
..Default::default()
};
let result = WorkloadHandle::spawn(&config);
assert!(result.is_err(), "PipeIo with odd workers should fail");
let msg = format!("{:#}", result.err().unwrap());
assert!(
msg.contains("divisible by 2"),
"expected divisibility error: {msg}"
);
}
#[test]
fn sched_getcpu_valid() {
let cpu = super::sched_getcpu();
let max = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1);
assert!(cpu < max, "cpu {cpu} >= max {max}");
}
#[test]
fn thread_cpu_time_positive() {
let mut x = 0u64;
for i in 0..100_000 {
x = x.wrapping_add(i);
}
std::hint::black_box(x);
let t = super::thread_cpu_time_ns();
assert!(t > 0);
}
#[test]
fn set_thread_affinity_cpu_zero() {
let pid: libc::pid_t = unsafe { libc::getpid() };
let cpus: BTreeSet<usize> = [0].into_iter().collect();
let result = set_thread_affinity(pid, &cpus);
assert!(result.is_ok(), "pinning to CPU 0 should succeed");
}
#[test]
fn spawn_zero_workers() {
let config = WorkloadConfig {
num_workers: 0,
..Default::default()
};
let h = WorkloadHandle::spawn(&config).unwrap();
assert!(h.worker_pids().is_empty());
let reports = h.stop_and_collect();
assert!(reports.is_empty());
}
#[test]
fn workload_handle_drop_tolerates_externally_killed_child() {
let config = WorkloadConfig {
num_workers: 2,
affinity: AffinityMode::None,
work_type: WorkType::CpuSpin,
sched_policy: SchedPolicy::Normal,
..Default::default()
};
let mut h = WorkloadHandle::spawn(&config).unwrap();
let pids = h.worker_pids();
assert_eq!(pids.len(), 2);
h.start();
unsafe { libc::kill(pids[0], libc::SIGKILL) };
std::thread::sleep(std::time::Duration::from_millis(50));
drop(h);
}
#[test]
fn worker_pids_count_matches_num_workers() {
for n in [1, 3, 5] {
let config = WorkloadConfig {
num_workers: n,
..Default::default()
};
let h = WorkloadHandle::spawn(&config).unwrap();
assert_eq!(
h.worker_pids().len(),
n,
"worker_pids().len() should match num_workers={n}"
);
drop(h);
}
}
#[test]
fn worker_report_serde_edge_cases() {
let r = WorkerReport {
tid: 0,
work_units: 0,
cpu_time_ns: 0,
wall_time_ns: 0,
off_cpu_ns: 0,
migration_count: 0,
cpus_used: BTreeSet::new(),
migrations: vec![],
max_gap_ms: 0,
max_gap_cpu: 0,
max_gap_at_ms: 0,
resume_latencies_ns: vec![],
wake_sample_total: 0,
iterations: 0,
schedstat_run_delay_ns: 0,
schedstat_run_count: 0,
schedstat_cpu_time_ns: 0,
completed: true,
numa_pages: BTreeMap::new(),
vmstat_numa_pages_migrated: 0,
exit_info: None,
is_messenger: false,
};
let json = serde_json::to_string(&r).unwrap();
let r2: WorkerReport = serde_json::from_str(&json).unwrap();
assert_eq!(r2.tid, 0);
assert!(r2.cpus_used.is_empty());
assert!(r2.migrations.is_empty());
let r = WorkerReport {
tid: i32::MAX,
work_units: u64::MAX,
cpu_time_ns: u64::MAX,
wall_time_ns: u64::MAX,
off_cpu_ns: u64::MAX,
migration_count: u64::MAX,
cpus_used: [0, usize::MAX].into_iter().collect(),
migrations: vec![],
max_gap_ms: u64::MAX,
max_gap_cpu: usize::MAX,
max_gap_at_ms: u64::MAX,
resume_latencies_ns: vec![],
wake_sample_total: u64::MAX,
iterations: u64::MAX,
schedstat_run_delay_ns: u64::MAX,
schedstat_run_count: u64::MAX,
schedstat_cpu_time_ns: u64::MAX,
completed: true,
numa_pages: BTreeMap::new(),
vmstat_numa_pages_migrated: 0,
exit_info: None,
is_messenger: false,
};
let json = serde_json::to_string(&r).unwrap();
let r2: WorkerReport = serde_json::from_str(&json).unwrap();
assert_eq!(r2.work_units, u64::MAX);
assert_eq!(r2.tid, i32::MAX);
}
#[test]
fn io_sync_cleans_up_temp_file() {
let config = WorkloadConfig {
num_workers: 1,
affinity: AffinityMode::None,
work_type: WorkType::IoSync,
sched_policy: SchedPolicy::Normal,
..Default::default()
};
let mut h = WorkloadHandle::spawn(&config).unwrap();
h.start();
std::thread::sleep(std::time::Duration::from_millis(200));
let reports = h.stop_and_collect();
assert_eq!(reports.len(), 1);
let tid = reports[0].tid;
let path = std::env::temp_dir()
.join(format!("ktstr_io_{tid}"))
.to_string_lossy()
.to_string();
assert!(
!std::path::Path::new(&path).exists(),
"temp file {path} should be cleaned up"
);
}
#[test]
fn set_sched_pid_stores_value() {
set_sched_pid(12345);
let v = SCHED_PID.load(std::sync::atomic::Ordering::Relaxed);
assert_eq!(v, 12345);
set_sched_pid(0);
}
#[test]
fn set_repro_mode_stores_value() {
set_repro_mode(true);
assert!(REPRO_MODE.load(std::sync::atomic::Ordering::Relaxed));
set_repro_mode(false);
assert!(!REPRO_MODE.load(std::sync::atomic::Ordering::Relaxed));
}
#[test]
fn set_sched_policy_normal_succeeds() {
let pid: libc::pid_t = unsafe { libc::getpid() };
let result = set_sched_policy(pid, SchedPolicy::Normal);
assert!(result.is_ok());
}
#[test]
fn set_affinity_via_handle() {
let config = WorkloadConfig {
num_workers: 1,
affinity: AffinityMode::None,
work_type: WorkType::CpuSpin,
sched_policy: SchedPolicy::Normal,
..Default::default()
};
let mut h = WorkloadHandle::spawn(&config).unwrap();
h.start();
let cpus: BTreeSet<usize> = [0].into_iter().collect();
let result = h.set_affinity(0, &cpus);
assert!(result.is_ok());
std::thread::sleep(std::time::Duration::from_millis(100));
let reports = h.stop_and_collect();
assert_eq!(reports.len(), 1);
}
#[test]
fn work_type_bursty_defaults() {
let wt = WorkType::from_name("Bursty").unwrap();
if let WorkType::Bursty { burst_ms, sleep_ms } = wt {
assert_eq!(burst_ms, 50);
assert_eq!(sleep_ms, 100);
} else {
panic!("expected Bursty variant");
}
}
#[test]
fn work_type_pipeio_defaults() {
let wt = WorkType::from_name("PipeIo").unwrap();
if let WorkType::PipeIo { burst_iters } = wt {
assert_eq!(burst_iters, 1024);
} else {
panic!("expected PipeIo variant");
}
}
#[test]
fn start_idempotent() {
let config = WorkloadConfig {
num_workers: 1,
affinity: AffinityMode::None,
work_type: WorkType::CpuSpin,
sched_policy: SchedPolicy::Normal,
..Default::default()
};
let mut h = WorkloadHandle::spawn(&config).unwrap();
h.start();
h.start(); std::thread::sleep(std::time::Duration::from_millis(100));
let reports = h.stop_and_collect();
assert_eq!(reports.len(), 1);
assert!(reports[0].work_units > 0);
}
#[test]
fn spawn_pipeio_four_workers() {
let config = WorkloadConfig {
num_workers: 4,
affinity: AffinityMode::None,
work_type: WorkType::PipeIo { burst_iters: 512 },
sched_policy: SchedPolicy::Normal,
..Default::default()
};
let mut h = WorkloadHandle::spawn(&config).unwrap();
assert_eq!(h.worker_pids().len(), 4);
h.start();
std::thread::sleep(std::time::Duration::from_millis(300));
let reports = h.stop_and_collect();
assert_eq!(reports.len(), 4);
for r in &reports {
assert!(
r.work_units > 0,
"PipeIo 4-worker worker {} did no work",
r.tid
);
}
}
#[test]
#[ignore]
fn set_sched_policy_fifo_returns_result() {
let pid: libc::pid_t = unsafe { libc::getpid() };
let result = set_sched_policy(pid, SchedPolicy::Fifo(1));
assert!(
result.is_ok(),
"SCHED_FIFO should succeed with CAP_SYS_NICE"
);
restore_normal(pid);
}
#[test]
#[ignore]
fn set_sched_policy_rr_returns_result() {
let pid: libc::pid_t = unsafe { libc::getpid() };
let result = set_sched_policy(pid, SchedPolicy::RoundRobin(1));
assert!(result.is_ok(), "SCHED_RR should succeed with CAP_SYS_NICE");
restore_normal(pid);
}
#[test]
fn resolve_affinity_random_single_cpu_pool() {
let from: BTreeSet<usize> = [7].into_iter().collect();
let r = resolve_affinity(&AffinityMode::Random { from, count: 1 }).unwrap();
assert_eq!(r.unwrap(), [7].into_iter().collect());
}
fn restore_normal(pid: libc::pid_t) {
let param = libc::sched_param { sched_priority: 0 };
unsafe { libc::sched_setscheduler(pid, libc::SCHED_OTHER, ¶m) };
}
#[test]
fn set_sched_policy_batch_returns_valid_result() {
let pid: libc::pid_t = unsafe { libc::getpid() };
let result = set_sched_policy(pid, SchedPolicy::Batch);
match result {
Ok(()) => {
let pol = unsafe { libc::sched_getscheduler(pid) };
assert!(
pol >= 0,
"sched_getscheduler must return a valid policy, got {pol}",
);
restore_normal(pid);
}
Err(ref e) => {
let msg = format!("{e:#}");
assert!(
msg.contains("sched_setscheduler"),
"error must name the syscall: {msg}"
);
}
}
}
#[test]
fn set_sched_policy_idle_returns_valid_result() {
let pid: libc::pid_t = unsafe { libc::getpid() };
let result = set_sched_policy(pid, SchedPolicy::Idle);
match result {
Ok(()) => {
let pol = unsafe { libc::sched_getscheduler(pid) };
assert!(
pol >= 0,
"sched_getscheduler must return a valid policy, got {pol}",
);
restore_normal(pid);
}
Err(ref e) => {
let msg = format!("{e:#}");
assert!(
msg.contains("sched_setscheduler"),
"error must name the syscall: {msg}"
);
}
}
}
#[test]
fn sched_policy_debug_shows_variant_and_priority() {
let s = format!("{:?}", SchedPolicy::Fifo(50));
assert!(s.contains("Fifo"), "must show variant name");
assert!(s.contains("50"), "must show priority value");
let s = format!("{:?}", SchedPolicy::RoundRobin(99));
assert!(s.contains("RoundRobin"), "must show variant name");
assert!(s.contains("99"), "must show priority value");
let s1 = format!("{:?}", SchedPolicy::Fifo(1));
let s10 = format!("{:?}", SchedPolicy::Fifo(10));
assert_ne!(
s1, s10,
"different priorities must produce different debug output"
);
}
#[test]
fn work_type_debug_shows_field_values() {
let s = format!(
"{:?}",
WorkType::Bursty {
burst_ms: 10,
sleep_ms: 20
}
);
assert!(s.contains("10"), "must show burst_ms value");
assert!(s.contains("20"), "must show sleep_ms value");
let s2 = format!(
"{:?}",
WorkType::Bursty {
burst_ms: 99,
sleep_ms: 1
}
);
assert!(s2.contains("99"), "must show changed burst_ms");
assert!(s2.contains("1"), "must show changed sleep_ms");
assert_ne!(
s, s2,
"different field values must produce different debug output"
);
}
#[test]
fn affinity_mode_debug_shows_cpus() {
let a = AffinityMode::Fixed([0, 1, 7].into_iter().collect());
let s = format!("{:?}", a);
assert!(s.contains("0"), "must show CPU 0");
assert!(s.contains("1"), "must show CPU 1");
assert!(s.contains("7"), "must show CPU 7");
let b = AffinityMode::Fixed([3, 4].into_iter().collect());
let s2 = format!("{:?}", b);
assert!(s2.contains("3"), "must show CPU 3");
assert_ne!(
s, s2,
"different CPU sets must produce different debug output"
);
}
#[test]
fn affinity_mode_clone_preserves_cpus() {
let cpus: BTreeSet<usize> = [2, 5, 7].into_iter().collect();
let a = AffinityMode::Random {
from: cpus.clone(),
count: 2,
};
let b = a.clone();
match b {
AffinityMode::Random { from, count } => {
assert_eq!(from, cpus, "cloned from set must match original");
assert_eq!(count, 2, "cloned count must match original");
}
_ => panic!("clone must preserve variant"),
}
}
#[test]
fn workload_config_debug_shows_field_values() {
let c = WorkloadConfig {
num_workers: 7,
affinity: AffinityMode::SingleCpu(3),
work_type: WorkType::YieldHeavy,
sched_policy: SchedPolicy::Batch,
..Default::default()
};
let s = format!("{:?}", c);
assert!(s.contains("7"), "must show num_workers value");
assert!(s.contains("SingleCpu"), "must show affinity variant");
assert!(s.contains("3"), "must show affinity CPU");
assert!(s.contains("YieldHeavy"), "must show work_type variant");
assert!(s.contains("Batch"), "must show sched_policy variant");
}
#[test]
fn migration_debug_shows_field_values() {
let m = Migration {
at_ns: 99999,
from_cpu: 3,
to_cpu: 7,
};
let s = format!("{:?}", m);
assert!(s.contains("99999"), "must show at_ns value");
assert!(s.contains("3"), "must show from_cpu value");
assert!(s.contains("7"), "must show to_cpu value");
let m2 = Migration {
at_ns: 1,
from_cpu: 0,
to_cpu: 1,
};
let s2 = format!("{:?}", m2);
assert_ne!(
s, s2,
"different field values must produce different debug output"
);
}
#[test]
fn worker_report_debug_shows_field_values() {
let r = WorkerReport {
tid: 42,
work_units: 12345,
cpu_time_ns: 1000,
wall_time_ns: 2000,
off_cpu_ns: 1000,
migration_count: 3,
cpus_used: [0, 5].into_iter().collect(),
migrations: vec![],
max_gap_ms: 77,
max_gap_cpu: 5,
max_gap_at_ms: 500,
resume_latencies_ns: vec![],
wake_sample_total: 0,
iterations: 0,
schedstat_run_delay_ns: 0,
schedstat_run_count: 0,
schedstat_cpu_time_ns: 0,
completed: true,
numa_pages: BTreeMap::new(),
vmstat_numa_pages_migrated: 0,
exit_info: None,
is_messenger: false,
};
let s = format!("{:?}", r);
assert!(s.contains("42"), "must show tid value");
assert!(s.contains("12345"), "must show work_units value");
assert!(s.contains("77"), "must show max_gap_ms value");
assert!(s.contains("5"), "must show max_gap_cpu value");
}
#[test]
fn work_type_clone_preserves_variant() {
let a = WorkType::PipeIo { burst_iters: 512 };
let b = a.clone();
match b {
WorkType::PipeIo { burst_iters } => assert_eq!(burst_iters, 512),
_ => panic!("clone must preserve variant and fields"),
}
}
#[test]
fn sched_policy_copy_preserves_priority() {
let a = SchedPolicy::Fifo(42);
let b = a; match b {
SchedPolicy::Fifo(p) => assert_eq!(p, 42),
_ => panic!("copy must preserve variant and priority"),
}
}
#[test]
fn worker_report_off_cpu_ns_calculation() {
let r = WorkerReport {
tid: 1,
work_units: 100,
cpu_time_ns: 3_000_000_000,
wall_time_ns: 5_000_000_000,
off_cpu_ns: 2_000_000_000,
migration_count: 0,
cpus_used: [0].into_iter().collect(),
migrations: vec![],
max_gap_ms: 0,
max_gap_cpu: 0,
max_gap_at_ms: 0,
resume_latencies_ns: vec![],
wake_sample_total: 0,
iterations: 0,
schedstat_run_delay_ns: 0,
schedstat_run_count: 0,
schedstat_cpu_time_ns: 0,
completed: true,
numa_pages: BTreeMap::new(),
vmstat_numa_pages_migrated: 0,
exit_info: None,
is_messenger: false,
};
assert_eq!(r.off_cpu_ns, r.wall_time_ns - r.cpu_time_ns);
}
#[test]
fn migration_serde_multiple() {
let migrations = vec![
Migration {
at_ns: 100,
from_cpu: 0,
to_cpu: 1,
},
Migration {
at_ns: 200,
from_cpu: 1,
to_cpu: 2,
},
Migration {
at_ns: 300,
from_cpu: 2,
to_cpu: 0,
},
];
let json = serde_json::to_string(&migrations).unwrap();
let m2: Vec<Migration> = serde_json::from_str(&json).unwrap();
assert_eq!(m2.len(), 3);
assert_eq!(m2[0].from_cpu, 0);
assert_eq!(m2[2].to_cpu, 0);
}
#[test]
fn resolve_affinity_random_zero_count_rejected() {
let from: BTreeSet<usize> = (0..4).collect();
let err = resolve_affinity(&AffinityMode::Random { from, count: 0 }).unwrap_err();
let msg = format!("{err}");
assert!(
msg.contains("count") && msg.contains("> 0"),
"error must name the field: {msg}"
);
}
#[test]
fn resolve_affinity_random_empty_pool_is_none() {
let from: BTreeSet<usize> = BTreeSet::new();
let r = resolve_affinity(&AffinityMode::Random { from, count: 1 }).unwrap();
assert!(r.is_none(), "empty Random pool must resolve to no affinity");
}
#[test]
fn reservoir_push_empty_buf() {
let mut buf = Vec::new();
let mut count = 0u64;
reservoir_push(&mut buf, &mut count, 42, 10);
assert_eq!(buf, vec![42]);
assert_eq!(count, 1);
}
#[test]
fn reservoir_push_under_cap() {
let mut buf = Vec::new();
let mut count = 0u64;
for i in 0..5 {
reservoir_push(&mut buf, &mut count, i * 100, 10);
}
assert_eq!(buf.len(), 5);
assert_eq!(count, 5);
assert_eq!(buf, vec![0, 100, 200, 300, 400]);
}
#[test]
fn reservoir_push_at_cap() {
let mut buf = Vec::new();
let mut count = 0u64;
for i in 0..10 {
reservoir_push(&mut buf, &mut count, i, 10);
}
assert_eq!(buf.len(), 10);
assert_eq!(count, 10);
for i in 0..10 {
assert!(buf.contains(&i), "missing {i}");
}
}
#[test]
fn reservoir_push_over_cap_maintains_size() {
let mut buf = Vec::new();
let mut count = 0u64;
let cap = 5;
for i in 0..1000 {
reservoir_push(&mut buf, &mut count, i, cap);
}
assert_eq!(buf.len(), cap);
assert_eq!(count, 1000);
}
#[test]
fn reservoir_push_uniform_sampling() {
let mut buf = Vec::new();
let mut count = 0u64;
let cap = 100;
let total = 10_000u64;
for i in 0..total {
reservoir_push(&mut buf, &mut count, i, cap);
}
assert_eq!(buf.len(), cap);
assert_eq!(count, total);
let has_early = buf.iter().any(|&v| v < total / 4);
let has_late = buf.iter().any(|&v| v > total * 3 / 4);
assert!(has_early, "reservoir should contain early values");
assert!(has_late, "reservoir should contain late values");
}
#[test]
fn reservoir_push_cap_zero() {
let mut buf = Vec::new();
let mut count = 0u64;
for i in 0..10 {
reservoir_push(&mut buf, &mut count, i, 0);
}
assert!(buf.is_empty(), "cap=0 should never store samples");
assert_eq!(count, 10, "count incremented regardless");
}
#[test]
fn reservoir_push_cap_one() {
let mut buf = Vec::new();
let mut count = 0u64;
reservoir_push(&mut buf, &mut count, 42, 1);
assert_eq!(buf, vec![42]);
assert_eq!(count, 1);
for i in 1..100 {
reservoir_push(&mut buf, &mut count, i * 100, 1);
}
assert_eq!(buf.len(), 1);
assert_eq!(count, 100);
}
#[test]
fn read_schedstat_returns_finite_triple() {
let Some((cpu_time, _run_delay, timeslices)) = read_schedstat() else {
eprintln!("skipping: /proc/self/schedstat not available (CONFIG_SCHEDSTATS off)");
return;
};
assert!(cpu_time > 0);
assert!(timeslices > 0);
}
#[test]
fn parse_schedstat_line_happy_path() {
let (cpu_time, run_delay, timeslices) =
parse_schedstat_line("100 200 300 999 extra").unwrap();
assert_eq!(cpu_time, 100);
assert_eq!(run_delay, 200);
assert_eq!(timeslices, 300);
}
#[test]
fn parse_schedstat_line_tab_and_newline_separators() {
let parsed = parse_schedstat_line("1\t2\t3\n").unwrap();
assert_eq!(parsed, (1, 2, 3));
}
#[test]
fn parse_schedstat_line_missing_field_returns_none() {
assert!(parse_schedstat_line("100 200").is_none());
assert!(parse_schedstat_line("100").is_none());
assert!(parse_schedstat_line("").is_none());
assert!(parse_schedstat_line(" \t\n ").is_none());
}
#[test]
fn parse_schedstat_line_non_u64_token_returns_none() {
assert!(parse_schedstat_line("not-a-number 200 300").is_none());
assert!(parse_schedstat_line("100 abc 300").is_none());
assert!(parse_schedstat_line("100 200 nan").is_none());
assert!(parse_schedstat_line("-1 200 300").is_none());
assert!(parse_schedstat_line("99999999999999999999 2 3").is_none());
}
#[test]
fn warn_schedstat_unavailable_once_does_not_panic_on_repeat() {
for _ in 0..10 {
warn_schedstat_unavailable_once();
}
}
#[test]
fn spawn_futex_fan_out_produces_work() {
let reports = spawn_and_collect_after(
WorkType::FutexFanOut {
fan_out: 4,
spin_iters: 1024,
},
5, 500,
);
assert_eq!(reports.len(), 5);
for r in &reports {
assert!(r.work_units > 0, "FutexFanOut worker {} did no work", r.tid);
}
}
#[test]
fn spawn_futex_fan_out_receivers_record_wake_latency() {
let config = WorkloadConfig {
num_workers: 5,
affinity: AffinityMode::None,
work_type: WorkType::FutexFanOut {
fan_out: 4,
spin_iters: 512,
},
sched_policy: SchedPolicy::Normal,
..Default::default()
};
let mut h = WorkloadHandle::spawn(&config).unwrap();
h.start();
std::thread::sleep(std::time::Duration::from_millis(500));
let reports = h.stop_and_collect();
let has_latencies = reports.iter().any(|r| !r.resume_latencies_ns.is_empty());
assert!(has_latencies, "receivers should record wake latencies");
}
#[test]
fn spawn_futex_fan_out_bad_worker_count_fails() {
let config = WorkloadConfig {
num_workers: 3, affinity: AffinityMode::None,
work_type: WorkType::FutexFanOut {
fan_out: 4,
spin_iters: 1024,
},
sched_policy: SchedPolicy::Normal,
..Default::default()
};
let result = WorkloadHandle::spawn(&config);
assert!(result.is_err());
let msg = format!("{:#}", result.err().unwrap());
assert!(
msg.contains("divisible by 5"),
"expected divisibility error: {msg}"
);
}
#[test]
fn spawn_futex_fan_out_two_groups() {
let config = WorkloadConfig {
num_workers: 10, affinity: AffinityMode::None,
work_type: WorkType::FutexFanOut {
fan_out: 4,
spin_iters: 512,
},
sched_policy: SchedPolicy::Normal,
..Default::default()
};
let mut h = WorkloadHandle::spawn(&config).unwrap();
assert_eq!(h.worker_pids().len(), 10);
h.start();
std::thread::sleep(std::time::Duration::from_millis(500));
let reports = h.stop_and_collect();
assert_eq!(reports.len(), 10);
for r in &reports {
assert!(r.work_units > 0, "worker {} did no work", r.tid);
}
}
#[test]
fn spawn_futex_fan_out_single_receiver() {
let config = WorkloadConfig {
num_workers: 2,
affinity: AffinityMode::None,
work_type: WorkType::FutexFanOut {
fan_out: 1,
spin_iters: 1024,
},
sched_policy: SchedPolicy::Normal,
..Default::default()
};
let mut h = WorkloadHandle::spawn(&config).unwrap();
h.start();
std::thread::sleep(std::time::Duration::from_millis(300));
let reports = h.stop_and_collect();
assert_eq!(reports.len(), 2);
for r in &reports {
assert!(r.work_units > 0, "worker {} did no work", r.tid);
}
}
#[test]
fn work_type_futex_fan_out_name() {
let wt = WorkType::FutexFanOut {
fan_out: 4,
spin_iters: 1024,
};
assert_eq!(wt.name(), "FutexFanOut");
}
#[test]
fn work_type_futex_fan_out_from_name() {
let wt = WorkType::from_name("FutexFanOut").unwrap();
match wt {
WorkType::FutexFanOut {
fan_out,
spin_iters,
} => {
assert_eq!(fan_out, 4);
assert_eq!(spin_iters, 1024);
}
_ => panic!("expected FutexFanOut"),
}
}
#[test]
fn work_type_futex_fan_out_group_size() {
let wt = WorkType::FutexFanOut {
fan_out: 4,
spin_iters: 1024,
};
assert_eq!(wt.worker_group_size(), Some(5));
}
#[test]
fn snapshot_iterations_empty_handle() {
let config = WorkloadConfig {
num_workers: 0,
..Default::default()
};
let h = WorkloadHandle::spawn(&config).unwrap();
assert!(h.snapshot_iterations().is_empty());
drop(h);
}
#[test]
fn snapshot_iterations_running_workers() {
let config = WorkloadConfig {
num_workers: 2,
affinity: AffinityMode::None,
work_type: WorkType::CpuSpin,
sched_policy: SchedPolicy::Normal,
..Default::default()
};
let mut h = WorkloadHandle::spawn(&config).unwrap();
h.start();
std::thread::sleep(std::time::Duration::from_millis(200));
let iters = h.snapshot_iterations();
assert_eq!(iters.len(), 2);
for (i, &v) in iters.iter().enumerate() {
assert!(v > 0, "worker {i} should have iterations > 0, got {v}");
}
drop(h);
}
#[test]
fn worker_group_size_paired() {
assert_eq!(WorkType::pipe_io(100).worker_group_size(), Some(2));
assert_eq!(WorkType::futex_ping_pong(100).worker_group_size(), Some(2));
assert_eq!(WorkType::cache_pipe(32, 100).worker_group_size(), Some(2));
}
#[test]
fn worker_group_size_fan_out() {
assert_eq!(WorkType::futex_fan_out(4, 100).worker_group_size(), Some(5));
assert_eq!(WorkType::futex_fan_out(1, 100).worker_group_size(), Some(2));
}
#[test]
fn worker_group_size_ungrouped() {
assert_eq!(WorkType::CpuSpin.worker_group_size(), None);
assert_eq!(WorkType::YieldHeavy.worker_group_size(), None);
assert_eq!(WorkType::Mixed.worker_group_size(), None);
assert_eq!(WorkType::IoSync.worker_group_size(), None);
assert_eq!(WorkType::bursty(50, 100).worker_group_size(), None);
assert_eq!(WorkType::cache_pressure(32, 64).worker_group_size(), None);
assert_eq!(WorkType::cache_yield(32, 64).worker_group_size(), None);
}
#[test]
fn needs_shared_mem_futex_types() {
assert!(WorkType::futex_ping_pong(100).needs_shared_mem());
assert!(WorkType::futex_fan_out(4, 100).needs_shared_mem());
}
#[test]
fn needs_shared_mem_non_futex() {
assert!(!WorkType::CpuSpin.needs_shared_mem());
assert!(!WorkType::pipe_io(100).needs_shared_mem());
assert!(!WorkType::cache_pipe(32, 100).needs_shared_mem());
assert!(!WorkType::cache_pressure(32, 64).needs_shared_mem());
}
#[test]
fn needs_cache_buf_cache_types() {
assert!(WorkType::cache_pressure(32, 64).needs_cache_buf());
assert!(WorkType::cache_yield(32, 64).needs_cache_buf());
assert!(WorkType::cache_pipe(32, 100).needs_cache_buf());
}
#[test]
fn needs_cache_buf_non_cache() {
assert!(!WorkType::CpuSpin.needs_cache_buf());
assert!(!WorkType::pipe_io(100).needs_cache_buf());
assert!(!WorkType::futex_ping_pong(100).needs_cache_buf());
assert!(!WorkType::futex_fan_out(4, 100).needs_cache_buf());
}
#[test]
fn resolve_work_type_not_swappable() {
let base = WorkType::CpuSpin;
let over = WorkType::YieldHeavy;
let result = resolve_work_type(&base, Some(&over), false, 4);
assert!(matches!(result, WorkType::CpuSpin));
}
#[test]
fn resolve_work_type_swappable_applies_override() {
let base = WorkType::CpuSpin;
let over = WorkType::YieldHeavy;
let result = resolve_work_type(&base, Some(&over), true, 4);
assert!(matches!(result, WorkType::YieldHeavy));
}
#[test]
fn resolve_work_type_swappable_no_override() {
let base = WorkType::CpuSpin;
let result = resolve_work_type(&base, None, true, 4);
assert!(matches!(result, WorkType::CpuSpin));
}
#[test]
fn resolve_work_type_group_size_mismatch() {
let base = WorkType::CpuSpin;
let over = WorkType::pipe_io(100); let result = resolve_work_type(&base, Some(&over), true, 3); assert!(matches!(result, WorkType::CpuSpin));
}
#[test]
fn resolve_work_type_group_size_match() {
let base = WorkType::CpuSpin;
let over = WorkType::pipe_io(100); let result = resolve_work_type(&base, Some(&over), true, 4); assert!(matches!(result, WorkType::PipeIo { .. }));
}
#[test]
fn resolve_work_type_fan_out_group_size() {
let base = WorkType::CpuSpin;
let over = WorkType::futex_fan_out(3, 100); let result = resolve_work_type(&base, Some(&over), true, 8); assert!(matches!(result, WorkType::FutexFanOut { .. }));
let fail = resolve_work_type(&base, Some(&over), true, 6); assert!(matches!(fail, WorkType::CpuSpin));
}
#[test]
fn work_builder_chain() {
let w = Work::default()
.workers(8)
.work_type(WorkType::bursty(10, 20))
.sched_policy(SchedPolicy::Batch)
.affinity(AffinityKind::SingleCpu);
assert_eq!(w.num_workers, Some(8));
assert!(matches!(
w.work_type,
WorkType::Bursty {
burst_ms: 10,
sleep_ms: 20
}
));
assert!(matches!(w.sched_policy, SchedPolicy::Batch));
assert!(matches!(w.affinity, AffinityKind::SingleCpu));
}
#[test]
fn work_default_values() {
let w = Work::default();
assert_eq!(w.num_workers, None);
assert!(matches!(w.work_type, WorkType::CpuSpin));
assert!(matches!(w.sched_policy, SchedPolicy::Normal));
assert!(matches!(w.affinity, AffinityKind::Inherit));
}
#[test]
fn sched_policy_fifo_constructor() {
match SchedPolicy::fifo(50) {
SchedPolicy::Fifo(p) => assert_eq!(p, 50),
_ => panic!("expected Fifo"),
}
}
#[test]
fn sched_policy_rr_constructor() {
match SchedPolicy::round_robin(25) {
SchedPolicy::RoundRobin(p) => assert_eq!(p, 25),
_ => panic!("expected RoundRobin"),
}
}
#[test]
fn spawn_futex_ping_pong_produces_work() {
let reports = spawn_and_collect_after(WorkType::FutexPingPong { spin_iters: 1024 }, 2, 500);
assert_eq!(reports.len(), 2);
for r in &reports {
assert!(
r.work_units > 0,
"FutexPingPong worker {} did no work",
r.tid
);
}
}
#[test]
fn spawn_cache_pressure_produces_work() {
let reports = spawn_and_collect_after(
WorkType::CachePressure {
size_kb: 32,
stride: 64,
},
1,
200,
);
assert_eq!(reports.len(), 1);
assert!(reports[0].work_units > 0);
}
#[test]
fn spawn_cache_yield_produces_work() {
let reports = spawn_and_collect_after(
WorkType::CacheYield {
size_kb: 32,
stride: 64,
},
1,
200,
);
assert_eq!(reports.len(), 1);
assert!(reports[0].work_units > 0);
}
#[test]
fn spawn_cache_pipe_produces_work() {
let reports = spawn_and_collect_after(
WorkType::CachePipe {
size_kb: 32,
burst_iters: 1024,
},
2,
300,
);
assert_eq!(reports.len(), 2);
for r in &reports {
assert!(r.work_units > 0, "CachePipe worker {} did no work", r.tid);
}
}
#[test]
fn spawn_sequence_produces_work() {
let reports = spawn_and_collect_after(
WorkType::Sequence {
first: Phase::Spin(Duration::from_millis(10)),
rest: vec![Phase::Yield(Duration::from_millis(10))],
},
1,
200,
);
assert_eq!(reports.len(), 1);
assert!(reports[0].work_units > 0);
}
fn stub_custom_fn(_stop: &AtomicBool) -> WorkerReport {
WorkerReport {
tid: 0,
work_units: 0,
cpu_time_ns: 0,
wall_time_ns: 0,
off_cpu_ns: 0,
migration_count: 0,
cpus_used: BTreeSet::new(),
migrations: vec![],
max_gap_ms: 0,
max_gap_cpu: 0,
max_gap_at_ms: 0,
resume_latencies_ns: vec![],
wake_sample_total: 0,
iterations: 0,
schedstat_run_delay_ns: 0,
schedstat_run_count: 0,
schedstat_cpu_time_ns: 0,
completed: true,
numa_pages: BTreeMap::new(),
vmstat_numa_pages_migrated: 0,
exit_info: None,
is_messenger: false,
}
}
#[test]
fn custom_name_returns_label() {
let wt = WorkType::custom("my_work", stub_custom_fn);
assert_eq!(wt.name(), "my_work");
}
#[test]
fn custom_group_size_is_none() {
let wt = WorkType::custom("x", stub_custom_fn);
assert_eq!(wt.worker_group_size(), None);
}
fn custom_spin_fn(stop: &AtomicBool) -> WorkerReport {
let tid: libc::pid_t = unsafe { libc::getpid() };
let start = Instant::now();
let mut work_units = 0u64;
while !stop.load(Ordering::Relaxed) {
work_units = std::hint::black_box(work_units.wrapping_add(1));
std::hint::spin_loop();
}
let wall_time_ns = start.elapsed().as_nanos() as u64;
WorkerReport {
tid,
work_units,
cpu_time_ns: 0,
wall_time_ns,
off_cpu_ns: 0,
migration_count: 0,
cpus_used: BTreeSet::new(),
migrations: vec![],
max_gap_ms: 0,
max_gap_cpu: 0,
max_gap_at_ms: 0,
resume_latencies_ns: vec![],
wake_sample_total: 0,
iterations: work_units,
schedstat_run_delay_ns: 0,
schedstat_run_count: 0,
schedstat_cpu_time_ns: 0,
completed: true,
numa_pages: BTreeMap::new(),
vmstat_numa_pages_migrated: 0,
exit_info: None,
is_messenger: false,
}
}
#[test]
fn spawn_custom_produces_work() {
let config = WorkloadConfig {
num_workers: 1,
affinity: AffinityMode::None,
work_type: WorkType::custom("test_spin", custom_spin_fn),
sched_policy: SchedPolicy::Normal,
..Default::default()
};
let mut h = WorkloadHandle::spawn(&config).unwrap();
h.start();
std::thread::sleep(std::time::Duration::from_millis(200));
let reports = h.stop_and_collect();
assert_eq!(reports.len(), 1);
assert!(
reports[0].work_units > 0,
"Custom worker did no work: work_units={}",
reports[0].work_units
);
assert!(reports[0].wall_time_ns > 0);
assert!(
reports.iter().all(|r| r.completed),
"every worker report on the live / non-sentinel path \
must carry completed=true — pairs with the
completed=false assertion in \
stop_and_collect_reaps_grandchild_from_panicking_custom_closure",
);
}
fn ready_file_path(pid: libc::pid_t) -> std::path::PathBuf {
std::env::temp_dir().join(format!("ktstr-sigusr1-ignore-ready-{pid}"))
}
fn ignore_sigusr1_and_get_pid() -> libc::pid_t {
unsafe {
libc::signal(libc::SIGUSR1, libc::SIG_IGN);
}
unsafe { libc::getpid() }
}
fn wait_for_deadline(stop: &AtomicBool, timeout: Duration) {
let deadline = Instant::now() + timeout;
while !stop.load(Ordering::Relaxed) && Instant::now() < deadline {
std::thread::sleep(Duration::from_millis(10));
}
}
fn wait_for_file_or_panic(
path: &std::path::Path,
timeout: Duration,
liveness_pid: libc::pid_t,
context: &str,
) {
let deadline = Instant::now() + timeout;
while !path.exists() {
if nix::sys::signal::kill(nix::unistd::Pid::from_raw(liveness_pid), None).is_err() {
panic!("pid {liveness_pid} exited before writing ready file {path:?} — {context}",);
}
if Instant::now() >= deadline {
panic!(
"pid {liveness_pid} did not write ready file {path:?} within {timeout:?} — {context}",
);
}
std::thread::sleep(Duration::from_millis(10));
}
}
fn ignores_sigusr1_fn(stop: &AtomicBool) -> WorkerReport {
let tid = ignore_sigusr1_and_get_pid();
let ready_path = ready_file_path(tid);
let _ = std::fs::write(&ready_path, []);
wait_for_deadline(stop, Duration::from_secs(7));
WorkerReport {
tid,
..WorkerReport::default()
}
}
#[test]
fn stop_and_collect_sentinel_exits_for_sigusr1_ignoring_worker() {
let config = WorkloadConfig {
num_workers: 1,
affinity: AffinityMode::None,
work_type: WorkType::custom("sigusr1_ignore", ignores_sigusr1_fn),
sched_policy: SchedPolicy::Normal,
..Default::default()
};
let mut h = WorkloadHandle::spawn(&config).unwrap();
let worker_pid = h.worker_pids()[0];
let ready_path = ready_file_path(worker_pid);
let _ = std::fs::remove_file(&ready_path);
h.start();
wait_for_file_or_panic(
&ready_path,
Duration::from_secs(3),
worker_pid,
"SIG_IGN install may have failed or child never reached \
ignores_sigusr1_fn's ready-file write",
);
let reports = h.stop_and_collect();
let _ = std::fs::remove_file(&ready_path);
assert_eq!(reports.len(), 1);
let r = &reports[0];
assert_eq!(
r.work_units, 0,
"sentinel sidecar must be zeroed; non-zero work_units means \
we parsed the worker's real report instead of hitting the \
Err branch",
);
match &r.exit_info {
Some(WorkerExitInfo::TimedOut) => {}
Some(WorkerExitInfo::Signaled(sig)) if *sig == libc::SIGKILL => {}
other => panic!("expected TimedOut or Signaled(SIGKILL), got {other:?}",),
}
}
fn grandchild_pidfile_path(worker_pid: libc::pid_t) -> std::path::PathBuf {
std::env::temp_dir().join(format!("ktstr-grandchild-pid-{worker_pid}"))
}
const GRANDCHILD_SLEEP_BINARY: &str = "/bin/sleep";
fn require_grandchild_sleep_binary() {
use std::os::unix::fs::PermissionsExt;
let path = std::path::Path::new(GRANDCHILD_SLEEP_BINARY);
let meta = match std::fs::metadata(path) {
Ok(m) => m,
Err(e) => panic!(
"grandchild reaping tests require {GRANDCHILD_SLEEP_BINARY} to \
exist; stat failed: {e}. Install coreutils (or adjust the \
test's exec target + update GRANDCHILD_SLEEP_BINARY)."
),
};
if meta.permissions().mode() & 0o111 == 0 {
panic!(
"grandchild reaping tests require {GRANDCHILD_SLEEP_BINARY} to \
have at least one execute bit set; mode = {:o}. Fix the \
file's permissions or adjust the test's exec target.",
meta.permissions().mode() & 0o7777,
);
}
}
fn read_grandchild_gpid_from_pidfile(
worker_pid: libc::pid_t,
pidfile: &std::path::Path,
) -> libc::pid_t {
wait_for_file_or_panic(
pidfile,
Duration::from_secs(3),
worker_pid,
"fork+exec path likely broken — check /bin/sleep exists and is executable",
);
let read_deadline = Instant::now() + Duration::from_secs(3);
let gpid_str = loop {
let s = std::fs::read_to_string(pidfile).expect("pidfile readable once exists");
if !s.trim().is_empty() {
break s;
}
if Instant::now() >= read_deadline {
panic!(
"pidfile {pidfile:?} stayed empty for 3s after exists() \
returned true — writer may have crashed between O_TRUNC \
and write",
);
}
std::thread::sleep(Duration::from_millis(10));
};
let gpid: libc::pid_t = gpid_str
.trim()
.parse()
.expect("pidfile holds a valid pid_t");
assert!(gpid > 0, "grandchild pid must be positive: {gpid}");
gpid
}
fn wait_for_grandchild_reap(gpid: libc::pid_t, timeout: Duration) -> Result<(), ()> {
let deadline = Instant::now() + timeout;
loop {
match nix::sys::signal::kill(nix::unistd::Pid::from_raw(gpid), None) {
Err(nix::errno::Errno::ESRCH) => return Ok(()),
Err(e) => panic!(
"unexpected errno from existence probe: {e} \
(common non-ESRCH errnos: EPERM = caller may not \
signal this process despite it existing; EINVAL = \
invalid signal number, which cannot happen here \
since we pass None / signal 0)",
),
Ok(()) => {
match nix::sys::wait::waitpid(
nix::unistd::Pid::from_raw(gpid),
Some(nix::sys::wait::WaitPidFlag::WNOHANG),
) {
Ok(nix::sys::wait::WaitStatus::Exited(_, _))
| Ok(nix::sys::wait::WaitStatus::Signaled(_, _, _)) => return Ok(()),
_ => {}
}
if Instant::now() >= deadline {
return Err(());
}
std::thread::sleep(Duration::from_millis(20));
}
}
}
}
fn assert_grandchild_reaped_within(gpid: libc::pid_t, timeout: Duration, context: &str) {
if wait_for_grandchild_reap(gpid, timeout).is_err() {
let _ = nix::sys::signal::kill(
nix::unistd::Pid::from_raw(gpid),
nix::sys::signal::Signal::SIGKILL,
);
panic!(
"grandchild {gpid} still alive {:?} after {context} — \
setpgid/killpg path broken",
timeout,
);
}
}
struct PidfileCleanup(Vec<std::path::PathBuf>);
impl Drop for PidfileCleanup {
fn drop(&mut self) {
for p in &self.0 {
let _ = std::fs::remove_file(p);
}
}
}
fn fork_and_exec_grandchild_and_publish_pidfile() -> libc::pid_t {
let exec_path = std::ffi::CString::new(GRANDCHILD_SLEEP_BINARY)
.expect("GRANDCHILD_SLEEP_BINARY must have no interior NUL");
let exec_arg = std::ffi::CString::new("60").expect("literal has no NUL");
let worker_pid = unsafe { libc::getpid() };
let gpid = unsafe { libc::fork() };
if gpid < 0 {
eprintln!("fork failed: {}", std::io::Error::last_os_error());
unsafe {
libc::_exit(127);
}
}
if gpid == 0 {
let rc = unsafe { libc::close_range(3, u32::MAX, 0) };
if rc != 0 {
for fd in 3..=256 {
unsafe {
libc::close(fd);
}
}
}
let argv: [*const libc::c_char; 3] =
[exec_path.as_ptr(), exec_arg.as_ptr(), std::ptr::null()];
unsafe {
libc::execv(exec_path.as_ptr(), argv.as_ptr());
libc::_exit(127);
}
}
let pidfile = grandchild_pidfile_path(worker_pid);
let pidfile_tmp =
std::env::temp_dir().join(format!("ktstr-grandchild-pid-{worker_pid}.tmp"));
if let Err(e) = std::fs::write(&pidfile_tmp, gpid.to_string()) {
eprintln!("failed to write grandchild pidfile tmp {pidfile_tmp:?}: {e}");
unsafe {
libc::_exit(127);
}
}
if let Err(e) = std::fs::rename(&pidfile_tmp, &pidfile) {
eprintln!("failed to rename grandchild pidfile {pidfile_tmp:?} → {pidfile:?}: {e}");
unsafe {
libc::_exit(127);
}
}
worker_pid
}
fn forks_grandchild_sleep_fn(stop: &AtomicBool) -> WorkerReport {
let worker_pid = ignore_sigusr1_and_get_pid();
fork_and_exec_grandchild_and_publish_pidfile();
wait_for_deadline(stop, Duration::from_secs(7));
WorkerReport {
tid: worker_pid,
..WorkerReport::default()
}
}
fn forks_grandchild_and_exits_cleanly_fn(stop: &AtomicBool) -> WorkerReport {
let worker_pid = fork_and_exec_grandchild_and_publish_pidfile();
wait_for_deadline(stop, Duration::from_secs(10));
WorkerReport {
tid: worker_pid,
..WorkerReport::default()
}
}
#[test]
fn stop_and_collect_reaps_custom_grandchild_via_process_group() {
require_grandchild_sleep_binary();
let config = WorkloadConfig {
num_workers: 1,
affinity: AffinityMode::None,
work_type: WorkType::custom("grandchild_sleep", forks_grandchild_sleep_fn),
sched_policy: SchedPolicy::Normal,
..Default::default()
};
let mut h = WorkloadHandle::spawn(&config).unwrap();
let worker_pid = h.worker_pids()[0];
let pidfile = grandchild_pidfile_path(worker_pid);
let _ = std::fs::remove_file(&pidfile);
let _pidfile_cleanup = PidfileCleanup(vec![pidfile.clone()]);
h.start();
let gpid = read_grandchild_gpid_from_pidfile(worker_pid, &pidfile);
assert!(
nix::sys::signal::kill(nix::unistd::Pid::from_raw(gpid), None).is_ok(),
"grandchild {gpid} must be alive before stop_and_collect",
);
let _reports = h.stop_and_collect();
assert_grandchild_reaped_within(gpid, Duration::from_secs(5), "stop_and_collect");
}
#[test]
fn stop_and_collect_reaps_grandchildren_from_multiple_workers() {
require_grandchild_sleep_binary();
const N: usize = 3;
let config = WorkloadConfig {
num_workers: N,
affinity: AffinityMode::None,
work_type: WorkType::custom("grandchild_sleep", forks_grandchild_sleep_fn),
sched_policy: SchedPolicy::Normal,
..Default::default()
};
let mut h = WorkloadHandle::spawn(&config).unwrap();
let worker_pids = h.worker_pids();
assert_eq!(
worker_pids.len(),
N,
"WorkloadHandle::worker_pids should report {N} workers",
);
let unique: std::collections::HashSet<libc::pid_t> = worker_pids.iter().copied().collect();
assert_eq!(
unique.len(),
worker_pids.len(),
"WorkloadHandle::worker_pids returned duplicates: {worker_pids:?}",
);
let pidfiles: Vec<std::path::PathBuf> = worker_pids
.iter()
.map(|&p| grandchild_pidfile_path(p))
.collect();
for p in &pidfiles {
let _ = std::fs::remove_file(p);
}
let _pidfile_cleanup = PidfileCleanup(pidfiles.clone());
h.start();
let gpids: Vec<libc::pid_t> = worker_pids
.iter()
.zip(pidfiles.iter())
.map(|(&wp, pf)| read_grandchild_gpid_from_pidfile(wp, pf))
.collect();
for &gpid in &gpids {
assert!(
nix::sys::signal::kill(nix::unistd::Pid::from_raw(gpid), None).is_ok(),
"grandchild {gpid} must be alive before stop_and_collect",
);
}
let _reports = h.stop_and_collect();
for &gpid in &gpids {
assert_grandchild_reaped_within(
gpid,
Duration::from_secs(5),
"stop_and_collect (multi-worker)",
);
}
}
fn forks_grandchild_and_panics_fn(_stop: &AtomicBool) -> WorkerReport {
let _worker_pid = ignore_sigusr1_and_get_pid();
fork_and_exec_grandchild_and_publish_pidfile();
panic!(
"intentional panic after grandchild fork to exercise the \
Custom-closure panic path in stop_and_collect"
);
}
#[test]
fn stop_and_collect_reaps_grandchild_from_panicking_custom_closure() {
require_grandchild_sleep_binary();
let config = WorkloadConfig {
num_workers: 1,
affinity: AffinityMode::None,
work_type: WorkType::custom("grandchild_panic", forks_grandchild_and_panics_fn),
sched_policy: SchedPolicy::Normal,
..Default::default()
};
let mut h = WorkloadHandle::spawn(&config).unwrap();
let worker_pid = h.worker_pids()[0];
let pidfile = grandchild_pidfile_path(worker_pid);
let _ = std::fs::remove_file(&pidfile);
let _pidfile_cleanup = PidfileCleanup(vec![pidfile.clone()]);
h.start();
let gpid = read_grandchild_gpid_from_pidfile(worker_pid, &pidfile);
assert!(
nix::sys::signal::kill(nix::unistd::Pid::from_raw(gpid), None).is_ok(),
"grandchild {gpid} must be alive before stop_and_collect",
);
let reports = h.stop_and_collect();
assert_grandchild_reaped_within(
gpid,
Duration::from_secs(5),
"stop_and_collect (panic-path)",
);
assert_eq!(reports.len(), 1, "one worker spawned");
let r = &reports[0];
assert_eq!(
r.work_units, 0,
"sentinel must be zeroed; non-zero work_units would mean \
a worker-authored report leaked through the JSON-parse \
branch despite the panic",
);
assert!(
!r.completed,
"sentinel must carry completed=false so downstream \
consumers distinguish '0 iterations by design / fast \
exit' (completed=true) from '0 iterations because the \
worker crashed before producing a report' (this case); \
`..WorkerReport::default()` gives the bool-default \
`false` at the sentinel construction site in \
`stop_and_collect`",
);
match &r.exit_info {
Some(WorkerExitInfo::Signaled(sig)) if *sig == libc::SIGABRT => {}
Some(WorkerExitInfo::Exited(1)) => {}
other => panic!(
"expected sentinel with Signaled(SIGABRT) (panic=abort) \
or Exited(1) (panic=unwind + catch_unwind) for a \
panicking Custom closure; got {other:?}",
),
}
}
#[test]
fn drop_reaps_custom_grandchild_via_process_group() {
require_grandchild_sleep_binary();
let config = WorkloadConfig {
num_workers: 1,
affinity: AffinityMode::None,
work_type: WorkType::custom("grandchild_sleep", forks_grandchild_sleep_fn),
sched_policy: SchedPolicy::Normal,
..Default::default()
};
let mut h = WorkloadHandle::spawn(&config).unwrap();
let worker_pid = h.worker_pids()[0];
let pidfile = grandchild_pidfile_path(worker_pid);
let _ = std::fs::remove_file(&pidfile);
let _pidfile_cleanup = PidfileCleanup(vec![pidfile.clone()]);
h.start();
let gpid = read_grandchild_gpid_from_pidfile(worker_pid, &pidfile);
assert!(
nix::sys::signal::kill(nix::unistd::Pid::from_raw(gpid), None).is_ok(),
"grandchild {gpid} must be alive before Drop",
);
drop(h);
assert_grandchild_reaped_within(
gpid,
Duration::from_secs(5),
"handle Drop (no stop_and_collect)",
);
}
#[test]
fn stop_and_collect_reaps_grandchild_from_graceful_custom_closure() {
require_grandchild_sleep_binary();
let config = WorkloadConfig {
num_workers: 1,
affinity: AffinityMode::None,
work_type: WorkType::custom(
"grandchild_graceful",
forks_grandchild_and_exits_cleanly_fn,
),
sched_policy: SchedPolicy::Normal,
..Default::default()
};
let mut h = WorkloadHandle::spawn(&config).unwrap();
let worker_pid = h.worker_pids()[0];
let pidfile = grandchild_pidfile_path(worker_pid);
let _ = std::fs::remove_file(&pidfile);
let _pidfile_cleanup = PidfileCleanup(vec![pidfile.clone()]);
h.start();
let gpid = read_grandchild_gpid_from_pidfile(worker_pid, &pidfile);
assert!(
nix::sys::signal::kill(nix::unistd::Pid::from_raw(gpid), None).is_ok(),
"grandchild {gpid} must be alive before stop_and_collect",
);
let t0 = Instant::now();
let _reports = h.stop_and_collect();
let elapsed = t0.elapsed();
assert!(
elapsed < Duration::from_secs(2),
"stop_and_collect must hit the graceful-exit branch \
(<2s), not StillAlive escalation (~5s). elapsed={elapsed:?} \
— a value near the 5s deadline means SIGUSR1 failed to \
reach the worker or wait_for_deadline did not observe \
STOP in time",
);
assert_grandchild_reaped_within(
gpid,
Duration::from_secs(5),
"stop_and_collect (graceful-exit)",
);
}
#[test]
fn wait_for_file_or_panic_returns_when_file_appears() {
let dir = std::env::temp_dir().join(format!("ktstr-wfp-happy-{}", std::process::id()));
std::fs::create_dir_all(&dir).unwrap();
let marker = dir.join("ready");
std::fs::write(&marker, b"ok").unwrap();
wait_for_file_or_panic(
&marker,
Duration::from_secs(1),
unsafe { libc::getpid() },
"pre-existing marker must satisfy the guard",
);
let _ = std::fs::remove_dir_all(&dir);
}
#[test]
fn wait_for_file_or_panic_detects_liveness_death() {
let mut child = std::process::Command::new("/bin/true")
.spawn()
.expect("spawn /bin/true");
let dead_pid = child.id() as libc::pid_t;
let _ = child.wait();
let nonexistent = std::env::temp_dir().join(format!(
"ktstr-wfp-never-exists-{}-{dead_pid}",
std::process::id(),
));
let _ = std::fs::remove_file(&nonexistent);
let result = std::panic::catch_unwind(|| {
wait_for_file_or_panic(
&nonexistent,
Duration::from_secs(30), dead_pid,
"liveness-death path",
);
});
let err = result.expect_err("must panic when liveness pid is dead");
let msg = crate::test_support::test_helpers::panic_payload_to_string(err);
assert!(
msg.contains("exited before writing ready file"),
"panic must name the early-exit path, got: {msg}"
);
}
#[test]
fn wait_for_file_or_panic_panics_on_deadline_miss() {
let nonexistent = std::env::temp_dir().join(format!(
"ktstr-wfp-deadline-never-exists-{}",
std::process::id()
));
let _ = std::fs::remove_file(&nonexistent);
let self_pid = unsafe { libc::getpid() };
let result = std::panic::catch_unwind(|| {
wait_for_file_or_panic(
&nonexistent,
Duration::from_millis(50),
self_pid,
"deadline path",
);
});
let err = result.expect_err("must panic when deadline expires");
let msg = crate::test_support::test_helpers::panic_payload_to_string(err);
assert!(
msg.contains("did not write ready file"),
"panic must name the deadline-miss path, got: {msg}"
);
}
#[test]
fn wait_for_deadline_waits_full_duration_when_stop_stays_false() {
let stop = AtomicBool::new(false);
let start = Instant::now();
wait_for_deadline(&stop, Duration::from_secs(1));
let elapsed = start.elapsed();
assert!(
elapsed >= Duration::from_millis(900),
"wait_for_deadline must hold for ~full duration; elapsed={elapsed:?}",
);
assert!(
elapsed < Duration::from_millis(2_000),
"wait_for_deadline must not massively overshoot; elapsed={elapsed:?}",
);
}
#[test]
fn wait_for_deadline_returns_early_when_stop_is_set() {
use std::sync::Arc;
let stop = Arc::new(AtomicBool::new(false));
let stop_setter = Arc::clone(&stop);
let flipper = std::thread::spawn(move || {
std::thread::sleep(Duration::from_millis(50));
stop_setter.store(true, Ordering::Relaxed);
});
let start = Instant::now();
wait_for_deadline(&stop, Duration::from_secs(10)); let elapsed = start.elapsed();
flipper.join().unwrap();
assert!(
elapsed < Duration::from_secs(1),
"wait_for_deadline must return promptly after stop flips; elapsed={elapsed:?}",
);
}
#[test]
fn fan_out_compute_name() {
let wt = WorkType::FanOutCompute {
fan_out: 4,
cache_footprint_kb: 256,
operations: 5,
sleep_usec: 100,
};
assert_eq!(wt.name(), "FanOutCompute");
}
#[test]
fn fan_out_compute_from_name() {
let wt = WorkType::from_name("FanOutCompute").unwrap();
match wt {
WorkType::FanOutCompute {
fan_out,
cache_footprint_kb,
operations,
sleep_usec,
} => {
assert_eq!(fan_out, 4);
assert_eq!(cache_footprint_kb, 256);
assert_eq!(operations, 5);
assert_eq!(sleep_usec, 100);
}
_ => panic!("expected FanOutCompute"),
}
}
#[test]
fn fan_out_compute_group_size() {
let wt = WorkType::fan_out_compute(4, 256, 5, 100);
assert_eq!(wt.worker_group_size(), Some(5));
let wt2 = WorkType::fan_out_compute(1, 256, 5, 100);
assert_eq!(wt2.worker_group_size(), Some(2));
}
#[test]
fn fan_out_compute_needs_shared_mem() {
assert!(WorkType::fan_out_compute(4, 256, 5, 100).needs_shared_mem());
}
#[test]
fn fan_out_compute_needs_cache_buf() {
assert!(WorkType::fan_out_compute(4, 256, 5, 100).needs_cache_buf());
}
#[test]
fn spawn_fan_out_compute_produces_work() {
let config = WorkloadConfig {
num_workers: 5, affinity: AffinityMode::None,
work_type: WorkType::FanOutCompute {
fan_out: 4,
cache_footprint_kb: 256,
operations: 5,
sleep_usec: 100,
},
sched_policy: SchedPolicy::Normal,
..Default::default()
};
let mut h = WorkloadHandle::spawn(&config).unwrap();
h.start();
std::thread::sleep(std::time::Duration::from_millis(500));
let reports = h.stop_and_collect();
assert_eq!(reports.len(), 5);
for r in &reports {
assert!(
r.work_units > 0,
"FanOutCompute worker {} did no work",
r.tid
);
}
assert!(
reports
.iter()
.filter(|r| !r.is_messenger)
.all(|r| !r.resume_latencies_ns.is_empty()),
"every FanOutCompute receiver must record at least one \
wake latency sample; got {:?}",
reports
.iter()
.map(|r| (r.tid, r.is_messenger, r.resume_latencies_ns.len()))
.collect::<Vec<_>>(),
);
const MAX_PLAUSIBLE_LATENCY_NS: u64 = 10_000_000_000;
for r in &reports {
for &lat in &r.resume_latencies_ns {
assert!(
lat < MAX_PLAUSIBLE_LATENCY_NS,
"worker {} recorded implausible wake latency {} ns \
(expected < {} ns); indicates wake_ns/generation \
ordering race. NB: lat==0 is LEGITIMATE under \
correct ordering — a Relaxed `wake_atom.load` \
paired with an Acquire gen load can see a wake_ns \
from a LATER round (gen+1's store becomes visible \
ahead of gen+1's wake_ns re-load), making \
now_ns < wake_ns and `saturating_sub` = 0. The \
reservoir-sampling of real latencies is dominated \
by positive values; a stray zero from this race \
is not a bug, so no lower bound is asserted.",
r.tid,
lat,
MAX_PLAUSIBLE_LATENCY_NS
);
}
}
}
#[test]
fn spawn_fan_out_compute_bad_worker_count_fails() {
let config = WorkloadConfig {
num_workers: 3,
affinity: AffinityMode::None,
work_type: WorkType::FanOutCompute {
fan_out: 4,
cache_footprint_kb: 256,
operations: 5,
sleep_usec: 100,
},
sched_policy: SchedPolicy::Normal,
..Default::default()
};
let result = WorkloadHandle::spawn(&config);
assert!(result.is_err());
let msg = format!("{:#}", result.err().unwrap());
assert!(
msg.contains("divisible by 5"),
"expected divisibility error: {msg}"
);
}
#[test]
fn spawn_fan_out_compute_two_groups() {
let config = WorkloadConfig {
num_workers: 10, affinity: AffinityMode::None,
work_type: WorkType::FanOutCompute {
fan_out: 4,
cache_footprint_kb: 256,
operations: 5,
sleep_usec: 100,
},
sched_policy: SchedPolicy::Normal,
..Default::default()
};
let mut h = WorkloadHandle::spawn(&config).unwrap();
assert_eq!(h.worker_pids().len(), 10);
h.start();
std::thread::sleep(std::time::Duration::from_millis(500));
let reports = h.stop_and_collect();
assert_eq!(reports.len(), 10);
for r in &reports {
assert!(
r.work_units > 0,
"FanOutCompute worker {} did no work",
r.tid
);
}
assert!(
reports
.iter()
.filter(|r| !r.is_messenger)
.all(|r| !r.resume_latencies_ns.is_empty()),
"every FanOutCompute receiver in both groups must record \
at least one wake latency sample; got {:?}",
reports
.iter()
.map(|r| (r.tid, r.is_messenger, r.resume_latencies_ns.len()))
.collect::<Vec<_>>(),
);
const MAX_PLAUSIBLE_LATENCY_NS: u64 = 10_000_000_000;
for r in &reports {
for &lat in &r.resume_latencies_ns {
assert!(
lat < MAX_PLAUSIBLE_LATENCY_NS,
"worker {} recorded implausible wake latency {} ns \
(expected < {} ns); indicates wake_ns/generation \
ordering race. NB: lat==0 is LEGITIMATE under \
correct ordering — a Relaxed `wake_atom.load` \
paired with an Acquire gen load can see a wake_ns \
from a LATER round (gen+1's store becomes visible \
ahead of gen+1's wake_ns re-load), making \
now_ns < wake_ns and `saturating_sub` = 0. The \
reservoir-sampling of real latencies is dominated \
by positive values; a stray zero from this race \
is not a bug, so no lower bound is asserted.",
r.tid,
lat,
MAX_PLAUSIBLE_LATENCY_NS
);
}
}
}
#[test]
fn mempolicy_default_node_set_empty() {
assert!(MemPolicy::Default.node_set().is_empty());
}
#[test]
fn mempolicy_local_node_set_empty() {
assert!(MemPolicy::Local.node_set().is_empty());
}
#[test]
fn mempolicy_bind_node_set() {
let p = MemPolicy::Bind([0, 2].into_iter().collect());
assert_eq!(p.node_set(), [0, 2].into_iter().collect());
}
#[test]
fn mempolicy_preferred_node_set() {
let p = MemPolicy::Preferred(1);
assert_eq!(p.node_set(), [1].into_iter().collect());
}
#[test]
fn mempolicy_interleave_node_set() {
let p = MemPolicy::Interleave([0, 1, 3].into_iter().collect());
assert_eq!(p.node_set(), [0, 1, 3].into_iter().collect());
}
#[test]
fn mempolicy_preferred_many_node_set() {
let p = MemPolicy::preferred_many([0, 2]);
assert_eq!(p.node_set(), [0, 2].into_iter().collect());
}
#[test]
fn mempolicy_weighted_interleave_node_set() {
let p = MemPolicy::weighted_interleave([1, 3]);
assert_eq!(p.node_set(), [1, 3].into_iter().collect());
}
#[test]
fn mempolicy_validate_preferred_many_empty() {
assert!(
MemPolicy::PreferredMany(BTreeSet::new())
.validate()
.is_err()
);
}
#[test]
fn mempolicy_validate_weighted_interleave_empty() {
assert!(
MemPolicy::WeightedInterleave(BTreeSet::new())
.validate()
.is_err()
);
}
#[test]
fn mempolicy_validate_preferred_many_ok() {
assert!(MemPolicy::preferred_many([0]).validate().is_ok());
}
#[test]
fn mempolicy_validate_weighted_interleave_ok() {
assert!(MemPolicy::weighted_interleave([0, 1]).validate().is_ok());
}
#[test]
fn mpol_flags_union() {
let f = MpolFlags::STATIC_NODES | MpolFlags::NUMA_BALANCING;
assert_eq!(f.bits(), (1 << 15) | (1 << 13));
}
#[test]
fn mpol_flags_none_is_zero() {
assert_eq!(MpolFlags::NONE.bits(), 0);
}
#[test]
fn work_mpol_flags_builder() {
let w = Work::default().mpol_flags(MpolFlags::STATIC_NODES);
assert_eq!(w.mpol_flags, MpolFlags::STATIC_NODES);
}
#[test]
fn mpol_flags_contains_identity() {
assert!(MpolFlags::NONE.contains(MpolFlags::NONE));
assert!(MpolFlags::STATIC_NODES.contains(MpolFlags::STATIC_NODES));
let composite = MpolFlags::STATIC_NODES | MpolFlags::NUMA_BALANCING;
assert!(composite.contains(composite));
}
#[test]
fn mpol_flags_contains_superset_is_true_for_subset() {
let composite = MpolFlags::STATIC_NODES | MpolFlags::NUMA_BALANCING;
assert!(composite.contains(MpolFlags::STATIC_NODES));
assert!(composite.contains(MpolFlags::NUMA_BALANCING));
}
#[test]
fn mpol_flags_contains_subset_is_false_for_superset() {
let composite = MpolFlags::STATIC_NODES | MpolFlags::NUMA_BALANCING;
assert!(!MpolFlags::STATIC_NODES.contains(composite));
assert!(!MpolFlags::NUMA_BALANCING.contains(composite));
}
#[test]
fn mpol_flags_contains_empty_is_always_true() {
assert!(MpolFlags::NONE.contains(MpolFlags::NONE));
assert!(MpolFlags::STATIC_NODES.contains(MpolFlags::NONE));
let composite = MpolFlags::STATIC_NODES | MpolFlags::NUMA_BALANCING;
assert!(composite.contains(MpolFlags::NONE));
}
#[test]
fn mpol_flags_none_does_not_contain_any_set_flag() {
assert!(!MpolFlags::NONE.contains(MpolFlags::STATIC_NODES));
assert!(!MpolFlags::NONE.contains(MpolFlags::RELATIVE_NODES));
assert!(!MpolFlags::NONE.contains(MpolFlags::NUMA_BALANCING));
}
#[test]
fn mpol_flags_contains_rejects_disjoint_flag() {
assert!(!MpolFlags::STATIC_NODES.contains(MpolFlags::NUMA_BALANCING));
assert!(!MpolFlags::NUMA_BALANCING.contains(MpolFlags::STATIC_NODES));
}
#[test]
fn mpol_flags_contains_rejects_partial_overlap() {
let a = MpolFlags::STATIC_NODES | MpolFlags::NUMA_BALANCING;
let b = MpolFlags::RELATIVE_NODES | MpolFlags::NUMA_BALANCING;
assert!(!a.contains(b));
assert!(!b.contains(a));
}
#[test]
fn build_nodemask_empty() {
let (mask, maxnode) = build_nodemask(&BTreeSet::new());
assert!(mask.is_empty());
assert_eq!(maxnode, 0);
}
#[test]
fn build_nodemask_single() {
let (mask, maxnode) = build_nodemask(&[0].into_iter().collect());
assert_eq!(maxnode, 2);
assert_eq!(mask.len(), 1);
assert_eq!(mask[0], 1);
}
#[test]
fn build_nodemask_multiple() {
let (mask, maxnode) = build_nodemask(&[0, 2].into_iter().collect());
assert_eq!(maxnode, 4); assert_eq!(mask[0] & 1, 1); assert_eq!(mask[0] & 4, 4); assert_eq!(mask[0] & 2, 0); }
#[test]
fn build_nodemask_high_node() {
let bits_per_word = std::mem::size_of::<libc::c_ulong>() * 8;
let high = bits_per_word + 3;
let (mask, maxnode) = build_nodemask(&[high].into_iter().collect());
assert_eq!(maxnode, (high + 2) as libc::c_ulong);
assert_eq!(mask.len(), 2);
assert_eq!(mask[0], 0);
assert_eq!(mask[1], 1 << 3);
}
#[test]
fn apply_mempolicy_default_is_noop() {
apply_mempolicy_with_flags(&MemPolicy::Default, MpolFlags::NONE);
}
#[test]
fn apply_mempolicy_empty_bind_skipped() {
apply_mempolicy_with_flags(&MemPolicy::Bind(BTreeSet::new()), MpolFlags::NONE);
}
#[test]
fn apply_mempolicy_empty_interleave_skipped() {
apply_mempolicy_with_flags(&MemPolicy::Interleave(BTreeSet::new()), MpolFlags::NONE);
}
#[test]
fn work_mem_policy_builder() {
let w = Work::default().mem_policy(MemPolicy::Bind([0].into_iter().collect()));
assert!(matches!(w.mem_policy, MemPolicy::Bind(_)));
}
#[test]
fn work_default_mempolicy_is_default() {
let w = Work::default();
assert!(matches!(w.mem_policy, MemPolicy::Default));
}
#[test]
fn workload_config_default_mempolicy() {
let wl = WorkloadConfig::default();
assert!(matches!(wl.mem_policy, MemPolicy::Default));
}
#[test]
fn page_fault_churn_name_roundtrip() {
let wt = WorkType::from_name("PageFaultChurn").unwrap();
assert_eq!(wt.name(), "PageFaultChurn");
}
#[test]
fn page_fault_churn_from_name_defaults() {
let wt = WorkType::from_name("PageFaultChurn").unwrap();
match wt {
WorkType::PageFaultChurn {
region_kb,
touches_per_cycle,
spin_iters,
} => {
assert_eq!(region_kb, 4096);
assert_eq!(touches_per_cycle, 256);
assert_eq!(spin_iters, 64);
}
_ => panic!("expected PageFaultChurn"),
}
}
#[test]
fn page_fault_churn_group_size_none() {
let wt = WorkType::page_fault_churn(4096, 256, 64);
assert_eq!(wt.worker_group_size(), None);
}
#[test]
fn page_fault_churn_no_shared_mem() {
assert!(!WorkType::page_fault_churn(4096, 256, 64).needs_shared_mem());
}
#[test]
fn page_fault_churn_no_cache_buf() {
assert!(!WorkType::page_fault_churn(4096, 256, 64).needs_cache_buf());
}
#[test]
fn page_fault_churn_region_kb_overflow_worker_exits_cleanly() {
let config = WorkloadConfig {
num_workers: 1,
affinity: AffinityMode::None,
work_type: WorkType::PageFaultChurn {
region_kb: usize::MAX,
touches_per_cycle: 16,
spin_iters: 32,
},
sched_policy: SchedPolicy::Normal,
..Default::default()
};
let mut h = WorkloadHandle::spawn(&config).unwrap();
h.start();
std::thread::sleep(Duration::from_millis(100));
let reports = h.stop_and_collect();
assert_eq!(reports.len(), 1, "exactly one worker was spawned");
let r = &reports[0];
assert_eq!(
r.iterations, 0,
"worker with overflowing region_kb must break out of the outer loop \
without completing any page-fault cycle; got iterations={}",
r.iterations,
);
assert_eq!(
r.work_units, 0,
"overflow path must not increment work_units; got {}",
r.work_units,
);
}
#[test]
fn spawn_page_fault_churn_produces_work() {
let num_cpus = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1);
let num_workers = num_cpus + 1;
let config = WorkloadConfig {
num_workers,
affinity: AffinityMode::None,
work_type: WorkType::PageFaultChurn {
region_kb: 64,
touches_per_cycle: 16,
spin_iters: 32,
},
sched_policy: SchedPolicy::Normal,
..Default::default()
};
let mut h = WorkloadHandle::spawn(&config).unwrap();
h.start();
std::thread::sleep(std::time::Duration::from_millis(100));
let snap1 = h.snapshot_iterations();
std::thread::sleep(std::time::Duration::from_millis(150));
let snap2 = h.snapshot_iterations();
let reports = h.stop_and_collect();
assert_eq!(reports.len(), num_workers);
assert_eq!(snap1.len(), num_workers);
assert_eq!(snap2.len(), num_workers);
for i in 0..num_workers {
let delta = snap2[i].saturating_sub(snap1[i]);
assert!(
delta > 0,
"worker {i} iter_slot delta between 100 ms and 250 ms \
was 0 (snap1={}, snap2={}); outer loop is not \
advancing, indicating a regression that restores \
the inner-`while !STOP` bug",
snap1[i],
snap2[i],
);
}
for r in &reports {
assert!(
r.work_units > 0,
"PageFaultChurn worker {} did no work",
r.tid
);
assert!(
r.iterations > 0,
"PageFaultChurn worker {} final iterations = 0",
r.tid
);
}
if num_cpus > 1 {
let total_migrations: u64 = reports.iter().map(|r| r.migration_count).sum();
assert!(
total_migrations > 0,
"expected ≥ 1 migration across {num_workers} \
oversubscribed workers on {num_cpus}-cpu host; 0 \
total migrations suggests the outer migration \
check at work_units.is_multiple_of(1024) isn't \
firing, indicating a regression that restores the \
inner-`while !STOP` bug"
);
}
}
#[test]
fn mutex_contention_name_roundtrip() {
let wt = WorkType::from_name("MutexContention").unwrap();
assert_eq!(wt.name(), "MutexContention");
}
#[test]
fn mutex_contention_from_name_defaults() {
let wt = WorkType::from_name("MutexContention").unwrap();
match wt {
WorkType::MutexContention {
contenders,
hold_iters,
work_iters,
} => {
assert_eq!(contenders, 4);
assert_eq!(hold_iters, 256);
assert_eq!(work_iters, 1024);
}
_ => panic!("expected MutexContention"),
}
}
#[test]
fn mutex_contention_group_size() {
let wt = WorkType::mutex_contention(4, 256, 1024);
assert_eq!(wt.worker_group_size(), Some(4));
let wt2 = WorkType::mutex_contention(8, 256, 1024);
assert_eq!(wt2.worker_group_size(), Some(8));
}
#[test]
fn mutex_contention_needs_shared_mem() {
assert!(WorkType::mutex_contention(4, 256, 1024).needs_shared_mem());
}
#[test]
fn mutex_contention_no_cache_buf() {
assert!(!WorkType::mutex_contention(4, 256, 1024).needs_cache_buf());
}
#[test]
fn spawn_mutex_contention_produces_work() {
let reports = spawn_and_collect_after(
WorkType::MutexContention {
contenders: 4,
hold_iters: 64,
work_iters: 256,
},
4,
500,
);
assert_eq!(reports.len(), 4);
for r in &reports {
assert!(
r.work_units > 0,
"MutexContention worker {} did no work",
r.tid
);
}
}
#[test]
fn spawn_mutex_contention_bad_worker_count_fails() {
let config = WorkloadConfig {
num_workers: 3,
affinity: AffinityMode::None,
work_type: WorkType::MutexContention {
contenders: 4,
hold_iters: 256,
work_iters: 1024,
},
sched_policy: SchedPolicy::Normal,
..Default::default()
};
let result = WorkloadHandle::spawn(&config);
assert!(result.is_err());
let msg = format!("{:#}", result.err().unwrap());
assert!(
msg.contains("divisible by 4"),
"expected divisibility error: {msg}"
);
}
#[test]
fn mutex_contention_records_wake_latency() {
let config = WorkloadConfig {
num_workers: 4,
affinity: AffinityMode::None,
work_type: WorkType::MutexContention {
contenders: 4,
hold_iters: 64,
work_iters: 256,
},
sched_policy: SchedPolicy::Normal,
..Default::default()
};
let mut h = WorkloadHandle::spawn(&config).unwrap();
h.start();
std::thread::sleep(std::time::Duration::from_millis(500));
let reports = h.stop_and_collect();
let has_latencies = reports.iter().any(|r| !r.resume_latencies_ns.is_empty());
assert!(has_latencies, "contenders should record wake latencies");
}
}