use anyhow::{Context, Result};
use std::collections::{BTreeMap, BTreeSet};
use std::io::{Read, Write};
use std::os::unix::io::FromRawFd;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::time::{Duration, Instant};
use super::affinity::{AffinityIntent, ResolvedAffinity, resolve_affinity, set_thread_affinity};
use super::config::{CloneMode, MemPolicy, MpolFlags, SchedPolicy, WorkSpec, WorkloadConfig};
use super::types::*;
use super::worker::worker_main;
pub(super) static STOP: AtomicBool = AtomicBool::new(false);
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct Migration {
pub at_ns: u64,
pub from_cpu: usize,
pub to_cpu: usize,
}
pub fn build_nodemask(nodes: &BTreeSet<usize>) -> (Vec<libc::c_ulong>, libc::c_ulong) {
if nodes.is_empty() {
return (vec![], 0);
}
let max_node = nodes.iter().copied().max().unwrap_or(0);
let mask_bits = max_node + 2;
let bits_per_word = std::mem::size_of::<libc::c_ulong>() * 8;
let mask_words = mask_bits.div_ceil(bits_per_word);
let mut nodemask = vec![0 as libc::c_ulong; mask_words];
for &node in nodes {
nodemask[node / bits_per_word] |= 1 << (node % bits_per_word);
}
(nodemask, mask_bits as libc::c_ulong)
}
const MPOL_PREFERRED_MANY: i32 = 5;
const MPOL_WEIGHTED_INTERLEAVE: i32 = 6;
pub(super) const WORKER_STOP_POLL_NS: libc::c_long = 100_000_000;
pub(super) const FUTEX_WAIT_TIMEOUT: libc::timespec = libc::timespec {
tv_sec: 0,
tv_nsec: WORKER_STOP_POLL_NS,
};
pub(super) const FAN_OUT_POST_WAKE_SPIN_ITERS: u64 = 256;
pub(super) fn apply_mempolicy_with_flags(policy: &MemPolicy, flags: MpolFlags) {
let (mode, node_set): (i32, BTreeSet<usize>) = match policy {
MemPolicy::Default => return,
MemPolicy::Bind(nodes) => (libc::MPOL_BIND, nodes.clone()),
MemPolicy::Preferred(node) => (libc::MPOL_PREFERRED, [*node].into_iter().collect()),
MemPolicy::Interleave(nodes) => (libc::MPOL_INTERLEAVE, nodes.clone()),
MemPolicy::PreferredMany(nodes) => (MPOL_PREFERRED_MANY, nodes.clone()),
MemPolicy::WeightedInterleave(nodes) => (MPOL_WEIGHTED_INTERLEAVE, nodes.clone()),
MemPolicy::Local => {
let rc = unsafe {
libc::syscall(
libc::SYS_set_mempolicy,
libc::MPOL_LOCAL | flags.bits() as i32,
std::ptr::null::<libc::c_ulong>(),
0 as libc::c_ulong,
)
};
if rc != 0 {
eprintln!(
"ktstr: set_mempolicy(MPOL_LOCAL) failed: {}",
std::io::Error::last_os_error(),
);
}
return;
}
};
if node_set.is_empty() {
eprintln!("ktstr: set_mempolicy: empty node set, skipping");
return;
}
let (mask, maxnode) = build_nodemask(&node_set);
let effective_mode = mode | flags.bits() as i32;
let rc = unsafe {
libc::syscall(
libc::SYS_set_mempolicy,
effective_mode,
mask.as_ptr(),
maxnode,
)
};
if rc != 0 {
eprintln!(
"ktstr: set_mempolicy(mode={}, nodes={:?}) failed: {}",
mode,
node_set,
std::io::Error::last_os_error(),
);
}
}
pub(super) fn apply_nice(nice: i32) {
if nice == 0 {
return;
}
let rc = unsafe { libc::setpriority(libc::PRIO_PROCESS, 0, nice) };
if rc != 0 {
warn_setpriority_failed_once();
}
}
pub(super) fn warn_setpriority_failed_once() {
static WARNED: std::sync::Once = std::sync::Once::new();
WARNED.call_once(|| {
let errno = std::io::Error::last_os_error();
eprintln!(
"workload: setpriority(PRIO_PROCESS) failed: {errno}; nice value not applied (CAP_SYS_NICE may be required for negative nice)"
);
});
}
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize, crate::Claim)]
pub struct WorkerReport {
pub tid: i32,
pub work_units: u64,
pub cpu_time_ns: u64,
pub wall_time_ns: u64,
pub off_cpu_ns: u64,
pub migration_count: u64,
pub cpus_used: BTreeSet<usize>,
pub migrations: Vec<Migration>,
pub max_gap_ms: u64,
pub max_gap_cpu: usize,
pub max_gap_at_ms: u64,
pub resume_latencies_ns: Vec<u64>,
pub wake_sample_total: u64,
pub iteration_costs_ns: Vec<u64>,
pub iteration_cost_sample_total: u64,
pub iterations: u64,
pub schedstat_run_delay_ns: u64,
pub schedstat_run_count: u64,
pub schedstat_cpu_time_ns: u64,
#[serde(default)]
pub completed: bool,
#[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
pub numa_pages: BTreeMap<usize, u64>,
pub vmstat_numa_pages_migrated: u64,
#[serde(skip_serializing_if = "Option::is_none")]
pub exit_info: Option<WorkerExitInfo>,
#[serde(default)]
pub is_messenger: bool,
#[serde(default)]
pub group_idx: usize,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub affinity_error: Option<String>,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub enum WorkerExitInfo {
Exited(i32),
Signaled(i32),
TimedOut,
WaitFailed(String),
Panicked(String),
}
pub(super) fn classify_wait_outcome(
source: Result<nix::sys::wait::WaitStatus, nix::errno::Errno>,
) -> WorkerExitInfo {
match source {
Ok(nix::sys::wait::WaitStatus::Exited(_, code)) => WorkerExitInfo::Exited(code),
Ok(nix::sys::wait::WaitStatus::Signaled(_, sig, _)) => WorkerExitInfo::Signaled(sig as i32),
Ok(nix::sys::wait::WaitStatus::StillAlive) => WorkerExitInfo::TimedOut,
Ok(_) => WorkerExitInfo::TimedOut,
Err(e) => WorkerExitInfo::WaitFailed(e.to_string()),
}
}
pub(super) fn extract_panic_payload(payload: Box<dyn std::any::Any + Send>) -> String {
if let Some(s) = payload.downcast_ref::<&'static str>() {
(*s).to_string()
} else if let Some(s) = payload.downcast_ref::<String>() {
s.clone()
} else {
"<non-string panic payload>".to_string()
}
}
pub(super) const THREAD_JOIN_TIMEOUT: Duration = Duration::from_secs(5);
pub(super) fn join_thread_with_timeout(
join: std::thread::JoinHandle<WorkerReport>,
exit_evt: &vmm_sys_util::eventfd::EventFd,
timeout: Duration,
) -> Option<std::thread::Result<WorkerReport>> {
use std::os::unix::io::AsRawFd;
use vmm_sys_util::epoll::{ControlOperation, Epoll, EpollEvent, EventSet};
use vmm_sys_util::timerfd::TimerFd;
if join.is_finished() {
return Some(join.join());
}
let epoll = match Epoll::new() {
Ok(e) => e,
Err(e) => {
tracing::warn!(%e, "join_thread_with_timeout: epoll_create1 failed");
return None;
}
};
if let Err(e) = epoll.ctl(
ControlOperation::Add,
exit_evt.as_raw_fd(),
EpollEvent::new(EventSet::IN, 0),
) {
tracing::warn!(%e, "join_thread_with_timeout: add exit_evt to epoll");
return None;
}
let mut timer = match TimerFd::new() {
Ok(t) => t,
Err(e) => {
tracing::warn!(%e, "join_thread_with_timeout: timerfd_create failed");
return None;
}
};
if let Err(e) = timer.reset(timeout, None) {
tracing::warn!(%e, "join_thread_with_timeout: timerfd_settime failed");
return None;
}
if let Err(e) = epoll.ctl(
ControlOperation::Add,
timer.as_raw_fd(),
EpollEvent::new(EventSet::IN, 1),
) {
tracing::warn!(%e, "join_thread_with_timeout: add timerfd to epoll");
return None;
}
let deadline = Instant::now() + timeout;
let mut events = [EpollEvent::default(); 2];
loop {
if join.is_finished() {
return Some(join.join());
}
let remaining = deadline.saturating_duration_since(Instant::now());
if remaining.is_zero() {
return None;
}
match epoll.wait(-1, &mut events) {
Ok(_) => {}
Err(e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
Err(e) => {
tracing::warn!(%e, "join_thread_with_timeout: epoll_wait failed");
return None;
}
}
}
}
#[inline]
pub(super) fn stop_requested(stop: &AtomicBool) -> bool {
stop.load(Ordering::Relaxed) || STOP.load(Ordering::Relaxed)
}
pub(super) struct ThreadWorker {
tid: std::sync::Arc<std::sync::atomic::AtomicI32>,
stop: std::sync::Arc<AtomicBool>,
pub(super) start_tx: Option<std::sync::mpsc::SyncSender<()>>,
join: Option<std::thread::JoinHandle<WorkerReport>>,
exit_evt: std::sync::Arc<vmm_sys_util::eventfd::EventFd>,
}
impl Drop for ThreadWorker {
fn drop(&mut self) {
if let Some(j) = self.join.take() {
self.stop.store(true, Ordering::Relaxed);
self.start_tx.take();
let _ = join_thread_with_timeout(j, &self.exit_evt, THREAD_JOIN_TIMEOUT);
}
}
}
#[must_use = "dropping a WorkloadHandle immediately tears down all worker tasks"]
pub struct WorkloadHandle {
children: Vec<(
libc::pid_t,
std::os::unix::io::RawFd,
std::os::unix::io::RawFd,
)>,
threads: Vec<ThreadWorker>,
started: bool,
futex_ptrs: Vec<*mut u32>,
futex_region_sizes: Vec<usize>,
iter_counters: *mut AtomicU64,
iter_counter_len: usize,
pipe_pairs: Vec<([i32; 2], [i32; 2])>,
chain_pipes: Vec<Vec<[i32; 2]>>,
}
pub(super) fn futex_region_size_for(work_type: &WorkType) -> usize {
match work_type {
WorkType::FanOutCompute { .. } => 16,
WorkType::ProducerConsumerImbalance {
queue_depth_target, ..
} => {
let q = std::cmp::min(*queue_depth_target as usize, usize::MAX / 8 - 4);
32 + q * 8
}
_ => std::mem::size_of::<u32>(),
}
}
pub(super) struct SpawnGuard {
pipe_pairs: Vec<([i32; 2], [i32; 2])>,
chain_pipes: Vec<Vec<[i32; 2]>>,
futex_ptrs: Vec<*mut u32>,
futex_region_sizes: Vec<usize>,
iter_counters: *mut AtomicU64,
iter_counter_bytes: usize,
children: Vec<(libc::pid_t, i32, i32)>,
threads: Vec<ThreadWorker>,
}
impl SpawnGuard {
fn new() -> Self {
Self {
pipe_pairs: Vec::new(),
chain_pipes: Vec::new(),
futex_ptrs: Vec::new(),
futex_region_sizes: Vec::new(),
iter_counters: std::ptr::null_mut(),
iter_counter_bytes: 0,
children: Vec::new(),
threads: Vec::new(),
}
}
fn into_handle(mut self) -> WorkloadHandle {
let children = std::mem::take(&mut self.children);
let threads = std::mem::take(&mut self.threads);
let futex_ptrs = std::mem::take(&mut self.futex_ptrs);
let futex_region_sizes = std::mem::take(&mut self.futex_region_sizes);
let iter_counters = std::mem::replace(&mut self.iter_counters, std::ptr::null_mut());
let iter_counter_bytes = std::mem::replace(&mut self.iter_counter_bytes, 0);
let iter_counter_len = iter_counter_bytes / std::mem::size_of::<AtomicU64>();
let pipe_pairs = std::mem::take(&mut self.pipe_pairs);
let chain_pipes = std::mem::take(&mut self.chain_pipes);
WorkloadHandle {
children,
threads,
started: false,
futex_ptrs,
futex_region_sizes,
iter_counters,
iter_counter_len,
pipe_pairs,
chain_pipes,
}
}
}
impl Drop for SpawnGuard {
fn drop(&mut self) {
for &(pid, _, _) in &self.children {
let npid = nix::unistd::Pid::from_raw(pid);
let _ = nix::sys::signal::kill(npid, nix::sys::signal::Signal::SIGKILL);
let _ = nix::sys::wait::waitpid(npid, None);
}
for &(_, rfd, wfd) in &self.children {
for fd in [rfd, wfd] {
if fd >= 0 {
let _ = nix::unistd::close(fd);
}
}
}
for tw in &mut self.threads {
tw.stop.store(true, Ordering::Relaxed);
tw.start_tx.take();
if let Some(j) = tw.join.take() {
let _ = join_thread_with_timeout(j, &tw.exit_evt, THREAD_JOIN_TIMEOUT);
}
}
for (ab, ba) in &self.pipe_pairs {
for fd in [ab[0], ab[1], ba[0], ba[1]] {
let _ = nix::unistd::close(fd);
}
}
for chain in &self.chain_pipes {
for pipe in chain {
let _ = nix::unistd::close(pipe[0]);
let _ = nix::unistd::close(pipe[1]);
}
}
for (&ptr, &size) in self.futex_ptrs.iter().zip(self.futex_region_sizes.iter()) {
unsafe {
libc::munmap(ptr as *mut libc::c_void, size);
}
}
if !self.iter_counters.is_null() && self.iter_counter_bytes > 0 {
unsafe {
libc::munmap(
self.iter_counters as *mut libc::c_void,
self.iter_counter_bytes,
);
}
}
}
}
unsafe impl Send for WorkloadHandle {}
unsafe impl Sync for WorkloadHandle {}
#[derive(Clone, Copy)]
pub(super) struct SendFutexPtr(Option<(usize, usize)>);
#[derive(Clone, Copy)]
pub(super) struct SendIterSlotPtr(usize);
impl SendFutexPtr {
fn new(p: Option<(*mut u32, usize)>) -> Self {
SendFutexPtr(p.map(|(ptr, pos)| (ptr as usize, pos)))
}
fn into_raw(self) -> Option<(*mut u32, usize)> {
self.0.map(|(addr, pos)| (addr as *mut u32, pos))
}
}
impl SendIterSlotPtr {
fn new(p: *mut AtomicU64) -> Self {
SendIterSlotPtr(p as usize)
}
fn into_raw(self) -> *mut AtomicU64 {
self.0 as *mut AtomicU64
}
}
#[derive(Clone)]
pub(super) struct GroupParams {
work_type: WorkType,
sched_policy: SchedPolicy,
mem_policy: MemPolicy,
mpol_flags: MpolFlags,
nice: i32,
affinity: ResolvedAffinity,
num_workers: usize,
group_idx: usize,
}
impl GroupParams {
fn from_work_spec(
spec: &WorkSpec,
group_idx: usize,
resolved_affinity: ResolvedAffinity,
resolved_num_workers: usize,
) -> Self {
Self {
work_type: spec.work_type.clone(),
sched_policy: spec.sched_policy,
mem_policy: spec.mem_policy.clone(),
mpol_flags: spec.mpol_flags,
nice: spec.nice,
affinity: resolved_affinity,
num_workers: resolved_num_workers,
group_idx,
}
}
pub(super) fn resolve_spawn_affinity(
intent: &AffinityIntent,
site: &str,
) -> Result<ResolvedAffinity> {
match intent {
AffinityIntent::Inherit => Ok(ResolvedAffinity::None),
AffinityIntent::Exact(cpus) => {
if cpus.is_empty() {
anyhow::bail!(
"{site} = AffinityIntent::Exact with empty CPU set \
would produce EINVAL from sched_setaffinity; \
use AffinityIntent::Inherit for no affinity \
constraint",
);
}
Ok(ResolvedAffinity::Fixed(cpus.clone()))
}
AffinityIntent::RandomSubset { from, count } => {
if from.is_empty() {
anyhow::bail!(
"{site} = AffinityIntent::RandomSubset with empty \
pool; use AffinityIntent::Inherit for no affinity \
constraint",
);
}
if *count == 0 {
anyhow::bail!(
"{site} = AffinityIntent::RandomSubset with \
count=0; use AffinityIntent::Inherit for no \
affinity constraint",
);
}
Ok(ResolvedAffinity::Random {
from: from.clone(),
count: *count,
})
}
AffinityIntent::SingleCpu
| AffinityIntent::LlcAligned
| AffinityIntent::CrossCgroup
| AffinityIntent::SmtSiblingPair => {
anyhow::bail!(
"{site} = {:?} requires scenario context; use \
AffinityIntent::Exact(set), \
AffinityIntent::RandomSubset {{ from, count }}, \
or AffinityIntent::Inherit when spawning directly \
via WorkloadHandle::spawn. Topology-aware variants \
resolve automatically inside #[ktstr_test] \
scenarios.",
intent,
);
}
}
}
fn primary(config: &WorkloadConfig) -> Result<Self> {
let resolved_affinity =
Self::resolve_spawn_affinity(&config.affinity, "WorkloadConfig::affinity")?;
let spec = WorkSpec {
work_type: config.work_type.clone(),
sched_policy: config.sched_policy,
num_workers: Some(config.num_workers),
affinity: AffinityIntent::Inherit,
mem_policy: config.mem_policy.clone(),
mpol_flags: config.mpol_flags,
nice: config.nice,
};
Ok(Self::from_work_spec(
&spec,
0,
resolved_affinity,
config.num_workers,
))
}
fn from_composed(spec: &WorkSpec, group_idx: usize) -> Result<Self> {
let num_workers = spec.num_workers.ok_or_else(|| {
anyhow::anyhow!(
"composed[{}].num_workers must be set explicitly at spawn time \
(the Some/None resolution via Ctx::workers_per_cgroup is only \
available through the scenario engine; \
WorkloadHandle::spawn requires a concrete count)",
group_idx - 1,
)
})?;
let site = format!("composed[{}].affinity", group_idx - 1);
let affinity = Self::resolve_spawn_affinity(&spec.affinity, &site)?;
Ok(Self::from_work_spec(spec, group_idx, affinity, num_workers))
}
}
#[allow(clippy::too_many_arguments)]
pub(super) fn spawn_thread_worker(
guard: &mut SpawnGuard,
group: &GroupParams,
affinity: Option<BTreeSet<usize>>,
worker_pipe_fds: Option<(i32, i32)>,
worker_futex: Option<(*mut u32, usize)>,
iter_slot: *mut AtomicU64,
) -> Result<()> {
use std::sync::Arc;
use std::sync::atomic::AtomicI32;
use std::sync::mpsc;
let (start_tx, start_rx) = mpsc::sync_channel::<()>(0);
let stop = Arc::new(AtomicBool::new(false));
let tid = Arc::new(AtomicI32::new(0));
let exit_evt = Arc::new(
vmm_sys_util::eventfd::EventFd::new(libc::EFD_NONBLOCK)
.context("create thread-worker exit eventfd")?,
);
let stop_thread = Arc::clone(&stop);
let tid_thread = Arc::clone(&tid);
let exit_evt_thread = Arc::clone(&exit_evt);
let work_type = group.work_type.clone();
let sched_policy = group.sched_policy;
let mem_policy = group.mem_policy.clone();
let mpol_flags = group.mpol_flags;
let nice = group.nice;
let group_idx = group.group_idx;
let num_workers = group.num_workers;
let futex_send = SendFutexPtr::new(worker_futex);
let iter_slot_send = SendIterSlotPtr::new(iter_slot);
let join = std::thread::Builder::new()
.name(format!("ktstr-worker-g{group_idx}-{}", guard.threads.len()))
.spawn(move || {
struct WorkerExitSignal(std::sync::Arc<vmm_sys_util::eventfd::EventFd>);
impl Drop for WorkerExitSignal {
fn drop(&mut self) {
let _ = self.0.write(1);
}
}
let _exit_signal = WorkerExitSignal(exit_evt_thread);
let my_tid: libc::pid_t = unsafe { libc::syscall(libc::SYS_gettid) as libc::pid_t };
tid_thread.store(my_tid, Ordering::Release);
if start_rx.recv().is_err() {
return WorkerReport {
tid: my_tid,
completed: false,
group_idx,
..WorkerReport::default()
};
}
let futex = futex_send.into_raw();
let slot = iter_slot_send.into_raw();
worker_main(
affinity,
work_type,
sched_policy,
mem_policy,
mpol_flags,
nice,
worker_pipe_fds,
futex,
slot,
&stop_thread,
group_idx,
)
})
.with_context(|| {
format!(
"thread::spawn for worker {}/{} (group {}) failed",
guard.threads.len() + 1,
num_workers,
group_idx,
)
})?;
guard.threads.push(ThreadWorker {
tid,
stop,
start_tx: Some(start_tx),
join: Some(join),
exit_evt,
});
Ok(())
}
pub(super) enum Dispatch {
Fork,
Thread,
}
impl WorkloadHandle {
pub fn spawn(config: &WorkloadConfig) -> Result<Self> {
let dispatch = match &config.clone_mode {
CloneMode::Fork => Dispatch::Fork,
CloneMode::Thread => Dispatch::Thread,
};
let mut groups: Vec<GroupParams> = Vec::with_capacity(1 + config.composed.len());
groups.push(GroupParams::primary(config)?);
for (i, spec) in config.composed.iter().enumerate() {
groups.push(GroupParams::from_composed(spec, i + 1)?);
}
for group in &groups {
if matches!(dispatch, Dispatch::Thread) && matches!(group.work_type, WorkType::ForkExit)
{
anyhow::bail!(
"CloneMode::Thread is incompatible with WorkType::ForkExit \
(group {}) — ForkExit forks inside the worker, which under \
a thread-group worker tears down every sibling thread on \
the child's _exit. Use CloneMode::Fork for ForkExit workloads.",
group.group_idx,
);
}
if matches!(dispatch, Dispatch::Fork)
&& matches!(group.work_type, WorkType::EpollStorm { .. })
{
anyhow::bail!(
"CloneMode::Fork is incompatible with WorkType::EpollStorm \
(group {}) — EpollStorm publishes eventfd/epoll fd numbers \
through a shared mmap region for siblings to consume, but \
forked children hold independent fd tables that never \
contain those post-fork descriptors. Use CloneMode::Thread \
for EpollStorm workloads.",
group.group_idx,
);
}
if matches!(dispatch, Dispatch::Thread)
&& matches!(group.work_type, WorkType::CgroupChurn { .. })
{
anyhow::bail!(
"CloneMode::Thread is incompatible with WorkType::CgroupChurn \
(group {}) — CgroupChurn writes the worker tid to \
`cgroup.procs`, which the kernel resolves to the whole tgid \
and migrates every sibling thread (including the harness) \
to the target cgroup. Use CloneMode::Fork for CgroupChurn \
workloads so each worker is a separate tgid.",
group.group_idx,
);
}
if let Some(group_size) = group.work_type.worker_group_size()
&& (group.num_workers == 0 || !group.num_workers.is_multiple_of(group_size))
{
return Err(WorkTypeValidationError::NonDivisibleWorkerCount {
name: group.work_type.name().to_string(),
group_idx: group.group_idx,
group_size,
num_workers: group.num_workers,
}
.into());
}
let group_chain_depth = group.work_type.chain_pipe_depth();
if let Some(depth) = group_chain_depth
&& depth < 2
{
return Err(WorkTypeValidationError::InsufficientWakeChainDepth {
depth,
group_idx: group.group_idx,
}
.into());
}
if let WorkType::IdleChurn {
burst_duration,
sleep_duration,
..
} = group.work_type
{
if burst_duration.is_zero() {
return Err(WorkTypeValidationError::ZeroBurstDuration {
group_idx: group.group_idx,
}
.into());
}
if sleep_duration.is_zero() {
return Err(WorkTypeValidationError::ZeroSleepDuration {
group_idx: group.group_idx,
}
.into());
}
}
if let WorkType::IpcVariance {
hot_iters,
cold_iters,
period_iters,
} = group.work_type
{
if hot_iters == 0 {
return Err(WorkTypeValidationError::ZeroIpcVarianceParam {
field: "hot_iters",
group_idx: group.group_idx,
}
.into());
}
if cold_iters == 0 {
return Err(WorkTypeValidationError::ZeroIpcVarianceParam {
field: "cold_iters",
group_idx: group.group_idx,
}
.into());
}
if period_iters == 0 {
return Err(WorkTypeValidationError::ZeroIpcVarianceParam {
field: "period_iters",
group_idx: group.group_idx,
}
.into());
}
}
}
let mut guard = SpawnGuard::new();
let total_workers: usize = groups.iter().map(|g| g.num_workers).sum();
if total_workers > 0 {
let size = total_workers * std::mem::size_of::<AtomicU64>();
let ptr = unsafe {
libc::mmap(
std::ptr::null_mut(),
size,
libc::PROT_READ | libc::PROT_WRITE,
libc::MAP_SHARED | libc::MAP_ANONYMOUS,
-1,
0,
)
};
if ptr == libc::MAP_FAILED {
let errno = std::io::Error::last_os_error();
let hint = mmap_shared_anon_errno_hint(errno.raw_os_error());
anyhow::bail!(
"mmap(MAP_SHARED|MAP_ANONYMOUS, {size} bytes) for the \
per-worker iter_counters region failed: {errno}{hint}; \
this region holds one AtomicU64 per worker \
({total_workers} slots across {} group(s)) so the parent \
can snapshot iteration counts via \
`snapshot_iterations()`. Remediation: reduce num_workers \
(each worker consumes 8 bytes of this region, rounded up \
to a page) or raise `vm.max_map_count` / the memory \
cgroup limit.",
groups.len(),
);
}
guard.iter_counters = ptr as *mut AtomicU64;
guard.iter_counter_bytes = size;
}
let mut iter_offset: usize = 0;
for group in &groups {
Self::spawn_group(&mut guard, group, &dispatch, iter_offset)?;
iter_offset += group.num_workers;
}
Ok(guard.into_handle())
}
#[allow(clippy::too_many_arguments)]
fn spawn_group(
guard: &mut SpawnGuard,
group: &GroupParams,
dispatch: &Dispatch,
iter_offset: usize,
) -> Result<()> {
let needs_pipes = matches!(
group.work_type,
WorkType::PipeIo { .. } | WorkType::CachePipe { .. }
);
let chain_depth = group.work_type.chain_pipe_depth();
let needs_futex = group.work_type.needs_shared_mem();
let pipe_pair_base = guard.pipe_pairs.len();
let chain_pipes_base = guard.chain_pipes.len();
let futex_ptrs_base = guard.futex_ptrs.len();
let futex_region_size = futex_region_size_for(&group.work_type);
if needs_pipes {
for _ in 0..group.num_workers / 2 {
let mut ab = [0i32; 2]; if unsafe { libc::pipe2(ab.as_mut_ptr(), libc::O_CLOEXEC) } != 0 {
anyhow::bail!("pipe2 failed: {}", std::io::Error::last_os_error());
}
let mut ba = [0i32; 2]; if unsafe { libc::pipe2(ba.as_mut_ptr(), libc::O_CLOEXEC) } != 0 {
unsafe {
libc::close(ab[0]);
libc::close(ab[1]);
}
anyhow::bail!("pipe2 failed: {}", std::io::Error::last_os_error());
}
guard.pipe_pairs.push((ab, ba));
}
}
if let Some(depth) = chain_depth
&& depth > 0
&& group.num_workers >= depth
{
let chains = group.num_workers / depth;
for _ in 0..chains {
let mut chain: Vec<[i32; 2]> = Vec::with_capacity(depth);
let mut alloc_ok = true;
for _ in 0..depth {
let mut p = [0i32; 2];
if unsafe { libc::pipe2(p.as_mut_ptr(), libc::O_CLOEXEC) } != 0 {
alloc_ok = false;
break;
}
chain.push(p);
}
if !alloc_ok {
for p in &chain {
unsafe {
libc::close(p[0]);
libc::close(p[1]);
}
}
anyhow::bail!(
"WakeChain pipe2 allocation failed: {}",
std::io::Error::last_os_error()
);
}
guard.chain_pipes.push(chain);
}
}
let futex_group_size = group.work_type.worker_group_size().unwrap_or(2);
if needs_futex {
for _ in 0..group.num_workers / futex_group_size {
let ptr = unsafe {
libc::mmap(
std::ptr::null_mut(),
futex_region_size,
libc::PROT_READ | libc::PROT_WRITE,
libc::MAP_SHARED | libc::MAP_ANONYMOUS,
-1,
0,
)
};
if ptr == libc::MAP_FAILED {
let errno = std::io::Error::last_os_error();
let hint = mmap_shared_anon_errno_hint(errno.raw_os_error());
anyhow::bail!(
"mmap(MAP_SHARED|MAP_ANONYMOUS, {futex_region_size} bytes) \
for a futex shared-memory region failed: {errno}{hint}; \
this region backs the {:?} worker-group's (group {}) \
inter-process futex word and is allocated \
before fork so every child inherits the same \
mapping. Remediation: reduce num_workers (each \
futex group consumes one shared page) or raise \
`vm.max_map_count` / the memory cgroup limit.",
group.work_type.name(),
group.group_idx,
);
}
unsafe { std::ptr::write_bytes(ptr as *mut u8, 0, futex_region_size) };
guard.futex_ptrs.push(ptr as *mut u32);
guard.futex_region_sizes.push(futex_region_size);
}
}
for i in 0..group.num_workers {
let affinity = resolve_affinity(&group.affinity)?;
let worker_pipe_fds: Option<(i32, i32)> = if needs_pipes {
let pair_idx = pipe_pair_base + i / 2;
let (ref ab, ref ba) = guard.pipe_pairs[pair_idx];
if i % 2 == 0 {
Some((ba[0], ab[1]))
} else {
Some((ab[0], ba[1]))
}
} else if let Some(depth) = chain_depth
&& depth > 0
{
let chain_idx = chain_pipes_base + i / depth;
let stage = i % depth;
let prev_stage = (stage + depth - 1) % depth;
let chain = &guard.chain_pipes[chain_idx];
Some((chain[prev_stage][0], chain[stage][1]))
} else {
None
};
let worker_futex: Option<(*mut u32, usize)> = if needs_futex {
let futex_group_idx = futex_ptrs_base + i / futex_group_size;
let pos = i % futex_group_size;
Some((guard.futex_ptrs[futex_group_idx], pos))
} else {
None
};
let iter_slot: *mut AtomicU64 = if !guard.iter_counters.is_null() {
unsafe { guard.iter_counters.add(iter_offset + i) }
} else {
std::ptr::null_mut()
};
match dispatch {
Dispatch::Thread => {
spawn_thread_worker(
guard,
group,
affinity,
worker_pipe_fds,
worker_futex,
iter_slot,
)?;
continue;
}
Dispatch::Fork => {
}
}
let mut report_fds = [0i32; 2];
if unsafe { libc::pipe2(report_fds.as_mut_ptr(), libc::O_CLOEXEC) } != 0 {
anyhow::bail!(
"worker {}/{} (group {}): report pipe2 failed: {}",
i + 1,
group.num_workers,
group.group_idx,
std::io::Error::last_os_error(),
);
}
const REPORT_PIPE_SIZE: libc::c_int = 8 * 1024 * 1024;
let prev_size =
unsafe { libc::fcntl(report_fds[1], libc::F_SETPIPE_SZ, REPORT_PIPE_SIZE) };
if prev_size < 0 {
let err = std::io::Error::last_os_error();
tracing::warn!(
worker = i + 1,
num_workers = group.num_workers,
group_idx = group.group_idx,
requested_size = REPORT_PIPE_SIZE,
%err,
"F_SETPIPE_SZ on report pipe failed; falling back to default \
pipe capacity. Workers producing >64 KiB reports may block \
in write_all and have their reports truncated by the \
parent's deadline-driven fd close."
);
}
let mut start_fds = [0i32; 2];
if unsafe { libc::pipe2(start_fds.as_mut_ptr(), libc::O_CLOEXEC) } != 0 {
unsafe {
libc::close(report_fds[0]);
libc::close(report_fds[1]);
}
anyhow::bail!(
"worker {}/{} (group {}): start pipe2 failed: {}",
i + 1,
group.num_workers,
group.group_idx,
std::io::Error::last_os_error(),
);
}
let mut old_mask: libc::sigset_t = unsafe { std::mem::zeroed() };
let mut block_mask: libc::sigset_t = unsafe { std::mem::zeroed() };
let psm_block_rc = unsafe {
libc::sigemptyset(&mut block_mask);
libc::sigaddset(&mut block_mask, libc::SIGUSR1);
libc::pthread_sigmask(libc::SIG_BLOCK, &block_mask, &mut old_mask)
};
if psm_block_rc != 0 {
tracing::warn!(
rc = psm_block_rc,
"pthread_sigmask(SIG_BLOCK, SIGUSR1) failed pre-fork; child inherits unblocked SIGUSR1 and may terminate on default action before installing handler"
);
}
let pid = unsafe { libc::fork() };
match pid {
-1 => {
let psm_restore_rc = unsafe {
libc::pthread_sigmask(libc::SIG_SETMASK, &old_mask, std::ptr::null_mut())
};
if psm_restore_rc != 0 {
tracing::warn!(
rc = psm_restore_rc,
"pthread_sigmask(SIG_SETMASK) failed restoring mask after fork failure; SIGUSR1 may remain blocked in this thread"
);
}
unsafe {
libc::close(report_fds[0]);
libc::close(report_fds[1]);
libc::close(start_fds[0]);
libc::close(start_fds[1]);
}
anyhow::bail!(
"worker {}/{} (group {}): fork failed: {}",
i + 1,
group.num_workers,
group.group_idx,
std::io::Error::last_os_error(),
);
}
0 => {
unsafe {
libc::prctl(libc::PR_SET_PDEATHSIG, libc::SIGKILL);
}
if std::env::var_os("KTSTR_GUEST_INIT").is_none()
&& unsafe { libc::getppid() } == 1
{
unsafe {
libc::_exit(0);
}
}
unsafe {
libc::setpgid(0, 0);
}
STOP.store(false, Ordering::Relaxed);
let sig_prev = unsafe {
libc::signal(
libc::SIGUSR1,
sigusr1_handler as *const () as libc::sighandler_t,
)
};
if sig_prev == libc::SIG_ERR {
let errno = std::io::Error::last_os_error();
eprintln!(
"ktstr: signal(SIGUSR1) install failed in worker child: {errno}; graceful stop unavailable, killpg escalation will reap"
);
}
let psm_unblock_rc = unsafe {
libc::pthread_sigmask(libc::SIG_SETMASK, &old_mask, std::ptr::null_mut())
};
if psm_unblock_rc != 0 {
eprintln!(
"ktstr: pthread_sigmask(SIG_SETMASK) unblock failed in worker child: rc={psm_unblock_rc}; SIGUSR1 stays blocked, killpg escalation will reap"
);
}
unsafe {
libc::close(report_fds[0]);
libc::close(start_fds[1]);
}
if needs_pipes {
let pair_idx = pipe_pair_base + i / 2;
let (ref ab, ref ba) = guard.pipe_pairs[pair_idx];
if i % 2 == 0 {
unsafe {
libc::close(ab[0]);
libc::close(ba[1]);
}
} else {
unsafe {
libc::close(ab[1]);
libc::close(ba[0]);
}
}
for (j, (ab2, ba2)) in guard.pipe_pairs.iter().enumerate() {
if j != pair_idx {
unsafe {
libc::close(ab2[0]);
libc::close(ab2[1]);
libc::close(ba2[0]);
libc::close(ba2[1]);
}
}
}
} else {
for (ab2, ba2) in guard.pipe_pairs.iter() {
unsafe {
libc::close(ab2[0]);
libc::close(ab2[1]);
libc::close(ba2[0]);
libc::close(ba2[1]);
}
}
}
if let Some(depth) = chain_depth
&& depth > 0
{
let chain_idx = chain_pipes_base + i / depth;
let stage = i % depth;
let prev_stage = (stage + depth - 1) % depth;
for (s, pipe) in guard.chain_pipes[chain_idx].iter().enumerate() {
if s == prev_stage {
unsafe {
libc::close(pipe[1]);
}
} else if s == stage {
unsafe {
libc::close(pipe[0]);
}
} else {
unsafe {
libc::close(pipe[0]);
libc::close(pipe[1]);
}
}
}
for (cj, chain) in guard.chain_pipes.iter().enumerate() {
if cj != chain_idx {
for pipe in chain {
unsafe {
libc::close(pipe[0]);
libc::close(pipe[1]);
}
}
}
}
} else {
for chain in guard.chain_pipes.iter() {
for pipe in chain {
unsafe {
libc::close(pipe[0]);
libc::close(pipe[1]);
}
}
}
}
#[cfg(panic = "unwind")]
{
let _ = std::panic::take_hook();
std::panic::set_hook(Box::new(|_| {}));
}
let child_result =
std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let mut pfd = libc::pollfd {
fd: start_fds[0],
events: libc::POLLIN,
revents: 0,
};
let ret = unsafe { libc::poll(&mut pfd, 1, 30_000) };
if ret <= 0 {
unsafe {
libc::_exit(1);
}
}
let mut buf = [0u8; 1];
let mut f = unsafe { std::fs::File::from_raw_fd(start_fds[0]) };
let _ = f.read_exact(&mut buf);
drop(f);
let ready_byte: u8 = b'r';
unsafe {
libc::write(
report_fds[1],
&ready_byte as *const u8 as *const libc::c_void,
1,
);
}
let report = worker_main(
affinity,
group.work_type.clone(),
group.sched_policy,
group.mem_policy.clone(),
group.mpol_flags,
group.nice,
worker_pipe_fds,
worker_futex,
iter_slot,
&STOP,
group.group_idx,
);
let json = serde_json::to_vec(&report).unwrap_or_default();
let mut f = unsafe { std::fs::File::from_raw_fd(report_fds[1]) };
if let Err(e) = f.write_all(&json) {
eprintln!("ktstr: worker report write_all failed: {e}");
}
drop(f);
}));
let code = if child_result.is_ok() { 0 } else { 1 };
unsafe {
libc::_exit(code);
}
}
child_pid => {
let psm_parent_restore_rc = unsafe {
libc::pthread_sigmask(libc::SIG_SETMASK, &old_mask, std::ptr::null_mut())
};
if psm_parent_restore_rc != 0 {
tracing::warn!(
rc = psm_parent_restore_rc,
"pthread_sigmask(SIG_SETMASK) failed restoring mask in parent post-fork; SIGUSR1 stays blocked in this thread for the lifetime of the workload"
);
}
unsafe {
libc::close(report_fds[1]);
libc::close(start_fds[0]);
}
guard
.children
.push((child_pid, report_fds[0], start_fds[1]));
}
}
}
Ok(())
}
pub fn worker_pids(&self) -> Vec<libc::pid_t> {
if !self.children.is_empty() {
self.children.iter().map(|(pid, _, _)| *pid).collect()
} else {
self.threads
.iter()
.map(|tw| tw.tid.load(Ordering::Acquire))
.collect()
}
}
pub fn worker_pids_for_cgroup_procs(&self) -> Result<Vec<libc::pid_t>> {
if !self.threads.is_empty() {
anyhow::bail!(
"WorkloadHandle::worker_pids_for_cgroup_procs: workers were \
spawned with CloneMode::Thread; their pids share the test \
harness's tgid and a `cgroup.procs` write would migrate the \
harness. Use `cgroup.threads` (thread-scoped) for Thread-mode \
workers, or switch to CloneMode::Fork."
);
}
Ok(self.worker_pids())
}
pub fn start(&mut self) {
if self.started {
return;
}
self.started = true;
for (_, _, start_fd) in &mut self.children {
unsafe {
libc::write(*start_fd, b"s".as_ptr() as *const _, 1);
libc::close(*start_fd);
}
*start_fd = -1;
}
for tw in &mut self.threads {
if let Some(tx) = tw.start_tx.take() {
let _ = tx.send(());
}
}
}
pub fn set_affinity(&self, idx: usize, cpus: &BTreeSet<usize>) -> Result<()> {
let pid = if !self.children.is_empty() {
self.children[idx].0
} else {
let tid = self.threads[idx].tid.load(Ordering::Acquire);
if tid == 0 {
anyhow::bail!(
"set_affinity: thread worker {idx} has not yet \
published gettid() (call start() first)"
);
}
tid
};
set_thread_affinity(pid, cpus)
}
pub fn snapshot_iterations(&self) -> Vec<u64> {
if self.iter_counters.is_null() || self.iter_counter_len == 0 {
return Vec::new();
}
(0..self.iter_counter_len)
.map(|i| {
unsafe { &*self.iter_counters.add(i) }.load(Ordering::Relaxed)
})
.collect()
}
pub fn stop_and_collect(mut self) -> Vec<WorkerReport> {
let was_started = self.started;
self.start();
if !was_started && !self.children.is_empty() {
let barrier_deadline = std::time::Instant::now() + std::time::Duration::from_secs(5);
let mut pending: Vec<(usize, i32)> = self
.children
.iter()
.enumerate()
.map(|(i, &(_, read_fd, _))| (i, read_fd))
.collect();
while !pending.is_empty() {
let remaining =
barrier_deadline.saturating_duration_since(std::time::Instant::now());
if remaining.is_zero() {
break;
}
let ms = remaining.as_millis().min(i32::MAX as u128) as i32;
let mut pfds: Vec<libc::pollfd> = pending
.iter()
.map(|&(_, fd)| libc::pollfd {
fd,
events: libc::POLLIN,
revents: 0,
})
.collect();
let ret = unsafe { libc::poll(pfds.as_mut_ptr(), pfds.len() as libc::nfds_t, ms) };
if ret < 0 {
let err = std::io::Error::last_os_error();
if err.kind() == std::io::ErrorKind::Interrupted {
continue;
}
tracing::warn!(
%err,
pending = pending.len(),
"WorkloadHandle::stop_and_collect: barrier poll failed; falling \
through to per-worker collect"
);
break;
}
if ret > 0 {
pending.retain(|&(_, fd)| {
let pfd = pfds.iter().find(|p| p.fd == fd);
let revents = pfd.map(|p| p.revents).unwrap_or(0);
if revents & libc::POLLIN != 0 {
let mut byte: u8 = 0;
let n = unsafe {
libc::read(fd, &mut byte as *mut u8 as *mut libc::c_void, 1)
};
if n >= 0 {
return false;
}
let err = std::io::Error::last_os_error();
if err.kind() == std::io::ErrorKind::Interrupted {
return true;
}
tracing::warn!(
%err,
fd,
"WorkloadHandle::stop_and_collect: barrier byte read \
failed; treating worker as ready"
);
return false;
}
if revents & (libc::POLLHUP | libc::POLLERR | libc::POLLNVAL) != 0 {
return false;
}
true
});
}
}
}
let mut reports = Vec::new();
let children = std::mem::take(&mut self.children);
let threads = std::mem::take(&mut self.threads);
for &(pid, _, _) in &children {
let _ = nix::sys::signal::kill(
nix::unistd::Pid::from_raw(pid),
nix::sys::signal::Signal::SIGUSR1,
);
}
for tw in &threads {
tw.stop.store(true, Ordering::Relaxed);
}
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(5);
for (pid, read_fd, _) in children {
let mut buf = Vec::new();
let remaining = deadline.saturating_duration_since(std::time::Instant::now());
let ms = remaining.as_millis().min(i32::MAX as u128) as i32;
if ms > 0 {
let mut pfd = libc::pollfd {
fd: read_fd,
events: libc::POLLIN,
revents: 0,
};
let ready = unsafe { libc::poll(&mut pfd, 1, ms) };
if ready > 0 {
let mut f = unsafe { std::fs::File::from_raw_fd(read_fd) };
let _ = f.read_to_end(&mut buf);
drop(f);
} else {
let _ = nix::unistd::close(read_fd);
}
} else {
let _ = nix::unistd::close(read_fd);
}
let npid = nix::unistd::Pid::from_raw(pid);
let _ = nix::sys::signal::killpg(npid, nix::sys::signal::Signal::SIGKILL);
let waited = nix::sys::wait::waitpid(npid, Some(nix::sys::wait::WaitPidFlag::WNOHANG));
let still_running = matches!(waited, Ok(nix::sys::wait::WaitStatus::StillAlive),);
let exit_info_source: Result<nix::sys::wait::WaitStatus, nix::errno::Errno> =
if still_running {
let _ = nix::sys::signal::kill(npid, nix::sys::signal::Signal::SIGKILL);
let _ = nix::sys::wait::waitpid(npid, None);
Ok(nix::sys::wait::WaitStatus::StillAlive)
} else {
waited
};
let report_slice: &[u8] = if buf.first() == Some(&b'r') {
&buf[1..]
} else {
&buf[..]
};
if let Ok(report) = serde_json::from_slice::<WorkerReport>(report_slice) {
reports.push(report);
} else {
let exit_info = classify_wait_outcome(exit_info_source);
eprintln!(
"ktstr: worker pid={pid} returned no report ({} bytes read, exit={exit_info:?})",
buf.len()
);
reports.push(WorkerReport {
tid: pid,
exit_info: Some(exit_info),
..WorkerReport::default()
});
}
}
for mut tw in threads {
tw.start_tx.take();
let tid = tw.tid.load(Ordering::Acquire);
if let Some(j) = tw.join.take() {
match join_thread_with_timeout(j, &tw.exit_evt, THREAD_JOIN_TIMEOUT) {
Some(Ok(report)) => reports.push(report),
Some(Err(payload)) => {
let msg = extract_panic_payload(payload);
eprintln!("ktstr: thread worker tid={tid} panicked: {msg}");
reports.push(WorkerReport {
tid,
completed: false,
exit_info: Some(WorkerExitInfo::Panicked(msg)),
..WorkerReport::default()
});
}
None => {
tracing::warn!(
tid,
timeout_secs = THREAD_JOIN_TIMEOUT.as_secs(),
"thread worker did not join within timeout — leaking the \
thread; sentinel report attached with TimedOut exit_info"
);
reports.push(WorkerReport {
tid,
completed: false,
exit_info: Some(WorkerExitInfo::TimedOut),
..WorkerReport::default()
});
}
}
}
}
reports
}
}
impl Drop for WorkloadHandle {
fn drop(&mut self) {
use nix::sys::signal::{Signal, kill};
use nix::sys::wait::waitpid;
use nix::unistd::{Pid, close};
for &(pid, rfd, wfd) in &self.children {
let nix_pid = Pid::from_raw(pid);
if let Err(e) = nix::sys::signal::killpg(nix_pid, Signal::SIGKILL)
&& e != nix::errno::Errno::ESRCH
{
tracing::warn!(pid, %e, "killpg failed in WorkloadHandle::drop");
}
if let Err(e) = kill(nix_pid, Signal::SIGKILL) {
tracing::warn!(pid, %e, "kill failed in WorkloadHandle::drop");
}
if let Err(e) = waitpid(nix_pid, None) {
tracing::warn!(pid, %e, "waitpid failed in WorkloadHandle::drop");
}
for fd in [rfd, wfd] {
if fd >= 0
&& let Err(e) = close(fd)
{
tracing::warn!(fd, %e, "close failed in WorkloadHandle::drop");
}
}
}
let threads = std::mem::take(&mut self.threads);
for mut tw in threads {
tw.stop.store(true, Ordering::Relaxed);
tw.start_tx.take();
if let Some(j) = tw.join.take() {
let tid = tw.tid.load(Ordering::Acquire);
match join_thread_with_timeout(j, &tw.exit_evt, THREAD_JOIN_TIMEOUT) {
Some(Ok(_)) => {}
Some(Err(e)) => {
let payload = extract_panic_payload(e);
tracing::warn!(
tid,
payload,
"thread worker panicked in WorkloadHandle::drop"
);
}
None => {
tracing::warn!(
tid,
timeout_secs = THREAD_JOIN_TIMEOUT.as_secs(),
"thread worker failed to join within timeout in \
WorkloadHandle::drop — leaking the thread"
);
}
}
}
}
for (ab, ba) in &self.pipe_pairs {
for fd in [ab[0], ab[1], ba[0], ba[1]] {
if let Err(e) = close(fd) {
tracing::warn!(fd, %e, "close failed for pipe_pair fd in WorkloadHandle::drop");
}
}
}
for chain in &self.chain_pipes {
for pipe in chain {
for fd in [pipe[0], pipe[1]] {
if let Err(e) = close(fd) {
tracing::warn!(fd, %e, "close failed for chain_pipe fd in WorkloadHandle::drop");
}
}
}
}
for (&ptr, &size) in self.futex_ptrs.iter().zip(self.futex_region_sizes.iter()) {
unsafe {
libc::munmap(ptr as *mut libc::c_void, size);
}
}
if !self.iter_counters.is_null() && self.iter_counter_len > 0 {
unsafe {
libc::munmap(
self.iter_counters as *mut libc::c_void,
self.iter_counter_len * std::mem::size_of::<u64>(),
);
}
}
}
}
pub(super) extern "C" fn sigusr1_handler(_: libc::c_int) {
STOP.store(true, Ordering::Relaxed);
}
pub(super) fn mmap_shared_anon_errno_hint(errno: Option<i32>) -> &'static str {
match errno {
Some(libc::ENOMEM) => {
" (ENOMEM: host is out of memory \
or /proc/sys/vm/max_map_count is too low — \
check `sysctl vm.max_map_count` and `free -h`)"
}
Some(libc::EPERM) => {
" (EPERM: MAP_SHARED|MAP_ANONYMOUS \
rejected by the kernel — check memory cgroup \
limits and container seccomp policy)"
}
Some(libc::EINVAL) => {
" (EINVAL: invalid length or \
flag combination — verify num_workers > 0 so the \
region size is non-zero, and that the total size \
does not overflow usize)"
}
_ => "",
}
}
#[cfg(test)]
mod testing;
#[cfg(test)]
mod tests_composed;
#[cfg(test)]
mod tests_fan_out;
#[cfg(test)]
mod tests_futex;
#[cfg(test)]
mod tests_grandchild;
#[cfg(test)]
mod tests_idle_churn;
#[cfg(test)]
mod tests_integration;
#[cfg(test)]
mod tests_lifecycle;
#[cfg(test)]
mod tests_mempolicy;
#[cfg(test)]
mod tests_misc;
#[cfg(test)]
mod tests_sched_policy;
#[cfg(test)]
mod tests_spawn_guard;
#[cfg(test)]
mod tests_thread_mode;
#[cfg(test)]
mod tests_wake_chain;