use super::ops::{CgroupDef, CpusetSpec, HoldSpec, Op, Setup, Step, execute_steps};
use super::{CgroupGroup, Ctx, collect_all, spawn_diverse};
use crate::assert::AssertResult;
use crate::workload::*;
use anyhow::Result;
use std::collections::BTreeSet;
use std::thread;
use std::time::{Duration, Instant};
fn per_cpu_defs(ctx: &super::Ctx) -> Vec<CgroupDef> {
let all = ctx.topo.all_cpus();
if all.is_empty() {
return Vec::new();
}
let n = (all.len() - 1).min(64);
(0..n)
.map(|i| {
CgroupDef::named(format!("many_{i}"))
.with_cpuset(CpusetSpec::exact([all[i]]))
.workers(1)
})
.collect()
}
fn cgroup_per_cpu_steps(ctx: &Ctx) -> Vec<Step> {
vec![Step {
setup: Setup::Factory(per_cpu_defs),
ops: vec![],
hold: HoldSpec::Fixed(Duration::from_secs(1) + ctx.duration),
}]
}
pub fn custom_cgroup_per_cpu(ctx: &Ctx) -> Result<AssertResult> {
execute_steps(ctx, cgroup_per_cpu_steps(ctx))
}
fn reuse_defs(ctx: &super::Ctx) -> Vec<CgroupDef> {
let all = ctx.topo.all_cpus();
if all.is_empty() {
return Vec::new();
}
let n = (all.len() - 1).min(15);
let half = n / 2;
(0..half)
.map(|i| {
CgroupDef::named(format!("reuse_{i}"))
.with_cpuset(CpusetSpec::exact([all[i % all.len()]]))
.workers(1)
})
.collect()
}
fn cgroup_exhaust_reuse_steps(ctx: &Ctx) -> Vec<Step> {
let all = ctx.topo.all_cpus();
if all.is_empty() {
return Vec::new();
}
let n = (all.len() - 1).min(15);
let half = n / 2;
let mut exhaust_ops = Vec::new();
for i in 0..n {
let name = format!("exhaust_{i}");
exhaust_ops.push(Op::add_cgroup(name.clone()));
exhaust_ops.push(Op::set_cpuset(
name,
CpusetSpec::exact([all[i % all.len()]]),
));
}
let mut remove_ops = Vec::new();
for i in 0..half {
remove_ops.push(Op::remove_cgroup(format!("exhaust_{i}")));
}
vec![
Step::new(exhaust_ops, HoldSpec::Fixed(Duration::from_secs(1))),
Step::new(remove_ops, HoldSpec::Fixed(Duration::from_secs(1))),
Step {
setup: Setup::Factory(reuse_defs),
ops: vec![],
hold: HoldSpec::Fixed(ctx.duration),
},
]
}
pub fn custom_cgroup_exhaust_reuse(ctx: &Ctx) -> Result<AssertResult> {
execute_steps(ctx, cgroup_exhaust_reuse_steps(ctx))
}
pub fn custom_cgroup_dsq_contention(ctx: &Ctx) -> Result<AssertResult> {
let all = ctx.topo.all_cpus();
if all.len() < 4 {
return Ok(AssertResult::skip("need >=4 CPUs"));
}
let last = all.len() - 1;
let mut _guard = CgroupGroup::new(ctx.cgroups);
_guard.add_cgroup("cg_0", &all[..last].iter().copied().collect())?;
thread::sleep(ctx.settle);
let n_unpinned = (last * 3).max(8);
let mut h_cgroup = WorkloadHandle::spawn(&WorkloadConfig {
num_workers: n_unpinned,
work_type: WorkType::bursty(Duration::from_millis(10), Duration::from_millis(5)),
..Default::default()
})?;
ctx.cgroups.move_tasks("cg_0", &h_cgroup.worker_pids())?;
let n_pinned = last.min(4);
let mut pinned_handles = Vec::new();
for &cpu in all.iter().take(n_pinned) {
let h = WorkloadHandle::spawn(&WorkloadConfig {
num_workers: 1,
affinity: AffinityIntent::Exact([cpu].into_iter().collect()),
work_type: WorkType::bursty(Duration::from_millis(10), Duration::from_millis(5)),
..Default::default()
})?;
ctx.cgroups.move_tasks("cg_0", &h.worker_pids())?;
pinned_handles.push(h);
}
h_cgroup.start();
for h in &mut pinned_handles {
h.start();
}
thread::sleep(ctx.duration);
let mut r = AssertResult::pass();
{
let reports = h_cgroup.stop_and_collect();
if ctx.assert.has_worker_checks() {
r.merge(ctx.assert.assert_cgroup(&reports, None));
}
}
for h in pinned_handles {
let reports = h.stop_and_collect();
for w in &reports {
if w.max_gap_ms > 1500 {
r.passed = false;
r.details.push(crate::assert::AssertDetail::new(
crate::assert::DetailKind::Stuck,
format!(
"pinned worker {} on CPU {} had {}ms gap (dispatch contention stall)",
w.tid,
w.cpus_used.iter().next().unwrap_or(&0),
w.max_gap_ms
),
));
}
}
if ctx.assert.has_worker_checks() {
r.merge(ctx.assert.assert_cgroup(&reports, None));
}
}
Ok(r)
}
pub fn custom_cgroup_workload_variety(ctx: &Ctx) -> Result<AssertResult> {
if ctx.topo.all_cpus().len() < 6 {
return Ok(AssertResult::skip("need >=6 CPUs for 5 cgroups"));
}
let names: Vec<String> = (0..5).map(|i| format!("cg_{i}")).collect();
let mut _guard = CgroupGroup::new(ctx.cgroups);
for n in &names {
_guard.add_cgroup_no_cpuset(n)?;
}
thread::sleep(ctx.settle);
let name_refs: Vec<&str> = names.iter().map(|s| s.as_str()).collect();
let handles = spawn_diverse(ctx, &name_refs)?;
thread::sleep(ctx.duration);
Ok(collect_all(handles, &ctx.assert))
}
pub fn custom_cgroup_cpuset_workload_variety(ctx: &Ctx) -> Result<AssertResult> {
let all = ctx.topo.all_cpus();
if all.len() < 6 {
return Ok(AssertResult::skip("need >=6 CPUs"));
}
let last = all.len() - 1;
let chunk = last / 3;
let names = ["cg_0", "cg_1", "cg_2"];
let mut _guard = CgroupGroup::new(ctx.cgroups);
for (i, n) in names.iter().enumerate() {
let start = i * chunk;
let end = if i == 2 { last } else { (i + 1) * chunk };
_guard.add_cgroup(n, &all[start..end].iter().copied().collect())?;
}
thread::sleep(ctx.settle);
let handles = spawn_diverse(ctx, &names)?;
thread::sleep(ctx.duration);
Ok(collect_all(handles, &ctx.assert))
}
pub fn custom_cgroup_dynamic_workload_variety(ctx: &Ctx) -> Result<AssertResult> {
if ctx.topo.all_cpus().len() < 5 {
return Ok(AssertResult::skip("need >=5 CPUs for dynamic cgroup add"));
}
let names: Vec<String> = (0..3).map(|i| format!("cg_{i}")).collect();
let mut _guard = CgroupGroup::new(ctx.cgroups);
for n in &names {
_guard.add_cgroup_no_cpuset(n)?;
}
thread::sleep(ctx.settle);
let name_refs: Vec<&str> = names.iter().map(|s| s.as_str()).collect();
let mut handles = spawn_diverse(ctx, &name_refs)?;
thread::sleep(ctx.duration / 3);
_guard.add_cgroup_no_cpuset("cg_3")?;
let mut h = WorkloadHandle::spawn(&WorkloadConfig {
num_workers: 4,
work_type: WorkType::bursty(Duration::from_millis(100), Duration::from_millis(50)),
..Default::default()
})?;
ctx.cgroups.move_tasks("cg_3", &h.worker_pids())?;
h.start();
handles.push(h);
thread::sleep(ctx.duration / 3);
let cg3_result: Option<AssertResult> = handles.pop().map(|h| {
let reports = h.stop_and_collect();
if ctx.assert.has_worker_checks() {
ctx.assert.assert_cgroup_with_numa(&reports, None, None)
} else {
AssertResult::pass()
}
});
if let Err(e) = ctx.cgroups.remove_cgroup("cg_3") {
tracing::warn!(err = %format!("{e:#}"), "stress: early remove_cgroup(cg_3) failed; guard Drop will retry on scenario teardown");
}
thread::sleep(ctx.duration / 3);
let mut r = collect_all(handles, &ctx.assert);
if let Some(cg3) = cg3_result {
r.merge(cg3);
}
Ok(r)
}
pub fn custom_cgroup_cpuset_cross_llc_race(ctx: &Ctx) -> Result<AssertResult> {
if ctx.topo.num_llcs() < 2 {
return Ok(AssertResult::skip("need >=2 LLCs"));
}
let llc0_full: BTreeSet<usize> = ctx.topo.llc_aligned_cpuset(0);
let llc1_full: BTreeSet<usize> = ctx.topo.llc_aligned_cpuset(1);
if llc0_full.is_empty() {
return Ok(AssertResult::skip("LLC0 has no CPUs"));
}
if llc1_full.is_empty() {
return Ok(AssertResult::skip("LLC1 has no CPUs"));
}
let reserved = *llc0_full.iter().next().unwrap();
let llc0: BTreeSet<usize> = llc0_full
.iter()
.copied()
.filter(|c| *c != reserved)
.collect();
let llc1: BTreeSet<usize> = llc1_full.clone();
if llc0.is_empty() {
return Ok(AssertResult::skip(
"LLC0 too small after reserving for cg_0",
));
}
let mut _guard = CgroupGroup::new(ctx.cgroups);
_guard.add_cgroup("cg_0", &llc0)?;
_guard.add_cgroup("cg_1", &llc1)?;
thread::sleep(Duration::from_secs(2));
let n = llc0.len().max(4) * 8;
let mut h0 = WorkloadHandle::spawn(&WorkloadConfig {
num_workers: n,
work_type: WorkType::Mixed,
..Default::default()
})?;
ctx.cgroups.move_tasks("cg_0", &h0.worker_pids())?;
let mut h1 = WorkloadHandle::spawn(&WorkloadConfig {
num_workers: n,
work_type: WorkType::Mixed,
..Default::default()
})?;
ctx.cgroups.move_tasks("cg_1", &h1.worker_pids())?;
h0.start();
h1.start();
let cross0: BTreeSet<usize> = llc1.iter().copied().collect();
let cross1: BTreeSet<usize> = llc0.iter().copied().collect();
let deadline = Instant::now() + ctx.duration;
let mut flip = false;
while Instant::now() < deadline {
if flip {
if let Err(e) = ctx.cgroups.set_cpuset("cg_0", &cross0) {
tracing::warn!(err = %format!("{e:#}"), "cross-LLC race: set_cpuset cg_0 cross0 failed; flip skipped");
}
if let Err(e) = ctx.cgroups.set_cpuset("cg_1", &cross1) {
tracing::warn!(err = %format!("{e:#}"), "cross-LLC race: set_cpuset cg_1 cross1 failed; flip skipped");
}
} else {
if let Err(e) = ctx.cgroups.set_cpuset("cg_0", &llc0) {
tracing::warn!(err = %format!("{e:#}"), "cross-LLC race: set_cpuset cg_0 llc0 failed; flip skipped");
}
if let Err(e) = ctx.cgroups.set_cpuset("cg_1", &llc1) {
tracing::warn!(err = %format!("{e:#}"), "cross-LLC race: set_cpuset cg_1 llc1 failed; flip skipped");
}
}
flip = !flip;
thread::sleep(Duration::from_millis(200));
}
let mut r = AssertResult::pass();
if ctx.assert.has_worker_checks() {
r.merge(ctx.assert.assert_cgroup(&h0.stop_and_collect(), None));
r.merge(ctx.assert.assert_cgroup(&h1.stop_and_collect(), None));
} else {
let _ = h0.stop_and_collect();
let _ = h1.stop_and_collect();
}
Ok(r)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::cgroup::CgroupManager;
use crate::topology::TestTopology;
fn ctx_for_test<'a>(cgroups: &'a CgroupManager, topo: &'a TestTopology) -> Ctx<'a> {
Ctx {
cgroups,
topo,
duration: Duration::from_secs(2),
workers_per_cgroup: 1,
sched_pid: Some(1),
settle: Duration::from_millis(100),
work_type_override: None,
assert: crate::assert::Assert::default_checks(),
wait_for_map_write: false,
}
}
#[test]
fn per_cpu_factory_produces_cgroup_per_cpu_capped_at_64() {
let cgroups = CgroupManager::new("/nonexistent");
let topo = TestTopology::from_vm_topology(&crate::vmm::topology::Topology::new(1, 1, 4, 1));
let ctx = ctx_for_test(&cgroups, &topo);
let steps = cgroup_per_cpu_steps(&ctx);
assert_eq!(steps.len(), 1);
assert!(steps[0].ops.is_empty());
let factory = match steps[0].setup {
Setup::Factory(f) => f,
Setup::Defs(_) => panic!("per_cpu should use Factory setup"),
};
let defs = factory(&ctx);
assert_eq!(defs.len(), 3);
for (i, d) in defs.iter().enumerate() {
assert_eq!(d.name, format!("many_{i}"));
assert!(d.cpuset.is_some());
assert_eq!(d.works[0].num_workers, Some(1));
}
}
#[test]
fn exhaust_reuse_builds_three_phases_with_matching_add_remove_counts() {
let cgroups = CgroupManager::new("/nonexistent");
let topo = TestTopology::from_vm_topology(&crate::vmm::topology::Topology::new(1, 1, 8, 1));
let ctx = ctx_for_test(&cgroups, &topo);
let steps = cgroup_exhaust_reuse_steps(&ctx);
assert_eq!(steps.len(), 3);
let adds = steps[0]
.ops
.iter()
.filter(|o| matches!(o, Op::AddCgroup { .. }))
.count();
let sets = steps[0]
.ops
.iter()
.filter(|o| matches!(o, Op::SetCpuset { .. }))
.count();
assert_eq!(adds, 7);
assert_eq!(sets, 7);
assert_eq!(steps[0].ops.len(), 14);
let removes = steps[1]
.ops
.iter()
.filter(|o| matches!(o, Op::RemoveCgroup { .. }))
.count();
assert_eq!(removes, 3);
assert_eq!(steps[1].ops.len(), 3);
assert!(steps[2].ops.is_empty());
let factory = match steps[2].setup {
Setup::Factory(f) => f,
Setup::Defs(_) => panic!("phase 3 should use Factory setup"),
};
let defs = factory(&ctx);
assert_eq!(defs.len(), 3);
for d in &defs {
assert_eq!(d.works[0].num_workers, Some(1));
}
}
}