use std::collections::BTreeSet;
use std::io::{Seek, Write};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::time::{Duration, Instant};
use super::affinity::{sched_getcpu, set_thread_affinity};
use super::config::{AluWidth, FutexLockMode, MemPolicy, MpolFlags, SchedPolicy, WakeMechanism};
use super::spawn::{
FAN_OUT_POST_WAKE_SPIN_ITERS, FUTEX_WAIT_TIMEOUT, Migration, WorkerReport,
apply_mempolicy_with_flags, apply_nice, build_nodemask, stop_requested,
};
use super::types::*;
#[inline]
fn clamp_futex_wake_n(n: usize) -> i32 {
n.min(i32::MAX as usize) as i32
}
unsafe fn futex_wake(futex_ptr: *mut u32, n_waiters: i32) {
unsafe {
libc::syscall(
libc::SYS_futex,
futex_ptr,
libc::FUTEX_WAKE,
n_waiters,
std::ptr::null::<libc::timespec>(),
std::ptr::null::<u32>(),
0u32,
);
}
}
unsafe fn futex_wait(futex_ptr: *mut u32, expected: u32, ts: &libc::timespec) {
unsafe {
libc::syscall(
libc::SYS_futex,
futex_ptr,
libc::FUTEX_WAIT,
expected,
ts as *const libc::timespec,
std::ptr::null::<u32>(),
0u32,
);
}
}
mod io;
use io::*;
#[allow(clippy::too_many_arguments)]
pub(super) fn worker_main(
affinity: Option<BTreeSet<usize>>,
work_type: WorkType,
sched_policy: SchedPolicy,
mem_policy: MemPolicy,
mpol_flags: MpolFlags,
nice: i32,
pipe_fds: Option<(i32, i32)>,
futex: Option<(*mut u32, usize)>,
iter_slot: *mut AtomicU64,
stop: &AtomicBool,
group_idx: usize,
) -> WorkerReport {
let tid: libc::pid_t = unsafe { libc::syscall(libc::SYS_gettid) as libc::pid_t };
let affinity_error: Option<String> = if let Some(ref cpus) = affinity {
set_thread_affinity(tid, cpus)
.err()
.map(|e| format!("{e:#}"))
} else {
None
};
let _ = set_sched_policy(tid, sched_policy);
apply_mempolicy_with_flags(&mem_policy, mpol_flags);
apply_nice(nice);
let start = Instant::now();
let mut work_units: u64 = 0;
let mut migration_count: u64 = 0;
let mut cpus_used = BTreeSet::new();
let mut migrations = Vec::new();
let mut last_cpu = sched_getcpu();
cpus_used.insert(last_cpu);
let mut last_iter_time = start;
let mut last_iter_slot_publish = start;
const IT_SLOT_PUBLISH_INTERVAL: Duration = Duration::from_millis(1);
let mut max_gap_ns: u64 = 0;
let mut max_gap_cpu: usize = last_cpu;
let mut max_gap_at_ns: u64 = 0;
let mut cache_pressure_buf: Option<Vec<u8>> = None;
let mut matrix_buf: Option<Vec<u64>> = None;
let mut io_disk: Option<IoBacking> = None;
let mut io_buf: Option<DirectIoBuf> = None;
let mut io_rng: u64 = (tid as u64).wrapping_mul(GOLDEN_RATIO_64);
if io_rng == 0 {
io_rng = GOLDEN_RATIO_64;
}
let mut io_seq_cursor: u64 = 0;
let mut io_iter: u64 = 0;
let mut io_seq_file: Option<PhaseIoTempfile> = None;
let mut page_fault_region: Option<(*mut libc::c_void, usize)> = None;
let mut page_fault_rng_state: u64 = 0;
let mut per_pos_policy_applied = false;
let mut idle_churn_slack_applied = false;
let mut alu_hot_resolved_width: Option<AluWidth> = None;
let mut ipc_variance_buf: Option<Vec<u64>> = None;
let mut ipc_variance_rng: u64 = (tid as u64).wrapping_mul(GOLDEN_RATIO_64);
if ipc_variance_rng == 0 {
ipc_variance_rng = GOLDEN_RATIO_64;
}
const MAX_WAKE_SAMPLES: usize = 100_000;
let mut resume_latencies_ns: Vec<u64> = Vec::with_capacity(MAX_WAKE_SAMPLES);
let mut wake_sample_count: u64 = 0;
let mut iteration_costs_ns: Vec<u64> = Vec::with_capacity(MAX_WAKE_SAMPLES);
let mut iteration_cost_sample_count: u64 = 0;
let mut iterations: u64 = 0;
if let WorkType::Custom { run, .. } = &work_type {
return run(stop);
}
let affinity_churn_cpus: Vec<usize> = if matches!(work_type, WorkType::AffinityChurn { .. }) {
let mut cpu_set: libc::cpu_set_t = unsafe { std::mem::zeroed() };
let ret = unsafe {
libc::sched_getaffinity(0, std::mem::size_of::<libc::cpu_set_t>(), &mut cpu_set)
};
if ret == 0 {
(0..libc::CPU_SETSIZE as usize)
.filter(|c| unsafe { libc::CPU_ISSET(*c, &cpu_set) })
.collect()
} else {
Vec::new()
}
} else {
Vec::new()
};
let policy_churn_policies: Vec<(i32, i32)> =
if matches!(work_type, WorkType::PolicyChurn { .. }) {
let mut policies = vec![
(libc::SCHED_OTHER, 0),
(libc::SCHED_BATCH, 0),
(libc::SCHED_IDLE, 0),
];
let param = libc::sched_param { sched_priority: 1 };
let ret = unsafe { libc::sched_setscheduler(0, libc::SCHED_FIFO, ¶m) };
if ret == 0 {
let normal = libc::sched_param { sched_priority: 0 };
unsafe { libc::sched_setscheduler(0, libc::SCHED_OTHER, &normal) };
policies.push((libc::SCHED_FIFO, 1));
policies.push((libc::SCHED_RR, 1));
}
policies
} else {
Vec::new()
};
let matrix_size: usize = if let WorkType::FanOutCompute {
cache_footprint_kb,
operations,
..
} = &work_type
{
if *operations > 0 && *cache_footprint_kb > 0 {
((cache_footprint_kb * 1024 / 3 / std::mem::size_of::<u64>()) as f64).sqrt() as usize
} else {
0
}
} else {
0
};
let cgroup_churn_paths: Vec<String> = if let WorkType::CgroupChurn { groups, .. } = &work_type {
let groups_count = (*groups).max(1);
(0..groups_count)
.map(|i| format!("/sys/fs/cgroup/wt-cgroup-churn-{i}/cgroup.procs"))
.collect()
} else {
Vec::new()
};
let cgroup_churn_tid_bytes: Vec<u8> = if matches!(work_type, WorkType::CgroupChurn { .. }) {
format!("{tid}\n").into_bytes()
} else {
Vec::new()
};
let mut cgroup_churn_files: Vec<Option<std::fs::File>> = if cgroup_churn_paths.is_empty() {
Vec::new()
} else {
(0..cgroup_churn_paths.len()).map(|_| None).collect()
};
let numa_sweep_masks: Vec<(Vec<libc::c_ulong>, libc::c_ulong)> =
if let WorkType::NumaWorkingSetSweep {
ref target_nodes, ..
} = work_type
{
target_nodes
.iter()
.map(|&node| build_nodemask(&[node].into_iter().collect::<BTreeSet<usize>>()))
.collect()
} else {
Vec::new()
};
let vmstat_migrated_start = read_vmstat_numa_pages_migrated();
let schedstat_start = read_schedstat(Some(tid));
while !stop_requested(stop) {
match work_type {
WorkType::SpinWait => {
spin_burst(&mut work_units, 1024);
iterations += 1;
}
WorkType::YieldHeavy => {
work_units = std::hint::black_box(work_units.wrapping_add(1));
std::thread::yield_now();
iterations += 1;
}
WorkType::Mixed => {
spin_burst(&mut work_units, 1024);
std::thread::yield_now();
iterations += 1;
}
WorkType::IoSyncWrite => {
use std::os::unix::io::AsRawFd;
if !ensure_io_disk(&mut io_disk, libc::O_SYNC, tid) {
std::thread::yield_now();
iterations += 1;
continue;
}
let backing = io_disk.as_ref().unwrap();
let buf = [0u8; IO_BLOCK_SIZE];
let base = stripe_base(tid, backing.capacity_bytes);
let stripe_size = (backing.capacity_bytes / IO_NUM_STRIPES) & !(IO_SECTOR_SIZE - 1);
let stripe_extent = stripe_size.max(IO_BLOCK_SIZE as u64 * 16);
let iter_off = (io_iter * IO_BLOCK_SIZE as u64 * 16) % stripe_extent;
let fd = backing.file.as_raw_fd();
for i in 0..16u64 {
let off = base + iter_off + i * IO_BLOCK_SIZE as u64;
let n = unsafe {
libc::pwrite(
fd,
buf.as_ptr() as *const libc::c_void,
IO_BLOCK_SIZE,
off as libc::off_t,
)
};
if n < IO_BLOCK_SIZE as isize {
tracing::warn!(n, off, "IoSyncWrite short pwrite");
}
work_units = std::hint::black_box(work_units.wrapping_add(1));
}
let before_fsync = Instant::now();
let _ = unsafe { libc::fdatasync(fd) };
reservoir_push(
&mut resume_latencies_ns,
&mut wake_sample_count,
before_fsync.elapsed().as_nanos() as u64,
MAX_WAKE_SAMPLES,
);
io_iter = io_iter.wrapping_add(1);
last_iter_time = Instant::now();
iterations += 1;
}
WorkType::IoRandRead => {
use std::os::unix::io::AsRawFd;
if !ensure_io_disk(&mut io_disk, libc::O_DIRECT, tid) {
std::thread::yield_now();
iterations += 1;
continue;
}
if !ensure_io_buf(&mut io_buf) {
std::thread::yield_now();
iterations += 1;
continue;
}
let backing = io_disk.as_ref().unwrap();
let buf = io_buf.as_ref().unwrap();
let off = rand_io_offset(&mut io_rng, backing.capacity_bytes);
let fd = backing.file.as_raw_fd();
let before_pread = Instant::now();
let _ = unsafe {
libc::pread(
fd,
buf.as_ptr() as *mut libc::c_void,
IO_BLOCK_SIZE,
off as libc::off_t,
)
};
reservoir_push(
&mut resume_latencies_ns,
&mut wake_sample_count,
before_pread.elapsed().as_nanos() as u64,
MAX_WAKE_SAMPLES,
);
work_units = std::hint::black_box(work_units.wrapping_add(1));
io_iter = io_iter.wrapping_add(1);
last_iter_time = Instant::now();
iterations += 1;
}
WorkType::IoConvoy => {
use std::os::unix::io::AsRawFd;
let was_open = io_disk.is_some();
if !ensure_io_disk(&mut io_disk, libc::O_DIRECT, tid) {
std::thread::yield_now();
iterations += 1;
continue;
}
if !was_open {
let cap = io_disk.as_ref().unwrap().capacity_bytes;
io_seq_cursor = stripe_base(tid, cap);
}
if !ensure_io_buf(&mut io_buf) {
std::thread::yield_now();
iterations += 1;
continue;
}
let backing = io_disk.as_ref().unwrap();
let buf = io_buf.as_ref().unwrap();
let fd = backing.file.as_raw_fd();
let stripe_size = (backing.capacity_bytes / IO_NUM_STRIPES) & !(IO_SECTOR_SIZE - 1);
let stripe_extent = stripe_size.max(IO_BLOCK_SIZE as u64 * 16);
let base = stripe_base(tid, backing.capacity_bytes);
if io_seq_cursor >= base + stripe_extent {
io_seq_cursor = base;
}
let before_io = Instant::now();
let n = unsafe {
libc::pwrite(
fd,
buf.as_ptr() as *const libc::c_void,
IO_BLOCK_SIZE,
io_seq_cursor as libc::off_t,
)
};
if n < IO_BLOCK_SIZE as isize {
tracing::warn!(n, off = io_seq_cursor, "IoConvoy short pwrite");
}
io_seq_cursor = io_seq_cursor.wrapping_add(IO_BLOCK_SIZE as u64);
let r_off = rand_io_offset(&mut io_rng, backing.capacity_bytes);
let _ = unsafe {
libc::pread(
fd,
buf.as_ptr() as *mut libc::c_void,
IO_BLOCK_SIZE,
r_off as libc::off_t,
)
};
if io_iter.is_multiple_of(16) {
let _ = unsafe { libc::fdatasync(fd) };
}
reservoir_push(
&mut resume_latencies_ns,
&mut wake_sample_count,
before_io.elapsed().as_nanos() as u64,
MAX_WAKE_SAMPLES,
);
work_units = std::hint::black_box(work_units.wrapping_add(2));
io_iter = io_iter.wrapping_add(1);
last_iter_time = Instant::now();
iterations += 1;
}
WorkType::Bursty {
burst_duration,
sleep_duration,
} => {
let burst_end = Instant::now() + burst_duration;
while Instant::now() < burst_end && !stop_requested(stop) {
spin_burst(&mut work_units, 1024);
}
if !stop_requested(stop) {
let before_sleep = Instant::now();
std::thread::sleep(sleep_duration);
reservoir_push(
&mut resume_latencies_ns,
&mut wake_sample_count,
before_sleep.elapsed().as_nanos() as u64,
MAX_WAKE_SAMPLES,
);
}
iterations += 1;
}
WorkType::PipeIo { burst_iters } => {
let (read_fd, write_fd) = pipe_fds.unwrap_or((-1, -1));
if read_fd < 0 || write_fd < 0 {
break;
}
spin_burst(&mut work_units, burst_iters);
pipe_exchange(
read_fd,
write_fd,
&mut resume_latencies_ns,
&mut wake_sample_count,
MAX_WAKE_SAMPLES,
stop,
);
last_iter_time = Instant::now();
iterations += 1;
}
WorkType::FutexPingPong { spin_iters } => {
let (futex_ptr, pos) = match futex {
Some(f) => f,
None => break,
};
let is_first = pos == 0;
spin_burst(&mut work_units, spin_iters);
let my_val: u32 = if is_first { 0 } else { 1 };
let partner_val: u32 = if is_first { 1 } else { 0 };
let atom = unsafe { &*(futex_ptr as *const std::sync::atomic::AtomicU32) };
atom.store(partner_val, Ordering::Relaxed);
unsafe { futex_wake(futex_ptr, 1) };
let before_block = Instant::now();
let atom = unsafe { &*(futex_ptr as *const std::sync::atomic::AtomicU32) };
loop {
if stop_requested(stop) {
break;
}
let cur = atom.load(Ordering::Relaxed);
if cur == my_val {
reservoir_push(
&mut resume_latencies_ns,
&mut wake_sample_count,
before_block.elapsed().as_nanos() as u64,
MAX_WAKE_SAMPLES,
);
break;
}
unsafe { futex_wait(futex_ptr, partner_val, &FUTEX_WAIT_TIMEOUT) };
}
last_iter_time = Instant::now();
iterations += 1;
}
WorkType::CachePressure { size_kb, stride } => {
let buf = cache_pressure_buf.get_or_insert_with(|| vec![0u8; size_kb * 1024]);
if buf.is_empty() || stride == 0 {
break;
}
cache_rmw_loop(buf, stride, 1024, &mut work_units);
iterations += 1;
}
WorkType::CacheYield { size_kb, stride } => {
let buf = cache_pressure_buf.get_or_insert_with(|| vec![0u8; size_kb * 1024]);
if buf.is_empty() || stride == 0 {
break;
}
cache_rmw_loop(buf, stride, 1024, &mut work_units);
let before_yield = Instant::now();
std::thread::yield_now();
reservoir_push(
&mut resume_latencies_ns,
&mut wake_sample_count,
before_yield.elapsed().as_nanos() as u64,
MAX_WAKE_SAMPLES,
);
iterations += 1;
}
WorkType::CachePipe {
size_kb,
burst_iters,
} => {
let (read_fd, write_fd) = pipe_fds.unwrap_or((-1, -1));
if read_fd < 0 || write_fd < 0 {
break;
}
let buf = cache_pressure_buf.get_or_insert_with(|| vec![0u8; size_kb * 1024]);
if !buf.is_empty() {
cache_rmw_loop(buf, 64, burst_iters, &mut work_units);
}
pipe_exchange(
read_fd,
write_fd,
&mut resume_latencies_ns,
&mut wake_sample_count,
MAX_WAKE_SAMPLES,
stop,
);
last_iter_time = Instant::now();
iterations += 1;
}
WorkType::FutexFanOut {
fan_out,
spin_iters,
} => {
let (futex_ptr, pos) = match futex {
Some(f) => f,
None => break,
};
let is_messenger = pos == 0;
spin_burst(&mut work_units, spin_iters);
let atom = unsafe { &*(futex_ptr as *const std::sync::atomic::AtomicU32) };
if is_messenger {
let next = atom.load(Ordering::Relaxed).wrapping_add(1);
let wake_n = clamp_futex_wake_n(fan_out);
atom.store(next, Ordering::Relaxed);
unsafe { futex_wake(futex_ptr, wake_n) };
spin_burst(&mut work_units, FAN_OUT_POST_WAKE_SPIN_ITERS);
} else {
let expected = atom.load(Ordering::Relaxed);
let before_block = Instant::now();
loop {
if stop_requested(stop) {
break;
}
let cur = atom.load(Ordering::Relaxed);
if cur != expected {
reservoir_push(
&mut resume_latencies_ns,
&mut wake_sample_count,
before_block.elapsed().as_nanos() as u64,
MAX_WAKE_SAMPLES,
);
break;
}
unsafe { futex_wait(futex_ptr, expected, &FUTEX_WAIT_TIMEOUT) };
}
}
last_iter_time = Instant::now();
iterations += 1;
}
WorkType::Sequence {
ref first,
ref rest,
} => {
for phase in std::iter::once(first).chain(rest.iter()) {
if stop_requested(stop) {
break;
}
match phase {
Phase::Spin(dur) => {
let end = Instant::now() + *dur;
while Instant::now() < end && !stop_requested(stop) {
spin_burst(&mut work_units, 1024);
}
}
Phase::Sleep(dur) => {
let before_sleep = Instant::now();
std::thread::sleep(*dur);
reservoir_push(
&mut resume_latencies_ns,
&mut wake_sample_count,
before_sleep.elapsed().as_nanos() as u64,
MAX_WAKE_SAMPLES,
);
last_iter_time = Instant::now();
}
Phase::Yield(dur) => {
let end = Instant::now() + *dur;
let mut yield_counter: u64 = 0;
loop {
if yield_counter & 63 == 0
&& (Instant::now() >= end || stop_requested(stop))
{
break;
}
yield_counter = yield_counter.wrapping_add(1);
work_units = std::hint::black_box(work_units.wrapping_add(1));
let before_yield = Instant::now();
std::thread::yield_now();
reservoir_push(
&mut resume_latencies_ns,
&mut wake_sample_count,
before_yield.elapsed().as_nanos() as u64,
MAX_WAKE_SAMPLES,
);
}
last_iter_time = Instant::now();
}
Phase::Io(dur) => {
let end = Instant::now() + *dur;
let tf = io_seq_file.get_or_insert_with(|| {
let path = std::env::temp_dir()
.join(format!("ktstr_seq_{tid}"))
.to_string_lossy()
.to_string();
let file = std::fs::OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(&path)
.expect("failed to create Phase::Io temp file");
PhaseIoTempfile { file, path }
});
let f = &mut tf.file;
while Instant::now() < end && !stop_requested(stop) {
let _ = f.set_len(0);
let _ = f.seek(std::io::SeekFrom::Start(0));
let buf = [0u8; 4096];
for _ in 0..16 {
let _ = f.write_all(&buf);
work_units = std::hint::black_box(work_units.wrapping_add(1));
}
let before_sleep = Instant::now();
std::thread::sleep(Duration::from_micros(100));
reservoir_push(
&mut resume_latencies_ns,
&mut wake_sample_count,
before_sleep.elapsed().as_nanos() as u64,
MAX_WAKE_SAMPLES,
);
}
last_iter_time = Instant::now();
}
}
}
iterations += 1;
}
WorkType::ForkExit => {
let pid = unsafe { libc::fork() };
match pid {
-1 => {
work_units = std::hint::black_box(work_units.wrapping_add(1));
iterations += 1;
}
0 => {
unsafe { libc::_exit(0) };
}
child => {
let mut status = 0i32;
let before_wait = Instant::now();
unsafe { libc::waitpid(child, &mut status, 0) };
reservoir_push(
&mut resume_latencies_ns,
&mut wake_sample_count,
before_wait.elapsed().as_nanos() as u64,
MAX_WAKE_SAMPLES,
);
work_units = std::hint::black_box(work_units.wrapping_add(1));
iterations += 1;
}
}
}
WorkType::NiceSweep => {
let effective_min: i32 = {
static PROBED_MIN: std::sync::atomic::AtomicI32 =
std::sync::atomic::AtomicI32::new(i32::MIN);
let cached = PROBED_MIN.load(Ordering::Relaxed);
if cached != i32::MIN {
cached
} else {
let ret = unsafe { libc::setpriority(libc::PRIO_PROCESS, 0, -20) };
let min = if ret == -1 {
0i32
} else {
unsafe { libc::setpriority(libc::PRIO_PROCESS, 0, 0) };
-20i32
};
PROBED_MIN.store(min, Ordering::Relaxed);
min
}
};
let range = (19 - effective_min + 1) as u64;
let nice_val = effective_min + (iterations % range) as i32;
spin_burst(&mut work_units, 512);
unsafe {
libc::setpriority(libc::PRIO_PROCESS, 0, nice_val);
}
let before_yield = Instant::now();
std::thread::yield_now();
reservoir_push(
&mut resume_latencies_ns,
&mut wake_sample_count,
before_yield.elapsed().as_nanos() as u64,
MAX_WAKE_SAMPLES,
);
iterations += 1;
}
WorkType::AffinityChurn { spin_iters } => {
spin_burst(&mut work_units, spin_iters);
if !affinity_churn_cpus.is_empty() {
use rand::RngExt;
let idx = rand::rng().random_range(0..affinity_churn_cpus.len());
let target = affinity_churn_cpus[idx];
let mut cpu_set: libc::cpu_set_t = unsafe { std::mem::zeroed() };
unsafe {
libc::CPU_ZERO(&mut cpu_set);
libc::CPU_SET(target, &mut cpu_set);
libc::sched_setaffinity(
0,
std::mem::size_of::<libc::cpu_set_t>(),
&cpu_set,
);
}
}
let before_yield = Instant::now();
std::thread::yield_now();
reservoir_push(
&mut resume_latencies_ns,
&mut wake_sample_count,
before_yield.elapsed().as_nanos() as u64,
MAX_WAKE_SAMPLES,
);
iterations += 1;
}
WorkType::PolicyChurn { spin_iters } => {
spin_burst(&mut work_units, spin_iters);
let idx = (iterations as usize) % policy_churn_policies.len().max(1);
let (pol, prio) = policy_churn_policies[idx];
let param = libc::sched_param {
sched_priority: prio,
};
unsafe {
libc::sched_setscheduler(0, pol, ¶m);
}
let before_yield = Instant::now();
std::thread::yield_now();
reservoir_push(
&mut resume_latencies_ns,
&mut wake_sample_count,
before_yield.elapsed().as_nanos() as u64,
MAX_WAKE_SAMPLES,
);
iterations += 1;
}
WorkType::FanOutCompute {
fan_out,
operations,
sleep_usec,
..
} => {
let (futex_ptr, pos) = match futex {
Some(f) => f,
None => break,
};
let is_messenger = pos == 0;
let wake_ts_ptr = unsafe { (futex_ptr as *mut u8).add(8) as *mut u64 };
let gen_atom = unsafe { &*(futex_ptr as *const std::sync::atomic::AtomicU64) };
let wake_atom = unsafe { &*(wake_ts_ptr as *const std::sync::atomic::AtomicU64) };
if is_messenger {
if let Some(wake_ns) = clock_gettime_ns(libc::CLOCK_MONOTONIC) {
wake_atom.store(wake_ns, Ordering::Relaxed);
gen_atom.fetch_add(1, Ordering::Release);
unsafe { futex_wake(futex_ptr, clamp_futex_wake_n(fan_out)) };
}
spin_burst(&mut work_units, FAN_OUT_POST_WAKE_SPIN_ITERS);
} else {
let expected = gen_atom.load(Ordering::Relaxed);
let expected_low = expected as u32;
loop {
if stop_requested(stop) {
break;
}
let cur = gen_atom.load(Ordering::Acquire);
if cur != expected {
if let Some(now_ns) = clock_gettime_ns(libc::CLOCK_MONOTONIC) {
let wake_ns = wake_atom.load(Ordering::Relaxed);
let latency = now_ns.saturating_sub(wake_ns);
reservoir_push(
&mut resume_latencies_ns,
&mut wake_sample_count,
latency,
MAX_WAKE_SAMPLES,
);
}
break;
}
unsafe { futex_wait(futex_ptr, expected_low, &FUTEX_WAIT_TIMEOUT) };
}
if sleep_usec > 0 && !stop_requested(stop) {
std::thread::sleep(Duration::from_micros(sleep_usec));
}
if matrix_size > 0 && !stop_requested(stop) {
let buf = matrix_buf
.get_or_insert_with(|| vec![0u64; 3 * matrix_size * matrix_size]);
for _ in 0..operations {
matrix_multiply(buf, matrix_size, &mut work_units);
work_units = std::hint::black_box(work_units.wrapping_add(1));
}
}
}
last_iter_time = Instant::now();
iterations += 1;
}
WorkType::PageFaultChurn {
region_kb,
touches_per_cycle,
spin_iters,
} => {
let (ptr, region_size) = match page_fault_region {
Some(p) => p,
None => {
let region_size = match region_kb.checked_mul(1024) {
Some(v) => v,
None => {
tracing::warn!(
tid,
region_kb,
"PageFaultChurn region_kb * 1024 overflowed usize — worker exiting outer loop without doing page-fault work"
);
break;
}
};
let ptr = unsafe {
libc::mmap(
std::ptr::null_mut(),
region_size,
libc::PROT_READ | libc::PROT_WRITE,
libc::MAP_PRIVATE | libc::MAP_ANONYMOUS,
-1,
0,
)
};
if ptr == libc::MAP_FAILED {
break;
}
unsafe {
libc::madvise(ptr, region_size, libc::MADV_NOHUGEPAGE);
}
page_fault_rng_state = (tid as u64) | 1;
page_fault_region = Some((ptr, region_size));
(ptr, region_size)
}
};
let page_count = (region_size / 4096).max(1);
let xorshift64 = |state: &mut u64| -> u64 {
let mut x = *state;
x ^= x << 13;
x ^= x >> 7;
x ^= x << 17;
*state = x;
x
};
for _ in 0..touches_per_cycle {
let page_idx = (xorshift64(&mut page_fault_rng_state) as usize) % page_count;
let page_ptr = unsafe { (ptr as *mut u8).add(page_idx * 4096) };
unsafe { std::ptr::write_volatile(page_ptr, 1u8) };
work_units = std::hint::black_box(work_units.wrapping_add(1));
}
unsafe {
libc::madvise(ptr, region_size, libc::MADV_DONTNEED);
}
spin_burst(&mut work_units, spin_iters);
iterations += 1;
}
WorkType::MutexContention {
hold_iters,
work_iters,
..
} => {
let (futex_ptr, _pos) = match futex {
Some(f) => f,
None => break,
};
spin_burst(&mut work_units, work_iters);
let atom = unsafe { &*(futex_ptr as *const std::sync::atomic::AtomicU32) };
loop {
if stop_requested(stop) {
break;
}
if atom
.compare_exchange_weak(0, 1, Ordering::Acquire, Ordering::Relaxed)
.is_ok()
{
break;
}
let before_block = Instant::now();
unsafe {
futex_wait(
futex_ptr,
1u32,
&FUTEX_WAIT_TIMEOUT,
)
};
reservoir_push(
&mut resume_latencies_ns,
&mut wake_sample_count,
before_block.elapsed().as_nanos() as u64,
MAX_WAKE_SAMPLES,
);
}
spin_burst(&mut work_units, hold_iters);
atom.store(0, Ordering::Release);
unsafe { futex_wake(futex_ptr, 1) };
last_iter_time = Instant::now();
iterations += 1;
}
WorkType::ThunderingHerd {
waiters,
batches,
inter_batch_ms,
} => {
let (futex_ptr, pos) = match futex {
Some(f) => f,
None => break,
};
let is_waker = pos == 0;
let atom = unsafe { &*(futex_ptr as *const std::sync::atomic::AtomicU32) };
if is_waker {
let mut batches_done: u64 = 0;
while batches_done < batches && !stop_requested(stop) {
if inter_batch_ms > 0 {
let before_sleep = Instant::now();
std::thread::sleep(Duration::from_millis(inter_batch_ms));
reservoir_push(
&mut resume_latencies_ns,
&mut wake_sample_count,
before_sleep.elapsed().as_nanos() as u64,
MAX_WAKE_SAMPLES,
);
}
let next = atom.load(Ordering::Relaxed).wrapping_add(1);
atom.store(next, Ordering::Relaxed);
unsafe { futex_wake(futex_ptr, clamp_futex_wake_n(usize::MAX)) };
spin_burst(&mut work_units, 256);
batches_done += 1;
}
} else {
let _ = waiters; let expected = atom.load(Ordering::Relaxed);
let before_block = Instant::now();
loop {
if stop_requested(stop) {
break;
}
let cur = atom.load(Ordering::Relaxed);
if cur != expected {
reservoir_push(
&mut resume_latencies_ns,
&mut wake_sample_count,
before_block.elapsed().as_nanos() as u64,
MAX_WAKE_SAMPLES,
);
break;
}
unsafe { futex_wait(futex_ptr, expected, &FUTEX_WAIT_TIMEOUT) };
}
}
last_iter_time = Instant::now();
iterations += 1;
}
WorkType::WakeChain {
depth,
wake,
work_per_hop,
} => {
if depth == 0 {
break;
}
if matches!(wake, WakeMechanism::Pipe) {
let (read_fd, write_fd) = match pipe_fds {
Some(p) => p,
None => break,
};
if read_fd < 0 || write_fd < 0 {
break;
}
let pos = match futex {
Some((_, p)) => p,
None => break,
};
if iterations == 0 && pos == 0 {
if stop_requested(stop) {
iterations += 1;
continue;
}
let one = [0u8; 1];
let _ = unsafe {
libc::write(write_fd, one.as_ptr() as *const libc::c_void, 1)
};
}
let before_block = Instant::now();
let mut pfd = libc::pollfd {
fd: read_fd,
events: libc::POLLIN,
revents: 0,
};
let mut got_byte = false;
loop {
if stop_requested(stop) {
break;
}
let ret = unsafe { libc::poll(&mut pfd, 1, 100) };
if ret > 0 {
let mut buf = [0u8; 1];
let n = unsafe {
libc::read(read_fd, buf.as_mut_ptr() as *mut libc::c_void, 1)
};
reservoir_push(
&mut resume_latencies_ns,
&mut wake_sample_count,
before_block.elapsed().as_nanos() as u64,
MAX_WAKE_SAMPLES,
);
if n == 1 {
got_byte = true;
}
break;
}
if ret < 0 {
break;
}
}
if !got_byte {
if stop_requested(stop) {
iterations += 1;
continue;
}
break;
}
if stop_requested(stop) {
iterations += 1;
continue;
}
let work_end = Instant::now() + work_per_hop;
while Instant::now() < work_end && !stop_requested(stop) {
spin_burst(&mut work_units, 256);
}
if stop_requested(stop) {
iterations += 1;
continue;
}
let one = [0u8; 1];
let _ =
unsafe { libc::write(write_fd, one.as_ptr() as *const libc::c_void, 1) };
last_iter_time = Instant::now();
iterations += 1;
} else {
let (futex_ptr, pos) = match futex {
Some(f) => f,
None => break,
};
if pos >= depth {
break;
}
let atom = unsafe { &*(futex_ptr as *const std::sync::atomic::AtomicU32) };
let my_stage = pos as u32;
let next_stage = ((pos + 1) % depth) as u32;
let before_block = Instant::now();
loop {
if stop_requested(stop) {
break;
}
let cur = atom.load(Ordering::Relaxed);
if cur == my_stage {
reservoir_push(
&mut resume_latencies_ns,
&mut wake_sample_count,
before_block.elapsed().as_nanos() as u64,
MAX_WAKE_SAMPLES,
);
break;
}
unsafe { futex_wait(futex_ptr, cur, &FUTEX_WAIT_TIMEOUT) };
}
if stop_requested(stop) {
iterations += 1;
continue;
}
let work_end = Instant::now() + work_per_hop;
while Instant::now() < work_end && !stop_requested(stop) {
spin_burst(&mut work_units, 256);
}
if stop_requested(stop) {
iterations += 1;
continue;
}
atom.store(next_stage, Ordering::Relaxed);
unsafe { futex_wake(futex_ptr, clamp_futex_wake_n(usize::MAX)) };
last_iter_time = Instant::now();
iterations += 1;
}
}
WorkType::AsymmetricWaker {
waker_class,
wakee_class,
burst_iters,
} => {
let (futex_ptr, pos) = match futex {
Some(f) => f,
None => break,
};
if !per_pos_policy_applied {
let class = if pos == 0 { waker_class } else { wakee_class };
let _ = set_sched_policy(0, class.to_policy());
per_pos_policy_applied = true;
}
let atom = unsafe { &*(futex_ptr as *const std::sync::atomic::AtomicU32) };
if pos == 0 {
spin_burst(&mut work_units, burst_iters);
let next = atom.load(Ordering::Relaxed).wrapping_add(1);
atom.store(next, Ordering::Relaxed);
unsafe { futex_wake(futex_ptr, 1) };
} else {
let expected = atom.load(Ordering::Relaxed);
let before_block = Instant::now();
loop {
if stop_requested(stop) {
break;
}
let cur = atom.load(Ordering::Relaxed);
if cur != expected {
reservoir_push(
&mut resume_latencies_ns,
&mut wake_sample_count,
before_block.elapsed().as_nanos() as u64,
MAX_WAKE_SAMPLES,
);
break;
}
unsafe { futex_wait(futex_ptr, expected, &FUTEX_WAIT_TIMEOUT) };
}
spin_burst(&mut work_units, burst_iters);
}
last_iter_time = Instant::now();
iterations += 1;
}
WorkType::PriorityInversion {
high_count,
medium_count,
low_count,
hold_iters,
work_iters,
pi_mode,
} => {
let (futex_ptr, pos) = match futex {
Some(f) => f,
None => break,
};
let high_end = high_count;
let medium_end = high_count + medium_count;
let total = high_count + medium_count + low_count;
if pos >= total {
break;
}
let (tier_prio, is_low, is_medium) = if pos < high_end {
(70u32, false, false)
} else if pos < medium_end {
(50u32, false, true)
} else {
(30u32, true, false)
};
if !per_pos_policy_applied {
let _ = set_sched_policy(0, SchedPolicy::Fifo(tier_prio));
per_pos_policy_applied = true;
}
let atom = unsafe { &*(futex_ptr as *const std::sync::atomic::AtomicU32) };
if is_medium {
spin_burst(&mut work_units, work_iters);
} else {
spin_burst(&mut work_units, work_iters);
match pi_mode {
FutexLockMode::Pi => {
let lock_rc = unsafe {
libc::syscall(
libc::SYS_futex,
futex_ptr,
libc::FUTEX_LOCK_PI,
0u32,
std::ptr::null::<libc::timespec>(),
std::ptr::null::<u32>(),
0u32,
)
};
if lock_rc == 0 {
spin_burst(&mut work_units, hold_iters);
unsafe {
libc::syscall(
libc::SYS_futex,
futex_ptr,
libc::FUTEX_UNLOCK_PI,
0u32,
std::ptr::null::<libc::timespec>(),
std::ptr::null::<u32>(),
0u32,
);
}
}
}
FutexLockMode::Plain => {
loop {
if stop_requested(stop) {
break;
}
if atom
.compare_exchange_weak(
0,
1,
Ordering::Acquire,
Ordering::Relaxed,
)
.is_ok()
{
break;
}
let before_block = Instant::now();
unsafe {
futex_wait(futex_ptr, 1u32, &FUTEX_WAIT_TIMEOUT);
}
reservoir_push(
&mut resume_latencies_ns,
&mut wake_sample_count,
before_block.elapsed().as_nanos() as u64,
MAX_WAKE_SAMPLES,
);
}
let hold = if is_low { hold_iters } else { work_iters };
spin_burst(&mut work_units, hold);
atom.store(0, Ordering::Release);
unsafe { futex_wake(futex_ptr, 1) };
}
}
}
last_iter_time = Instant::now();
iterations += 1;
}
WorkType::ProducerConsumerImbalance {
producers,
consumers,
produce_rate_hz,
consume_iters,
queue_depth_target,
} => {
let (futex_ptr, pos) = match futex {
Some(f) => f,
None => break,
};
let total = producers + consumers;
if pos >= total || queue_depth_target == 0 {
break;
}
let q_target_usize = std::cmp::min(queue_depth_target as usize, usize::MAX / 8 - 4);
let q = q_target_usize as u64;
if q == 0 {
break;
}
let base = futex_ptr as *mut u8;
let head_atom = unsafe { &*(base as *const std::sync::atomic::AtomicU64) };
let tail_atom = unsafe { &*(base.add(8) as *const std::sync::atomic::AtomicU64) };
let prod_wake_ptr = unsafe { base.add(16) as *mut u32 };
let cons_wake_ptr = unsafe { base.add(20) as *mut u32 };
let pub_head_atom =
unsafe { &*(base.add(24) as *const std::sync::atomic::AtomicU64) };
let ring_base = unsafe { base.add(32) as *mut u64 };
if pos < producers {
let mut next_seq: u64 = 0;
let pace_ns: u64 = if produce_rate_hz == 0 {
0
} else {
1_000_000_000u64 / produce_rate_hz
};
while !stop_requested(stop) {
let mut slot_avail: u64 = 0;
let mut got_slot = false;
loop {
if stop_requested(stop) {
break;
}
let head = head_atom.load(Ordering::Relaxed);
let tail = tail_atom.load(Ordering::Acquire);
if head.wrapping_sub(tail) >= q {
let prod_wake_atom = unsafe {
&*(prod_wake_ptr as *const std::sync::atomic::AtomicU32)
};
let expected = prod_wake_atom.load(Ordering::Relaxed);
let before_block = Instant::now();
unsafe { futex_wait(prod_wake_ptr, expected, &FUTEX_WAIT_TIMEOUT) };
reservoir_push(
&mut resume_latencies_ns,
&mut wake_sample_count,
before_block.elapsed().as_nanos() as u64,
MAX_WAKE_SAMPLES,
);
continue;
}
match head_atom.compare_exchange_weak(
head,
head.wrapping_add(1),
Ordering::Acquire,
Ordering::Relaxed,
) {
Ok(_) => {
slot_avail = head;
got_slot = true;
break;
}
Err(_) => continue,
}
}
if !got_slot || stop_requested(stop) {
break;
}
let slot_idx = (slot_avail % q) as usize;
unsafe {
std::ptr::write_volatile(ring_base.add(slot_idx), next_seq);
}
next_seq = next_seq.wrapping_add(1);
while pub_head_atom.load(Ordering::Acquire) != slot_avail {
if stop_requested(stop) {
break;
}
std::hint::spin_loop();
}
if stop_requested(stop) {
break;
}
pub_head_atom.store(slot_avail.wrapping_add(1), Ordering::Release);
let cons_wake_atom =
unsafe { &*(cons_wake_ptr as *const std::sync::atomic::AtomicU32) };
let cur = cons_wake_atom.load(Ordering::Relaxed);
cons_wake_atom.store(cur.wrapping_add(1), Ordering::Relaxed);
unsafe { futex_wake(cons_wake_ptr, 1) };
work_units = std::hint::black_box(work_units.wrapping_add(1));
if pace_ns > 0 {
let ts = libc::timespec {
tv_sec: (pace_ns / 1_000_000_000) as libc::time_t,
tv_nsec: (pace_ns % 1_000_000_000) as libc::c_long,
};
unsafe {
libc::nanosleep(&ts, std::ptr::null_mut());
}
}
iterations += 1;
}
} else {
while !stop_requested(stop) {
let mut item_idx: u64 = 0;
let mut got_item = false;
loop {
if stop_requested(stop) {
break;
}
let tail = tail_atom.load(Ordering::Relaxed);
let pub_head = pub_head_atom.load(Ordering::Acquire);
if pub_head != tail {
item_idx = tail;
got_item = true;
break;
}
let cons_wake_atom =
unsafe { &*(cons_wake_ptr as *const std::sync::atomic::AtomicU32) };
let expected = cons_wake_atom.load(Ordering::Relaxed);
let before_block = Instant::now();
unsafe { futex_wait(cons_wake_ptr, expected, &FUTEX_WAIT_TIMEOUT) };
reservoir_push(
&mut resume_latencies_ns,
&mut wake_sample_count,
before_block.elapsed().as_nanos() as u64,
MAX_WAKE_SAMPLES,
);
}
if !got_item || stop_requested(stop) {
break;
}
let slot_idx = (item_idx % q) as usize;
let _val = unsafe { std::ptr::read_volatile(ring_base.add(slot_idx)) };
tail_atom.store(item_idx.wrapping_add(1), Ordering::Release);
let prod_wake_atom =
unsafe { &*(prod_wake_ptr as *const std::sync::atomic::AtomicU32) };
let cur = prod_wake_atom.load(Ordering::Relaxed);
prod_wake_atom.store(cur.wrapping_add(1), Ordering::Relaxed);
unsafe { futex_wake(prod_wake_ptr, 1) };
spin_burst(&mut work_units, consume_iters);
iterations += 1;
}
}
last_iter_time = Instant::now();
}
WorkType::RtStarvation {
rt_workers,
cfs_workers: _,
rt_priority,
burst_iters,
} => {
let (_, pos) = match futex {
Some(f) => f,
None => break,
};
if !per_pos_policy_applied {
if pos < rt_workers {
let prio = rt_priority.clamp(1, 99) as u32;
let _ = set_sched_policy(0, SchedPolicy::Fifo(prio));
} else {
let _ = set_sched_policy(0, SchedPolicy::Normal);
}
per_pos_policy_applied = true;
}
spin_burst(&mut work_units, burst_iters);
last_iter_time = Instant::now();
iterations += 1;
}
WorkType::NumaWorkingSetSweep {
region_kb,
sweep_period_ms,
ref target_nodes,
} => {
let region_size = match region_kb.checked_mul(1024) {
Some(v) => v,
None => {
tracing::warn!(
tid,
region_kb,
"NumaWorkingSetSweep region_kb * 1024 overflowed usize"
);
break;
}
};
let (ptr, _) = match page_fault_region {
Some(p) => p,
None => {
let ptr = unsafe {
libc::mmap(
std::ptr::null_mut(),
region_size,
libc::PROT_READ | libc::PROT_WRITE,
libc::MAP_PRIVATE | libc::MAP_ANONYMOUS,
-1,
0,
)
};
if ptr == libc::MAP_FAILED {
break;
}
page_fault_region = Some((ptr, region_size));
(ptr, region_size)
}
};
if !target_nodes.is_empty() {
let phase = (tid as usize) % target_nodes.len();
let node_idx = ((iterations as usize).wrapping_add(phase)) % target_nodes.len();
let (mask, maxnode) = &numa_sweep_masks[node_idx];
const MPOL_MF_MOVE: libc::c_ulong = 1 << 1;
let _ = unsafe {
libc::syscall(
libc::SYS_mbind,
ptr,
region_size as libc::c_ulong,
libc::MPOL_BIND as libc::c_ulong,
mask.as_ptr(),
*maxnode,
MPOL_MF_MOVE,
)
};
}
let page_count = (region_size / 4096).max(1);
for page_idx in 0..page_count {
let page_ptr = unsafe { (ptr as *mut u8).add(page_idx * 4096) };
unsafe { std::ptr::write_volatile(page_ptr, 1u8) };
work_units = std::hint::black_box(work_units.wrapping_add(1));
}
if sweep_period_ms > 0 && !stop_requested(stop) {
let before_sleep = Instant::now();
std::thread::sleep(Duration::from_millis(sweep_period_ms));
reservoir_push(
&mut resume_latencies_ns,
&mut wake_sample_count,
before_sleep.elapsed().as_nanos() as u64,
MAX_WAKE_SAMPLES,
);
}
last_iter_time = Instant::now();
iterations += 1;
}
WorkType::CgroupChurn { groups, cycle_ms } => {
let path_count = cgroup_churn_paths.len();
if path_count == 0 {
let _ = groups; } else {
let target_idx = (iterations as usize) % path_count;
if cgroup_churn_files[target_idx].is_none() {
match std::fs::OpenOptions::new()
.write(true)
.open(&cgroup_churn_paths[target_idx])
{
Ok(f) => cgroup_churn_files[target_idx] = Some(f),
Err(e) => {
tracing::warn!(
?e,
path = %cgroup_churn_paths[target_idx],
"CgroupChurn open failed"
);
}
}
}
if let Some(f) = cgroup_churn_files[target_idx].take() {
use std::io::Write;
match (&f).write_all(&cgroup_churn_tid_bytes) {
Ok(()) => {
cgroup_churn_files[target_idx] = Some(f);
}
Err(e) => {
tracing::warn!(
?e,
path = %cgroup_churn_paths[target_idx],
"CgroupChurn write failed; invalidating cached fd"
);
}
}
}
}
if cycle_ms > 0 && !stop_requested(stop) {
std::thread::sleep(Duration::from_millis(cycle_ms));
}
spin_burst(&mut work_units, 256);
last_iter_time = Instant::now();
iterations += 1;
}
WorkType::SignalStorm {
signals_per_iter,
work_iters,
} => {
use std::sync::Once;
use std::sync::atomic::AtomicU32;
static SIG_HANDLER_INSTALLED: Once = Once::new();
SIG_HANDLER_INSTALLED.call_once(|| {
extern "C" fn handler(_: libc::c_int) {}
let mut sa: libc::sigaction = unsafe { std::mem::zeroed() };
sa.sa_sigaction = handler as *const () as usize;
sa.sa_flags = libc::SA_RESTART;
unsafe {
libc::sigemptyset(&mut sa.sa_mask);
libc::sigaction(libc::SIGUSR1, &sa, std::ptr::null_mut());
}
});
let (futex_ptr, pos) = match futex {
Some(f) => f,
None => break,
};
let slots = futex_ptr as *mut AtomicU32;
let self_slot_idx = pos & 1;
let partner_slot_idx = self_slot_idx ^ 1;
unsafe {
(*slots.add(self_slot_idx)).store(tid as u32, Ordering::Release);
}
let partner_tid =
unsafe { (*slots.add(partner_slot_idx)).load(Ordering::Acquire) as i32 };
if partner_tid != 0 {
for _ in 0..signals_per_iter {
unsafe {
libc::kill(partner_tid, libc::SIGUSR1);
}
work_units = std::hint::black_box(work_units.wrapping_add(1));
}
}
spin_burst(&mut work_units, work_iters);
last_iter_time = Instant::now();
iterations += 1;
}
WorkType::PreemptStorm {
cfs_workers: _,
rt_burst_iters,
rt_sleep_us,
} => {
let pos = match futex {
Some((_, p)) => p,
None => 1,
};
let is_rt = pos == 0;
if is_rt && !per_pos_policy_applied {
let param = libc::sched_param { sched_priority: 1 };
let rc = unsafe { libc::sched_setscheduler(0, libc::SCHED_FIFO, ¶m) };
if rc != 0 {
tracing::warn!(
errno = std::io::Error::last_os_error().raw_os_error(),
"PreemptStorm sched_setscheduler(FIFO) failed \
(need CAP_SYS_NICE / RLIMIT_RTPRIO)"
);
}
per_pos_policy_applied = true;
}
spin_burst(&mut work_units, rt_burst_iters);
if is_rt && rt_sleep_us > 0 && !stop_requested(stop) {
let req = libc::timespec {
tv_sec: (rt_sleep_us / 1_000_000) as libc::time_t,
tv_nsec: ((rt_sleep_us % 1_000_000) * 1_000) as libc::c_long,
};
unsafe {
libc::clock_nanosleep(libc::CLOCK_MONOTONIC, 0, &req, std::ptr::null_mut());
}
}
last_iter_time = Instant::now();
iterations += 1;
}
WorkType::EpollStorm {
producers,
consumers: _,
events_per_burst,
} => {
use std::sync::atomic::AtomicU64;
let (futex_ptr, pos) = match futex {
Some(f) => f,
None => break,
};
let slots = futex_ptr as *mut AtomicU64;
let efd_slot = unsafe { &*slots };
let epfd_slot = unsafe { &*slots.add(1) };
let is_producer = pos < producers;
if pos == 0 && efd_slot.load(Ordering::Acquire) == 0 {
let efd = unsafe { libc::eventfd(0, libc::EFD_CLOEXEC) };
let epfd = unsafe { libc::epoll_create1(libc::EPOLL_CLOEXEC) };
if efd >= 0 && epfd >= 0 {
let mut ev = libc::epoll_event {
events: libc::EPOLLIN as u32,
u64: 0,
};
unsafe {
libc::epoll_ctl(epfd, libc::EPOLL_CTL_ADD, efd, &mut ev);
}
efd_slot.store(efd as u64 + 1, Ordering::Release);
epfd_slot.store(epfd as u64 + 1, Ordering::Release);
}
}
let efd_raw = efd_slot.load(Ordering::Acquire);
let epfd_raw = epfd_slot.load(Ordering::Acquire);
if efd_raw == 0 || epfd_raw == 0 {
spin_burst(&mut work_units, 256);
} else {
let efd = (efd_raw - 1) as libc::c_int;
let epfd = (epfd_raw - 1) as libc::c_int;
if is_producer {
for _ in 0..events_per_burst {
let one: u64 = 1;
unsafe {
libc::write(efd, &one as *const u64 as *const libc::c_void, 8);
}
work_units = std::hint::black_box(work_units.wrapping_add(1));
}
} else {
let mut ev: libc::epoll_event = unsafe { std::mem::zeroed() };
let before_wait = Instant::now();
let n = unsafe { libc::epoll_wait(epfd, &mut ev, 1, 100) };
if n > 0 {
let mut buf = [0u8; 8];
unsafe {
libc::read(efd, buf.as_mut_ptr() as *mut libc::c_void, 8);
}
reservoir_push(
&mut resume_latencies_ns,
&mut wake_sample_count,
before_wait.elapsed().as_nanos() as u64,
MAX_WAKE_SAMPLES,
);
}
spin_burst(&mut work_units, 256);
}
}
last_iter_time = Instant::now();
iterations += 1;
}
WorkType::NumaMigrationChurn { period_ms } => {
fn parse_cpulist_inline(s: &str) -> Vec<usize> {
let mut out = Vec::new();
for part in s.split(',') {
let part = part.trim();
if part.is_empty() {
continue;
}
if let Some((lo, hi)) = part.split_once('-') {
if let (Ok(lo), Ok(hi)) = (lo.parse::<usize>(), hi.parse::<usize>()) {
for c in lo..=hi {
out.push(c);
}
}
} else if let Ok(c) = part.parse::<usize>() {
out.push(c);
}
}
out
}
static NUMA_NODES: std::sync::OnceLock<Vec<Vec<usize>>> =
std::sync::OnceLock::new();
let nodes = NUMA_NODES.get_or_init(|| {
let online = std::fs::read_to_string("/sys/devices/system/node/online")
.unwrap_or_default();
let mut node_cpus: Vec<Vec<usize>> = Vec::new();
for part in online.trim().split(',') {
if let Some((lo, hi)) = part.split_once('-') {
let lo: usize = lo.parse().unwrap_or(0);
let hi: usize = hi.parse().unwrap_or(0);
for n in lo..=hi {
if let Ok(s) = std::fs::read_to_string(format!(
"/sys/devices/system/node/node{}/cpulist",
n
)) {
node_cpus.push(parse_cpulist_inline(s.trim()));
}
}
} else if let Ok(n) = part.parse::<usize>()
&& let Ok(s) = std::fs::read_to_string(format!(
"/sys/devices/system/node/node{}/cpulist",
n
))
{
node_cpus.push(parse_cpulist_inline(s.trim()));
}
}
if node_cpus.is_empty() {
node_cpus.push(vec![0]);
}
node_cpus
});
let target_node = (iterations as usize) % nodes.len();
let cpus = &nodes[target_node];
if !cpus.is_empty() {
let mut set: libc::cpu_set_t = unsafe { std::mem::zeroed() };
unsafe { libc::CPU_ZERO(&mut set) };
for &cpu in cpus {
if cpu < libc::CPU_SETSIZE as usize {
unsafe { libc::CPU_SET(cpu, &mut set) };
}
}
unsafe {
libc::sched_setaffinity(0, std::mem::size_of::<libc::cpu_set_t>(), &set);
}
}
if period_ms > 0 && !stop_requested(stop) {
std::thread::sleep(Duration::from_millis(period_ms));
}
spin_burst(&mut work_units, 256);
last_iter_time = Instant::now();
iterations += 1;
}
WorkType::IdleChurn {
burst_duration,
sleep_duration,
precise_timing,
} => {
if precise_timing && !idle_churn_slack_applied {
let _ = unsafe { libc::prctl(libc::PR_SET_TIMERSLACK, 1u64) };
idle_churn_slack_applied = true;
}
let burst_end = Instant::now() + burst_duration;
while Instant::now() < burst_end && !stop_requested(stop) {
spin_burst(&mut work_units, 256);
}
if stop_requested(stop) {
iterations += 1;
continue;
}
let req = libc::timespec {
tv_sec: sleep_duration.as_secs() as libc::time_t,
tv_nsec: sleep_duration.subsec_nanos() as libc::c_long,
};
let before_sleep = Instant::now();
let nanosleep_rc = unsafe { libc::nanosleep(&req, std::ptr::null_mut()) };
if nanosleep_rc < 0 {
let errno = std::io::Error::last_os_error().raw_os_error();
if errno == Some(libc::EINVAL) {
tracing::error!(
errno = errno,
"IdleChurn nanosleep returned EINVAL; bailing"
);
break;
}
}
let elapsed = before_sleep.elapsed();
let resume_overhead = elapsed.saturating_sub(sleep_duration);
reservoir_push(
&mut resume_latencies_ns,
&mut wake_sample_count,
resume_overhead.as_nanos() as u64,
MAX_WAKE_SAMPLES,
);
last_iter_time = Instant::now();
iterations += 1;
}
WorkType::AluHot { width } => {
let resolved = *alu_hot_resolved_width.get_or_insert_with(|| {
let r = resolve_alu_width(width);
if r != width && !matches!(width, AluWidth::Widest) {
tracing::warn!(
requested = ?width,
resolved = ?r,
tid,
"AluHot width unavailable on this host; downgraded — \
see [`AluWidth`] doc for resolution order"
);
}
r
});
let iter_start = Instant::now();
alu_hot_chain(resolved, ALU_HOT_CHAIN_STEPS, &mut work_units);
reservoir_push(
&mut iteration_costs_ns,
&mut iteration_cost_sample_count,
iter_start.elapsed().as_nanos() as u64,
MAX_WAKE_SAMPLES,
);
iterations += 1;
}
WorkType::SmtSiblingSpin => {
let iter_start = Instant::now();
spin_burst(&mut work_units, 1024);
reservoir_push(
&mut iteration_costs_ns,
&mut iteration_cost_sample_count,
iter_start.elapsed().as_nanos() as u64,
MAX_WAKE_SAMPLES,
);
iterations += 1;
}
WorkType::IpcVariance {
hot_iters,
cold_iters,
period_iters,
} => {
let buf =
ipc_variance_buf.get_or_insert_with(|| vec![0u64; IPC_VARIANCE_REGION_U64]);
let xorshift64 = |state: &mut u64| -> u64 {
let mut x = *state;
x ^= x << 13;
x ^= x >> 7;
x ^= x << 17;
*state = x;
x
};
let iter_start = Instant::now();
let mut completed = true;
for _ in 0..period_iters {
if stop_requested(stop) {
completed = false;
break;
}
alu_hot_chain(AluWidth::Scalar, hot_iters, &mut work_units);
if stop_requested(stop) {
completed = false;
break;
}
let len = buf.len();
if len > 0 {
for _ in 0..cold_iters {
let idx = (xorshift64(&mut ipc_variance_rng) as usize) % len;
let cur = std::hint::black_box(buf[idx]);
buf[idx] = cur.wrapping_add(1);
work_units = std::hint::black_box(work_units.wrapping_add(1));
}
}
}
if completed {
reservoir_push(
&mut iteration_costs_ns,
&mut iteration_cost_sample_count,
iter_start.elapsed().as_nanos() as u64,
MAX_WAKE_SAMPLES,
);
}
iterations += 1;
}
WorkType::Custom { .. } => unreachable!("handled by early return"),
}
let mut now_for_gate: Option<Instant> = None;
if !iter_slot.is_null() {
let now = *now_for_gate.get_or_insert_with(Instant::now);
if now.duration_since(last_iter_slot_publish) >= IT_SLOT_PUBLISH_INTERVAL {
unsafe { &*iter_slot }.store(iterations, Ordering::Relaxed);
last_iter_slot_publish = now;
}
}
if work_units.is_multiple_of(1024) {
let now = *now_for_gate.get_or_insert_with(Instant::now);
let gap = now.duration_since(last_iter_time).as_nanos() as u64;
if gap > max_gap_ns {
max_gap_ns = gap;
max_gap_cpu = last_cpu;
max_gap_at_ns = now.duration_since(start).as_nanos() as u64;
}
last_iter_time = now;
let cpu = sched_getcpu();
if cpu != last_cpu {
migration_count += 1;
cpus_used.insert(cpu);
migrations.push(Migration {
at_ns: now.duration_since(start).as_nanos() as u64,
from_cpu: last_cpu,
to_cpu: cpu,
});
last_cpu = cpu;
}
}
}
if matches!(work_type, WorkType::NiceSweep) {
unsafe { libc::setpriority(libc::PRIO_PROCESS, 0, 0) };
}
if matches!(work_type, WorkType::PolicyChurn { .. }) {
let param = libc::sched_param { sched_priority: 0 };
unsafe { libc::sched_setscheduler(0, libc::SCHED_OTHER, ¶m) };
}
if let Some((ptr, size)) = page_fault_region {
unsafe { libc::munmap(ptr, size) };
}
if !iter_slot.is_null() {
unsafe { &*iter_slot }.store(iterations, Ordering::Relaxed);
}
let wall_time = start.elapsed();
let cpu_time_ns = thread_cpu_time_ns();
let wall_time_ns = wall_time.as_nanos() as u64;
let schedstat_end = read_schedstat(Some(tid));
let (ss_delay_delta, ss_ts_delta, ss_cpu_delta) = match (schedstat_start, schedstat_end) {
(Some((cpu_s, delay_s, ts_s)), Some((cpu_e, delay_e, ts_e))) => (
delay_e.saturating_sub(delay_s),
ts_e.saturating_sub(ts_s),
cpu_e.saturating_sub(cpu_s),
),
_ => (0, 0, 0),
};
let numa_pages = read_numa_maps_pages();
let vmstat_migrated_end = read_vmstat_numa_pages_migrated();
let vmstat_migrated_delta = vmstat_migrated_end.saturating_sub(vmstat_migrated_start);
WorkerReport {
tid,
work_units,
cpu_time_ns,
wall_time_ns,
off_cpu_ns: wall_time_ns.saturating_sub(cpu_time_ns),
migration_count,
cpus_used,
migrations,
max_gap_ms: max_gap_ns / 1_000_000,
max_gap_cpu,
max_gap_at_ms: max_gap_at_ns / 1_000_000,
resume_latencies_ns,
wake_sample_total: wake_sample_count,
iteration_costs_ns,
iteration_cost_sample_total: iteration_cost_sample_count,
iterations,
schedstat_run_delay_ns: ss_delay_delta,
schedstat_run_count: ss_ts_delta,
schedstat_cpu_time_ns: ss_cpu_delta,
completed: true,
numa_pages,
vmstat_numa_pages_migrated: vmstat_migrated_delta,
exit_info: None,
is_messenger: matches!(
work_type,
WorkType::FutexFanOut { .. } | WorkType::FanOutCompute { .. }
) && futex.map(|(_, p)| p == 0).unwrap_or(false),
group_idx,
affinity_error,
}
}
#[inline(never)]
pub(super) fn spin_burst(work_units: &mut u64, count: u64) {
for _ in 0..count {
*work_units = std::hint::black_box(work_units.wrapping_add(1));
std::hint::spin_loop();
}
}
#[inline(never)]
pub(super) fn cache_rmw_loop(buf: &mut [u8], stride: usize, iters: u64, work_units: &mut u64) {
let len = buf.len();
let mut idx = 0;
for _ in 0..iters {
let cur = unsafe { std::ptr::read_volatile(&buf[idx]) };
unsafe { std::ptr::write_volatile(&mut buf[idx], cur.wrapping_add(1)) };
idx = (idx + stride) % len;
*work_units = std::hint::black_box(work_units.wrapping_add(1));
}
}
pub(super) const GOLDEN_RATIO_64: u64 = 0x9E37_79B9_7F4A_7C15;
pub(super) const ALU_HOT_CHAIN_STEPS: u64 = 1024;
pub(super) const IPC_VARIANCE_REGION_U64: usize = 64 * 1024;
pub(super) fn resolve_alu_width(requested: AluWidth) -> AluWidth {
#[cfg(target_arch = "x86_64")]
{
let widest = if std::is_x86_feature_detected!("avx512f") {
AluWidth::Vec512
} else if std::is_x86_feature_detected!("avx2") {
AluWidth::Vec256
} else {
AluWidth::Vec128
};
match requested {
AluWidth::Widest => widest,
AluWidth::Amx => widest,
AluWidth::Vec512 if std::is_x86_feature_detected!("avx512f") => AluWidth::Vec512,
AluWidth::Vec512 => widest,
AluWidth::Vec256 if std::is_x86_feature_detected!("avx2") => AluWidth::Vec256,
AluWidth::Vec256 => widest,
AluWidth::Vec128 => AluWidth::Vec128,
AluWidth::Scalar => AluWidth::Scalar,
}
}
#[cfg(target_arch = "aarch64")]
{
match requested {
AluWidth::Widest | AluWidth::Vec128 => AluWidth::Vec128,
AluWidth::Vec256 | AluWidth::Vec512 | AluWidth::Amx => AluWidth::Vec128,
AluWidth::Scalar => AluWidth::Scalar,
}
}
#[cfg(not(any(target_arch = "x86_64", target_arch = "aarch64")))]
{
let _ = requested;
AluWidth::Scalar
}
}
#[inline(never)]
pub(super) fn alu_hot_chain(width: AluWidth, steps: u64, work_units: &mut u64) {
let mut a: u64 = std::hint::black_box(GOLDEN_RATIO_64);
let mut b: u64 = std::hint::black_box(0xBF58_476D_1CE4_E5B9u64);
let mut c: u64 = std::hint::black_box(0x94D0_49BB_1331_11EBu64);
let mut d: u64 = std::hint::black_box(0x2545_F491_4F6C_DD1Du64);
debug_assert!(
!matches!(width, AluWidth::Widest),
"alu_hot_chain reached with AluWidth::Widest; \
caller must resolve via resolve_alu_width first",
);
for _ in 0..steps {
a = std::hint::black_box(a).wrapping_mul(0xD134_2543_DE82_EF95);
b = std::hint::black_box(b).wrapping_mul(0xC4CE_B9FE_1A85_EC53);
c = std::hint::black_box(c).wrapping_mul(0xFF51_AFD7_ED55_8CCD);
d = std::hint::black_box(d).wrapping_mul(0xCA45_4F47_1E40_FE19);
*work_units = std::hint::black_box(work_units.wrapping_add(1));
}
let sink = a ^ b ^ c ^ d;
let _ = std::hint::black_box(sink);
}
#[inline(never)]
pub(super) fn matrix_multiply(data: &mut [u64], size: usize, work_units: &mut u64) {
debug_assert_eq!(data.len(), 3 * size * size);
let stride = size * size;
for i in 0..size {
for j in 0..size {
let mut acc: u64 = 0;
for k in 0..size {
acc = acc.wrapping_add(
std::hint::black_box(data[i * size + k])
.wrapping_mul(std::hint::black_box(data[stride + k * size + j])),
);
}
unsafe {
std::ptr::write_volatile(
&mut data[2 * stride + i * size + j] as *mut u64,
std::hint::black_box(acc),
);
}
}
}
*work_units = work_units.wrapping_add(std::hint::black_box(data[2 * stride]));
}
pub(super) fn pipe_exchange(
read_fd: i32,
write_fd: i32,
resume_latencies_ns: &mut Vec<u64>,
wake_sample_count: &mut u64,
max_wake_samples: usize,
stop: &AtomicBool,
) {
unsafe { libc::write(write_fd, b"x".as_ptr() as *const _, 1) };
let before_block = Instant::now();
let mut pfd = libc::pollfd {
fd: read_fd,
events: libc::POLLIN,
revents: 0,
};
loop {
if stop_requested(stop) {
break;
}
let ret = unsafe { libc::poll(&mut pfd, 1, 100) };
if ret > 0 {
let mut byte = [0u8; 1];
unsafe { libc::read(read_fd, byte.as_mut_ptr() as *mut _, 1) };
reservoir_push(
resume_latencies_ns,
wake_sample_count,
before_block.elapsed().as_nanos() as u64,
max_wake_samples,
);
break;
}
if ret < 0 {
break;
}
}
}
pub(super) fn reservoir_push(buf: &mut Vec<u64>, count: &mut u64, sample: u64, cap: usize) {
*count += 1;
if buf.len() < cap {
buf.push(sample);
} else {
thread_local! {
static RESERVOIR_RNG: std::cell::Cell<u64> = const {
std::cell::Cell::new(0)
};
}
let r = RESERVOIR_RNG.with(|cell| {
let mut s = cell.get();
if s == 0 {
let tid = unsafe { libc::syscall(libc::SYS_gettid) as u64 };
s = tid.wrapping_mul(GOLDEN_RATIO_64);
if s == 0 {
s = GOLDEN_RATIO_64;
}
}
s ^= s << 13;
s ^= s >> 7;
s ^= s << 17;
cell.set(s);
s
});
let idx = (r % *count) as usize;
if idx < cap {
buf[idx] = sample;
}
}
}
mod sched;
use sched::*;
#[cfg(test)]
mod tests;