pub mod affinity;
pub mod backdrop;
pub mod basic;
pub mod bpf_pin;
pub mod cpuset;
pub mod dynamic;
pub mod interaction;
pub mod nested;
pub mod ops;
pub mod payload_run;
pub mod performance;
pub mod sample;
pub mod scenarios;
pub mod snapshot;
pub mod stress;
pub use backdrop::Backdrop;
use std::collections::BTreeSet;
use std::sync::Arc;
use std::sync::atomic::AtomicU16;
use std::thread;
use std::time::Duration;
use anyhow::Result;
use nix::sys::signal::kill;
use nix::unistd::Pid;
use crate::assert::AssertResult;
use crate::topology::TestTopology;
use crate::workload::*;
fn process_alive(pid: libc::pid_t) -> bool {
if pid <= 0 {
return false;
}
kill(Pid::from_raw(pid), None).is_ok()
}
pub use crate::workload::AffinityIntent;
#[must_use = "dropping a CgroupGroup immediately destroys the cgroups it manages"]
pub struct CgroupGroup<'a> {
cgroups: &'a dyn crate::cgroup::CgroupOps,
names: Vec<String>,
}
impl std::fmt::Debug for CgroupGroup<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CgroupGroup")
.field("cgroups", &self.cgroups.parent_path())
.field("names", &self.names)
.finish()
}
}
impl<'a> CgroupGroup<'a> {
pub fn new(cgroups: &'a dyn crate::cgroup::CgroupOps) -> Self {
Self {
cgroups,
names: Vec::new(),
}
}
pub fn add_cgroup(&mut self, name: &str, cpuset: &BTreeSet<usize>) -> Result<()> {
let mut required = BTreeSet::new();
required.insert(crate::cgroup::Controller::Cpuset);
self.cgroups.setup(&required)?;
self.cgroups.create_cgroup(name)?;
self.cgroups.set_cpuset(name, cpuset)?;
self.names.push(name.to_string());
Ok(())
}
pub fn add_cgroup_no_cpuset(&mut self, name: &str) -> Result<()> {
self.cgroups.create_cgroup(name)?;
self.names.push(name.to_string());
Ok(())
}
pub fn names(&self) -> &[String] {
&self.names
}
pub(crate) fn forget(&mut self, name: &str) {
self.names.retain(|n| n != name);
}
}
pub(crate) fn is_io_not_found(err: &anyhow::Error) -> bool {
err.root_cause()
.downcast_ref::<std::io::Error>()
.is_some_and(|io| io.kind() == std::io::ErrorKind::NotFound)
}
pub(crate) fn remove_cgroup_errno_hint(err: &anyhow::Error) -> Option<&'static str> {
let raw = err
.root_cause()
.downcast_ref::<std::io::Error>()?
.raw_os_error()?;
match raw {
libc::EBUSY => {
Some("EBUSY: cgroup still has live tasks — workloads were not drained before teardown")
}
libc::EACCES => {
Some("EACCES: permission denied — check cgroup owner / `user.slice` delegation")
}
_ => None,
}
}
impl Drop for CgroupGroup<'_> {
fn drop(&mut self) {
for name in self.names.iter().rev() {
if let Err(err) = self.cgroups.remove_cgroup(name) {
if is_io_not_found(&err) {
continue;
}
let hint = remove_cgroup_errno_hint(&err).unwrap_or("");
tracing::warn!(
cgroup = %name,
err = %format!("{err:#}"),
hint,
"CgroupGroup::drop: remove_cgroup returned non-ENOENT error",
);
}
}
}
}
pub struct Ctx<'a> {
pub cgroups: &'a dyn crate::cgroup::CgroupOps,
pub topo: &'a TestTopology,
pub duration: Duration,
pub workers_per_cgroup: usize,
pub sched_pid: Option<libc::pid_t>,
pub settle: Duration,
pub work_type_override: Option<WorkType>,
pub assert: crate::assert::Assert,
pub wait_for_map_write: bool,
pub current_step: Arc<AtomicU16>,
}
impl std::fmt::Debug for Ctx<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Ctx")
.field("cgroups", &self.cgroups.parent_path())
.field("topo", &self.topo)
.field("duration", &self.duration)
.field("workers_per_cgroup", &self.workers_per_cgroup)
.field("sched_pid", &self.sched_pid)
.field("settle", &self.settle)
.field("work_type_override", &self.work_type_override)
.field("assert", &self.assert)
.field("wait_for_map_write", &self.wait_for_map_write)
.field(
"current_step",
&self.current_step.load(std::sync::atomic::Ordering::Relaxed),
)
.finish()
}
}
impl Ctx<'_> {
pub(crate) fn active_sched_pid(&self) -> Option<libc::pid_t> {
match self.sched_pid {
Some(p) if p > 0 => Some(p),
Some(p) => {
tracing::warn!(
pid = p,
"Ctx::active_sched_pid: sched_pid=Some({p}) squashed to None; \
only positive pids are configured-scheduler values — use \
None for the unconfigured shape instead of a 0-sentinel or \
negative pid"
);
None
}
None => None,
}
}
pub fn cpuset_cpus(&self, spec: &crate::scenario::ops::CpusetSpec) -> usize {
spec.resolve(self).len()
}
pub fn settled_hold(&self, fraction_of_duration: f64) -> crate::scenario::ops::HoldSpec {
crate::scenario::ops::HoldSpec::fixed(
self.settle + self.duration.mul_f64(fraction_of_duration),
)
}
pub fn cgroup_def(
&self,
name: impl Into<std::borrow::Cow<'static, str>>,
) -> crate::scenario::ops::CgroupDef {
crate::scenario::ops::CgroupDef::named(name).workers(self.workers_per_cgroup)
}
}
pub struct CtxBuilder<'a> {
cgroups: &'a dyn crate::cgroup::CgroupOps,
topo: &'a TestTopology,
duration: Duration,
workers_per_cgroup: usize,
sched_pid: Option<libc::pid_t>,
settle: Duration,
work_type_override: Option<WorkType>,
assert: crate::assert::Assert,
wait_for_map_write: bool,
current_step: Arc<AtomicU16>,
}
impl<'a> CtxBuilder<'a> {
#[must_use = "builder methods consume self; bind the result"]
pub fn duration(mut self, d: Duration) -> Self {
self.duration = d;
self
}
#[must_use = "builder methods consume self; bind the result"]
pub fn workers_per_cgroup(mut self, n: usize) -> Self {
self.workers_per_cgroup = n;
self
}
#[must_use = "builder methods consume self; bind the result"]
pub fn sched_pid(mut self, pid: Option<libc::pid_t>) -> Self {
self.sched_pid = pid;
self
}
#[must_use = "builder methods consume self; bind the result"]
pub fn settle(mut self, s: Duration) -> Self {
self.settle = s;
self
}
#[must_use = "builder methods consume self; bind the result"]
pub fn work_type_override(mut self, wt: Option<WorkType>) -> Self {
self.work_type_override = wt;
self
}
#[must_use = "builder methods consume self; bind the result"]
pub fn assert(mut self, a: crate::assert::Assert) -> Self {
self.assert = a;
self
}
#[must_use = "builder methods consume self; bind the result"]
pub fn wait_for_map_write(mut self, v: bool) -> Self {
self.wait_for_map_write = v;
self
}
#[must_use = "builder methods consume self; bind the result"]
pub fn current_step(mut self, cs: Arc<AtomicU16>) -> Self {
self.current_step = cs;
self
}
#[must_use = "dropping a Ctx without running the scenario discards the test setup"]
pub fn build(self) -> Ctx<'a> {
Ctx {
cgroups: self.cgroups,
topo: self.topo,
duration: self.duration,
workers_per_cgroup: self.workers_per_cgroup,
sched_pid: self.sched_pid,
settle: self.settle,
work_type_override: self.work_type_override,
assert: self.assert,
wait_for_map_write: self.wait_for_map_write,
current_step: self.current_step,
}
}
}
impl<'a> Ctx<'a> {
#[must_use = "discarding a CtxBuilder drops the scenario context defaults; chain setters and call .build()"]
pub fn builder(
cgroups: &'a dyn crate::cgroup::CgroupOps,
topo: &'a TestTopology,
) -> CtxBuilder<'a> {
CtxBuilder {
cgroups,
topo,
duration: Duration::from_secs(1),
workers_per_cgroup: 1,
sched_pid: None,
settle: Duration::from_millis(0),
work_type_override: None,
assert: crate::assert::Assert::default_checks(),
wait_for_map_write: false,
current_step: Arc::new(AtomicU16::new(0)),
}
}
#[must_use = "dropping a PayloadRun discards the payload configuration; chain setters and call .run()"]
pub fn payload(
&'a self,
p: &'static crate::test_support::Payload,
) -> crate::scenario::payload_run::PayloadRun<'a> {
crate::scenario::payload_run::PayloadRun::new(self, p)
}
}
fn spawn_and_move<F>(ctx: &Ctx, names: &[String], mut cfg_fn: F) -> Result<Vec<WorkloadHandle>>
where
F: FnMut(usize, &str) -> Result<WorkloadConfig>,
{
let mut handles = Vec::with_capacity(names.len());
for (i, name) in names.iter().enumerate() {
let wl = cfg_fn(i, name.as_str())?;
let h = WorkloadHandle::spawn(&wl)?;
tracing::debug!(
cgroup = %name,
workers = wl.num_workers,
pids = h.worker_pids().len(),
"spawned workers",
);
ctx.cgroups
.move_tasks(name.as_str(), &h.worker_pids_for_cgroup_procs()?)?;
handles.push(h);
}
for h in &mut handles {
h.start();
}
Ok(handles)
}
pub(crate) fn resolve_num_workers(work: &WorkSpec, default_n: usize, label: &str) -> Result<usize> {
let n = work.num_workers.unwrap_or(default_n);
if n == 0 {
anyhow::bail!(
"cgroup '{}': num_workers=0 is not allowed — assertions would \
vacuously pass with no WorkerReports; use at least 1 worker or \
drop this WorkSpec entry",
label,
);
}
Ok(n)
}
pub fn resolve_affinity_for_cgroup(
kind: &AffinityIntent,
cpuset: Option<&BTreeSet<usize>>,
topo: &TestTopology,
) -> Result<ResolvedAffinity> {
match kind {
AffinityIntent::Inherit => Ok(ResolvedAffinity::None),
AffinityIntent::RandomSubset { from, count } => {
if *count == 0 {
anyhow::bail!(
"AffinityIntent::RandomSubset count=0 cannot satisfy any sample. \
Switch to `AffinityIntent::Inherit` to deliberately inherit the \
cgroup cpuset, or pass `count >= 1`.",
);
}
let pool = if let Some(cs) = cpuset {
from.intersection(cs).copied().collect::<BTreeSet<usize>>()
} else {
from.clone()
};
if pool.is_empty() {
if cpuset.is_some() {
let cpuset_repr = format_cpuset_for_diag(cpuset);
anyhow::bail!(
"AffinityIntent::RandomSubset has no CPUs after intersecting \
`from={from:?}` with the cgroup cpuset ({cpuset_repr}). \
Switch to `AffinityIntent::Inherit` to deliberately inherit \
the cgroup cpuset, widen the cgroup's cpuset, or pick a \
`from` set that overlaps the cpuset.",
);
} else {
anyhow::bail!(
"AffinityIntent::RandomSubset has an empty `from` pool with \
no cgroup cpuset to narrow it — there is no CPU to sample. \
Switch to `AffinityIntent::Inherit` to deliberately inherit \
the scenario's CPU budget, or pass a non-empty `from` set.",
);
}
}
Ok(ResolvedAffinity::Random {
from: pool,
count: *count,
})
}
AffinityIntent::LlcAligned => {
let pool = cpuset.cloned().unwrap_or_else(|| topo.all_cpuset());
let mut best_llc = topo.llc_aligned_cpuset(0);
let mut best_overlap = best_llc.intersection(&pool).count();
for idx in 1..topo.num_llcs() {
let llc = topo.llc_aligned_cpuset(idx);
let overlap = llc.intersection(&pool).count();
if overlap > best_overlap {
best_llc = llc;
best_overlap = overlap;
}
}
let effective: BTreeSet<usize> = best_llc.intersection(&pool).copied().collect();
if effective.is_empty() {
let cpuset_repr = format_cpuset_for_diag(cpuset);
anyhow::bail!(
"AffinityIntent::LlcAligned has no CPUs after intersecting every \
LLC with the cgroup cpuset ({cpuset_repr}). No LLC has any CPU \
inside the cpuset. Switch to `AffinityIntent::Inherit` to \
deliberately inherit the cpuset, widen the cgroup's cpuset to \
include CPUs from at least one LLC, or pick a different \
affinity intent that doesn't require LLC alignment.",
);
}
Ok(ResolvedAffinity::Fixed(effective))
}
AffinityIntent::CrossCgroup => {
let all = topo.all_cpuset();
if all.is_empty() {
anyhow::bail!(
"AffinityIntent::CrossCgroup cannot satisfy any worker — \
the topology exposes zero CPUs. The public \
TestTopology constructors (`synthetic` + \
`from_vm_topology`) reject this at construction; \
reaching this bail means a direct private-field \
construction or a future API addition produced a \
zero-CPU topology. Build the test against a \
topology with at least one CPU, or switch to \
`AffinityIntent::Inherit` to defer to the cgroup \
cpuset.",
);
}
Ok(ResolvedAffinity::Fixed(all))
}
AffinityIntent::SingleCpu => {
let pool = cpuset.cloned().unwrap_or_else(|| topo.all_cpuset());
if let Some(&cpu) = pool.iter().next() {
Ok(ResolvedAffinity::SingleCpu(cpu))
} else {
anyhow::bail!(
"AffinityIntent::SingleCpu cannot pick a CPU from an empty \
cgroup cpuset. Switch to `AffinityIntent::Inherit` to \
deliberately inherit (the empty cpuset is itself the \
problem), or assign a non-empty cpuset to the cgroup.",
);
}
}
AffinityIntent::Exact(cpus) => {
if cpus.is_empty() {
anyhow::bail!(
"AffinityIntent::Exact(BTreeSet::new()) is unsatisfiable — an \
empty CPU set pins workers to nothing. Switch to \
`AffinityIntent::Inherit` to deliberately inherit the cgroup \
cpuset (or the full topology when no cpuset is active), or \
pass at least one CPU ID.",
);
}
if let Some(cs) = cpuset {
let effective: BTreeSet<usize> = cpus.intersection(cs).copied().collect();
if effective.is_empty() {
let cpuset_repr = format_cpuset_for_diag(cpuset);
anyhow::bail!(
"AffinityIntent::Exact({cpus:?}) is disjoint from the cgroup \
cpuset ({cpuset_repr}); intersection is empty. Switch to \
`AffinityIntent::Inherit` to deliberately inherit the cpuset, \
widen the cgroup's cpuset to include the requested CPUs, or \
narrow the `Exact` set to CPUs inside the cpuset.",
);
}
Ok(ResolvedAffinity::Fixed(effective))
} else {
Ok(ResolvedAffinity::Fixed(cpus.clone()))
}
}
AffinityIntent::SmtSiblingPair => resolve_smt_sibling_pair(cpuset, topo),
}
}
fn format_cpuset_for_diag(cpuset: Option<&BTreeSet<usize>>) -> String {
match cpuset {
Some(cs) if cs.is_empty() => "empty cpuset {}".to_string(),
Some(cs) => format!("cpuset {cs:?}"),
None => "<no cpuset>".to_string(),
}
}
fn resolve_smt_sibling_pair(
cpuset: Option<&BTreeSet<usize>>,
topo: &TestTopology,
) -> Result<ResolvedAffinity> {
let pool = cpuset.cloned().unwrap_or_else(|| topo.all_cpuset());
for llc in topo.llcs() {
for siblings in llc.cores().values() {
let mut iter = siblings.iter().copied().filter(|cpu| pool.contains(cpu));
if let (Some(a), Some(b)) = (iter.next(), iter.next()) {
let pair: BTreeSet<usize> = [a, b].into_iter().collect();
return Ok(ResolvedAffinity::Fixed(pair));
}
}
}
let scope = if cpuset.is_some() {
format!("the effective cpuset ({})", format_cpuset_for_diag(cpuset))
} else {
"the full topology (no cgroup cpuset is active)".to_string()
};
anyhow::bail!(
"AffinityIntent::SmtSiblingPair requires a physical core with at \
least two SMT siblings present in {scope}. The current topology \
and cpuset expose no such pair — threads_per_core may be 1 (SMT \
disabled or non-SMT host), the cpuset may have isolated each \
sibling onto a different cgroup, or the topology was built \
without per-core sibling data. Switch to a different \
AffinityIntent for non-SMT scheduling tests, or run on a host \
whose VM topology has threads_per_core >= 2.",
);
}
pub(crate) fn intent_for_spawn(
kind: &AffinityIntent,
cpuset: Option<&BTreeSet<usize>>,
topo: &TestTopology,
) -> Result<AffinityIntent> {
Ok(flatten_for_spawn(resolve_affinity_for_cgroup(
kind, cpuset, topo,
)?))
}
fn flatten_for_spawn(resolved: ResolvedAffinity) -> AffinityIntent {
match resolved {
ResolvedAffinity::None => AffinityIntent::Inherit,
ResolvedAffinity::Fixed(set) => {
if set.is_empty() {
unreachable!(
"ResolvedAffinity::Fixed(empty) reached flatten_for_spawn — \
resolve_affinity_for_cgroup is supposed to bail on every \
path that produces an empty Fixed (no-silent-drops \
invariant). Audit the new caller that constructed it.",
)
} else {
AffinityIntent::Exact(set)
}
}
ResolvedAffinity::SingleCpu(cpu) => AffinityIntent::Exact([cpu].into_iter().collect()),
ResolvedAffinity::Random { from, count } => {
if count == 0 || from.is_empty() {
unreachable!(
"ResolvedAffinity::Random {{ count={count}, from={from:?} }} \
reached flatten_for_spawn with count==0 or empty pool — \
resolve_affinity_for_cgroup is supposed to bail on those \
cases (no-silent-drops invariant). Audit the new caller \
that constructed it.",
)
} else {
AffinityIntent::RandomSubset { from, count }
}
}
}
}
pub fn setup_cgroups<'a>(
ctx: &'a Ctx,
n: usize,
wl: &WorkloadConfig,
) -> Result<(Vec<WorkloadHandle>, CgroupGroup<'a>)> {
let mut guard = CgroupGroup::new(ctx.cgroups);
for i in 0..n {
guard.add_cgroup_no_cpuset(&format!("cg_{i}"))?;
}
thread::sleep(ctx.settle);
if let Some(pid) = ctx.active_sched_pid()
&& !process_alive(pid)
{
anyhow::bail!(
"{} after cgroup creation (pid={})",
crate::assert::SCHED_DIED_PREFIX,
pid,
);
}
let names: Vec<String> = (0..n).map(|i| format!("cg_{i}")).collect();
let handles = spawn_and_move(ctx, &names, |_, _| Ok(wl.clone()))?;
Ok((handles, guard))
}
pub(crate) fn collect_handles<'a>(
handles: impl IntoIterator<Item = (WorkloadHandle, Option<&'a BTreeSet<usize>>)>,
checks: &crate::assert::Assert,
topo: Option<&crate::topology::TestTopology>,
) -> AssertResult {
let mut r = AssertResult::pass();
for (h, cpuset) in handles {
let reports = h.stop_and_collect();
if checks.has_worker_checks() {
let numa_nodes = cpuset.and_then(|cs| topo.map(|t| t.numa_nodes_for_cpuset(cs)));
r.merge(checks.assert_cgroup_with_numa(&reports, cpuset, numa_nodes.as_ref()));
}
}
r
}
pub fn collect_all(handles: Vec<WorkloadHandle>, checks: &crate::assert::Assert) -> AssertResult {
collect_handles(handles.into_iter().map(|h| (h, None)), checks, None)
}
pub fn dfl_wl(ctx: &Ctx) -> WorkloadConfig {
WorkloadConfig {
num_workers: ctx.workers_per_cgroup,
..Default::default()
}
}
#[cfg(test)]
pub fn split_half(ctx: &Ctx) -> (BTreeSet<usize>, BTreeSet<usize>) {
let usable = ctx.topo.usable_cpus();
let mid = usable.len() / 2;
(
usable[..mid].iter().copied().collect(),
usable[mid..].iter().copied().collect(),
)
}
pub fn spawn_diverse(ctx: &Ctx, cgroup_names: &[&str]) -> Result<Vec<WorkloadHandle>> {
let types = [
WorkType::SpinWait,
WorkType::bursty(Duration::from_millis(50), Duration::from_millis(100)),
WorkType::IoSyncWrite,
WorkType::Mixed,
WorkType::YieldHeavy,
];
let mut handles = Vec::new();
for (i, name) in cgroup_names.iter().enumerate() {
let wt = types[i % types.len()].clone();
let n = if matches!(wt, WorkType::IoSyncWrite) {
2
} else {
ctx.workers_per_cgroup
};
let mut h = WorkloadHandle::spawn(&WorkloadConfig {
num_workers: n,
work_type: wt,
..Default::default()
})?;
ctx.cgroups
.move_tasks(name, &h.worker_pids_for_cgroup_procs()?)?;
h.start();
handles.push(h);
}
Ok(handles)
}
#[cfg(test)]
mod tests;