use anyhow::{Context, Result};
use std::collections::{BTreeMap, BTreeSet};
use std::io::{Read, Write};
use std::os::unix::io::FromRawFd;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::time::{Duration, Instant};
use super::affinity::{AffinityIntent, ResolvedAffinity, resolve_affinity, set_thread_affinity};
use super::config::{CloneMode, MemPolicy, MpolFlags, SchedPolicy, WorkSpec, WorkloadConfig};
use super::types::*;
use super::worker::worker_main;
pub(super) static STOP: AtomicBool = AtomicBool::new(false);
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct Migration {
pub at_ns: u64,
pub from_cpu: usize,
pub to_cpu: usize,
}
pub fn build_nodemask(nodes: &BTreeSet<usize>) -> (Vec<libc::c_ulong>, libc::c_ulong) {
if nodes.is_empty() {
return (vec![], 0);
}
let max_node = nodes.iter().copied().max().unwrap_or(0);
let mask_bits = max_node + 2;
let bits_per_word = std::mem::size_of::<libc::c_ulong>() * 8;
let mask_words = mask_bits.div_ceil(bits_per_word);
let mut nodemask = vec![0 as libc::c_ulong; mask_words];
for &node in nodes {
nodemask[node / bits_per_word] |= 1 << (node % bits_per_word);
}
(nodemask, mask_bits as libc::c_ulong)
}
const MPOL_PREFERRED_MANY: i32 = 5;
const MPOL_WEIGHTED_INTERLEAVE: i32 = 6;
pub(super) const WORKER_STOP_POLL_NS: libc::c_long = 100_000_000;
pub(super) const FUTEX_WAIT_TIMEOUT: libc::timespec = libc::timespec {
tv_sec: 0,
tv_nsec: WORKER_STOP_POLL_NS,
};
pub(super) const FAN_OUT_POST_WAKE_SPIN_ITERS: u64 = 256;
pub(super) fn apply_mempolicy_with_flags(policy: &MemPolicy, flags: MpolFlags) {
let (mode, node_set): (i32, BTreeSet<usize>) = match policy {
MemPolicy::Default => return,
MemPolicy::Bind(nodes) => (libc::MPOL_BIND, nodes.clone()),
MemPolicy::Preferred(node) => (libc::MPOL_PREFERRED, [*node].into_iter().collect()),
MemPolicy::Interleave(nodes) => (libc::MPOL_INTERLEAVE, nodes.clone()),
MemPolicy::PreferredMany(nodes) => (MPOL_PREFERRED_MANY, nodes.clone()),
MemPolicy::WeightedInterleave(nodes) => (MPOL_WEIGHTED_INTERLEAVE, nodes.clone()),
MemPolicy::Local => {
let rc = unsafe {
libc::syscall(
libc::SYS_set_mempolicy,
libc::MPOL_LOCAL | flags.bits() as i32,
std::ptr::null::<libc::c_ulong>(),
0 as libc::c_ulong,
)
};
if rc != 0 {
eprintln!(
"ktstr: set_mempolicy(MPOL_LOCAL) failed: {}",
std::io::Error::last_os_error(),
);
}
return;
}
};
if node_set.is_empty() {
eprintln!("ktstr: set_mempolicy: empty node set, skipping");
return;
}
let (mask, maxnode) = build_nodemask(&node_set);
let effective_mode = mode | flags.bits() as i32;
let rc = unsafe {
libc::syscall(
libc::SYS_set_mempolicy,
effective_mode,
mask.as_ptr(),
maxnode,
)
};
if rc != 0 {
eprintln!(
"ktstr: set_mempolicy(mode={}, nodes={:?}) failed: {}",
mode,
node_set,
std::io::Error::last_os_error(),
);
}
}
pub(super) fn apply_nice(nice: i32) {
let rc = unsafe { libc::setpriority(libc::PRIO_PROCESS, 0, nice) };
if rc != 0 {
warn_setpriority_failed_once();
}
}
pub(super) fn warn_setpriority_failed_once() {
static WARNED: std::sync::Once = std::sync::Once::new();
WARNED.call_once(|| {
let errno = std::io::Error::last_os_error();
eprintln!(
"workload: setpriority(PRIO_PROCESS) failed: {errno}; nice value not applied (CAP_SYS_NICE may be required for negative nice)"
);
});
}
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize, crate::Claim)]
pub struct WorkerReport {
pub tid: i32,
pub work_units: u64,
pub cpu_time_ns: u64,
pub wall_time_ns: u64,
pub off_cpu_ns: u64,
pub migration_count: u64,
pub cpus_used: BTreeSet<usize>,
pub migrations: Vec<Migration>,
pub max_gap_ms: u64,
pub max_gap_cpu: usize,
pub max_gap_at_ms: u64,
pub resume_latencies_ns: Vec<u64>,
pub wake_sample_total: u64,
pub iteration_costs_ns: Vec<u64>,
pub iteration_cost_sample_total: u64,
pub iterations: u64,
pub schedstat_run_delay_ns: u64,
pub schedstat_run_count: u64,
pub schedstat_cpu_time_ns: u64,
#[serde(default)]
pub completed: bool,
#[serde(default)]
pub numa_pages: BTreeMap<usize, u64>,
pub vmstat_numa_pages_migrated: u64,
pub exit_info: Option<WorkerExitInfo>,
#[serde(default)]
pub is_messenger: bool,
#[serde(default)]
pub group_idx: usize,
#[serde(default)]
pub affinity_error: Option<String>,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub enum WorkerExitInfo {
Exited(i32),
Signaled(i32),
TimedOut,
WaitFailed(String),
Panicked(String),
}
pub(super) fn classify_wait_outcome(
source: Result<nix::sys::wait::WaitStatus, nix::errno::Errno>,
) -> WorkerExitInfo {
match source {
Ok(nix::sys::wait::WaitStatus::Exited(_, code)) => WorkerExitInfo::Exited(code),
Ok(nix::sys::wait::WaitStatus::Signaled(_, sig, _)) => WorkerExitInfo::Signaled(sig as i32),
Ok(nix::sys::wait::WaitStatus::StillAlive) => WorkerExitInfo::TimedOut,
Ok(_) => WorkerExitInfo::TimedOut,
Err(e) => WorkerExitInfo::WaitFailed(e.to_string()),
}
}
pub(super) fn extract_panic_payload(payload: Box<dyn std::any::Any + Send>) -> String {
if let Some(s) = payload.downcast_ref::<&'static str>() {
(*s).to_string()
} else if let Some(s) = payload.downcast_ref::<String>() {
s.clone()
} else {
"<non-string panic payload>".to_string()
}
}
pub(super) const THREAD_JOIN_TIMEOUT: Duration = Duration::from_secs(5);
pub(super) fn join_thread_with_timeout(
join: std::thread::JoinHandle<WorkerReport>,
exit_evt: &vmm_sys_util::eventfd::EventFd,
timeout: Duration,
) -> Option<std::thread::Result<WorkerReport>> {
use std::os::unix::io::AsRawFd;
use vmm_sys_util::epoll::{ControlOperation, Epoll, EpollEvent, EventSet};
use vmm_sys_util::timerfd::TimerFd;
if join.is_finished() {
return Some(join.join());
}
let epoll = match Epoll::new() {
Ok(e) => e,
Err(e) => {
tracing::warn!(%e, "join_thread_with_timeout: epoll_create1 failed");
return None;
}
};
if let Err(e) = epoll.ctl(
ControlOperation::Add,
exit_evt.as_raw_fd(),
EpollEvent::new(EventSet::IN, 0),
) {
tracing::warn!(%e, "join_thread_with_timeout: add exit_evt to epoll");
return None;
}
let mut timer = match TimerFd::new() {
Ok(t) => t,
Err(e) => {
tracing::warn!(%e, "join_thread_with_timeout: timerfd_create failed");
return None;
}
};
if let Err(e) = timer.reset(timeout, None) {
tracing::warn!(%e, "join_thread_with_timeout: timerfd_settime failed");
return None;
}
if let Err(e) = epoll.ctl(
ControlOperation::Add,
timer.as_raw_fd(),
EpollEvent::new(EventSet::IN, 1),
) {
tracing::warn!(%e, "join_thread_with_timeout: add timerfd to epoll");
return None;
}
let deadline = Instant::now() + timeout;
let mut events = [EpollEvent::default(); 2];
loop {
if join.is_finished() {
return Some(join.join());
}
let remaining = deadline.saturating_duration_since(Instant::now());
if remaining.is_zero() {
return None;
}
match epoll.wait(-1, &mut events) {
Ok(_) => {}
Err(e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
Err(e) => {
tracing::warn!(%e, "join_thread_with_timeout: epoll_wait failed");
return None;
}
}
}
}
#[inline]
pub(super) fn stop_requested(stop: &AtomicBool) -> bool {
stop.load(Ordering::Relaxed) || STOP.load(Ordering::Relaxed)
}
pub(super) struct ThreadWorker {
tid: std::sync::Arc<std::sync::atomic::AtomicI32>,
stop: std::sync::Arc<AtomicBool>,
pub(super) start_tx: Option<std::sync::mpsc::SyncSender<()>>,
join: Option<std::thread::JoinHandle<WorkerReport>>,
exit_evt: std::sync::Arc<vmm_sys_util::eventfd::EventFd>,
}
impl Drop for ThreadWorker {
fn drop(&mut self) {
if let Some(j) = self.join.take() {
self.stop.store(true, Ordering::Relaxed);
self.start_tx.take();
let _ = join_thread_with_timeout(j, &self.exit_evt, THREAD_JOIN_TIMEOUT);
}
}
}
#[derive(Clone, Debug)]
pub(super) enum ForkedChildKind {
Worker { group_idx: usize },
PcommContainer { groups: Vec<(usize, usize)> },
}
#[derive(Clone, Debug)]
pub(super) struct ForkedChild {
pub pid: libc::pid_t,
pub report_fd: std::os::unix::io::RawFd,
pub start_fd: std::os::unix::io::RawFd,
pub kind: ForkedChildKind,
}
#[must_use = "dropping a WorkloadHandle immediately tears down all worker tasks"]
pub struct WorkloadHandle {
children: Vec<ForkedChild>,
threads: Vec<ThreadWorker>,
started: bool,
futex_ptrs: Vec<*mut u32>,
futex_region_sizes: Vec<usize>,
iter_counters: *mut AtomicU64,
iter_counter_len: usize,
pipe_pairs: Vec<([i32; 2], [i32; 2])>,
chain_pipes: Vec<Vec<[i32; 2]>>,
}
pub(super) fn futex_region_size_for(work_type: &WorkType) -> usize {
match work_type {
WorkType::FanOutCompute { .. } => 16,
WorkType::ProducerConsumerImbalance {
queue_depth_target, ..
} => {
let q = std::cmp::min(*queue_depth_target as usize, usize::MAX / 8 - 4);
32 + q * 8
}
_ => std::mem::size_of::<u32>(),
}
}
pub(super) struct SpawnGuard {
pipe_pairs: Vec<([i32; 2], [i32; 2])>,
chain_pipes: Vec<Vec<[i32; 2]>>,
futex_ptrs: Vec<*mut u32>,
futex_region_sizes: Vec<usize>,
iter_counters: *mut AtomicU64,
iter_counter_bytes: usize,
children: Vec<ForkedChild>,
threads: Vec<ThreadWorker>,
}
impl SpawnGuard {
fn new() -> Self {
Self {
pipe_pairs: Vec::new(),
chain_pipes: Vec::new(),
futex_ptrs: Vec::new(),
futex_region_sizes: Vec::new(),
iter_counters: std::ptr::null_mut(),
iter_counter_bytes: 0,
children: Vec::new(),
threads: Vec::new(),
}
}
fn into_handle(mut self) -> WorkloadHandle {
let children = std::mem::take(&mut self.children);
let threads = std::mem::take(&mut self.threads);
let futex_ptrs = std::mem::take(&mut self.futex_ptrs);
let futex_region_sizes = std::mem::take(&mut self.futex_region_sizes);
let iter_counters = std::mem::replace(&mut self.iter_counters, std::ptr::null_mut());
let iter_counter_bytes = std::mem::replace(&mut self.iter_counter_bytes, 0);
let iter_counter_len = iter_counter_bytes / std::mem::size_of::<AtomicU64>();
let pipe_pairs = std::mem::take(&mut self.pipe_pairs);
let chain_pipes = std::mem::take(&mut self.chain_pipes);
WorkloadHandle {
children,
threads,
started: false,
futex_ptrs,
futex_region_sizes,
iter_counters,
iter_counter_len,
pipe_pairs,
chain_pipes,
}
}
}
impl Drop for SpawnGuard {
fn drop(&mut self) {
for child in &self.children {
let npid = nix::unistd::Pid::from_raw(child.pid);
let _ = nix::sys::signal::kill(npid, nix::sys::signal::Signal::SIGKILL);
let _ = nix::sys::wait::waitpid(npid, None);
}
for child in &self.children {
for fd in [child.report_fd, child.start_fd] {
if fd >= 0 {
let _ = nix::unistd::close(fd);
}
}
}
for tw in &mut self.threads {
tw.stop.store(true, Ordering::Relaxed);
tw.start_tx.take();
if let Some(j) = tw.join.take() {
let _ = join_thread_with_timeout(j, &tw.exit_evt, THREAD_JOIN_TIMEOUT);
}
}
for (ab, ba) in &self.pipe_pairs {
for fd in [ab[0], ab[1], ba[0], ba[1]] {
let _ = nix::unistd::close(fd);
}
}
for chain in &self.chain_pipes {
for pipe in chain {
let _ = nix::unistd::close(pipe[0]);
let _ = nix::unistd::close(pipe[1]);
}
}
for (&ptr, &size) in self.futex_ptrs.iter().zip(self.futex_region_sizes.iter()) {
unsafe {
libc::munmap(ptr as *mut libc::c_void, size);
}
}
if !self.iter_counters.is_null() && self.iter_counter_bytes > 0 {
unsafe {
libc::munmap(
self.iter_counters as *mut libc::c_void,
self.iter_counter_bytes,
);
}
}
}
}
unsafe impl Send for WorkloadHandle {}
unsafe impl Sync for WorkloadHandle {}
#[derive(Clone, Copy)]
pub(super) struct SendFutexPtr(Option<(usize, usize)>);
#[derive(Clone, Copy)]
pub(super) struct SendIterSlotPtr(usize);
impl SendFutexPtr {
fn new(p: Option<(*mut u32, usize)>) -> Self {
SendFutexPtr(p.map(|(ptr, pos)| (ptr as usize, pos)))
}
fn into_raw(self) -> Option<(*mut u32, usize)> {
self.0.map(|(addr, pos)| (addr as *mut u32, pos))
}
}
impl SendIterSlotPtr {
fn new(p: *mut AtomicU64) -> Self {
SendIterSlotPtr(p as usize)
}
fn into_raw(self) -> *mut AtomicU64 {
self.0 as *mut AtomicU64
}
}
#[derive(Clone)]
pub(super) struct GroupParams {
work_type: WorkType,
sched_policy: SchedPolicy,
mem_policy: MemPolicy,
mpol_flags: MpolFlags,
nice: Option<i32>,
comm: Option<String>,
uid: Option<u32>,
gid: Option<u32>,
numa_node: Option<u32>,
affinity: ResolvedAffinity,
num_workers: usize,
group_idx: usize,
}
impl GroupParams {
fn from_work_spec(
spec: &WorkSpec,
group_idx: usize,
resolved_affinity: ResolvedAffinity,
resolved_num_workers: usize,
) -> Self {
Self {
work_type: spec.work_type.clone(),
sched_policy: spec.sched_policy,
mem_policy: spec.mem_policy.clone(),
mpol_flags: spec.mpol_flags,
nice: spec.nice,
comm: spec.comm.as_ref().map(|c| c.to_string()),
uid: spec.uid,
gid: spec.gid,
numa_node: spec.numa_node,
affinity: resolved_affinity,
num_workers: resolved_num_workers,
group_idx,
}
}
pub(super) fn resolve_spawn_affinity(
intent: &AffinityIntent,
site: &str,
) -> Result<ResolvedAffinity> {
match intent {
AffinityIntent::Inherit => Ok(ResolvedAffinity::None),
AffinityIntent::Exact(cpus) => {
if cpus.is_empty() {
anyhow::bail!(
"{site} = AffinityIntent::Exact with empty CPU set \
would produce EINVAL from sched_setaffinity; \
use AffinityIntent::Inherit for no affinity \
constraint",
);
}
Ok(ResolvedAffinity::Fixed(cpus.clone()))
}
AffinityIntent::RandomSubset { from, count } => {
if from.is_empty() {
anyhow::bail!(
"{site} = AffinityIntent::RandomSubset with empty \
pool; use AffinityIntent::Inherit for no affinity \
constraint",
);
}
if *count == 0 {
anyhow::bail!(
"{site} = AffinityIntent::RandomSubset with \
count=0; use AffinityIntent::Inherit for no \
affinity constraint",
);
}
Ok(ResolvedAffinity::Random {
from: from.clone(),
count: *count,
})
}
AffinityIntent::SingleCpu
| AffinityIntent::LlcAligned
| AffinityIntent::CrossCgroup
| AffinityIntent::SmtSiblingPair => {
anyhow::bail!(
"{site} = {:?} requires scenario context; use \
AffinityIntent::Exact(set), \
AffinityIntent::RandomSubset {{ from, count }}, \
or AffinityIntent::Inherit when spawning directly \
via WorkloadHandle::spawn. Topology-aware variants \
resolve automatically inside #[ktstr_test] \
scenarios.",
intent,
);
}
}
}
fn primary(config: &WorkloadConfig) -> Result<Self> {
let resolved_affinity =
Self::resolve_spawn_affinity(&config.affinity, "WorkloadConfig::affinity")?;
let spec = WorkSpec {
work_type: config.work_type.clone(),
sched_policy: config.sched_policy,
num_workers: Some(config.num_workers),
affinity: AffinityIntent::Inherit,
mem_policy: config.mem_policy.clone(),
mpol_flags: config.mpol_flags,
nice: config.nice,
comm: config.comm.clone(),
uid: config.uid,
gid: config.gid,
numa_node: config.numa_node,
pcomm: None,
};
Ok(Self::from_work_spec(
&spec,
0,
resolved_affinity,
config.num_workers,
))
}
fn from_composed(spec: &WorkSpec, group_idx: usize) -> Result<Self> {
if spec.pcomm.is_some() {
anyhow::bail!(
"composed[{}].pcomm: pcomm via WorkloadHandle::spawn is not supported; \
use WorkloadHandle::spawn_pcomm_cgroup or CgroupDef (apply_setup) — \
spawn always forks one process per worker and never coalesces into \
a thread-group leader",
group_idx - 1,
);
}
let num_workers = spec.num_workers.ok_or_else(|| {
anyhow::anyhow!(
"composed[{}].num_workers must be set explicitly at spawn time \
(the Some/None resolution via Ctx::workers_per_cgroup is only \
available through the scenario engine; \
WorkloadHandle::spawn requires a concrete count)",
group_idx - 1,
)
})?;
let site = format!("composed[{}].affinity", group_idx - 1);
let affinity = Self::resolve_spawn_affinity(&spec.affinity, &site)?;
Ok(Self::from_work_spec(spec, group_idx, affinity, num_workers))
}
}
pub(super) fn validate_workload_admission(group: &GroupParams) -> Result<()> {
if let Some(group_size) = group.work_type.worker_group_size()
&& (group.num_workers == 0 || !group.num_workers.is_multiple_of(group_size))
{
return Err(WorkTypeValidationError::NonDivisibleWorkerCount {
name: group.work_type.name().to_string(),
group_idx: group.group_idx,
group_size,
num_workers: group.num_workers,
}
.into());
}
if let Some(depth) = group.work_type.chain_pipe_depth()
&& depth < 2
{
return Err(WorkTypeValidationError::InsufficientWakeChainDepth {
depth,
group_idx: group.group_idx,
}
.into());
}
if let WorkType::IdleChurn {
burst_duration,
sleep_duration,
..
} = group.work_type
{
if burst_duration.is_zero() {
return Err(WorkTypeValidationError::ZeroBurstDuration {
group_idx: group.group_idx,
}
.into());
}
if sleep_duration.is_zero() {
return Err(WorkTypeValidationError::ZeroSleepDuration {
group_idx: group.group_idx,
}
.into());
}
}
if let WorkType::IpcVariance {
hot_iters,
cold_iters,
period_iters,
} = group.work_type
{
if hot_iters == 0 {
return Err(WorkTypeValidationError::ZeroIpcVarianceParam {
field: "hot_iters",
group_idx: group.group_idx,
}
.into());
}
if cold_iters == 0 {
return Err(WorkTypeValidationError::ZeroIpcVarianceParam {
field: "cold_iters",
group_idx: group.group_idx,
}
.into());
}
if period_iters == 0 {
return Err(WorkTypeValidationError::ZeroIpcVarianceParam {
field: "period_iters",
group_idx: group.group_idx,
}
.into());
}
}
Ok(())
}
#[allow(clippy::too_many_arguments)]
pub(super) fn spawn_thread_worker(
guard: &mut SpawnGuard,
group: &GroupParams,
affinity: Option<BTreeSet<usize>>,
worker_pipe_fds: Option<(i32, i32)>,
worker_futex: Option<(*mut u32, usize)>,
iter_slot: *mut AtomicU64,
) -> Result<()> {
use std::sync::Arc;
use std::sync::atomic::AtomicI32;
use std::sync::mpsc;
let (start_tx, start_rx) = mpsc::sync_channel::<()>(0);
let stop = Arc::new(AtomicBool::new(false));
let tid = Arc::new(AtomicI32::new(0));
let exit_evt = Arc::new(
vmm_sys_util::eventfd::EventFd::new(libc::EFD_NONBLOCK)
.context("create thread-worker exit eventfd")?,
);
let stop_thread = Arc::clone(&stop);
let tid_thread = Arc::clone(&tid);
let exit_evt_thread = Arc::clone(&exit_evt);
let work_type = group.work_type.clone();
let sched_policy = group.sched_policy;
let mem_policy = group.mem_policy.clone();
let mpol_flags = group.mpol_flags;
let nice = group.nice;
let comm = group.comm.clone();
let uid = group.uid;
let gid = group.gid;
let numa_node = group.numa_node;
let group_idx = group.group_idx;
let num_workers = group.num_workers;
let futex_send = SendFutexPtr::new(worker_futex);
let iter_slot_send = SendIterSlotPtr::new(iter_slot);
let join = std::thread::Builder::new()
.name(format!("ktstr-worker-g{group_idx}-{}", guard.threads.len()))
.spawn(move || {
struct WorkerExitSignal(std::sync::Arc<vmm_sys_util::eventfd::EventFd>);
impl Drop for WorkerExitSignal {
fn drop(&mut self) {
let _ = self.0.write(1);
}
}
let _exit_signal = WorkerExitSignal(exit_evt_thread);
let my_tid: libc::pid_t = unsafe { libc::syscall(libc::SYS_gettid) as libc::pid_t };
tid_thread.store(my_tid, Ordering::Release);
if start_rx.recv().is_err() {
return WorkerReport {
tid: my_tid,
completed: false,
group_idx,
..WorkerReport::default()
};
}
let futex = futex_send.into_raw();
let slot = iter_slot_send.into_raw();
worker_main(
affinity,
work_type,
sched_policy,
mem_policy,
mpol_flags,
nice,
comm.as_deref(),
uid,
gid,
numa_node,
worker_pipe_fds,
futex,
slot,
&stop_thread,
group_idx,
)
})
.with_context(|| {
format!(
"thread::spawn for worker {}/{} (group {}) failed",
guard.threads.len() + 1,
num_workers,
group_idx,
)
})?;
guard.threads.push(ThreadWorker {
tid,
stop,
start_tx: Some(start_tx),
join: Some(join),
exit_evt,
});
Ok(())
}
#[derive(Clone, Copy, Debug)]
pub(super) struct PcommGroupResources {
pub iter_offset: usize,
pub pipe_pair_base: usize,
pub chain_pipes_base: usize,
pub futex_ptrs_base: usize,
pub needs_pipes: bool,
pub chain_depth: Option<usize>,
pub needs_futex: bool,
pub futex_group_size: usize,
}
#[allow(clippy::too_many_arguments)]
pub(super) fn spawn_pcomm_container(
guard: &mut SpawnGuard,
pcomm: &str,
container_uid: Option<u32>,
container_gid: Option<u32>,
groups: &[GroupParams],
resources: &[PcommGroupResources],
) -> Result<()> {
debug_assert_eq!(
groups.len(),
resources.len(),
"spawn_pcomm_container: groups / resources must have the same length",
);
let mut report_fds = [0i32; 2];
if unsafe { libc::pipe2(report_fds.as_mut_ptr(), libc::O_CLOEXEC) } != 0 {
anyhow::bail!(
"pcomm container (pcomm={pcomm:?}): report pipe2 failed: {}",
std::io::Error::last_os_error(),
);
}
const REPORT_PIPE_SIZE: libc::c_int = 8 * 1024 * 1024;
let prev_size = unsafe { libc::fcntl(report_fds[1], libc::F_SETPIPE_SZ, REPORT_PIPE_SIZE) };
if prev_size < 0 {
let err = std::io::Error::last_os_error();
tracing::warn!(
pcomm,
requested_size = REPORT_PIPE_SIZE,
%err,
"F_SETPIPE_SZ on pcomm container report pipe failed; falling back \
to default pipe capacity",
);
}
let mut start_fds = [0i32; 2];
if unsafe { libc::pipe2(start_fds.as_mut_ptr(), libc::O_CLOEXEC) } != 0 {
unsafe {
libc::close(report_fds[0]);
libc::close(report_fds[1]);
}
anyhow::bail!(
"pcomm container (pcomm={pcomm:?}): start pipe2 failed: {}",
std::io::Error::last_os_error(),
);
}
let mut old_mask: libc::sigset_t = unsafe { std::mem::zeroed() };
let mut block_mask: libc::sigset_t = unsafe { std::mem::zeroed() };
let psm_block_rc = unsafe {
libc::sigemptyset(&mut block_mask);
libc::sigaddset(&mut block_mask, libc::SIGUSR1);
libc::pthread_sigmask(libc::SIG_BLOCK, &block_mask, &mut old_mask)
};
if psm_block_rc != 0 {
tracing::warn!(
rc = psm_block_rc,
pcomm,
"pthread_sigmask(SIG_BLOCK, SIGUSR1) failed pre-fork for pcomm container; \
container inherits unblocked SIGUSR1 and may terminate on default \
action before installing handler",
);
}
let total_workers: usize = groups.iter().map(|g| g.num_workers).sum();
let pid = unsafe { libc::fork() };
match pid {
-1 => {
let psm_restore_rc = unsafe {
libc::pthread_sigmask(libc::SIG_SETMASK, &old_mask, std::ptr::null_mut())
};
if psm_restore_rc != 0 {
tracing::warn!(
rc = psm_restore_rc,
"pthread_sigmask(SIG_SETMASK) failed restoring mask after pcomm \
container fork failure; SIGUSR1 may remain blocked in this thread",
);
}
unsafe {
libc::close(report_fds[0]);
libc::close(report_fds[1]);
libc::close(start_fds[0]);
libc::close(start_fds[1]);
}
anyhow::bail!(
"pcomm container (pcomm={pcomm:?}): fork failed: {}",
std::io::Error::last_os_error(),
);
}
0 => {
unsafe {
libc::prctl(libc::PR_SET_PDEATHSIG, libc::SIGKILL);
}
if std::env::var_os("KTSTR_GUEST_INIT").is_none() && unsafe { libc::getppid() } == 1 {
unsafe {
libc::_exit(0);
}
}
unsafe {
libc::setpgid(0, 0);
}
STOP.store(false, Ordering::Relaxed);
let sig_prev = unsafe {
libc::signal(
libc::SIGUSR1,
sigusr1_handler as *const () as libc::sighandler_t,
)
};
if sig_prev == libc::SIG_ERR {
let errno = std::io::Error::last_os_error();
eprintln!(
"ktstr: signal(SIGUSR1) install failed in pcomm container: {errno}; \
graceful stop unavailable, killpg escalation will reap"
);
}
let psm_unblock_rc = unsafe {
libc::pthread_sigmask(libc::SIG_SETMASK, &old_mask, std::ptr::null_mut())
};
if psm_unblock_rc != 0 {
eprintln!(
"ktstr: pthread_sigmask(SIG_SETMASK) unblock failed in pcomm \
container: rc={psm_unblock_rc}; SIGUSR1 stays blocked, killpg \
escalation will reap"
);
}
unsafe {
libc::close(report_fds[0]);
libc::close(start_fds[1]);
}
let mut keep_fds: std::collections::BTreeSet<i32> = std::collections::BTreeSet::new();
keep_fds.insert(0);
keep_fds.insert(1);
keep_fds.insert(2);
keep_fds.insert(report_fds[1]);
keep_fds.insert(start_fds[0]);
for (ab, ba) in &guard.pipe_pairs {
keep_fds.insert(ab[0]);
keep_fds.insert(ab[1]);
keep_fds.insert(ba[0]);
keep_fds.insert(ba[1]);
}
for chain in &guard.chain_pipes {
for pipe in chain {
keep_fds.insert(pipe[0]);
keep_fds.insert(pipe[1]);
}
}
if let Ok(entries) = std::fs::read_dir("/proc/self/fd") {
let mut to_close: Vec<i32> = Vec::new();
for entry in entries.flatten() {
if let Some(name) = entry.file_name().to_str()
&& let Ok(fd) = name.parse::<i32>()
&& !keep_fds.contains(&fd)
{
to_close.push(fd);
}
}
for fd in to_close {
unsafe {
libc::close(fd);
}
}
} else {
eprintln!(
"ktstr: pcomm container (pcomm={pcomm:?}): /proc/self/fd \
read_dir failed; inherited-fd sweep skipped",
);
}
let c_pcomm = std::ffi::CString::new(pcomm).unwrap_or_default();
let prctl_rc = unsafe { libc::prctl(libc::PR_SET_NAME, c_pcomm.as_ptr()) };
if prctl_rc != 0 {
let errno = std::io::Error::last_os_error();
eprintln!(
"ktstr: prctl(PR_SET_NAME, {pcomm:?}) failed on pcomm container: {errno}",
);
}
if let Some(gid) = container_gid {
let rc = unsafe { libc::setresgid(gid, gid, gid) };
if rc != 0 {
let errno = std::io::Error::last_os_error();
eprintln!("ktstr: setresgid({gid}) failed on pcomm container: {errno}");
}
}
if let Some(uid) = container_uid {
let rc = unsafe { libc::setresuid(uid, uid, uid) };
if rc != 0 {
let errno = std::io::Error::last_os_error();
eprintln!("ktstr: setresuid({uid}) failed on pcomm container: {errno}");
}
}
let mut pfd = libc::pollfd {
fd: start_fds[0],
events: libc::POLLIN,
revents: 0,
};
let ret = unsafe { libc::poll(&mut pfd, 1, 30_000) };
if ret <= 0 {
unsafe {
libc::_exit(1);
}
}
let mut buf = [0u8; 1];
{
let mut f = unsafe { std::fs::File::from_raw_fd(start_fds[0]) };
let _ = f.read_exact(&mut buf);
drop(f);
}
STOP.store(false, Ordering::Relaxed);
let mut joins: Vec<(
std::thread::JoinHandle<WorkerReport>,
std::sync::Arc<vmm_sys_util::eventfd::EventFd>,
)> = Vec::with_capacity(total_workers);
let pipe_pairs_snapshot: Vec<([i32; 2], [i32; 2])> = guard.pipe_pairs.clone();
let chain_pipes_snapshot: Vec<Vec<[i32; 2]>> = guard.chain_pipes.clone();
let futex_ptrs_snapshot: Vec<*mut u32> = guard.futex_ptrs.clone();
let iter_counters_base = guard.iter_counters;
for (group, res) in groups.iter().zip(resources.iter()) {
let num_workers = group.num_workers;
for i in 0..num_workers {
let affinity = match resolve_affinity(&group.affinity) {
Ok(a) => a,
Err(e) => {
eprintln!(
"ktstr: pcomm container (group {}): resolve_affinity \
for thread {i}/{num_workers} failed: {e:#}",
group.group_idx,
);
None
}
};
let worker_pipe_fds: Option<(i32, i32)> = if res.needs_pipes {
let pair_idx = res.pipe_pair_base + i / 2;
let (ab, ba) = &pipe_pairs_snapshot[pair_idx];
if i % 2 == 0 {
Some((ba[0], ab[1]))
} else {
Some((ab[0], ba[1]))
}
} else if let Some(depth) = res.chain_depth
&& depth > 0
{
let chain_idx = res.chain_pipes_base + i / depth;
let stage = i % depth;
let prev_stage = (stage + depth - 1) % depth;
let chain = &chain_pipes_snapshot[chain_idx];
Some((chain[prev_stage][0], chain[stage][1]))
} else {
None
};
let worker_futex: Option<(*mut u32, usize)> = if res.needs_futex {
let futex_group_idx = res.futex_ptrs_base + i / res.futex_group_size;
let pos = i % res.futex_group_size;
Some((futex_ptrs_snapshot[futex_group_idx], pos))
} else {
None
};
let iter_slot: *mut AtomicU64 = if !iter_counters_base.is_null() {
unsafe { iter_counters_base.add(res.iter_offset + i) }
} else {
std::ptr::null_mut()
};
let futex_send = SendFutexPtr::new(worker_futex);
let iter_slot_send = SendIterSlotPtr::new(iter_slot);
let work_type = group.work_type.clone();
let sched_policy = group.sched_policy;
let mem_policy = group.mem_policy.clone();
let mpol_flags = group.mpol_flags;
let nice = group.nice;
let comm = group.comm.clone();
let uid = group.uid;
let gid = group.gid;
let numa_node = group.numa_node;
let group_idx = group.group_idx;
let exit_evt = match vmm_sys_util::eventfd::EventFd::new(libc::EFD_NONBLOCK) {
Ok(efd) => std::sync::Arc::new(efd),
Err(e) => {
eprintln!(
"ktstr: pcomm container (group {}): EventFd::new for \
worker {}/{num_workers} failed: {e}; aborting container",
group.group_idx,
i + 1,
);
unsafe {
libc::_exit(1);
}
}
};
let exit_evt_thread = std::sync::Arc::clone(&exit_evt);
let join = std::thread::Builder::new()
.name(format!("ktstr-pcomm-g{group_idx}-{i}"))
.spawn(move || {
struct WorkerExitSignal(std::sync::Arc<vmm_sys_util::eventfd::EventFd>);
impl Drop for WorkerExitSignal {
fn drop(&mut self) {
let _ = self.0.write(1);
}
}
let _exit_signal = WorkerExitSignal(exit_evt_thread);
let futex = futex_send.into_raw();
let slot = iter_slot_send.into_raw();
worker_main(
affinity,
work_type,
sched_policy,
mem_policy,
mpol_flags,
nice,
comm.as_deref(),
uid,
gid,
numa_node,
worker_pipe_fds,
futex,
slot,
&STOP,
group_idx,
)
});
match join {
Ok(j) => joins.push((j, exit_evt)),
Err(e) => {
eprintln!(
"ktstr: pcomm container (group {}): thread::spawn for \
worker {}/{num_workers} failed: {e}; aborting container",
group.group_idx,
i + 1,
);
unsafe {
libc::_exit(1);
}
}
}
}
}
let ready_byte: u8 = b'r';
unsafe {
libc::write(
report_fds[1],
&ready_byte as *const u8 as *const libc::c_void,
1,
);
}
use std::os::unix::io::AsRawFd;
use vmm_sys_util::epoll::{ControlOperation, Epoll, EpollEvent, EventSet};
let mut indexed_joins: Vec<(
usize,
std::thread::JoinHandle<WorkerReport>,
std::sync::Arc<vmm_sys_util::eventfd::EventFd>,
)> = joins
.into_iter()
.enumerate()
.map(|(i, (j, evt))| (i, j, evt))
.collect();
let mut reports: Vec<WorkerReport> = Vec::with_capacity(indexed_joins.len());
let epoll_setup: Option<Epoll> = match Epoll::new() {
Ok(ep) => {
let mut ok = true;
for (idx, _, evt) in &indexed_joins {
if let Err(e) = ep.ctl(
ControlOperation::Add,
evt.as_raw_fd(),
EpollEvent::new(EventSet::IN, *idx as u64),
) {
eprintln!(
"ktstr: pcomm container (pcomm={pcomm:?}): epoll.ctl(Add) \
for thread {idx} failed: {e}; falling back to blocking \
join per thread",
);
ok = false;
break;
}
}
if ok { Some(ep) } else { None }
}
Err(e) => {
eprintln!(
"ktstr: pcomm container (pcomm={pcomm:?}): Epoll::new failed: {e}; \
falling back to blocking join per thread",
);
None
}
};
if let Some(epoll) = epoll_setup {
let mut events_buf: Vec<EpollEvent> =
vec![EpollEvent::default(); indexed_joins.len().max(1)];
while !indexed_joins.is_empty() {
let n = match epoll.wait(-1, &mut events_buf) {
Ok(n) => n,
Err(e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
Err(e) => {
eprintln!(
"ktstr: pcomm container (pcomm={pcomm:?}): epoll.wait \
failed: {e}; falling back to blocking join for the \
remaining {} thread(s)",
indexed_joins.len(),
);
break;
}
};
for ev in &events_buf[..n] {
let target_idx = ev.data() as usize;
let pos = match indexed_joins
.iter()
.position(|(idx, _, _)| *idx == target_idx)
{
Some(p) => p,
None => continue,
};
let (idx, j, evt) = indexed_joins.swap_remove(pos);
if let Err(e) = epoll.ctl(
ControlOperation::Delete,
evt.as_raw_fd(),
EpollEvent::default(),
) {
eprintln!(
"ktstr: pcomm container (pcomm={pcomm:?}): epoll.ctl(Del) \
for thread {idx} failed: {e}",
);
}
let _ = evt.read();
match j.join() {
Ok(r) => reports.push(r),
Err(payload) => {
let msg = extract_panic_payload(payload);
eprintln!(
"ktstr: pcomm container (pcomm={pcomm:?}): thread {idx} \
panicked: {msg}",
);
reports.push(WorkerReport {
completed: false,
exit_info: Some(WorkerExitInfo::Panicked(msg)),
..WorkerReport::default()
});
}
}
}
}
}
for (idx, j, _evt) in indexed_joins {
match j.join() {
Ok(r) => reports.push(r),
Err(payload) => {
let msg = extract_panic_payload(payload);
eprintln!(
"ktstr: pcomm container (pcomm={pcomm:?}): thread {idx} \
panicked: {msg}",
);
reports.push(WorkerReport {
completed: false,
exit_info: Some(WorkerExitInfo::Panicked(msg)),
..WorkerReport::default()
});
}
}
}
let bytes = match serde_json::to_vec(&reports) {
Ok(v) => v,
Err(e) => {
eprintln!(
"ktstr: pcomm container (pcomm={pcomm:?}): serde_json encode \
of {} reports failed: {e}",
reports.len(),
);
Vec::new()
}
};
{
let mut f = unsafe { std::fs::File::from_raw_fd(report_fds[1]) };
if let Err(e) = f.write_all(&bytes) {
eprintln!(
"ktstr: pcomm container (pcomm={pcomm:?}): report write_all \
of {} bytes failed: {e}",
bytes.len(),
);
}
drop(f);
}
unsafe {
libc::_exit(0);
}
}
child_pid => {
let psm_parent_restore_rc = unsafe {
libc::pthread_sigmask(libc::SIG_SETMASK, &old_mask, std::ptr::null_mut())
};
if psm_parent_restore_rc != 0 {
tracing::warn!(
rc = psm_parent_restore_rc,
"pthread_sigmask(SIG_SETMASK) failed restoring mask in parent \
post-pcomm-fork; SIGUSR1 stays blocked in this thread for \
the lifetime of the workload",
);
}
unsafe {
libc::close(report_fds[1]);
libc::close(start_fds[0]);
}
let group_layout: Vec<(usize, usize)> = groups
.iter()
.map(|g| (g.group_idx, g.num_workers))
.collect();
debug_assert_eq!(
group_layout.iter().map(|(_, n)| n).sum::<usize>(),
total_workers,
"spawn_pcomm_container: group_layout total must match total_workers",
);
guard.children.push(ForkedChild {
pid: child_pid,
report_fd: report_fds[0],
start_fd: start_fds[1],
kind: ForkedChildKind::PcommContainer {
groups: group_layout,
},
});
Ok(())
}
}
}
pub(super) enum Dispatch {
Fork,
Thread,
}
impl WorkloadHandle {
pub fn spawn_pcomm_cgroup(
pcomm: &str,
container_uid: Option<u32>,
container_gid: Option<u32>,
works: &[WorkSpec],
) -> Result<Self> {
if pcomm.is_empty() {
anyhow::bail!(
"spawn_pcomm_cgroup: pcomm must be a non-empty string; \
the caller (apply_setup) treats `Some(\"\")` as \
`None` and falls through to the conventional fork \
spawn — empty here is a programmer error.",
);
}
let mut groups: Vec<GroupParams> = Vec::with_capacity(works.len());
for (i, spec) in works.iter().enumerate() {
let num_workers = spec.num_workers.ok_or_else(|| {
anyhow::anyhow!(
"spawn_pcomm_cgroup: works[{i}].num_workers must be set explicitly \
(apply_setup runs resolve_num_workers before calling in)",
)
})?;
let site = format!("works[{i}].affinity");
let affinity = GroupParams::resolve_spawn_affinity(&spec.affinity, &site)?;
groups.push(GroupParams::from_work_spec(spec, i, affinity, num_workers));
}
for group in &groups {
if matches!(group.work_type, WorkType::ForkExit) {
anyhow::bail!(
"WorkSpec::pcomm is incompatible with WorkType::ForkExit \
(works[{}]): a fork from a thread of a multi-threaded \
container inherits all locks held by sibling threads at \
fork time, producing undefined behaviour for any libc \
primitive in the child. Drop pcomm or pick a different \
work type.",
group.group_idx,
);
}
if matches!(group.work_type, WorkType::CgroupChurn { .. }) {
anyhow::bail!(
"WorkSpec::pcomm is incompatible with WorkType::CgroupChurn \
(works[{}]): CgroupChurn writes the worker tid to \
`cgroup.procs`, which the kernel resolves to the whole \
tgid and migrates the entire pcomm container (every \
sibling thread). Drop pcomm or pick a different work type.",
group.group_idx,
);
}
validate_workload_admission(group)?;
}
let total_workers: usize = groups.iter().map(|g| g.num_workers).sum();
if total_workers == 0 {
return Ok(SpawnGuard::new().into_handle());
}
let mut guard = SpawnGuard::new();
let size = total_workers * std::mem::size_of::<AtomicU64>();
let ptr = unsafe {
libc::mmap(
std::ptr::null_mut(),
size,
libc::PROT_READ | libc::PROT_WRITE,
libc::MAP_SHARED | libc::MAP_ANONYMOUS,
-1,
0,
)
};
if ptr == libc::MAP_FAILED {
let errno = std::io::Error::last_os_error();
let hint = mmap_shared_anon_errno_hint(errno.raw_os_error());
anyhow::bail!(
"mmap(MAP_SHARED|MAP_ANONYMOUS, {size} bytes) for the \
per-worker iter_counters region failed: {errno}{hint}; \
this region holds one AtomicU64 per worker thread \
({total_workers} thread(s) inside the pcomm={pcomm:?} \
container) so the parent can snapshot iteration \
counts via `snapshot_iterations()`.",
);
}
guard.iter_counters = ptr as *mut AtomicU64;
guard.iter_counter_bytes = size;
let mut resources: Vec<PcommGroupResources> = Vec::with_capacity(groups.len());
let mut iter_offset: usize = 0;
for group in &groups {
let needs_pipes = matches!(
group.work_type,
WorkType::PipeIo { .. } | WorkType::CachePipe { .. }
);
let chain_depth = group.work_type.chain_pipe_depth();
let needs_futex = group.work_type.needs_shared_mem();
let pipe_pair_base = guard.pipe_pairs.len();
let chain_pipes_base = guard.chain_pipes.len();
let futex_ptrs_base = guard.futex_ptrs.len();
let futex_region_size = futex_region_size_for(&group.work_type);
let futex_group_size = group.work_type.worker_group_size().unwrap_or(2);
if needs_pipes {
for _ in 0..group.num_workers / 2 {
let mut ab = [0i32; 2];
if unsafe { libc::pipe2(ab.as_mut_ptr(), libc::O_CLOEXEC) } != 0 {
anyhow::bail!(
"pipe2 (pcomm={pcomm:?}, group {}) failed: {}",
group.group_idx,
std::io::Error::last_os_error(),
);
}
let mut ba = [0i32; 2];
if unsafe { libc::pipe2(ba.as_mut_ptr(), libc::O_CLOEXEC) } != 0 {
unsafe {
libc::close(ab[0]);
libc::close(ab[1]);
}
anyhow::bail!(
"pipe2 (pcomm={pcomm:?}, group {}) failed: {}",
group.group_idx,
std::io::Error::last_os_error(),
);
}
guard.pipe_pairs.push((ab, ba));
}
}
if let Some(depth) = chain_depth
&& depth > 0
&& group.num_workers >= depth
{
let chains = group.num_workers / depth;
for _ in 0..chains {
let mut chain: Vec<[i32; 2]> = Vec::with_capacity(depth);
let mut alloc_ok = true;
for _ in 0..depth {
let mut p = [0i32; 2];
if unsafe { libc::pipe2(p.as_mut_ptr(), libc::O_CLOEXEC) } != 0 {
alloc_ok = false;
break;
}
chain.push(p);
}
if !alloc_ok {
for p in &chain {
unsafe {
libc::close(p[0]);
libc::close(p[1]);
}
}
anyhow::bail!(
"WakeChain pipe2 (pcomm={pcomm:?}, group {}) failed: {}",
group.group_idx,
std::io::Error::last_os_error(),
);
}
guard.chain_pipes.push(chain);
}
}
if needs_futex {
for _ in 0..group.num_workers / futex_group_size {
let region = unsafe {
libc::mmap(
std::ptr::null_mut(),
futex_region_size,
libc::PROT_READ | libc::PROT_WRITE,
libc::MAP_SHARED | libc::MAP_ANONYMOUS,
-1,
0,
)
};
if region == libc::MAP_FAILED {
let errno = std::io::Error::last_os_error();
let hint = mmap_shared_anon_errno_hint(errno.raw_os_error());
anyhow::bail!(
"mmap(MAP_SHARED|MAP_ANONYMOUS, {futex_region_size} bytes) \
for a futex shared-memory region (pcomm={pcomm:?}, group {}) \
failed: {errno}{hint}",
group.group_idx,
);
}
unsafe {
std::ptr::write_bytes(region as *mut u8, 0, futex_region_size);
}
guard.futex_ptrs.push(region as *mut u32);
guard.futex_region_sizes.push(futex_region_size);
}
}
resources.push(PcommGroupResources {
iter_offset,
pipe_pair_base,
chain_pipes_base,
futex_ptrs_base,
needs_pipes,
chain_depth,
needs_futex,
futex_group_size,
});
iter_offset += group.num_workers;
}
spawn_pcomm_container(
&mut guard,
pcomm,
container_uid,
container_gid,
&groups,
&resources,
)?;
Ok(guard.into_handle())
}
pub fn spawn(config: &WorkloadConfig) -> Result<Self> {
let dispatch = match &config.clone_mode {
CloneMode::Fork => Dispatch::Fork,
CloneMode::Thread => Dispatch::Thread,
};
let mut groups: Vec<GroupParams> = Vec::with_capacity(1 + config.composed.len());
groups.push(GroupParams::primary(config)?);
for (i, spec) in config.composed.iter().enumerate() {
groups.push(GroupParams::from_composed(spec, i + 1)?);
}
for group in &groups {
if matches!(dispatch, Dispatch::Thread) && matches!(group.work_type, WorkType::ForkExit)
{
anyhow::bail!(
"CloneMode::Thread is incompatible with WorkType::ForkExit \
(group {}) — ForkExit forks inside the worker, which under \
a thread-group worker tears down every sibling thread on \
the child's _exit. Use CloneMode::Fork for ForkExit workloads.",
group.group_idx,
);
}
if matches!(dispatch, Dispatch::Fork)
&& matches!(group.work_type, WorkType::EpollStorm { .. })
{
anyhow::bail!(
"CloneMode::Fork is incompatible with WorkType::EpollStorm \
(group {}) — EpollStorm publishes eventfd/epoll fd numbers \
through a shared mmap region for siblings to consume, but \
forked children hold independent fd tables that never \
contain those post-fork descriptors. Use CloneMode::Thread \
for EpollStorm workloads.",
group.group_idx,
);
}
if matches!(dispatch, Dispatch::Thread)
&& matches!(group.work_type, WorkType::CgroupChurn { .. })
{
anyhow::bail!(
"CloneMode::Thread is incompatible with WorkType::CgroupChurn \
(group {}) — CgroupChurn writes the worker tid to \
`cgroup.procs`, which the kernel resolves to the whole tgid \
and migrates every sibling thread (including the harness) \
to the target cgroup. Use CloneMode::Fork for CgroupChurn \
workloads so each worker is a separate tgid.",
group.group_idx,
);
}
validate_workload_admission(group)?;
}
let mut guard = SpawnGuard::new();
let total_workers: usize = groups.iter().map(|g| g.num_workers).sum();
if total_workers > 0 {
let size = total_workers * std::mem::size_of::<AtomicU64>();
let ptr = unsafe {
libc::mmap(
std::ptr::null_mut(),
size,
libc::PROT_READ | libc::PROT_WRITE,
libc::MAP_SHARED | libc::MAP_ANONYMOUS,
-1,
0,
)
};
if ptr == libc::MAP_FAILED {
let errno = std::io::Error::last_os_error();
let hint = mmap_shared_anon_errno_hint(errno.raw_os_error());
anyhow::bail!(
"mmap(MAP_SHARED|MAP_ANONYMOUS, {size} bytes) for the \
per-worker iter_counters region failed: {errno}{hint}; \
this region holds one AtomicU64 per worker \
({total_workers} slots across {} group(s)) so the parent \
can snapshot iteration counts via \
`snapshot_iterations()`. Remediation: reduce num_workers \
(each worker consumes 8 bytes of this region, rounded up \
to a page) or raise `vm.max_map_count` / the memory \
cgroup limit.",
groups.len(),
);
}
guard.iter_counters = ptr as *mut AtomicU64;
guard.iter_counter_bytes = size;
}
let mut iter_offset: usize = 0;
for group in &groups {
Self::spawn_group(&mut guard, group, &dispatch, iter_offset)?;
iter_offset += group.num_workers;
}
Ok(guard.into_handle())
}
#[allow(clippy::too_many_arguments)]
fn spawn_group(
guard: &mut SpawnGuard,
group: &GroupParams,
dispatch: &Dispatch,
iter_offset: usize,
) -> Result<()> {
let needs_pipes = matches!(
group.work_type,
WorkType::PipeIo { .. } | WorkType::CachePipe { .. }
);
let chain_depth = group.work_type.chain_pipe_depth();
let needs_futex = group.work_type.needs_shared_mem();
let pipe_pair_base = guard.pipe_pairs.len();
let chain_pipes_base = guard.chain_pipes.len();
let futex_ptrs_base = guard.futex_ptrs.len();
let futex_region_size = futex_region_size_for(&group.work_type);
if needs_pipes {
for _ in 0..group.num_workers / 2 {
let mut ab = [0i32; 2]; if unsafe { libc::pipe2(ab.as_mut_ptr(), libc::O_CLOEXEC) } != 0 {
anyhow::bail!("pipe2 failed: {}", std::io::Error::last_os_error());
}
let mut ba = [0i32; 2]; if unsafe { libc::pipe2(ba.as_mut_ptr(), libc::O_CLOEXEC) } != 0 {
unsafe {
libc::close(ab[0]);
libc::close(ab[1]);
}
anyhow::bail!("pipe2 failed: {}", std::io::Error::last_os_error());
}
guard.pipe_pairs.push((ab, ba));
}
}
if let Some(depth) = chain_depth
&& depth > 0
&& group.num_workers >= depth
{
let chains = group.num_workers / depth;
for _ in 0..chains {
let mut chain: Vec<[i32; 2]> = Vec::with_capacity(depth);
let mut alloc_ok = true;
for _ in 0..depth {
let mut p = [0i32; 2];
if unsafe { libc::pipe2(p.as_mut_ptr(), libc::O_CLOEXEC) } != 0 {
alloc_ok = false;
break;
}
chain.push(p);
}
if !alloc_ok {
for p in &chain {
unsafe {
libc::close(p[0]);
libc::close(p[1]);
}
}
anyhow::bail!(
"WakeChain pipe2 allocation failed: {}",
std::io::Error::last_os_error()
);
}
guard.chain_pipes.push(chain);
}
}
let futex_group_size = group.work_type.worker_group_size().unwrap_or(2);
if needs_futex {
for _ in 0..group.num_workers / futex_group_size {
let ptr = unsafe {
libc::mmap(
std::ptr::null_mut(),
futex_region_size,
libc::PROT_READ | libc::PROT_WRITE,
libc::MAP_SHARED | libc::MAP_ANONYMOUS,
-1,
0,
)
};
if ptr == libc::MAP_FAILED {
let errno = std::io::Error::last_os_error();
let hint = mmap_shared_anon_errno_hint(errno.raw_os_error());
anyhow::bail!(
"mmap(MAP_SHARED|MAP_ANONYMOUS, {futex_region_size} bytes) \
for a futex shared-memory region failed: {errno}{hint}; \
this region backs the {:?} worker-group's (group {}) \
inter-process futex word and is allocated \
before fork so every child inherits the same \
mapping. Remediation: reduce num_workers (each \
futex group consumes one shared page) or raise \
`vm.max_map_count` / the memory cgroup limit.",
group.work_type.name(),
group.group_idx,
);
}
unsafe { std::ptr::write_bytes(ptr as *mut u8, 0, futex_region_size) };
guard.futex_ptrs.push(ptr as *mut u32);
guard.futex_region_sizes.push(futex_region_size);
}
}
for i in 0..group.num_workers {
let affinity = resolve_affinity(&group.affinity)?;
let worker_pipe_fds: Option<(i32, i32)> = if needs_pipes {
let pair_idx = pipe_pair_base + i / 2;
let (ref ab, ref ba) = guard.pipe_pairs[pair_idx];
if i % 2 == 0 {
Some((ba[0], ab[1]))
} else {
Some((ab[0], ba[1]))
}
} else if let Some(depth) = chain_depth
&& depth > 0
{
let chain_idx = chain_pipes_base + i / depth;
let stage = i % depth;
let prev_stage = (stage + depth - 1) % depth;
let chain = &guard.chain_pipes[chain_idx];
Some((chain[prev_stage][0], chain[stage][1]))
} else {
None
};
let worker_futex: Option<(*mut u32, usize)> = if needs_futex {
let futex_group_idx = futex_ptrs_base + i / futex_group_size;
let pos = i % futex_group_size;
Some((guard.futex_ptrs[futex_group_idx], pos))
} else {
None
};
let iter_slot: *mut AtomicU64 = if !guard.iter_counters.is_null() {
unsafe { guard.iter_counters.add(iter_offset + i) }
} else {
std::ptr::null_mut()
};
match dispatch {
Dispatch::Thread => {
spawn_thread_worker(
guard,
group,
affinity,
worker_pipe_fds,
worker_futex,
iter_slot,
)?;
continue;
}
Dispatch::Fork => {
}
}
let mut report_fds = [0i32; 2];
if unsafe { libc::pipe2(report_fds.as_mut_ptr(), libc::O_CLOEXEC) } != 0 {
anyhow::bail!(
"worker {}/{} (group {}): report pipe2 failed: {}",
i + 1,
group.num_workers,
group.group_idx,
std::io::Error::last_os_error(),
);
}
const REPORT_PIPE_SIZE: libc::c_int = 8 * 1024 * 1024;
let prev_size =
unsafe { libc::fcntl(report_fds[1], libc::F_SETPIPE_SZ, REPORT_PIPE_SIZE) };
if prev_size < 0 {
let err = std::io::Error::last_os_error();
tracing::warn!(
worker = i + 1,
num_workers = group.num_workers,
group_idx = group.group_idx,
requested_size = REPORT_PIPE_SIZE,
%err,
"F_SETPIPE_SZ on report pipe failed; falling back to default \
pipe capacity. Workers producing >64 KiB reports may block \
in write_all and have their reports truncated by the \
parent's deadline-driven fd close."
);
}
let mut start_fds = [0i32; 2];
if unsafe { libc::pipe2(start_fds.as_mut_ptr(), libc::O_CLOEXEC) } != 0 {
unsafe {
libc::close(report_fds[0]);
libc::close(report_fds[1]);
}
anyhow::bail!(
"worker {}/{} (group {}): start pipe2 failed: {}",
i + 1,
group.num_workers,
group.group_idx,
std::io::Error::last_os_error(),
);
}
let mut old_mask: libc::sigset_t = unsafe { std::mem::zeroed() };
let mut block_mask: libc::sigset_t = unsafe { std::mem::zeroed() };
let psm_block_rc = unsafe {
libc::sigemptyset(&mut block_mask);
libc::sigaddset(&mut block_mask, libc::SIGUSR1);
libc::pthread_sigmask(libc::SIG_BLOCK, &block_mask, &mut old_mask)
};
if psm_block_rc != 0 {
tracing::warn!(
rc = psm_block_rc,
"pthread_sigmask(SIG_BLOCK, SIGUSR1) failed pre-fork; child inherits unblocked SIGUSR1 and may terminate on default action before installing handler"
);
}
let pid = unsafe { libc::fork() };
match pid {
-1 => {
let psm_restore_rc = unsafe {
libc::pthread_sigmask(libc::SIG_SETMASK, &old_mask, std::ptr::null_mut())
};
if psm_restore_rc != 0 {
tracing::warn!(
rc = psm_restore_rc,
"pthread_sigmask(SIG_SETMASK) failed restoring mask after fork failure; SIGUSR1 may remain blocked in this thread"
);
}
unsafe {
libc::close(report_fds[0]);
libc::close(report_fds[1]);
libc::close(start_fds[0]);
libc::close(start_fds[1]);
}
anyhow::bail!(
"worker {}/{} (group {}): fork failed: {}",
i + 1,
group.num_workers,
group.group_idx,
std::io::Error::last_os_error(),
);
}
0 => {
unsafe {
libc::prctl(libc::PR_SET_PDEATHSIG, libc::SIGKILL);
}
if std::env::var_os("KTSTR_GUEST_INIT").is_none()
&& unsafe { libc::getppid() } == 1
{
unsafe {
libc::_exit(0);
}
}
unsafe {
libc::setpgid(0, 0);
}
STOP.store(false, Ordering::Relaxed);
let sig_prev = unsafe {
libc::signal(
libc::SIGUSR1,
sigusr1_handler as *const () as libc::sighandler_t,
)
};
if sig_prev == libc::SIG_ERR {
let errno = std::io::Error::last_os_error();
eprintln!(
"ktstr: signal(SIGUSR1) install failed in worker child: {errno}; graceful stop unavailable, killpg escalation will reap"
);
}
let psm_unblock_rc = unsafe {
libc::pthread_sigmask(libc::SIG_SETMASK, &old_mask, std::ptr::null_mut())
};
if psm_unblock_rc != 0 {
eprintln!(
"ktstr: pthread_sigmask(SIG_SETMASK) unblock failed in worker child: rc={psm_unblock_rc}; SIGUSR1 stays blocked, killpg escalation will reap"
);
}
unsafe {
libc::close(report_fds[0]);
libc::close(start_fds[1]);
}
if needs_pipes {
let pair_idx = pipe_pair_base + i / 2;
let (ref ab, ref ba) = guard.pipe_pairs[pair_idx];
if i % 2 == 0 {
unsafe {
libc::close(ab[0]);
libc::close(ba[1]);
}
} else {
unsafe {
libc::close(ab[1]);
libc::close(ba[0]);
}
}
for (j, (ab2, ba2)) in guard.pipe_pairs.iter().enumerate() {
if j != pair_idx {
unsafe {
libc::close(ab2[0]);
libc::close(ab2[1]);
libc::close(ba2[0]);
libc::close(ba2[1]);
}
}
}
} else {
for (ab2, ba2) in guard.pipe_pairs.iter() {
unsafe {
libc::close(ab2[0]);
libc::close(ab2[1]);
libc::close(ba2[0]);
libc::close(ba2[1]);
}
}
}
if let Some(depth) = chain_depth
&& depth > 0
{
let chain_idx = chain_pipes_base + i / depth;
let stage = i % depth;
let prev_stage = (stage + depth - 1) % depth;
for (s, pipe) in guard.chain_pipes[chain_idx].iter().enumerate() {
if s == prev_stage {
unsafe {
libc::close(pipe[1]);
}
} else if s == stage {
unsafe {
libc::close(pipe[0]);
}
} else {
unsafe {
libc::close(pipe[0]);
libc::close(pipe[1]);
}
}
}
for (cj, chain) in guard.chain_pipes.iter().enumerate() {
if cj != chain_idx {
for pipe in chain {
unsafe {
libc::close(pipe[0]);
libc::close(pipe[1]);
}
}
}
}
} else {
for chain in guard.chain_pipes.iter() {
for pipe in chain {
unsafe {
libc::close(pipe[0]);
libc::close(pipe[1]);
}
}
}
}
#[cfg(panic = "unwind")]
{
let _ = std::panic::take_hook();
std::panic::set_hook(Box::new(|_| {}));
}
let child_result =
std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let mut pfd = libc::pollfd {
fd: start_fds[0],
events: libc::POLLIN,
revents: 0,
};
let ret = unsafe { libc::poll(&mut pfd, 1, 30_000) };
if ret <= 0 {
unsafe {
libc::_exit(1);
}
}
let mut buf = [0u8; 1];
let mut f = unsafe { std::fs::File::from_raw_fd(start_fds[0]) };
let _ = f.read_exact(&mut buf);
drop(f);
let ready_byte: u8 = b'r';
unsafe {
libc::write(
report_fds[1],
&ready_byte as *const u8 as *const libc::c_void,
1,
);
}
let report = worker_main(
affinity,
group.work_type.clone(),
group.sched_policy,
group.mem_policy.clone(),
group.mpol_flags,
group.nice,
group.comm.as_deref(),
group.uid,
group.gid,
group.numa_node,
worker_pipe_fds,
worker_futex,
iter_slot,
&STOP,
group.group_idx,
);
let bytes =
bincode::serde::encode_to_vec(&report, bincode::config::standard())
.unwrap_or_default();
let mut f = unsafe { std::fs::File::from_raw_fd(report_fds[1]) };
if let Err(e) = f.write_all(&bytes) {
eprintln!("ktstr: worker report write_all failed: {e}");
}
drop(f);
}));
let code = if child_result.is_ok() { 0 } else { 1 };
unsafe {
libc::_exit(code);
}
}
child_pid => {
let psm_parent_restore_rc = unsafe {
libc::pthread_sigmask(libc::SIG_SETMASK, &old_mask, std::ptr::null_mut())
};
if psm_parent_restore_rc != 0 {
tracing::warn!(
rc = psm_parent_restore_rc,
"pthread_sigmask(SIG_SETMASK) failed restoring mask in parent post-fork; SIGUSR1 stays blocked in this thread for the lifetime of the workload"
);
}
unsafe {
libc::close(report_fds[1]);
libc::close(start_fds[0]);
}
guard.children.push(ForkedChild {
pid: child_pid,
report_fd: report_fds[0],
start_fd: start_fds[1],
kind: ForkedChildKind::Worker {
group_idx: group.group_idx,
},
});
}
}
}
Ok(())
}
pub fn worker_pids(&self) -> Vec<libc::pid_t> {
let mut out = Vec::with_capacity(self.children.len() + self.threads.len());
out.extend(self.children.iter().map(|c| c.pid));
out.extend(self.threads.iter().map(|tw| tw.tid.load(Ordering::Acquire)));
out
}
pub fn worker_pids_for_cgroup_procs(&self) -> Result<Vec<libc::pid_t>> {
if !self.threads.is_empty() {
anyhow::bail!(
"WorkloadHandle::worker_pids_for_cgroup_procs: workers were \
spawned with CloneMode::Thread; their pids share the test \
harness's tgid and a `cgroup.procs` write would migrate the \
harness. Use `cgroup.threads` (thread-scoped) for Thread-mode \
workers, or switch to CloneMode::Fork."
);
}
Ok(self.worker_pids())
}
pub fn start(&mut self) {
if self.started {
return;
}
self.started = true;
for child in &mut self.children {
unsafe {
libc::write(child.start_fd, b"s".as_ptr() as *const _, 1);
libc::close(child.start_fd);
}
child.start_fd = -1;
}
for tw in &mut self.threads {
if let Some(tx) = tw.start_tx.take() {
let _ = tx.send(());
}
}
}
pub fn set_affinity(&self, idx: usize, cpus: &BTreeSet<usize>) -> Result<()> {
let pid = if idx < self.children.len() {
let child = &self.children[idx];
if let ForkedChildKind::PcommContainer { groups } = &child.kind {
let total: usize = groups.iter().map(|(_, n)| n).sum();
anyhow::bail!(
"set_affinity: child {idx} is a pcomm container \
hosting {total} thread workers; per-thread \
tids are not exported across the process boundary. \
Set affinity via WorkSpec::affinity at spawn time \
so worker_main applies it inside the container."
);
}
child.pid
} else {
let thread_idx = idx - self.children.len();
let tid = self.threads[thread_idx].tid.load(Ordering::Acquire);
if tid == 0 {
anyhow::bail!(
"set_affinity: thread worker {thread_idx} has not yet \
published gettid() (call start() first)"
);
}
tid
};
set_thread_affinity(pid, cpus)
}
pub fn snapshot_iterations(&self) -> Vec<u64> {
if self.iter_counters.is_null() || self.iter_counter_len == 0 {
return Vec::new();
}
(0..self.iter_counter_len)
.map(|i| {
unsafe { &*self.iter_counters.add(i) }.load(Ordering::Relaxed)
})
.collect()
}
pub fn stop_and_collect(mut self) -> Vec<WorkerReport> {
let was_started = self.started;
self.start();
if !was_started && !self.children.is_empty() {
let barrier_deadline = std::time::Instant::now() + std::time::Duration::from_secs(5);
let mut pending: Vec<(usize, i32)> = self
.children
.iter()
.enumerate()
.map(|(i, c)| (i, c.report_fd))
.collect();
while !pending.is_empty() {
let remaining =
barrier_deadline.saturating_duration_since(std::time::Instant::now());
if remaining.is_zero() {
break;
}
let ms = remaining.as_millis().min(i32::MAX as u128) as i32;
let mut pfds: Vec<libc::pollfd> = pending
.iter()
.map(|&(_, fd)| libc::pollfd {
fd,
events: libc::POLLIN,
revents: 0,
})
.collect();
let ret = unsafe { libc::poll(pfds.as_mut_ptr(), pfds.len() as libc::nfds_t, ms) };
if ret < 0 {
let err = std::io::Error::last_os_error();
if err.kind() == std::io::ErrorKind::Interrupted {
continue;
}
tracing::warn!(
%err,
pending = pending.len(),
"WorkloadHandle::stop_and_collect: barrier poll failed; falling \
through to per-worker collect"
);
break;
}
if ret > 0 {
pending.retain(|&(_, fd)| {
let pfd = pfds.iter().find(|p| p.fd == fd);
let revents = pfd.map(|p| p.revents).unwrap_or(0);
if revents & libc::POLLIN != 0 {
let mut byte: u8 = 0;
let n = unsafe {
libc::read(fd, &mut byte as *mut u8 as *mut libc::c_void, 1)
};
if n >= 0 {
return false;
}
let err = std::io::Error::last_os_error();
if err.kind() == std::io::ErrorKind::Interrupted {
return true;
}
tracing::warn!(
%err,
fd,
"WorkloadHandle::stop_and_collect: barrier byte read \
failed; treating worker as ready"
);
return false;
}
if revents & (libc::POLLHUP | libc::POLLERR | libc::POLLNVAL) != 0 {
return false;
}
true
});
}
}
}
let mut reports = Vec::new();
let children = std::mem::take(&mut self.children);
let threads = std::mem::take(&mut self.threads);
if was_started {
for child in &children {
let mut byte: u8 = 0;
unsafe {
libc::read(
child.report_fd,
&mut byte as *mut u8 as *mut libc::c_void,
1,
);
}
}
}
for child in &children {
let _ = nix::sys::signal::kill(
nix::unistd::Pid::from_raw(child.pid),
nix::sys::signal::Signal::SIGUSR1,
);
}
for tw in &threads {
tw.stop.store(true, Ordering::Relaxed);
}
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(5);
for child in children {
let mut buf = Vec::new();
let remaining = deadline.saturating_duration_since(std::time::Instant::now());
let ms = remaining.as_millis().min(i32::MAX as u128) as i32;
let mut poll_ready = false;
if ms > 0 {
let mut pfd = libc::pollfd {
fd: child.report_fd,
events: libc::POLLIN,
revents: 0,
};
let ready = unsafe { libc::poll(&mut pfd, 1, ms) };
poll_ready = ready > 0;
}
let npid = nix::unistd::Pid::from_raw(child.pid);
if !poll_ready {
let _ = nix::sys::signal::kill(npid, nix::sys::signal::Signal::SIGKILL);
let _ = nix::sys::signal::killpg(npid, nix::sys::signal::Signal::SIGKILL);
let _ = nix::unistd::close(child.report_fd);
} else {
let mut f = unsafe { std::fs::File::from_raw_fd(child.report_fd) };
let _ = f.read_to_end(&mut buf);
drop(f);
let _ = nix::sys::signal::kill(npid, nix::sys::signal::Signal::SIGKILL);
let _ = nix::sys::signal::killpg(npid, nix::sys::signal::Signal::SIGKILL);
}
let waited = nix::sys::wait::waitpid(npid, Some(nix::sys::wait::WaitPidFlag::WNOHANG));
let still_running = matches!(waited, Ok(nix::sys::wait::WaitStatus::StillAlive));
let exit_info_source: Result<nix::sys::wait::WaitStatus, nix::errno::Errno> =
if still_running {
let _ = nix::sys::wait::waitpid(npid, None);
Ok(nix::sys::wait::WaitStatus::StillAlive)
} else {
waited
};
let report_slice: &[u8] = if buf.first() == Some(&b'r') {
&buf[1..]
} else {
&buf[..]
};
match child.kind {
ForkedChildKind::Worker { group_idx } => {
let decoded: Result<(WorkerReport, usize), _> =
bincode::serde::decode_from_slice(
report_slice,
bincode::config::standard(),
);
if let Ok((report, _)) = decoded {
reports.push(report);
} else {
let exit_info = classify_wait_outcome(exit_info_source);
eprintln!(
"ktstr: worker pid={} returned no report ({} bytes read, exit={exit_info:?})",
child.pid,
buf.len(),
);
reports.push(WorkerReport {
tid: child.pid,
group_idx,
exit_info: Some(exit_info),
..WorkerReport::default()
});
}
}
ForkedChildKind::PcommContainer { groups } => {
let total_workers: usize = groups.iter().map(|(_, n)| n).sum();
let decoded: Result<Vec<WorkerReport>, _> =
serde_json::from_slice(report_slice);
match decoded {
Ok(mut decoded_reports) if decoded_reports.len() == total_workers => {
reports.append(&mut decoded_reports);
}
Ok(mut decoded_reports) => {
let exit_info = classify_wait_outcome(exit_info_source);
eprintln!(
"ktstr: pcomm thread-group leader pid={} returned {} of {} reports ({} bytes read, exit={exit_info:?})",
child.pid,
decoded_reports.len(),
total_workers,
buf.len(),
);
decoded_reports.truncate(total_workers);
let got = decoded_reports.len();
for r in decoded_reports {
reports.push(r);
}
for slot in got..total_workers {
let mut acc = 0usize;
let mut g_idx = 0usize;
for &(gi, n) in &groups {
if slot < acc + n {
g_idx = gi;
break;
}
acc += n;
}
reports.push(WorkerReport {
tid: child.pid,
group_idx: g_idx,
exit_info: Some(exit_info.clone()),
..WorkerReport::default()
});
}
}
Err(_) => {
let exit_info = classify_wait_outcome(exit_info_source);
eprintln!(
"ktstr: pcomm thread-group leader pid={} returned no decodable report ({} bytes read, exit={exit_info:?})",
child.pid,
buf.len(),
);
for &(gi, n) in &groups {
for _ in 0..n {
reports.push(WorkerReport {
tid: child.pid,
group_idx: gi,
exit_info: Some(exit_info.clone()),
..WorkerReport::default()
});
}
}
}
}
}
}
}
for mut tw in threads {
tw.start_tx.take();
let tid = tw.tid.load(Ordering::Acquire);
if let Some(j) = tw.join.take() {
match join_thread_with_timeout(j, &tw.exit_evt, THREAD_JOIN_TIMEOUT) {
Some(Ok(report)) => reports.push(report),
Some(Err(payload)) => {
let msg = extract_panic_payload(payload);
eprintln!("ktstr: thread worker tid={tid} panicked: {msg}");
reports.push(WorkerReport {
tid,
completed: false,
exit_info: Some(WorkerExitInfo::Panicked(msg)),
..WorkerReport::default()
});
}
None => {
tracing::warn!(
tid,
timeout_secs = THREAD_JOIN_TIMEOUT.as_secs(),
"thread worker did not join within timeout — leaking the \
thread; sentinel report attached with TimedOut exit_info"
);
reports.push(WorkerReport {
tid,
completed: false,
exit_info: Some(WorkerExitInfo::TimedOut),
..WorkerReport::default()
});
}
}
}
}
reports
}
}
impl Drop for WorkloadHandle {
fn drop(&mut self) {
use nix::sys::signal::{Signal, kill};
use nix::sys::wait::waitpid;
use nix::unistd::{Pid, close};
for child in &self.children {
let pid = child.pid;
let nix_pid = Pid::from_raw(pid);
if let Err(e) = nix::sys::signal::killpg(nix_pid, Signal::SIGKILL)
&& e != nix::errno::Errno::ESRCH
{
tracing::warn!(pid, %e, "killpg failed in WorkloadHandle::drop");
}
if let Err(e) = kill(nix_pid, Signal::SIGKILL) {
tracing::warn!(pid, %e, "kill failed in WorkloadHandle::drop");
}
if let Err(e) = waitpid(nix_pid, None) {
tracing::warn!(pid, %e, "waitpid failed in WorkloadHandle::drop");
}
for fd in [child.report_fd, child.start_fd] {
if fd >= 0
&& let Err(e) = close(fd)
{
tracing::warn!(fd, %e, "close failed in WorkloadHandle::drop");
}
}
}
let threads = std::mem::take(&mut self.threads);
for mut tw in threads {
tw.stop.store(true, Ordering::Relaxed);
tw.start_tx.take();
if let Some(j) = tw.join.take() {
let tid = tw.tid.load(Ordering::Acquire);
match join_thread_with_timeout(j, &tw.exit_evt, THREAD_JOIN_TIMEOUT) {
Some(Ok(_)) => {}
Some(Err(e)) => {
let payload = extract_panic_payload(e);
tracing::warn!(
tid,
payload,
"thread worker panicked in WorkloadHandle::drop"
);
}
None => {
tracing::warn!(
tid,
timeout_secs = THREAD_JOIN_TIMEOUT.as_secs(),
"thread worker failed to join within timeout in \
WorkloadHandle::drop — leaking the thread"
);
}
}
}
}
for (ab, ba) in &self.pipe_pairs {
for fd in [ab[0], ab[1], ba[0], ba[1]] {
if let Err(e) = close(fd) {
tracing::warn!(fd, %e, "close failed for pipe_pair fd in WorkloadHandle::drop");
}
}
}
for chain in &self.chain_pipes {
for pipe in chain {
for fd in [pipe[0], pipe[1]] {
if let Err(e) = close(fd) {
tracing::warn!(fd, %e, "close failed for chain_pipe fd in WorkloadHandle::drop");
}
}
}
}
for (&ptr, &size) in self.futex_ptrs.iter().zip(self.futex_region_sizes.iter()) {
unsafe {
libc::munmap(ptr as *mut libc::c_void, size);
}
}
if !self.iter_counters.is_null() && self.iter_counter_len > 0 {
unsafe {
libc::munmap(
self.iter_counters as *mut libc::c_void,
self.iter_counter_len * std::mem::size_of::<u64>(),
);
}
}
}
}
pub(super) extern "C" fn sigusr1_handler(_: libc::c_int) {
STOP.store(true, Ordering::Relaxed);
}
pub(super) fn mmap_shared_anon_errno_hint(errno: Option<i32>) -> &'static str {
match errno {
Some(libc::ENOMEM) => {
" (ENOMEM: host is out of memory \
or /proc/sys/vm/max_map_count is too low — \
check `sysctl vm.max_map_count` and `free -h`)"
}
Some(libc::EPERM) => {
" (EPERM: MAP_SHARED|MAP_ANONYMOUS \
rejected by the kernel — check memory cgroup \
limits and container seccomp policy)"
}
Some(libc::EINVAL) => {
" (EINVAL: invalid length or \
flag combination — verify num_workers > 0 so the \
region size is non-zero, and that the total size \
does not overflow usize)"
}
_ => "",
}
}
#[cfg(test)]
mod testing;
#[cfg(test)]
mod tests_composed;
#[cfg(test)]
mod tests_fan_out;
#[cfg(test)]
mod tests_futex;
#[cfg(test)]
mod tests_grandchild;
#[cfg(test)]
mod tests_idle_churn;
#[cfg(test)]
mod tests_integration;
#[cfg(test)]
mod tests_lifecycle;
#[cfg(test)]
mod tests_mempolicy;
#[cfg(test)]
mod tests_misc;
#[cfg(test)]
mod tests_pcomm;
#[cfg(test)]
mod tests_sched_policy;
#[cfg(test)]
mod tests_spawn_guard;
#[cfg(test)]
mod tests_thread_mode;
#[cfg(test)]
mod tests_wake_chain;