mod types;
pub use types::*;
use std::collections::BTreeSet;
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
use std::time::Duration;
use anyhow::{Context, Result};
use crate::assert::AssertResult;
use crate::scenario::backdrop;
use crate::scenario::{CgroupGroup, Ctx, process_alive};
use crate::vmm::guest_comms;
use crate::vmm::wire::StimulusPayload;
use crate::workload::{MemPolicy, WorkloadConfig, WorkloadHandle};
static SNAPSHOT_TRANSPORT_DEAD: AtomicBool = AtomicBool::new(false);
struct BackdropState<'a> {
cgroups: CgroupGroup<'a>,
handles: Vec<(String, WorkloadHandle)>,
cpusets: std::collections::HashMap<String, BTreeSet<usize>>,
payload_handles: Vec<PayloadEntry>,
}
impl<'a> BackdropState<'a> {
fn empty(ctx: &'a Ctx) -> Self {
Self {
cgroups: CgroupGroup::new(ctx.cgroups),
handles: Vec::new(),
cpusets: std::collections::HashMap::new(),
payload_handles: Vec::new(),
}
}
}
struct StepState<'a> {
cgroups: CgroupGroup<'a>,
handles: Vec<(String, WorkloadHandle)>,
cpusets: std::collections::HashMap<String, BTreeSet<usize>>,
payload_handles: Vec<PayloadEntry>,
}
impl<'a> StepState<'a> {
fn empty(ctx: &'a Ctx) -> Self {
Self {
cgroups: CgroupGroup::new(ctx.cgroups),
handles: Vec::new(),
cpusets: std::collections::HashMap::new(),
payload_handles: Vec::new(),
}
}
}
struct ScenarioState<'a, 'b> {
step: &'b mut StepState<'a>,
backdrop: &'b mut BackdropState<'a>,
target_backdrop: bool,
}
impl<'a, 'b> ScenarioState<'a, 'b> {
fn new(step: &'b mut StepState<'a>, backdrop: &'b mut BackdropState<'a>) -> Self {
Self {
step,
backdrop,
target_backdrop: false,
}
}
fn with_target_backdrop<R>(&mut self, f: impl FnOnce(&mut Self) -> R) -> R {
let prev = self.target_backdrop;
self.target_backdrop = true;
let r = f(self);
self.target_backdrop = prev;
r
}
fn target_cgroups(&mut self) -> &mut CgroupGroup<'a> {
if self.target_backdrop {
&mut self.backdrop.cgroups
} else {
&mut self.step.cgroups
}
}
fn target_handles(&mut self) -> &mut Vec<(String, WorkloadHandle)> {
if self.target_backdrop {
&mut self.backdrop.handles
} else {
&mut self.step.handles
}
}
fn target_cpusets(&mut self) -> &mut std::collections::HashMap<String, BTreeSet<usize>> {
if self.target_backdrop {
&mut self.backdrop.cpusets
} else {
&mut self.step.cpusets
}
}
fn target_payload_handles(&mut self) -> &mut Vec<PayloadEntry> {
if self.target_backdrop {
&mut self.backdrop.payload_handles
} else {
&mut self.step.payload_handles
}
}
fn lookup_cpuset(&self, name: &str) -> Option<&BTreeSet<usize>> {
self.step
.cpusets
.get(name)
.or_else(|| self.backdrop.cpusets.get(name))
}
fn find_live_payload_with_cgroup(
&self,
payload_name: &str,
cgroup_key: &str,
) -> Option<&PayloadEntry> {
let matches =
|e: &&PayloadEntry| e.handle.payload_name() == payload_name && e.cgroup == cgroup_key;
self.step
.payload_handles
.iter()
.find(matches)
.or_else(|| self.backdrop.payload_handles.iter().find(matches))
}
fn take_payload_by_name(
&mut self,
name: &str,
cgroup: Option<&str>,
) -> std::result::Result<Option<PayloadEntry>, Vec<String>> {
if let Some(c) = cgroup {
if let Some(idx) = self
.step
.payload_handles
.iter()
.position(|e| e.handle.payload_name() == name && e.cgroup == c)
{
return Ok(Some(self.step.payload_handles.swap_remove(idx)));
}
if let Some(idx) = self
.backdrop
.payload_handles
.iter()
.position(|e| e.handle.payload_name() == name && e.cgroup == c)
{
return Ok(Some(self.backdrop.payload_handles.swap_remove(idx)));
}
return Ok(None);
}
let mut step_idx: Option<usize> = None;
let mut backdrop_idx: Option<usize> = None;
let mut cgroups: Vec<String> = Vec::new();
for (i, e) in self.step.payload_handles.iter().enumerate() {
if e.handle.payload_name() == name {
if step_idx.is_none() {
step_idx = Some(i);
}
cgroups.push(e.cgroup.clone());
}
}
for (i, e) in self.backdrop.payload_handles.iter().enumerate() {
if e.handle.payload_name() == name {
if backdrop_idx.is_none() && step_idx.is_none() {
backdrop_idx = Some(i);
}
cgroups.push(e.cgroup.clone());
}
}
if cgroups.len() > 1 {
return Err(cgroups);
}
if let Some(i) = step_idx {
return Ok(Some(self.step.payload_handles.swap_remove(i)));
}
if let Some(i) = backdrop_idx {
return Ok(Some(self.backdrop.payload_handles.swap_remove(i)));
}
Ok(None)
}
fn drain_all_payloads(&mut self) {
drain_all_payload_handles(&mut self.step.payload_handles);
drain_all_payload_handles(&mut self.backdrop.payload_handles);
}
fn drain_payloads_for_cgroup(&mut self, cgroup: &str) {
drain_payload_handles_for_cgroup(&mut self.step.payload_handles, cgroup);
drain_payload_handles_for_cgroup(&mut self.backdrop.payload_handles, cgroup);
}
fn drop_handles_for_cgroup(&mut self, cgroup: &str) {
self.step.handles.retain(|(n, _)| n.as_str() != cgroup);
self.backdrop.handles.retain(|(n, _)| n.as_str() != cgroup);
}
fn forget_cpuset(&mut self, cgroup: &str) {
self.step.cpusets.remove(cgroup);
self.backdrop.cpusets.remove(cgroup);
}
fn record_cpuset(&mut self, cgroup: &str, cpuset: BTreeSet<usize>) {
if self.step.cpusets.contains_key(cgroup) {
self.step.cpusets.insert(cgroup.to_string(), cpuset);
} else if self.backdrop.cpusets.contains_key(cgroup) {
self.backdrop.cpusets.insert(cgroup.to_string(), cpuset);
} else {
self.target_cpusets().insert(cgroup.to_string(), cpuset);
}
}
fn rename_handles(&mut self, from: &str, to: &str) {
let to_is_backdrop = self.cgroup_name_is_backdrop(to);
if to_is_backdrop {
let mut i = self.step.handles.len();
while i > 0 {
i -= 1;
if self.step.handles[i].0.as_str() == from {
let (_, handle) = self.step.handles.swap_remove(i);
self.backdrop.handles.push((to.to_string(), handle));
}
}
} else {
for (name, _) in &mut self.step.handles {
if name.as_str() == from {
*name = to.to_string();
}
}
}
for (name, _) in &mut self.backdrop.handles {
if name.as_str() == from {
*name = to.to_string();
}
}
}
fn all_handles(&self) -> impl Iterator<Item = &(String, WorkloadHandle)> {
self.step.handles.iter().chain(self.backdrop.handles.iter())
}
fn cgroup_name_is_tracked(&self, name: &str) -> bool {
self.step.cgroups.names().iter().any(|n| n == name)
|| self.backdrop.cgroups.names().iter().any(|n| n == name)
}
fn cgroup_name_is_backdrop(&self, name: &str) -> bool {
self.backdrop.cgroups.names().iter().any(|n| n == name)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum PayloadSource {
CgroupDefWorkload,
OpRunPayload,
}
impl PayloadSource {
fn describe(self) -> &'static str {
match self {
PayloadSource::CgroupDefWorkload => "CgroupDef::workload",
PayloadSource::OpRunPayload => "Op::RunPayload",
}
}
}
struct PayloadEntry {
cgroup: String,
source: PayloadSource,
handle: crate::scenario::payload_run::PayloadHandle,
}
pub fn execute_defs(ctx: &Ctx, defs: Vec<CgroupDef>) -> Result<AssertResult> {
execute_steps(ctx, vec![Step::with_defs(defs, HoldSpec::FULL)])
}
pub fn execute_steps(ctx: &Ctx, steps: Vec<Step>) -> Result<AssertResult> {
execute_steps_with(ctx, steps, None)
}
pub fn execute_scenario(
ctx: &Ctx,
backdrop: backdrop::Backdrop,
steps: Vec<Step>,
) -> Result<AssertResult> {
execute_scenario_with(ctx, backdrop, steps, None)
}
pub fn execute_scenario_with(
ctx: &Ctx,
backdrop: backdrop::Backdrop,
steps: Vec<Step>,
checks: Option<&crate::assert::Assert>,
) -> Result<AssertResult> {
run_scenario(ctx, backdrop, steps, checks)
}
pub fn execute_steps_with(
ctx: &Ctx,
steps: Vec<Step>,
checks: Option<&crate::assert::Assert>,
) -> Result<AssertResult> {
execute_scenario_with(ctx, backdrop::Backdrop::EMPTY, steps, checks)
}
fn required_controllers(
ctx: &Ctx,
backdrop: &backdrop::Backdrop,
steps: &[Step],
) -> BTreeSet<crate::cgroup::Controller> {
use crate::cgroup::Controller;
fn absorb_def(set: &mut BTreeSet<Controller>, def: &CgroupDef) {
if def.cpuset.is_some() || def.cpuset_mems.is_some() {
set.insert(Controller::Cpuset);
}
if def.cpu.is_some() {
set.insert(Controller::Cpu);
}
if def.memory.is_some() {
set.insert(Controller::Memory);
}
if def.io.is_some() {
set.insert(Controller::Io);
}
if def.pids.is_some() {
set.insert(Controller::Pids);
}
}
fn absorb_op(set: &mut BTreeSet<Controller>, op: &Op) {
if matches!(
op,
Op::SetCpuset { .. }
| Op::ClearCpuset { .. }
| Op::SwapCpusets { .. }
| Op::SetAffinity { .. }
) {
set.insert(Controller::Cpuset);
}
}
let mut set = BTreeSet::new();
for def in &backdrop.cgroups {
absorb_def(&mut set, def);
}
for op in &backdrop.ops {
absorb_op(&mut set, op);
}
for step in steps {
for def in step.setup.resolve(ctx) {
absorb_def(&mut set, &def);
}
for op in &step.ops {
absorb_op(&mut set, op);
}
}
set
}
fn run_scenario(
ctx: &Ctx,
backdrop: backdrop::Backdrop,
steps: Vec<Step>,
checks: Option<&crate::assert::Assert>,
) -> Result<AssertResult> {
for (i, step) in steps.iter().enumerate() {
if let Err(reason) = step.hold.validate() {
anyhow::bail!("step {i} hold validation: {reason}");
}
}
for p in &backdrop.payloads {
if p.is_scheduler() {
anyhow::bail!(
"Backdrop::with_payload received scheduler-kind Payload '{}' — \
only PayloadKind::Binary payloads run in the Backdrop; \
place scheduler-kind payloads on the #[ktstr_test(scheduler = ...)] \
attribute instead",
p.name,
);
}
}
for op in &backdrop.ops {
if let Op::RunPayload { payload, .. } = op
&& payload.is_scheduler()
{
anyhow::bail!(
"Backdrop::with_op(Op::RunPayload) received scheduler-kind Payload '{}' — \
only PayloadKind::Binary payloads run in the Backdrop; \
place scheduler-kind payloads on the #[ktstr_test(scheduler = ...)] \
attribute instead",
payload.name,
);
}
}
let effective_checks = checks.unwrap_or(&ctx.assert);
let required = required_controllers(ctx, &backdrop, &steps);
ctx.cgroups
.setup(&required)
.context("enable cgroup controllers in subtree_control")?;
let mut backdrop_state = BackdropState::empty(ctx);
let mut result = AssertResult::pass();
let scenario_start = std::time::Instant::now();
if guest_comms::is_guest() {
crate::vmm::guest_comms::send_scenario_start();
}
if ctx.wait_for_map_write && guest_comms::is_guest() {
let latch = crate::vmm::rust_init::bpf_map_write_done_latch();
if !latch.wait_timeout(Duration::from_secs(60)) {
tracing::warn!(
"wait_for_map_write timed out after 60s — host bpf-map-write \
thread may have failed to resolve a queued map; proceeding \
with the workload regardless"
);
}
}
if !backdrop.is_empty() {
let mut step_staging = StepState::empty(ctx);
let mut scratch = ScenarioState::new(&mut step_staging, &mut backdrop_state);
let setup_res = scratch.with_target_backdrop(|s| {
if !backdrop.cgroups.is_empty() {
apply_setup(ctx, s, &backdrop.cgroups)?;
}
if !backdrop.ops.is_empty() {
apply_ops(ctx, s, &backdrop.ops)?;
}
if !backdrop.payloads.is_empty() {
let ops: Vec<Op> = backdrop
.payloads
.iter()
.map(|p| Op::run_payload(p, Vec::<String>::new()))
.collect();
apply_ops(ctx, s, &ops)?;
}
Ok::<(), anyhow::Error>(())
});
if let Err(err) = setup_res {
let mut r =
collect_backdrop(&mut backdrop_state, effective_checks, ctx.topo, ctx.cgroups);
let staging_result =
collect_step(&mut step_staging, effective_checks, ctx.topo, ctx.cgroups);
r.merge(staging_result);
r.merge(result);
r.passed = false;
r.details.push(crate::assert::AssertDetail::new(
crate::assert::DetailKind::Other,
format!("Backdrop setup failed: {err:#}"),
));
return Ok(r);
}
drain_all_payload_handles(&mut step_staging.payload_handles);
}
for (step_idx, step) in steps.iter().enumerate() {
if step_idx > 0
&& let Some(pid) = ctx.sched_pid
&& !process_alive(pid)
{
let mut r =
collect_backdrop(&mut backdrop_state, effective_checks, ctx.topo, ctx.cgroups);
r.merge(result);
r.passed = false;
r.details.push(crate::assert::AssertDetail::new(
crate::assert::DetailKind::SchedulerDied,
crate::assert::format_sched_died_after_step(
step_idx,
steps.len(),
scenario_start.elapsed().as_secs_f64(),
),
));
return Ok(r);
}
let mut step_state = StepState::empty(ctx);
let mut sched_died_during_hold = false;
let step_res = run_step(
ctx,
step,
step_idx,
&mut step_state,
&mut backdrop_state,
scenario_start,
effective_checks,
&mut sched_died_during_hold,
);
if guest_comms::is_guest() {
crate::vmm::guest_comms::send_scenario_pause();
}
let step_result = collect_step(&mut step_state, effective_checks, ctx.topo, ctx.cgroups);
result.merge(step_result);
if let Err(err) = step_res {
let mut r =
collect_backdrop(&mut backdrop_state, effective_checks, ctx.topo, ctx.cgroups);
r.merge(result);
r.passed = false;
r.details.push(crate::assert::AssertDetail::new(
crate::assert::DetailKind::Other,
format!("step {step_idx} failed: {err:#}"),
));
return Ok(r);
}
if sched_died_during_hold {
let mut r =
collect_backdrop(&mut backdrop_state, effective_checks, ctx.topo, ctx.cgroups);
r.merge(result);
r.passed = false;
r.details.push(crate::assert::AssertDetail::new(
crate::assert::DetailKind::SchedulerDied,
crate::assert::format_sched_died_during_workload(
scenario_start.elapsed().as_secs_f64(),
),
));
return Ok(r);
}
}
if guest_comms::is_guest() {
let elapsed = scenario_start.elapsed().as_millis() as u64;
crate::vmm::guest_comms::send_scenario_end(elapsed);
}
let sched_dead = ctx.sched_pid.is_some_and(|pid| !process_alive(pid));
let backdrop_result =
collect_backdrop(&mut backdrop_state, effective_checks, ctx.topo, ctx.cgroups);
result.merge(backdrop_result);
if sched_dead {
result.passed = false;
result.details.push(crate::assert::AssertDetail::new(
crate::assert::DetailKind::SchedulerDied,
crate::assert::format_sched_died_after_all_steps(
steps.len(),
scenario_start.elapsed().as_secs_f64(),
),
));
}
Ok(result)
}
fn sleep_or_sched_died(dur: Duration, sched_pid: Option<libc::pid_t>) -> bool {
use nix::sys::epoll::{Epoll, EpollCreateFlags, EpollEvent, EpollFlags, EpollTimeout};
use std::os::fd::{AsFd, FromRawFd, OwnedFd};
if dur.is_zero() {
return sched_pid.is_some_and(|pid| !process_alive(pid));
}
let Some(pid) = sched_pid else {
thread::sleep(dur);
return false;
};
let pidfd_raw = unsafe { libc::syscall(libc::SYS_pidfd_open, pid, 0i32) };
if pidfd_raw < 0 {
let err = std::io::Error::last_os_error();
if err.raw_os_error() == Some(libc::ESRCH) {
return true;
}
tracing::warn!(
target: "ktstr::scenario",
pid,
error = %err,
"pidfd_open failed; falling back to sleep + final process_alive check"
);
thread::sleep(dur);
return !process_alive(pid);
}
let pidfd: OwnedFd = unsafe { OwnedFd::from_raw_fd(pidfd_raw as i32) };
let epoll = match Epoll::new(EpollCreateFlags::EPOLL_CLOEXEC) {
Ok(e) => e,
Err(e) => {
tracing::warn!(
target: "ktstr::scenario",
pid,
error = %e,
"epoll_create1 failed for pidfd waiter; falling back to sleep + final process_alive check"
);
drop(pidfd);
thread::sleep(dur);
return !process_alive(pid);
}
};
let event = EpollEvent::new(EpollFlags::EPOLLIN, 0);
if let Err(e) = epoll.add(pidfd.as_fd(), event) {
tracing::warn!(
target: "ktstr::scenario",
pid,
error = %e,
"epoll_ctl ADD pidfd failed; falling back to sleep + final process_alive check"
);
drop(pidfd);
thread::sleep(dur);
return !process_alive(pid);
}
let deadline = std::time::Instant::now() + dur;
let mut events = [EpollEvent::empty()];
loop {
let remaining = deadline.saturating_duration_since(std::time::Instant::now());
if remaining.is_zero() {
return !process_alive(pid);
}
let ms_u32 = u32::try_from(remaining.as_millis()).unwrap_or(u32::MAX);
let ms_u32 = std::cmp::min(ms_u32, i32::MAX as u32);
let timeout_param = match EpollTimeout::try_from(ms_u32) {
Ok(t) => t,
Err(e) => {
tracing::warn!(
target: "ktstr::scenario",
pid,
error = %e,
"epoll timeout conversion failed; falling back to sleep + final process_alive check"
);
thread::sleep(remaining);
return !process_alive(pid);
}
};
match epoll.wait(&mut events, timeout_param) {
Ok(0) => {
}
Ok(_) => {
return true;
}
Err(nix::errno::Errno::EINTR) => {
}
Err(e) => {
tracing::warn!(
target: "ktstr::scenario",
pid,
error = %e,
"epoll_wait failed; falling back to sleep + final process_alive check"
);
thread::sleep(remaining);
return !process_alive(pid);
}
}
}
}
#[allow(clippy::too_many_arguments)]
fn run_step<'a>(
ctx: &Ctx,
step: &Step,
step_idx: usize,
step_state: &mut StepState<'a>,
backdrop_state: &mut BackdropState<'a>,
scenario_start: std::time::Instant,
_effective_checks: &crate::assert::Assert,
sched_died_during_hold: &mut bool,
) -> Result<()> {
let mut scenario = ScenarioState::new(step_state, backdrop_state);
macro_rules! drain_on_err {
($scenario:expr, $e:expr) => {
match $e {
Ok(v) => v,
Err(err) => {
$scenario.drain_all_payloads();
return Err(err);
}
}
};
}
match &step.hold {
HoldSpec::Loop { interval } => {
if !step.setup.is_empty() {
let defs = step.setup.resolve(ctx);
drain_on_err!(scenario, apply_setup(ctx, &mut scenario, &defs));
}
let deadline = scenario_start + ctx.duration;
while std::time::Instant::now() < deadline {
drain_on_err!(scenario, apply_ops(ctx, &mut scenario, &step.ops));
let remaining = deadline.saturating_duration_since(std::time::Instant::now());
if sleep_or_sched_died(remaining.min(*interval), ctx.sched_pid) {
*sched_died_during_hold = true;
return Ok(());
}
}
}
_ => {
drain_on_err!(scenario, apply_ops(ctx, &mut scenario, &step.ops));
if !step.setup.is_empty() {
let defs = step.setup.resolve(ctx);
drain_on_err!(scenario, apply_setup(ctx, &mut scenario, &defs));
}
if guest_comms::is_guest() {
let payload = build_stimulus(&scenario_start, step_idx, &step.ops, &scenario);
crate::vmm::guest_comms::send_stimulus(zerocopy::IntoBytes::as_bytes(&payload));
}
if guest_comms::is_guest() {
crate::vmm::guest_comms::send_scenario_resume();
}
let hold_dur = match &step.hold {
HoldSpec::Frac(f) => Duration::from_secs_f64(ctx.duration.as_secs_f64() * f),
HoldSpec::Fixed(d) => *d,
HoldSpec::Loop { .. } => unreachable!(),
};
let remaining = (scenario_start + ctx.duration)
.saturating_duration_since(std::time::Instant::now());
let hold_dur = hold_dur.min(remaining);
if sleep_or_sched_died(hold_dur, ctx.sched_pid) {
*sched_died_during_hold = true;
return Ok(());
}
}
}
Ok(())
}
fn build_stimulus(
scenario_start: &std::time::Instant,
step_idx: usize,
ops: &[Op],
state: &ScenarioState<'_, '_>,
) -> StimulusPayload {
let mut op_kinds: u32 = 0;
for op in ops {
op_kinds |= 1 << op.discriminant();
}
let total_iterations: u64 = state
.all_handles()
.flat_map(|(_, h)| h.snapshot_iterations())
.sum();
let cgroup_count = state.step.cgroups.names().len() + state.backdrop.cgroups.names().len();
let worker_count = state.step.handles.len() + state.backdrop.handles.len();
let to_u32 = |field: &str, v: u128| -> u32 {
u32::try_from(v).unwrap_or_else(|_| {
tracing::warn!(
field,
value = %v,
"StimulusPayload field overflowed u32; saturating to u32::MAX",
);
u32::MAX
})
};
let to_u16 = |field: &str, v: usize| -> u16 {
u16::try_from(v).unwrap_or_else(|_| {
tracing::warn!(
field,
value = v,
"StimulusPayload field overflowed u16; saturating to u16::MAX",
);
u16::MAX
})
};
StimulusPayload {
elapsed_ms: to_u32("elapsed_ms", scenario_start.elapsed().as_millis()),
step_index: to_u16("step_index", step_idx),
op_count: to_u16("op_count", ops.len()),
op_kinds,
cgroup_count: to_u16("cgroup_count", cgroup_count),
worker_count: to_u16("worker_count", worker_count),
total_iterations,
}
}
fn validate_known_flags(payload: &crate::test_support::Payload, args: &[String]) -> Result<()> {
let Some(allowlist) = payload.known_flags else {
return Ok(());
};
for arg in args {
let Some(flag_body) = arg.strip_prefix("--") else {
continue;
};
let name = flag_body
.split('=')
.next()
.expect("str::split always yields at least one element");
if name.is_empty() {
continue;
}
if !allowlist.contains(&name) {
anyhow::bail!(
"Op::RunPayload: payload '{}' received unknown flag \
'--{name}' — not in its known_flags allowlist \
{allowlist:?}. Check the spelling against the \
payload's declared flags; if '--{name}' is a new \
legitimate flag, add it to `Payload::known_flags`.",
payload.name,
);
}
}
Ok(())
}
fn validate_mempolicy_cpuset(
policy: &MemPolicy,
flags: crate::workload::MpolFlags,
cpuset: &BTreeSet<usize>,
ctx: &Ctx,
cgroup_name: &str,
) -> Result<()> {
use crate::workload::MpolFlags;
let known_bits = MpolFlags::STATIC_NODES.bits()
| MpolFlags::RELATIVE_NODES.bits()
| MpolFlags::NUMA_BALANCING.bits();
let unknown_bits = flags.bits() & !known_bits;
if unknown_bits != 0 {
anyhow::bail!(
"cgroup '{}': MpolFlags contains unknown bit(s) {:#x} (known bits: \
STATIC_NODES={:#x}, RELATIVE_NODES={:#x}, NUMA_BALANCING={:#x}); \
refusing to forward to the kernel — update MpolFlags to model the \
new bit before using it, or clear the bit at the call site",
cgroup_name,
unknown_bits,
MpolFlags::STATIC_NODES.bits(),
MpolFlags::RELATIVE_NODES.bits(),
MpolFlags::NUMA_BALANCING.bits(),
);
}
if flags.contains(MpolFlags::STATIC_NODES) && flags.contains(MpolFlags::RELATIVE_NODES) {
anyhow::bail!(
"cgroup '{}': MpolFlags::STATIC_NODES and MpolFlags::RELATIVE_NODES are \
mutually exclusive (the kernel will reject the set_mempolicy syscall with \
EINVAL); pick whichever matches the intended semantics — STATIC_NODES \
for absolute node ids that survive cpuset changes, RELATIVE_NODES for \
cpuset-relative indices",
cgroup_name,
);
}
let policy_nodes = policy.node_set();
if policy_nodes.is_empty() {
return Ok(());
}
if flags.contains(MpolFlags::STATIC_NODES) {
let host_nodes = ctx.topo.numa_node_ids();
let missing: Vec<usize> = policy_nodes
.iter()
.copied()
.filter(|n| !host_nodes.contains(n))
.collect();
if !missing.is_empty() {
anyhow::bail!(
"cgroup '{}': MemPolicy with MpolFlags::STATIC_NODES references \
NUMA node(s) {:?} that do not exist on this host (host nodes: {:?}); \
the kernel will reject or silently drop the policy (Preferred can \
silently fall back to local allocation; Bind/Interleave reject with \
EINVAL) — fix the MemPolicy or pick a host with the required nodes",
cgroup_name,
missing,
host_nodes,
);
}
return Ok(());
}
if flags.contains(MpolFlags::RELATIVE_NODES) {
return Ok(());
}
let cpuset_numa = ctx.topo.numa_nodes_for_cpuset(cpuset);
let uncovered: Vec<usize> = policy_nodes
.iter()
.copied()
.filter(|n| !cpuset_numa.contains(n))
.collect();
if !uncovered.is_empty() {
anyhow::bail!(
"cgroup '{}': MemPolicy references NUMA node(s) {:?} \
outside the cpuset's coverage (cpuset covers node(s) \
{:?}) — some or all of the worker's allocations would \
live on NUMA nodes its CPUs cannot reach locally, \
producing cross-socket allocation traffic that is \
almost certainly unintended. Two fixes: \
(a) add .mpol_flags(MpolFlags::STATIC_NODES) to \
declare the cross-node placement intentional (the \
flag survives cpuset rebinds; see MpolFlags doc), or \
(b) widen the cpuset to cover the policy's nodes \
(e.g. CpusetSpec::Numa(N) for each referenced N, or \
a CpusetSpec::Exact set that spans both).",
cgroup_name,
uncovered,
cpuset_numa,
);
}
Ok(())
}
fn apply_setup(ctx: &Ctx, state: &mut ScenarioState<'_, '_>, defs: &[CgroupDef]) -> Result<()> {
for def in defs {
if state.cgroup_name_is_tracked(&def.name) {
anyhow::bail!(
"CgroupDef '{}' collides with a cgroup already tracked \
(by a prior Backdrop or step-local CgroupDef) — declare it \
in exactly one place; use a fresh name for the step-local cgroup",
def.name,
);
}
state.target_cgroups().add_cgroup_no_cpuset(&def.name)?;
if let Some(ref cpuset_spec) = def.cpuset {
if let Err(reason) = cpuset_spec.validate(ctx) {
anyhow::bail!(
"cgroup '{}': CpusetSpec validation failed: {}",
def.name,
reason
);
}
let resolved = cpuset_spec.resolve(ctx);
ctx.cgroups.set_cpuset(&def.name, &resolved)?;
state.record_cpuset(&def.name, resolved);
}
if let Some(ref nodes) = def.cpuset_mems {
ctx.cgroups.set_cpuset_mems(&def.name, nodes)?;
}
if let Some(ref cpu) = def.cpu {
if let Some(w) = cpu.weight {
if !(1..=10_000).contains(&w) {
anyhow::bail!(
"cgroup '{}': cpu.weight {w} out of range 1..=10000",
def.name,
);
}
ctx.cgroups.set_cpu_weight(&def.name, w)?;
}
if cpu.max_period_us == 0 {
anyhow::bail!("cgroup '{}': cpu.max period must be > 0 (got 0)", def.name,);
}
if let Some(q) = cpu.max_quota_us
&& q == 0
{
anyhow::bail!(
"cgroup '{}': cpu.max quota must be > 0 when set; \
use cpu_unlimited() to remove the cap",
def.name,
);
}
ctx.cgroups
.set_cpu_max(&def.name, cpu.max_quota_us, cpu.max_period_us)?;
}
if let Some(ref mem) = def.memory {
ctx.cgroups.set_memory_max(&def.name, mem.max)?;
ctx.cgroups.set_memory_high(&def.name, mem.high)?;
ctx.cgroups.set_memory_low(&def.name, mem.low)?;
if mem.swap_max.is_some() {
ctx.cgroups.set_memory_swap_max(&def.name, mem.swap_max)?;
}
}
if let Some(ref io) = def.io
&& let Some(w) = io.weight
{
if !(1..=10_000).contains(&w) {
anyhow::bail!(
"cgroup '{}': io.weight {w} out of range 1..=10000",
def.name,
);
}
ctx.cgroups.set_io_weight(&def.name, w)?;
}
if let Some(ref pids) = def.pids {
if let Some(0) = pids.max {
anyhow::bail!(
"cgroup '{}': pids.max must be > 0; use \
pids_unlimited() to remove the cap",
def.name,
);
}
ctx.cgroups.set_pids_max(&def.name, pids.max)?;
}
let effective_works = def.merged_works();
for work in &effective_works {
if let Err(reason) = work.mem_policy.validate() {
anyhow::bail!("cgroup '{}': {}", def.name, reason);
}
}
let cgroup_cpuset: Option<BTreeSet<usize>> = state.lookup_cpuset(&def.name).cloned();
if let Some(ref resolved) = cgroup_cpuset {
for work in &effective_works {
validate_mempolicy_cpuset(
&work.mem_policy,
work.mpol_flags,
resolved,
ctx,
&def.name,
)?;
}
}
let mut resolved_works: Vec<crate::workload::WorkSpec> =
Vec::with_capacity(effective_works.len());
for work in &effective_works {
let n = crate::scenario::resolve_num_workers(work, ctx.workers_per_cgroup, &def.name)?;
let effective_work_type = crate::workload::resolve_work_type(
&work.work_type,
ctx.work_type_override.as_ref(),
def.swappable,
n,
);
let affinity = crate::scenario::intent_for_spawn(
&work.affinity,
cgroup_cpuset.as_ref(),
ctx.topo,
)?;
resolved_works.push(crate::workload::WorkSpec {
work_type: effective_work_type,
sched_policy: work.sched_policy,
num_workers: Some(n),
affinity,
mem_policy: work.mem_policy.clone(),
mpol_flags: work.mpol_flags,
nice: work.nice,
comm: work.comm.clone(),
pcomm: work.pcomm.clone(),
uid: work.uid,
gid: work.gid,
numa_node: work.numa_node,
});
}
let mut pcomm_groups: std::collections::HashMap<String, Vec<crate::workload::WorkSpec>> =
std::collections::HashMap::new();
let mut pcomm_order: Vec<String> = Vec::new();
let mut non_pcomm_works: Vec<crate::workload::WorkSpec> = Vec::new();
for work in resolved_works {
match &work.pcomm {
Some(value) if !value.is_empty() => {
let key = value.to_string();
if !pcomm_groups.contains_key(&key) {
pcomm_order.push(key.clone());
}
pcomm_groups.entry(key).or_default().push(work);
}
_ => non_pcomm_works.push(work),
}
}
for work in non_pcomm_works {
let n = work.num_workers.expect("num_workers resolved above");
let wl = WorkloadConfig {
num_workers: n,
affinity: work.affinity.clone(),
work_type: work.work_type.clone(),
sched_policy: work.sched_policy,
mem_policy: work.mem_policy.clone(),
mpol_flags: work.mpol_flags,
nice: work.nice,
clone_mode: Default::default(),
comm: work.comm.clone(),
uid: work.uid,
gid: work.gid,
numa_node: work.numa_node,
composed: Vec::new(),
};
let mut h = WorkloadHandle::spawn(&wl)?;
ctx.cgroups.move_tasks(&def.name, &h.worker_pids())?;
h.start();
state.target_handles().push((def.name.to_string(), h));
}
for pcomm in pcomm_order {
if pcomm.len() > 15 {
tracing::warn!(
cgroup = %def.name,
pcomm = %pcomm,
len = pcomm.len(),
"WorkSpec::pcomm exceeds TASK_COMM_LEN-1 (15 bytes); kernel \
`__set_task_comm` will truncate to the leading 15 bytes",
);
}
let works_for_pcomm = pcomm_groups
.remove(&pcomm)
.expect("pcomm key inserted during partition pass");
if works_for_pcomm.len() > 1 {
let first_uid = works_for_pcomm[0].uid;
let first_gid = works_for_pcomm[0].gid;
for (i, w) in works_for_pcomm.iter().enumerate().skip(1) {
if w.uid != first_uid {
anyhow::bail!(
"cgroup '{}' pcomm '{}': WorkSpec[0].uid={:?} differs from \
WorkSpec[{}].uid={:?}; pcomm-coalesced WorkSpecs must \
agree on uid (NPTL setresuid is broadcast to every thread \
in the tgid)",
def.name,
pcomm,
first_uid,
i,
w.uid,
);
}
if w.gid != first_gid {
anyhow::bail!(
"cgroup '{}' pcomm '{}': WorkSpec[0].gid={:?} differs from \
WorkSpec[{}].gid={:?}; pcomm-coalesced WorkSpecs must \
agree on gid (NPTL setresgid is broadcast to every thread \
in the tgid)",
def.name,
pcomm,
first_gid,
i,
w.gid,
);
}
}
}
let container_uid = def
.default_uid
.or_else(|| works_for_pcomm.first().and_then(|w| w.uid));
let container_gid = def
.default_gid
.or_else(|| works_for_pcomm.first().and_then(|w| w.gid));
let mut h = WorkloadHandle::spawn_pcomm_cgroup(
&pcomm,
container_uid,
container_gid,
&works_for_pcomm,
)?;
ctx.cgroups.move_tasks(&def.name, &h.worker_pids())?;
h.start();
state.target_handles().push((def.name.to_string(), h));
}
if let Some(payload) = def.payload {
if let Some(existing) =
state.find_live_payload_with_cgroup(payload.name, def.name.as_ref())
{
anyhow::bail!(
"CgroupDef::workload: payload '{}' already running in cgroup '{}' (spawned by {}) — \
declare it in exactly one place per cgroup",
payload.name,
def.name,
existing.source.describe(),
);
}
let handle = crate::scenario::payload_run::PayloadRun::new(ctx, payload)
.in_cgroup(def.name.clone())
.spawn()
.map_err(|e| {
anyhow::anyhow!(
"cgroup '{}': spawn payload '{}': {:#}",
def.name,
payload.name,
e,
)
})?;
state.target_payload_handles().push(PayloadEntry {
cgroup: def.name.to_string(),
source: PayloadSource::CgroupDefWorkload,
handle,
});
}
}
Ok(())
}
fn apply_ops(ctx: &Ctx, state: &mut ScenarioState<'_, '_>, ops: &[Op]) -> Result<()> {
for op in ops {
match op {
Op::AddCgroup { name } => {
if state.cgroup_name_is_tracked(name) {
anyhow::bail!(
"Op::AddCgroup '{}' collides with a cgroup already \
tracked (by a prior Backdrop or step-local CgroupDef) — \
declare it in exactly one place; use a fresh name for \
the step-local cgroup",
name,
);
}
state.target_cgroups().add_cgroup_no_cpuset(name)?;
}
Op::RemoveCgroup { cgroup } => {
if !state.target_backdrop && state.cgroup_name_is_backdrop(cgroup) {
anyhow::bail!(
"Op::RemoveCgroup targets Backdrop-owned cgroup '{}' — \
Backdrop cgroups live for the full scenario and must \
not be removed from a Step; drop the op or move the \
cgroup declaration out of the Backdrop",
cgroup,
);
}
state.drain_payloads_for_cgroup(cgroup);
state.drop_handles_for_cgroup(cgroup);
state.forget_cpuset(cgroup);
if let Err(err) = ctx.cgroups.remove_cgroup(cgroup)
&& !crate::scenario::is_io_not_found(&err)
{
let hint = crate::scenario::remove_cgroup_errno_hint(&err).unwrap_or("");
tracing::warn!(
cgroup = %cgroup,
err = %format!("{err:#}"),
hint,
"Op::RemoveCgroup: remove_cgroup returned non-ENOENT error",
);
}
}
Op::SetCpuset { cgroup, cpus } => {
if let Err(reason) = cpus.validate(ctx) {
anyhow::bail!(
"cgroup '{}': CpusetSpec validation failed: {}",
cgroup,
reason
);
}
let resolved = cpus.resolve(ctx);
ctx.cgroups.set_cpuset(cgroup, &resolved)?;
state.record_cpuset(cgroup, resolved);
}
Op::ClearCpuset { cgroup } => {
ctx.cgroups.clear_cpuset(cgroup)?;
state.forget_cpuset(cgroup);
}
Op::SwapCpusets { a, b } => {
let cpus_a = read_cpuset(ctx, a);
let cpus_b = read_cpuset(ctx, b);
if let Some(ca) = cpus_a {
ctx.cgroups.set_cpuset(b, &ca)?;
state.record_cpuset(b, ca);
}
if let Some(cb) = cpus_b {
ctx.cgroups.set_cpuset(a, &cb)?;
state.record_cpuset(a, cb);
}
}
Op::Spawn { cgroup, work } => {
if let Err(reason) = work.mem_policy.validate() {
anyhow::bail!("cgroup '{}': {}", cgroup, reason);
}
let n = crate::scenario::resolve_num_workers(work, ctx.workers_per_cgroup, cgroup)?;
let cgroup_cpuset: Option<BTreeSet<usize>> = state.lookup_cpuset(cgroup).cloned();
if let Some(ref resolved) = cgroup_cpuset {
validate_mempolicy_cpuset(
&work.mem_policy,
work.mpol_flags,
resolved,
ctx,
cgroup,
)?;
}
let affinity = crate::scenario::intent_for_spawn(
&work.affinity,
cgroup_cpuset.as_ref(),
ctx.topo,
)?;
let wl = WorkloadConfig {
num_workers: n,
affinity,
work_type: work.work_type.clone(),
sched_policy: work.sched_policy,
mem_policy: work.mem_policy.clone(),
mpol_flags: work.mpol_flags,
nice: work.nice,
clone_mode: Default::default(),
comm: work.comm.clone(),
uid: work.uid,
gid: work.gid,
numa_node: work.numa_node,
composed: Vec::new(),
};
let mut h = WorkloadHandle::spawn(&wl)?;
ctx.cgroups.move_tasks(cgroup, &h.worker_pids())?;
h.start();
state.target_handles().push((cgroup.to_string(), h));
}
Op::StopCgroup { cgroup } => {
if !state.target_backdrop && state.cgroup_name_is_backdrop(cgroup) {
anyhow::bail!(
"Op::StopCgroup targets Backdrop-owned cgroup '{}' — \
Backdrop workers live for the full scenario and must \
not be stopped from a Step; drop the op or move the \
cgroup declaration out of the Backdrop",
cgroup,
);
}
state.drain_payloads_for_cgroup(cgroup);
state.drop_handles_for_cgroup(cgroup);
}
Op::SetAffinity { cgroup, affinity } => {
let cgroup_cpuset: Option<BTreeSet<usize>> = state.lookup_cpuset(cgroup).cloned();
let resolved = crate::scenario::resolve_affinity_for_cgroup(
affinity,
cgroup_cpuset.as_ref(),
ctx.topo,
)?;
let random_pool: Vec<usize> = match &resolved {
crate::workload::ResolvedAffinity::Random { from, count }
if !from.is_empty() && *count > 0 =>
{
from.iter().copied().collect()
}
_ => Vec::new(),
};
for (name, handle) in state.all_handles() {
if name.as_str() == *cgroup {
match &resolved {
crate::workload::ResolvedAffinity::None => {}
crate::workload::ResolvedAffinity::Fixed(cpus) => {
for idx in 0..handle.worker_pids().len() {
if let Err(e) = handle.set_affinity(idx, cpus) {
tracing::warn!(
cgroup = %cgroup,
idx,
err = %format!("{e:#}"),
"Op::SetAffinity Fixed: handle.set_affinity failed; \
worker keeps prior affinity"
);
}
}
}
crate::workload::ResolvedAffinity::Random { from: _, count }
if !random_pool.is_empty() && *count > 0 =>
{
use rand::seq::IndexedRandom;
for idx in 0..handle.worker_pids().len() {
let chosen: BTreeSet<usize> = random_pool
.sample(&mut rand::rng(), *count)
.copied()
.collect();
if let Err(e) = handle.set_affinity(idx, &chosen) {
tracing::warn!(
cgroup = %cgroup,
idx,
err = %format!("{e:#}"),
"Op::SetAffinity Random: handle.set_affinity failed; \
worker keeps prior affinity"
);
}
}
}
crate::workload::ResolvedAffinity::Random { .. } => {}
crate::workload::ResolvedAffinity::SingleCpu(cpu) => {
let cpus: BTreeSet<usize> = [*cpu].into_iter().collect();
for idx in 0..handle.worker_pids().len() {
if let Err(e) = handle.set_affinity(idx, &cpus) {
tracing::warn!(
cgroup = %cgroup,
idx,
cpu = *cpu,
err = %format!("{e:#}"),
"Op::SetAffinity SingleCpu: handle.set_affinity failed; \
worker keeps prior affinity"
);
}
}
}
}
}
}
}
Op::SpawnHost { work } => {
if let Err(reason) = work.mem_policy.validate() {
anyhow::bail!("SpawnHost: {}", reason);
}
let n =
crate::scenario::resolve_num_workers(work, ctx.workers_per_cgroup, "<host>")?;
let affinity = crate::scenario::intent_for_spawn(&work.affinity, None, ctx.topo)?;
let wl = WorkloadConfig {
num_workers: n,
affinity,
work_type: work.work_type.clone(),
sched_policy: work.sched_policy,
mem_policy: work.mem_policy.clone(),
mpol_flags: work.mpol_flags,
nice: work.nice,
clone_mode: Default::default(),
comm: work.comm.clone(),
uid: work.uid,
gid: work.gid,
numa_node: work.numa_node,
composed: Vec::new(),
};
let mut h = WorkloadHandle::spawn(&wl)?;
h.start();
state.target_handles().push((String::new(), h));
}
Op::MoveAllTasks { from, to } => {
if !state.target_backdrop
&& state.cgroup_name_is_backdrop(from)
&& !state.cgroup_name_is_backdrop(to)
{
anyhow::bail!(
"Op::MoveAllTasks from Backdrop-owned '{}' to step-local '{}' \
would leave persistent workers in a cgroup that disappears \
at step boundary; declare `{}` in the Backdrop too, or \
move the workers back into a Backdrop-owned cgroup",
from,
to,
to,
);
}
if let Err(e) = ctx.cgroups.clear_subtree_control(to) {
tracing::warn!(
cgroup = to.as_ref(),
err = %e,
"failed to clear subtree_control before task move"
);
}
let pid_batches: Vec<Vec<libc::pid_t>> = state
.all_handles()
.filter(|(name, _)| name.as_str() == *from)
.map(|(_, handle)| handle.worker_pids())
.collect();
for pids in &pid_batches {
ctx.cgroups.move_tasks(to, pids)?;
}
state.rename_handles(from, to);
}
Op::RunPayload {
payload,
args,
cgroup,
} => {
if payload.is_scheduler() {
anyhow::bail!(
"Op::RunPayload called with scheduler-kind Payload ('{}'); \
only PayloadKind::Binary payloads can be spawned by step ops",
payload.name,
);
}
validate_known_flags(payload, args)?;
let cgroup_key = cgroup.as_ref().map(|c| c.to_string()).unwrap_or_default();
if let Some(existing) =
state.find_live_payload_with_cgroup(payload.name, &cgroup_key)
{
anyhow::bail!(
"Op::RunPayload: payload '{}' already running in cgroup {} (spawned by {}) — \
WaitPayload/KillPayload it before spawning another with the same name in the same cgroup",
payload.name,
render_cgroup_key(&existing.cgroup),
existing.source.describe(),
);
}
let mut run = crate::scenario::payload_run::PayloadRun::new(ctx, payload);
if !args.is_empty() {
run = run.args(args.iter().cloned());
}
if let Some(c) = cgroup {
run = run.in_cgroup(c.clone());
}
let handle = run.spawn().with_context(|| {
format!(
"Op::RunPayload: spawn payload '{}' in cgroup {}",
payload.name,
render_cgroup_key(&cgroup_key),
)
})?;
state.target_payload_handles().push(PayloadEntry {
cgroup: cgroup_key,
source: PayloadSource::OpRunPayload,
handle,
});
}
Op::WaitPayload { name, cgroup } => {
let entry = take_payload_for_op(
state,
"Op::WaitPayload",
"waiting",
"Op::wait_payload_in_cgroup",
name,
cgroup.as_deref(),
)?;
let _result = entry
.handle
.wait()
.with_context(|| format!("Op::WaitPayload: wait payload '{name}'"))?;
}
Op::KillPayload { name, cgroup } => {
let entry = take_payload_for_op(
state,
"Op::KillPayload",
"killing",
"Op::kill_payload_in_cgroup",
name,
cgroup.as_deref(),
)?;
let _result = entry
.handle
.kill()
.with_context(|| format!("Op::KillPayload: kill payload '{name}'"))?;
}
Op::FreezeCgroup { cgroup } => {
ctx.cgroups
.set_freeze(cgroup, true)
.with_context(|| format!("Op::FreezeCgroup: cgroup '{cgroup}'"))?;
}
Op::UnfreezeCgroup { cgroup } => {
ctx.cgroups
.set_freeze(cgroup, false)
.with_context(|| format!("Op::UnfreezeCgroup: cgroup '{cgroup}'"))?;
}
Op::Snapshot { name } => {
let invoked = crate::scenario::snapshot::with_active_bridge(|b| {
let captured = b.capture(name);
if captured {
tracing::info!(
name = %name,
stored = b.len(),
"Op::Snapshot: captured diagnostic snapshot"
);
}
captured
});
if invoked.is_none() {
if crate::vmm::guest_comms::is_guest() {
if SNAPSHOT_TRANSPORT_DEAD.load(Ordering::Relaxed) {
tracing::warn!(
name = %name,
"Op::Snapshot: snapshot transport latched dead; skipping host \
request to avoid the 30 s timeout per attempt"
);
} else {
let timeout = std::time::Duration::from_secs(30);
match crate::vmm::guest_comms::request_snapshot(
crate::vmm::wire::SNAPSHOT_KIND_CAPTURE,
name,
timeout,
) {
crate::vmm::wire::SnapshotRequestResult::Ok => {
tracing::info!(
name = %name,
"Op::Snapshot: host captured diagnostic snapshot via TLV stream"
);
}
crate::vmm::wire::SnapshotRequestResult::HostError { reason } => {
anyhow::bail!(
"Op::Snapshot('{name}'): host rejected capture: {reason}"
);
}
crate::vmm::wire::SnapshotRequestResult::TransportError {
reason,
} => {
SNAPSHOT_TRANSPORT_DEAD.store(true, Ordering::Relaxed);
anyhow::bail!(
"Op::Snapshot('{name}'): port-1 transport failure: {reason}"
);
}
}
}
} else {
tracing::warn!(
name = %name,
"Op::Snapshot: no SnapshotBridge installed on the executor's \
thread and not running in a guest VM — skipping capture"
);
}
}
}
Op::WatchSnapshot { symbol } => {
let registered =
crate::scenario::snapshot::with_active_bridge(|b| b.register_watch(symbol));
match registered {
Some(Ok(())) => {
tracing::info!(
symbol = %symbol,
"Op::WatchSnapshot: registered hardware-watchpoint snapshot"
);
}
Some(Err(err)) => {
anyhow::bail!(
"Op::WatchSnapshot: register watch on '{symbol}' failed: {err}",
);
}
None => {
if crate::vmm::guest_comms::is_guest() {
if SNAPSHOT_TRANSPORT_DEAD.load(Ordering::Relaxed) {
tracing::warn!(
symbol = %symbol,
"Op::WatchSnapshot: snapshot transport latched dead; skipping \
host request to avoid the 30 s timeout per attempt"
);
} else {
let timeout = std::time::Duration::from_secs(30);
match crate::vmm::guest_comms::request_snapshot(
crate::vmm::wire::SNAPSHOT_KIND_WATCH,
symbol,
timeout,
) {
crate::vmm::wire::SnapshotRequestResult::Ok => {
tracing::info!(
symbol = %symbol,
"Op::WatchSnapshot: host armed hardware-watchpoint via TLV stream"
);
}
crate::vmm::wire::SnapshotRequestResult::HostError {
reason,
} => {
anyhow::bail!(
"Op::WatchSnapshot('{symbol}'): host rejected: {reason}"
);
}
crate::vmm::wire::SnapshotRequestResult::TransportError {
reason,
} => {
SNAPSHOT_TRANSPORT_DEAD.store(true, Ordering::Relaxed);
anyhow::bail!(
"Op::WatchSnapshot('{symbol}'): port-1 transport failure: {reason}"
);
}
}
}
} else {
tracing::warn!(
symbol = %symbol,
"Op::WatchSnapshot: no SnapshotBridge installed and not in \
guest VM — skipping watch registration"
);
}
}
}
}
}
}
Ok(())
}
fn take_payload_for_op(
state: &mut ScenarioState<'_, '_>,
op_tag: &str,
verb_ing: &str,
ctor_path: &str,
name: &str,
cgroup: Option<&str>,
) -> Result<PayloadEntry> {
match state.take_payload_by_name(name, cgroup) {
Ok(Some(entry)) => Ok(entry),
Ok(None) => match cgroup {
Some(c) => anyhow::bail!(
"{op_tag}: no running payload named '{name}' in cgroup {} \
(spawn it via Op::RunPayload or CgroupDef::workload before {verb_ing})",
render_cgroup_key(c),
),
None => anyhow::bail!(
"{op_tag}: no running payload named '{name}' \
(spawn it via Op::RunPayload or CgroupDef::workload before {verb_ing})",
),
},
Err(cgroups) => {
let rendered: Vec<String> = cgroups.iter().map(|c| render_cgroup_key(c)).collect();
anyhow::bail!(
"{op_tag}: payload '{name}' is ambiguous — {} live copies in cgroups {} — \
use {ctor_path}(name, cgroup) to disambiguate",
rendered.len(),
rendered.join(", "),
)
}
}
}
fn read_cpuset(ctx: &Ctx, name: &str) -> Option<BTreeSet<usize>> {
let path = ctx.cgroups.parent_path().join(name).join("cpuset.cpus");
let content = std::fs::read_to_string(&path).ok()?;
let content = content.trim();
if content.is_empty() {
return None;
}
let cpus: BTreeSet<usize> = crate::topology::parse_cpu_list_lenient(content)
.into_iter()
.collect();
Some(cpus)
}
fn collect_step(
step_state: &mut StepState<'_>,
checks: &crate::assert::Assert,
topo: &crate::topology::TestTopology,
cgroups: &dyn crate::cgroup::CgroupOps,
) -> AssertResult {
for name in step_state.cgroups.names() {
if let Err(e) = cgroups.set_freeze(name, false) {
tracing::warn!(
cgroup = %name,
err = %format!("{e:#}"),
"collect_step: pre-teardown unfreeze failed; rmdir may EBUSY"
);
}
}
drain_all_payload_handles(&mut step_state.payload_handles);
let handles = std::mem::take(&mut step_state.handles);
crate::scenario::collect_handles(
handles
.into_iter()
.map(|(name, h)| (h, step_state.cpusets.get(&name))),
checks,
Some(topo),
)
}
fn collect_backdrop(
backdrop_state: &mut BackdropState<'_>,
checks: &crate::assert::Assert,
topo: &crate::topology::TestTopology,
cgroups: &dyn crate::cgroup::CgroupOps,
) -> AssertResult {
for name in backdrop_state.cgroups.names() {
if let Err(e) = cgroups.set_freeze(name, false) {
tracing::warn!(
cgroup = %name,
err = %format!("{e:#}"),
"collect_backdrop: pre-teardown unfreeze failed; rmdir may EBUSY"
);
}
}
drain_all_payload_handles(&mut backdrop_state.payload_handles);
let handles = std::mem::take(&mut backdrop_state.handles);
crate::scenario::collect_handles(
handles
.into_iter()
.map(|(name, h)| (h, backdrop_state.cpusets.get(&name))),
checks,
Some(topo),
)
}
fn drain_payload_handles_for_cgroup(handles: &mut Vec<PayloadEntry>, cgroup: &str) {
let mut i = handles.len();
while i > 0 {
i -= 1;
if handles[i].cgroup.as_str() == cgroup {
let entry = handles.remove(i);
if let Err(e) = entry.handle.kill() {
eprintln!("ktstr: kill payload in cgroup '{cgroup}': {e:#}");
}
}
}
}
fn drain_all_payload_handles(handles: &mut Vec<PayloadEntry>) {
while let Some(entry) = handles.pop() {
if let Err(e) = entry.handle.kill() {
eprintln!(
"ktstr: teardown kill payload in cgroup {}: {e:#}",
render_cgroup_key(&entry.cgroup),
);
}
}
}
fn render_cgroup_key(cgroup: &str) -> String {
if cgroup.is_empty() {
"(no cgroup)".to_string()
} else {
format!("'{cgroup}'")
}
}
#[cfg(test)]
mod tests {
use std::borrow::Cow;
use std::ops::RangeInclusive;
use super::*;
use crate::workload::{AffinityIntent, WorkSpec, WorkType};
use strum::IntoEnumIterator;
#[test]
fn op_kind_bit_indices_are_unique_and_contiguous() {
let kinds: Vec<OpKind> = OpKind::iter().collect();
let indices: Vec<u32> = kinds.iter().copied().map(OpKind::bit_index).collect();
let unique: std::collections::BTreeSet<u32> = indices.iter().copied().collect();
assert_eq!(
unique.len(),
indices.len(),
"OpKind::bit_index produced duplicates. \
Pairs (OpKind, bit_index): {:?}. Fix the match in \
OpKind::bit_index so every variant maps to a distinct \
bit.",
kinds.iter().zip(&indices).collect::<Vec<_>>(),
);
let expected: Vec<u32> = (0..kinds.len() as u32).collect();
let mut sorted = indices.clone();
sorted.sort_unstable();
assert_eq!(
sorted,
expected,
"OpKind::bit_index indices must be contiguous from 0 \
(no gaps, no duplicates). Got sorted indices {sorted:?} \
for {} OpKind variants; expected {expected:?}.",
kinds.len(),
);
}
#[derive(Debug)]
enum Layout {
Disjoint,
Overlap(f64, f64),
}
#[derive(Debug)]
struct Traverse {
seed: Option<u64>,
cgroup_count: RangeInclusive<usize>,
layouts: Vec<Layout>,
phases: usize,
phase_duration: Duration,
settle: Duration,
persistent_cgroups: usize,
cgroup_workloads: Vec<WorkSpec>,
}
impl Traverse {
fn generate(&self, ctx: &Ctx) -> Vec<Step> {
use rand::RngExt;
let seed = self.seed.unwrap_or_else(|| std::process::id() as u64);
let mut rng = seeded_rng(seed);
let usable_len = ctx.topo.usable_cpus().len();
let max_cgroups = (*self.cgroup_count.end()).min(usable_len / 2).max(1);
let min_cgroups = (*self.cgroup_count.start()).max(1).min(max_cgroups);
let mut steps = Vec::with_capacity(self.phases + 1);
let mut live_cgroups: Vec<Cow<'static, str>> = Vec::new();
let names: Vec<Cow<'static, str>> = (0..max_cgroups)
.map(|i| Cow::Owned(format!("cg_{i}")))
.collect();
for phase in 0..self.phases {
let range = max_cgroups - min_cgroups + 1;
let target_count = min_cgroups + rng.random_range(0..range);
let layout_idx = rng.random_range(0..self.layouts.len());
let layout = &self.layouts[layout_idx];
let mut ops = Vec::new();
while live_cgroups.len() < target_count {
let idx = live_cgroups.len();
let name = names[idx].clone();
let w = self
.cgroup_workloads
.get(idx)
.or(self.cgroup_workloads.last())
.cloned()
.unwrap_or_default();
ops.push(Op::AddCgroup { name: name.clone() });
ops.push(Op::Spawn {
cgroup: name.clone(),
work: w,
});
live_cgroups.push(name);
}
while live_cgroups.len() > target_count
&& live_cgroups.len() > self.persistent_cgroups
{
if let Some(name) = live_cgroups.pop() {
ops.push(Op::StopCgroup {
cgroup: name.clone(),
});
ops.push(Op::RemoveCgroup { cgroup: name });
}
}
for (i, name) in live_cgroups.iter().enumerate() {
let spec = match layout {
Layout::Disjoint => CpusetSpec::Disjoint {
index: i,
of: live_cgroups.len(),
},
Layout::Overlap(min_frac, max_frac) => {
let frac = min_frac
+ rng.random_range(0..100) as f64 / 100.0 * (max_frac - min_frac);
CpusetSpec::Overlap {
index: i,
of: live_cgroups.len(),
frac,
}
}
};
ops.push(Op::SetCpuset {
cgroup: name.clone(),
cpus: spec,
});
}
let hold = if phase == 0 {
HoldSpec::Fixed(self.settle + self.phase_duration)
} else {
HoldSpec::Fixed(self.phase_duration)
};
steps.push(Step {
setup: vec![].into(),
ops,
hold,
});
}
steps
}
}
fn seeded_rng(seed: u64) -> rand::rngs::StdRng {
use rand::SeedableRng;
rand::rngs::StdRng::seed_from_u64(seed)
}
#[test]
fn validate_known_flags_accepts_listed_long_flags() {
use crate::test_support::{OutputFormat, Payload, PayloadKind};
static WITH_ALLOWLIST: Payload = Payload {
name: "with_allowlist",
kind: PayloadKind::Binary("/bin/true"),
output: OutputFormat::ExitCode,
default_args: &[],
default_checks: &[],
metrics: &[],
include_files: &[],
uses_parent_pgrp: false,
known_flags: Some(&["runtime", "threads", "verbose"]),
metric_bounds: None,
};
let args: Vec<String> = vec![
"--runtime=30".into(),
"--threads".into(),
"4".into(),
"--verbose".into(),
"positional_arg".into(),
"-s".into(), "--".into(),
"--=value".into(),
];
validate_known_flags(&WITH_ALLOWLIST, &args)
.expect("all long flags in allowlist must pass");
}
#[test]
fn validate_known_flags_fails_fast_on_first_unknown() {
use crate::test_support::{OutputFormat, Payload, PayloadKind};
static WITH_ALLOWLIST: Payload = Payload {
name: "with_allowlist",
kind: PayloadKind::Binary("/bin/true"),
output: OutputFormat::ExitCode,
default_args: &[],
default_checks: &[],
metrics: &[],
include_files: &[],
uses_parent_pgrp: false,
known_flags: Some(&["runtime", "threads", "verbose"]),
metric_bounds: None,
};
let args = vec!["--runtime=30".into(), "--threds".into(), "--verbose".into()];
let err = validate_known_flags(&WITH_ALLOWLIST, &args)
.expect_err("typo between two known flags must be rejected");
let msg = format!("{err:#}");
assert!(msg.contains("--threds"), "error must name the typo: {msg}");
assert!(
!msg.contains("--verbose"),
"error must not mention the later known flag '--verbose' \
— fail-fast broke: {msg}",
);
}
#[test]
fn validate_known_flags_rejects_unknown_long_flag() {
use crate::test_support::{OutputFormat, Payload, PayloadKind};
static WITH_ALLOWLIST: Payload = Payload {
name: "with_allowlist",
kind: PayloadKind::Binary("/bin/true"),
output: OutputFormat::ExitCode,
default_args: &[],
default_checks: &[],
metrics: &[],
include_files: &[],
uses_parent_pgrp: false,
known_flags: Some(&["runtime", "threads"]),
metric_bounds: None,
};
let args = vec!["--threds".to_string(), "4".to_string()];
let err = validate_known_flags(&WITH_ALLOWLIST, &args).expect_err("typo must be rejected");
let msg = format!("{err:#}");
assert!(
msg.contains("--threds"),
"error must name the offending flag: {msg}",
);
assert!(
msg.contains("known_flags allowlist"),
"error must mention the allowlist surface: {msg}",
);
}
#[test]
fn validate_known_flags_none_is_permissive() {
use crate::test_support::{OutputFormat, Payload, PayloadKind};
static NO_ALLOWLIST: Payload = Payload {
name: "no_allowlist",
kind: PayloadKind::Binary("/bin/true"),
output: OutputFormat::ExitCode,
default_args: &[],
default_checks: &[],
metrics: &[],
include_files: &[],
uses_parent_pgrp: false,
known_flags: None,
metric_bounds: None,
};
let args: Vec<String> = vec![
"--anything".into(),
"--whatever=x".into(),
"--threds".into(),
];
validate_known_flags(&NO_ALLOWLIST, &args).expect("None allowlist must pass any flag");
}
#[test]
fn op_discriminant_unique() {
use crate::test_support::{OutputFormat, Payload, PayloadKind};
static TRUE_BIN: Payload = Payload {
name: "true_bin",
kind: PayloadKind::Binary("/bin/true"),
output: OutputFormat::ExitCode,
default_args: &[],
default_checks: &[],
metrics: &[],
include_files: &[],
uses_parent_pgrp: false,
known_flags: None,
metric_bounds: None,
};
let ops: Vec<Op> = vec![
Op::AddCgroup { name: "a".into() },
Op::RemoveCgroup { cgroup: "a".into() },
Op::SetCpuset {
cgroup: "a".into(),
cpus: CpusetSpec::exact([]),
},
Op::ClearCpuset { cgroup: "a".into() },
Op::SwapCpusets {
a: "a".into(),
b: "b".into(),
},
Op::Spawn {
cgroup: "a".into(),
work: Default::default(),
},
Op::StopCgroup { cgroup: "a".into() },
Op::SetAffinity {
cgroup: "a".into(),
affinity: Default::default(),
},
Op::SpawnHost {
work: Default::default(),
},
Op::MoveAllTasks {
from: "a".into(),
to: "b".into(),
},
Op::RunPayload {
payload: &TRUE_BIN,
args: vec![],
cgroup: None,
},
Op::WaitPayload {
name: "p".into(),
cgroup: None,
},
Op::KillPayload {
name: "p".into(),
cgroup: None,
},
Op::FreezeCgroup { cgroup: "a".into() },
Op::UnfreezeCgroup { cgroup: "a".into() },
Op::Snapshot {
name: "snap".into(),
},
Op::WatchSnapshot {
symbol: "kernel.x".into(),
},
];
let mut seen = std::collections::BTreeSet::new();
for op in &ops {
assert!(seen.insert(op.discriminant()), "duplicate discriminant");
}
}
#[test]
fn op_discriminant_values() {
use crate::test_support::{OutputFormat, Payload, PayloadKind};
static TRUE_BIN: Payload = Payload {
name: "true_bin",
kind: PayloadKind::Binary("/bin/true"),
output: OutputFormat::ExitCode,
default_args: &[],
default_checks: &[],
metrics: &[],
include_files: &[],
uses_parent_pgrp: false,
known_flags: None,
metric_bounds: None,
};
assert_eq!(Op::AddCgroup { name: "a".into() }.discriminant(), 0);
assert_eq!(Op::RemoveCgroup { cgroup: "a".into() }.discriminant(), 1);
assert_eq!(
Op::SpawnHost {
work: Default::default()
}
.discriminant(),
8
);
assert_eq!(
Op::MoveAllTasks {
from: "a".into(),
to: "b".into()
}
.discriminant(),
9
);
assert_eq!(
Op::RunPayload {
payload: &TRUE_BIN,
args: vec![],
cgroup: None,
}
.discriminant(),
10,
);
assert_eq!(
Op::WaitPayload {
name: "p".into(),
cgroup: None,
}
.discriminant(),
11,
);
assert_eq!(
Op::KillPayload {
name: "p".into(),
cgroup: None,
}
.discriminant(),
12,
);
assert_eq!(Op::FreezeCgroup { cgroup: "a".into() }.discriminant(), 13,);
assert_eq!(Op::UnfreezeCgroup { cgroup: "a".into() }.discriminant(), 14,);
assert_eq!(
Op::Snapshot {
name: "snap".into()
}
.discriminant(),
15,
);
assert_eq!(
Op::WatchSnapshot {
symbol: "kernel.x".into()
}
.discriminant(),
16,
);
}
#[test]
fn seeded_rng_deterministic() {
use rand::RngExt;
let mut rng1 = seeded_rng(42);
let mut rng2 = seeded_rng(42);
for _ in 0..100 {
assert_eq!(rng1.random::<u64>(), rng2.random::<u64>());
}
}
#[test]
fn seeded_rng_different_seeds_differ() {
use rand::RngExt;
let mut rng1 = seeded_rng(1);
let mut rng2 = seeded_rng(2);
let same = (0..10).all(|_| rng1.random::<u64>() == rng2.random::<u64>());
assert!(!same);
}
#[test]
fn holdspec_validate_accepts_valid() {
HoldSpec::Frac(0.5).validate().unwrap();
HoldSpec::Frac(1.0).validate().unwrap();
HoldSpec::Fixed(Duration::from_millis(1))
.validate()
.unwrap();
HoldSpec::Loop {
interval: Duration::from_millis(100),
}
.validate()
.unwrap();
}
#[test]
fn holdspec_validate_accepts_fixed_zero() {
HoldSpec::Fixed(Duration::ZERO)
.validate()
.expect("Duration::ZERO is valid for settle/op-only steps");
}
#[test]
fn holdspec_validate_rejects_frac_zero() {
let err = HoldSpec::Frac(0.0).validate().unwrap_err();
assert!(err.contains("Frac") && err.contains("> 0"), "got: {err}");
}
#[test]
fn holdspec_validate_rejects_frac_negative() {
let err = HoldSpec::Frac(-0.5).validate().unwrap_err();
assert!(err.contains("Frac") && err.contains("> 0"), "got: {err}");
}
#[test]
fn holdspec_validate_rejects_frac_nan() {
let err = HoldSpec::Frac(f64::NAN).validate().unwrap_err();
assert!(
err.contains("not finite") || err.contains("NaN"),
"got: {err}"
);
}
#[test]
fn holdspec_validate_rejects_frac_inf() {
let err = HoldSpec::Frac(f64::INFINITY).validate().unwrap_err();
assert!(
err.contains("not finite") || err.contains("Inf"),
"got: {err}"
);
}
#[test]
fn holdspec_validate_rejects_loop_zero_interval() {
let err = HoldSpec::Loop {
interval: Duration::ZERO,
}
.validate()
.unwrap_err();
assert!(err.contains("Loop") && err.contains("busy"), "got: {err}");
}
#[test]
fn holdspec_frac() {
let step = Step::new(vec![], HoldSpec::Frac(0.5));
match step.hold {
HoldSpec::Frac(f) => assert!((f - 0.5).abs() < f64::EPSILON),
_ => panic!("expected Frac"),
}
}
#[test]
fn holdspec_fixed() {
let step = Step::new(vec![], HoldSpec::Fixed(Duration::from_secs(3)));
match step.hold {
HoldSpec::Fixed(d) => assert_eq!(d, Duration::from_secs(3)),
_ => panic!("expected Fixed"),
}
}
#[test]
fn holdspec_loop() {
let step = Step::new(
vec![],
HoldSpec::Loop {
interval: Duration::from_millis(100),
},
);
match step.hold {
HoldSpec::Loop { interval } => assert_eq!(interval, Duration::from_millis(100)),
_ => panic!("expected Loop"),
}
}
#[test]
fn cpusetspec_exact_is_passthrough() {
let cpus: BTreeSet<usize> = [0, 2, 4].iter().copied().collect();
let spec = CpusetSpec::Exact(cpus.clone());
let topo = crate::topology::TestTopology::from_vm_topology(
&crate::vmm::topology::Topology::new(1, 1, 4, 1),
);
let cgroups = crate::cgroup::CgroupManager::new("/nonexistent");
let ctx = Ctx {
cgroups: &cgroups,
topo: &topo,
duration: Duration::from_secs(10),
workers_per_cgroup: 4,
sched_pid: None,
settle: Duration::from_millis(1000),
work_type_override: None,
assert: crate::assert::Assert::default_checks(),
wait_for_map_write: false,
};
let resolved = spec.resolve(&ctx);
assert_eq!(resolved, cpus);
}
#[test]
fn resolve_disjoint_of_zero_returns_empty_instead_of_panicking() {
let (cg, topo) = make_ctx(1, 4, 1);
let ctx = ctx_from(&cg, &topo);
let spec = CpusetSpec::Disjoint { index: 0, of: 0 };
assert!(spec.resolve(&ctx).is_empty());
}
#[test]
fn resolve_overlap_of_zero_returns_empty_instead_of_panicking() {
let (cg, topo) = make_ctx(1, 4, 1);
let ctx = ctx_from(&cg, &topo);
let spec = CpusetSpec::Overlap {
index: 0,
of: 0,
frac: 0.5,
};
assert!(spec.resolve(&ctx).is_empty());
}
#[test]
fn resolve_range_inverted_fracs_returns_empty_instead_of_panicking() {
let (cg, topo) = make_ctx(1, 4, 1);
let ctx = ctx_from(&cg, &topo);
let spec = CpusetSpec::Range {
start_frac: 0.8,
end_frac: 0.2,
};
assert!(spec.resolve(&ctx).is_empty());
}
#[test]
fn resolve_range_nan_fracs_clamps_to_zero_instead_of_panicking() {
let (cg, topo) = make_ctx(1, 4, 1);
let ctx = ctx_from(&cg, &topo);
let spec = CpusetSpec::Range {
start_frac: f64::NAN,
end_frac: f64::NAN,
};
assert!(spec.resolve(&ctx).is_empty());
}
#[test]
fn resolve_overlap_nonfinite_frac_clamps_to_zero() {
let (cg, topo) = make_ctx(1, 4, 1);
let ctx = ctx_from(&cg, &topo);
let spec = CpusetSpec::Overlap {
index: 0,
of: 2,
frac: f64::NAN,
};
let result = spec.resolve(&ctx);
assert!(!result.is_empty());
}
fn make_ctx(
llcs: u32,
cores: u32,
threads: u32,
) -> (crate::cgroup::CgroupManager, crate::topology::TestTopology) {
let cgroups = crate::cgroup::CgroupManager::new("/nonexistent");
let topo = crate::topology::TestTopology::from_vm_topology(
&crate::vmm::topology::Topology::new(1, llcs, cores, threads),
);
(cgroups, topo)
}
fn ctx_from<'a>(
cgroups: &'a crate::cgroup::CgroupManager,
topo: &'a crate::topology::TestTopology,
) -> Ctx<'a> {
Ctx {
cgroups,
topo,
duration: Duration::from_secs(10),
workers_per_cgroup: 4,
sched_pid: None,
settle: Duration::ZERO,
work_type_override: None,
assert: crate::assert::Assert::default_checks(),
wait_for_map_write: false,
}
}
#[test]
fn cpusetspec_disjoint_two_partitions() {
let (cg, topo) = make_ctx(1, 8, 1);
let ctx = ctx_from(&cg, &topo);
let a = CpusetSpec::Disjoint { index: 0, of: 2 }.resolve(&ctx);
let b = CpusetSpec::Disjoint { index: 1, of: 2 }.resolve(&ctx);
assert!(a.is_disjoint(&b), "partitions overlap: {:?} vs {:?}", a, b);
let usable = ctx.topo.usable_cpuset();
let union: BTreeSet<usize> = a.union(&b).copied().collect();
assert_eq!(union, usable);
}
#[test]
fn cpusetspec_disjoint_remainder_to_last() {
let (cg, topo) = make_ctx(1, 8, 1);
let ctx = ctx_from(&cg, &topo);
let usable_len = ctx.topo.usable_cpus().len();
let c = CpusetSpec::Disjoint { index: 2, of: 3 }.resolve(&ctx);
let chunk = usable_len / 3;
assert!(
c.len() >= chunk,
"last partition {}: expected >= {}",
c.len(),
chunk
);
}
#[test]
fn cpusetspec_disjoint_single_partition() {
let (cg, topo) = make_ctx(1, 4, 1);
let ctx = ctx_from(&cg, &topo);
let all = CpusetSpec::Disjoint { index: 0, of: 1 }.resolve(&ctx);
let usable = ctx.topo.usable_cpuset();
assert_eq!(all, usable);
}
#[test]
fn cpusetspec_disjoint_index_beyond_of_returns_empty() {
let (cg, topo) = make_ctx(1, 4, 1);
let ctx = ctx_from(&cg, &topo);
let cpus = CpusetSpec::Disjoint { index: 5, of: 3 }.resolve(&ctx);
assert!(
cpus.is_empty(),
"Disjoint with index beyond `of` must return an empty \
cpuset rather than panicking, got: {cpus:?}",
);
}
#[test]
fn cpusetspec_range_first_half() {
let (cg, topo) = make_ctx(1, 8, 1);
let ctx = ctx_from(&cg, &topo);
let cpus = CpusetSpec::Range {
start_frac: 0.0,
end_frac: 0.5,
}
.resolve(&ctx);
let usable = ctx.topo.usable_cpus();
let expected_len = usable.len() / 2;
assert_eq!(cpus.len(), expected_len);
for &cpu in &cpus {
assert!(usable.contains(&cpu));
}
}
#[test]
fn cpusetspec_range_second_half() {
let (cg, topo) = make_ctx(1, 8, 1);
let ctx = ctx_from(&cg, &topo);
let a = CpusetSpec::Range {
start_frac: 0.0,
end_frac: 0.5,
}
.resolve(&ctx);
let b = CpusetSpec::Range {
start_frac: 0.5,
end_frac: 1.0,
}
.resolve(&ctx);
assert!(a.is_disjoint(&b));
}
#[test]
fn cpusetspec_range_full() {
let (cg, topo) = make_ctx(1, 4, 1);
let ctx = ctx_from(&cg, &topo);
let cpus = CpusetSpec::Range {
start_frac: 0.0,
end_frac: 1.0,
}
.resolve(&ctx);
let usable = ctx.topo.usable_cpuset();
assert_eq!(cpus, usable);
}
#[test]
fn cpusetspec_range_empty() {
let (cg, topo) = make_ctx(1, 4, 1);
let ctx = ctx_from(&cg, &topo);
let cpus = CpusetSpec::Range {
start_frac: 0.5,
end_frac: 0.5,
}
.resolve(&ctx);
assert!(cpus.is_empty());
}
#[test]
fn cpusetspec_range_clamps_to_bounds() {
let (cg, topo) = make_ctx(1, 4, 1);
let ctx = ctx_from(&cg, &topo);
let cpus = CpusetSpec::Range {
start_frac: 0.0,
end_frac: 2.0,
}
.resolve(&ctx);
let usable = ctx.topo.usable_cpuset();
assert_eq!(cpus, usable);
}
#[test]
fn cpusetspec_overlap_neighbors_share_cpus() {
let (cg, topo) = make_ctx(1, 8, 1);
let ctx = ctx_from(&cg, &topo);
let a = CpusetSpec::Overlap {
index: 0,
of: 2,
frac: 0.5,
}
.resolve(&ctx);
let b = CpusetSpec::Overlap {
index: 1,
of: 2,
frac: 0.5,
}
.resolve(&ctx);
let shared: BTreeSet<usize> = a.intersection(&b).copied().collect();
assert!(!shared.is_empty(), "overlap=0.5 should produce shared CPUs");
}
#[test]
fn cpusetspec_overlap_zero_frac_is_disjoint() {
let (cg, topo) = make_ctx(1, 8, 1);
let ctx = ctx_from(&cg, &topo);
let a = CpusetSpec::Overlap {
index: 0,
of: 2,
frac: 0.0,
}
.resolve(&ctx);
let b = CpusetSpec::Overlap {
index: 1,
of: 2,
frac: 0.0,
}
.resolve(&ctx);
assert!(a.is_disjoint(&b), "frac=0 should be disjoint");
}
#[test]
fn cpusetspec_overlap_last_partition_covers_tail() {
let (cg, topo) = make_ctx(1, 8, 1);
let ctx = ctx_from(&cg, &topo);
let last = CpusetSpec::Overlap {
index: 2,
of: 3,
frac: 0.5,
}
.resolve(&ctx);
let usable = ctx.topo.usable_cpus();
assert!(last.contains(usable.last().unwrap()));
}
#[test]
fn cpusetspec_overlap_first_partition_starts_at_zero() {
let (cg, topo) = make_ctx(1, 8, 1);
let ctx = ctx_from(&cg, &topo);
let first = CpusetSpec::Overlap {
index: 0,
of: 3,
frac: 0.5,
}
.resolve(&ctx);
let usable = ctx.topo.usable_cpus();
assert!(first.contains(&usable[0]));
}
#[test]
fn cpusetspec_llc_index_zero() {
let (cg, topo) = make_ctx(2, 4, 1);
let ctx = ctx_from(&cg, &topo);
let cpus = CpusetSpec::Llc(0).resolve(&ctx);
assert!(!cpus.is_empty());
let llc0 = ctx.topo.llc_aligned_cpuset(0);
assert_eq!(cpus, llc0);
}
#[test]
fn cpusetspec_llc_two_llcs_disjoint() {
let (cg, topo) = make_ctx(2, 4, 1);
let ctx = ctx_from(&cg, &topo);
let llc0 = CpusetSpec::Llc(0).resolve(&ctx);
let llc1 = CpusetSpec::Llc(1).resolve(&ctx);
assert!(llc0.is_disjoint(&llc1), "LLCs should be disjoint");
}
fn make_numa_ctx(
numa_nodes: u32,
llcs: u32,
cores: u32,
threads: u32,
) -> (crate::cgroup::CgroupManager, crate::topology::TestTopology) {
let cgroups = crate::cgroup::CgroupManager::new("/nonexistent");
let topo = crate::topology::TestTopology::from_vm_topology(
&crate::vmm::topology::Topology::new(numa_nodes, llcs, cores, threads),
);
(cgroups, topo)
}
#[test]
fn cpusetspec_numa_node_zero() {
let (cg, topo) = make_numa_ctx(2, 4, 4, 1);
let ctx = ctx_from(&cg, &topo);
let cpus = CpusetSpec::Numa(0).resolve(&ctx);
let expected: BTreeSet<usize> = (0..8).collect();
assert_eq!(cpus, expected);
}
#[test]
fn cpusetspec_numa_node_one() {
let (cg, topo) = make_numa_ctx(2, 4, 4, 1);
let ctx = ctx_from(&cg, &topo);
let cpus = CpusetSpec::Numa(1).resolve(&ctx);
let expected: BTreeSet<usize> = (8..16).collect();
assert_eq!(cpus, expected);
}
#[test]
fn cpusetspec_numa_disjoint() {
let (cg, topo) = make_numa_ctx(2, 4, 4, 1);
let ctx = ctx_from(&cg, &topo);
let node0 = CpusetSpec::Numa(0).resolve(&ctx);
let node1 = CpusetSpec::Numa(1).resolve(&ctx);
assert!(
node0.is_disjoint(&node1),
"NUMA nodes should be disjoint: {:?} vs {:?}",
node0,
node1
);
let union: BTreeSet<usize> = node0.union(&node1).copied().collect();
assert_eq!(union, ctx.topo.all_cpuset());
}
#[test]
fn cpusetspec_numa_single_node_returns_all() {
let (cg, topo) = make_numa_ctx(1, 2, 4, 1);
let ctx = ctx_from(&cg, &topo);
let cpus = CpusetSpec::Numa(0).resolve(&ctx);
assert_eq!(cpus, ctx.topo.all_cpuset());
}
#[test]
fn cpusetspec_numa_validate_out_of_range() {
let (cg, topo) = make_numa_ctx(2, 4, 4, 1);
let ctx = ctx_from(&cg, &topo);
let spec = CpusetSpec::Numa(5);
let err = spec.validate(&ctx).unwrap_err();
assert!(err.contains("out of range"), "got: {err}");
}
#[test]
fn cpusetspec_numa_validate_valid() {
let (cg, topo) = make_numa_ctx(2, 4, 4, 1);
let ctx = ctx_from(&cg, &topo);
assert!(CpusetSpec::Numa(0).validate(&ctx).is_ok());
assert!(CpusetSpec::Numa(1).validate(&ctx).is_ok());
}
#[test]
fn cpusetspec_numa_convenience_constructor() {
let spec = CpusetSpec::numa(0);
assert!(matches!(spec, CpusetSpec::Numa(0)));
}
#[test]
fn traverse_generate_produces_steps() {
let (cg, topo) = make_ctx(1, 8, 1);
let ctx = ctx_from(&cg, &topo);
let t = Traverse {
seed: Some(42),
cgroup_count: 2..=4,
layouts: vec![Layout::Disjoint],
phases: 3,
phase_duration: Duration::from_millis(100),
settle: Duration::from_millis(50),
persistent_cgroups: 0,
cgroup_workloads: vec![WorkSpec::default()],
};
let steps = t.generate(&ctx);
assert_eq!(steps.len(), 3);
for step in &steps {
assert!(!step.ops.is_empty(), "each phase should have ops");
}
}
#[test]
fn traverse_generate_deterministic() {
let (cg, topo) = make_ctx(1, 8, 1);
let ctx = ctx_from(&cg, &topo);
let t = Traverse {
seed: Some(99),
cgroup_count: 2..=4,
layouts: vec![Layout::Disjoint, Layout::Overlap(0.2, 0.5)],
phases: 5,
phase_duration: Duration::from_millis(100),
settle: Duration::from_millis(50),
persistent_cgroups: 1,
cgroup_workloads: vec![WorkSpec::default()],
};
let steps1 = t.generate(&ctx);
let steps2 = t.generate(&ctx);
assert_eq!(steps1.len(), steps2.len());
for (s1, s2) in steps1.iter().zip(steps2.iter()) {
assert_eq!(
s1.ops.len(),
s2.ops.len(),
"deterministic seed should produce same ops"
);
}
}
#[test]
fn traverse_generate_persistent_cgroups_preserved() {
let (cg, topo) = make_ctx(1, 8, 1);
let ctx = ctx_from(&cg, &topo);
let t = Traverse {
seed: Some(42),
cgroup_count: 1..=4,
layouts: vec![Layout::Disjoint],
phases: 5,
phase_duration: Duration::from_millis(100),
settle: Duration::from_millis(50),
persistent_cgroups: 2,
cgroup_workloads: vec![WorkSpec::default()],
};
let steps = t.generate(&ctx);
for step in &steps {
let remove_ops: Vec<&Op> = step.ops.iter()
.filter(|op| matches!(op, Op::RemoveCgroup { cgroup } if cgroup == "cg_0" || cgroup == "cg_1"))
.collect();
assert!(
remove_ops.is_empty(),
"persistent cgroups should never be removed"
);
}
}
#[test]
fn cgroup_def_builder_chain() {
let d = CgroupDef::named("test")
.with_cpuset(CpusetSpec::llc(0))
.workers(8)
.work_type(WorkType::bursty(
Duration::from_millis(50),
Duration::from_millis(100),
))
.sched_policy(crate::workload::SchedPolicy::Batch)
.swappable(true);
assert_eq!(d.name, "test");
assert!(d.cpuset.is_some());
assert_eq!(d.works.len(), 1);
assert_eq!(d.works[0].num_workers, Some(8));
assert!(d.swappable);
}
#[test]
fn cgroup_def_default() {
let d = CgroupDef::default();
assert_eq!(d.name, "cg_0");
assert!(d.cpuset.is_none());
assert!(d.works.is_empty());
assert!(!d.swappable);
}
#[test]
fn cgroup_def_multi_work() {
let d = CgroupDef::named("multi")
.work(WorkSpec::default().workers(4).work_type(WorkType::SpinWait))
.work(
WorkSpec::default()
.workers(2)
.work_type(WorkType::YieldHeavy),
);
assert_eq!(d.works.len(), 2);
assert_eq!(d.works[0].num_workers, Some(4));
assert_eq!(d.works[1].num_workers, Some(2));
}
#[test]
fn cgroup_def_old_api_then_work() {
let d = CgroupDef::named("mixed")
.workers(4)
.work(WorkSpec::default().workers(2));
assert_eq!(d.works.len(), 2);
assert_eq!(d.works[0].num_workers, Some(4));
assert_eq!(d.works[1].num_workers, Some(2));
}
#[test]
fn cgroup_def_work_only_no_phantom() {
let d = CgroupDef::named("explicit").work(WorkSpec::default().workers(3));
assert_eq!(d.works.len(), 1);
assert_eq!(d.works[0].num_workers, Some(3));
}
#[test]
fn setup_defs_resolves() {
let defs = vec![CgroupDef::named("a"), CgroupDef::named("b")];
let setup = Setup::Defs(defs);
let (cg, topo) = make_ctx(1, 4, 1);
let ctx = ctx_from(&cg, &topo);
let resolved = setup.resolve(&ctx);
assert_eq!(resolved.len(), 2);
assert!(!setup.is_empty());
}
#[test]
fn setup_defs_empty() {
let setup = Setup::Defs(vec![]);
assert!(setup.is_empty());
}
#[test]
fn setup_factory_not_empty() {
let setup = Setup::Factory(|_| vec![CgroupDef::named("generated")]);
assert!(!setup.is_empty());
}
#[test]
fn step_with_defs_empty() {
let step = Step::with_defs(vec![], HoldSpec::Frac(0.5));
assert!(step.setup.is_empty());
assert!(step.ops.is_empty());
}
#[test]
fn step_with_defs_populated() {
let step = Step::with_defs(
vec![CgroupDef::named("cg_0"), CgroupDef::named("cg_1")],
HoldSpec::Fixed(Duration::from_secs(5)),
);
assert!(!step.setup.is_empty());
assert!(step.ops.is_empty());
}
#[test]
fn step_with_defs_then_ops() {
let step = Step::with_defs(vec![CgroupDef::named("cg_0")], HoldSpec::FULL).set_ops(vec![
Op::AddCgroup {
name: "cg_1".into(),
},
]);
assert!(!step.setup.is_empty());
assert_eq!(step.ops.len(), 1);
}
#[test]
fn step_set_ops_replaces() {
let step = Step::new(
vec![Op::AddCgroup { name: "a".into() }],
HoldSpec::Frac(0.5),
)
.set_ops(vec![
Op::AddCgroup { name: "b".into() },
Op::RemoveCgroup { cgroup: "c".into() },
]);
assert_eq!(step.ops.len(), 2);
}
#[test]
fn cpusetspec_validate_disjoint_of_zero() {
let (cg, topo) = make_ctx(1, 4, 1);
let ctx = ctx_from(&cg, &topo);
let spec = CpusetSpec::Disjoint { index: 0, of: 0 };
let err = spec.validate(&ctx).unwrap_err();
assert!(err.contains("must be > 0"), "got: {err}");
}
#[test]
fn cpusetspec_validate_disjoint_index_ge_of() {
let (cg, topo) = make_ctx(1, 4, 1);
let ctx = ctx_from(&cg, &topo);
let spec = CpusetSpec::Disjoint { index: 3, of: 3 };
let err = spec.validate(&ctx).unwrap_err();
assert!(err.contains("index 3 >= partition count 3"), "got: {err}");
}
#[test]
fn cpusetspec_validate_overlap_of_zero() {
let (cg, topo) = make_ctx(1, 4, 1);
let ctx = ctx_from(&cg, &topo);
let spec = CpusetSpec::Overlap {
index: 0,
of: 0,
frac: 0.5,
};
let err = spec.validate(&ctx).unwrap_err();
assert!(err.contains("must be > 0"), "got: {err}");
}
#[test]
fn cpusetspec_validate_overlap_index_ge_of() {
let (cg, topo) = make_ctx(1, 4, 1);
let ctx = ctx_from(&cg, &topo);
let spec = CpusetSpec::Overlap {
index: 2,
of: 2,
frac: 0.5,
};
let err = spec.validate(&ctx).unwrap_err();
assert!(err.contains("index 2 >= partition count 2"), "got: {err}");
}
#[test]
fn cpusetspec_validate_range_start_ge_end() {
let (cg, topo) = make_ctx(1, 4, 1);
let ctx = ctx_from(&cg, &topo);
let spec = CpusetSpec::Range {
start_frac: 0.8,
end_frac: 0.2,
};
let err = spec.validate(&ctx).unwrap_err();
assert!(err.contains("start_frac"), "got: {err}");
}
#[test]
fn cpusetspec_validate_range_rejects_nan() {
let (cg, topo) = make_ctx(1, 4, 1);
let ctx = ctx_from(&cg, &topo);
let spec = CpusetSpec::Range {
start_frac: 0.8,
end_frac: f64::NAN,
};
let err = spec.validate(&ctx).unwrap_err();
assert!(err.contains("not finite"), "got: {err}");
}
#[test]
fn cpusetspec_validate_range_rejects_infinity() {
let (cg, topo) = make_ctx(1, 4, 1);
let ctx = ctx_from(&cg, &topo);
let spec = CpusetSpec::Range {
start_frac: 0.0,
end_frac: f64::INFINITY,
};
let err = spec.validate(&ctx).unwrap_err();
assert!(err.contains("not finite"), "got: {err}");
}
#[test]
fn cpusetspec_validate_range_rejects_negative() {
let (cg, topo) = make_ctx(1, 4, 1);
let ctx = ctx_from(&cg, &topo);
let spec = CpusetSpec::Range {
start_frac: -0.5,
end_frac: 0.5,
};
let err = spec.validate(&ctx).unwrap_err();
assert!(err.contains("[0.0, 1.0]"), "got: {err}");
}
#[test]
fn cpusetspec_validate_range_rejects_above_one() {
let (cg, topo) = make_ctx(1, 4, 1);
let ctx = ctx_from(&cg, &topo);
let spec = CpusetSpec::Range {
start_frac: 0.5,
end_frac: 1.5,
};
let err = spec.validate(&ctx).unwrap_err();
assert!(err.contains("[0.0, 1.0]"), "got: {err}");
}
#[test]
fn cpusetspec_validate_overlap_rejects_nan_frac() {
let (cg, topo) = make_ctx(1, 4, 1);
let ctx = ctx_from(&cg, &topo);
let spec = CpusetSpec::Overlap {
index: 0,
of: 2,
frac: f64::NAN,
};
let err = spec.validate(&ctx).unwrap_err();
assert!(err.contains("not finite"), "got: {err}");
}
#[test]
fn cpusetspec_validate_overlap_rejects_infinity_frac() {
let (cg, topo) = make_ctx(1, 4, 1);
let ctx = ctx_from(&cg, &topo);
let spec = CpusetSpec::Overlap {
index: 0,
of: 2,
frac: f64::INFINITY,
};
let err = spec.validate(&ctx).unwrap_err();
assert!(err.contains("not finite"), "got: {err}");
}
#[test]
fn cpusetspec_validate_overlap_rejects_out_of_range_frac() {
let (cg, topo) = make_ctx(1, 4, 1);
let ctx = ctx_from(&cg, &topo);
let spec = CpusetSpec::Overlap {
index: 0,
of: 2,
frac: 1.5,
};
let err = spec.validate(&ctx).unwrap_err();
assert!(err.contains("[0.0, 1.0]"), "got: {err}");
}
#[test]
fn cpusetspec_validate_too_few_cpus_for_partitions() {
let (cg, topo) = make_ctx(1, 2, 1);
let ctx = ctx_from(&cg, &topo);
let spec = CpusetSpec::Disjoint { index: 0, of: 5 };
let err = spec.validate(&ctx).unwrap_err();
assert!(err.contains("not enough usable CPUs"), "got: {err}");
}
#[test]
fn cpusetspec_validate_exact_in_range_ok() {
let (cg, topo) = make_ctx(1, 4, 1);
let ctx = ctx_from(&cg, &topo);
let spec = CpusetSpec::exact([0, 2]);
assert!(spec.validate(&ctx).is_ok());
}
#[test]
fn cpusetspec_validate_exact_empty_rejected() {
let (cg, topo) = make_ctx(1, 4, 1);
let ctx = ctx_from(&cg, &topo);
let spec = CpusetSpec::Exact(BTreeSet::new());
let err = spec.validate(&ctx).unwrap_err();
assert!(err.contains("Exact") && err.contains("empty"), "got: {err}");
}
#[test]
fn cpusetspec_validate_exact_out_of_range_rejected() {
let (cg, topo) = make_ctx(1, 4, 1);
let ctx = ctx_from(&cg, &topo);
let spec = CpusetSpec::exact([99]);
let err = spec.validate(&ctx).unwrap_err();
assert!(
err.contains("99") && err.contains("physical CPU set"),
"error must name the offending CPU and call it physical: {err}"
);
}
#[test]
fn cpusetspec_validate_exact_accepts_reserved_last_cpu() {
let (cg, topo) = make_ctx(1, 4, 1);
let ctx = ctx_from(&cg, &topo);
let total = ctx.topo.all_cpus().len();
assert!(total > 2, "test requires a topology that reserves a CPU");
let reserved_cpu = total - 1;
assert!(
!ctx.topo.usable_cpuset().contains(&reserved_cpu),
"precondition: reserved CPU {reserved_cpu} must sit outside usable_cpuset",
);
assert!(
ctx.topo.all_cpuset().contains(&reserved_cpu),
"precondition: reserved CPU {reserved_cpu} must be physically present",
);
let spec = CpusetSpec::exact([reserved_cpu]);
assert!(
spec.validate(&ctx).is_ok(),
"validate must accept the reserved CPU — physical presence, not \
usable-set membership, is the bar",
);
}
#[test]
fn execute_steps_with_bails_on_invalid_hold_before_ops() {
let parent =
std::env::temp_dir().join(format!("ktstr-hold-validate-{}", std::process::id()));
let _ = std::fs::remove_dir_all(&parent);
std::fs::create_dir_all(&parent).unwrap();
let cgroups = crate::cgroup::CgroupManager::new(parent.to_str().unwrap());
let topo = crate::topology::TestTopology::from_vm_topology(
&crate::vmm::topology::Topology::new(1, 1, 4, 1),
);
let ctx = ctx_from(&cgroups, &topo);
let cg_name = "should_never_exist";
let step = Step::new(vec![Op::add_cgroup(cg_name)], HoldSpec::Frac(0.0));
let err = execute_steps_with(&ctx, vec![step], None).unwrap_err();
let msg = format!("{err:#}");
assert!(
msg.contains("hold validation") && msg.contains("Frac"),
"error must cite hold validation + variant: {msg}"
);
assert!(
!parent.join(cg_name).exists(),
"AddCgroup op ran before hold validation — cgroup dir '{}' exists",
parent.join(cg_name).display()
);
let _ = std::fs::remove_dir_all(&parent);
}
#[test]
fn set_affinity_random_no_op_conditions() {
fn should_apply(from: &BTreeSet<usize>, count: usize) -> bool {
!from.is_empty() && count > 0
}
let pool: BTreeSet<usize> = [0, 1, 2].into_iter().collect();
let empty: BTreeSet<usize> = BTreeSet::new();
assert!(should_apply(&pool, 2));
assert!(!should_apply(&pool, 0), "count=0 → no-op");
assert!(!should_apply(&empty, 1), "empty pool → no-op");
assert!(!should_apply(&empty, 0), "both zero → no-op");
}
#[test]
fn cpusetspec_validate_llc_out_of_range() {
let (cg, topo) = make_ctx(1, 4, 1);
let ctx = ctx_from(&cg, &topo);
let spec = CpusetSpec::Llc(5);
let err = spec.validate(&ctx).unwrap_err();
assert!(err.contains("out of range"), "got: {err}");
}
#[test]
fn cpusetspec_validate_valid_disjoint_ok() {
let (cg, topo) = make_ctx(1, 8, 1);
let ctx = ctx_from(&cg, &topo);
let spec = CpusetSpec::Disjoint { index: 1, of: 2 };
assert!(spec.validate(&ctx).is_ok());
}
#[test]
fn validate_mempolicy_default_always_ok() {
let (cg, topo) = make_numa_ctx(2, 2, 4, 1);
let ctx = ctx_from(&cg, &topo);
let cpuset: BTreeSet<usize> = (0..4).collect();
assert!(
validate_mempolicy_cpuset(
&MemPolicy::Default,
crate::workload::MpolFlags::NONE,
&cpuset,
&ctx,
"cg_0",
)
.is_ok()
);
}
#[test]
fn validate_mempolicy_local_always_ok() {
let (cg, topo) = make_numa_ctx(2, 2, 4, 1);
let ctx = ctx_from(&cg, &topo);
let cpuset: BTreeSet<usize> = (0..4).collect();
assert!(
validate_mempolicy_cpuset(
&MemPolicy::Local,
crate::workload::MpolFlags::NONE,
&cpuset,
&ctx,
"cg_0",
)
.is_ok()
);
}
#[test]
fn validate_mempolicy_bind_covered() {
let (cg, topo) = make_numa_ctx(2, 2, 4, 1);
let ctx = ctx_from(&cg, &topo);
let cpuset: BTreeSet<usize> = (0..8).collect(); let policy = MemPolicy::Bind([0, 1].into_iter().collect());
assert!(
validate_mempolicy_cpuset(
&policy,
crate::workload::MpolFlags::NONE,
&cpuset,
&ctx,
"cg_0",
)
.is_ok()
);
}
#[test]
fn validate_mempolicy_bind_uncovered() {
let (cg, topo) = make_numa_ctx(2, 2, 4, 1);
let ctx = ctx_from(&cg, &topo);
let cpuset: BTreeSet<usize> = (0..4).collect(); let policy = MemPolicy::Bind([1].into_iter().collect()); let err = validate_mempolicy_cpuset(
&policy,
crate::workload::MpolFlags::NONE,
&cpuset,
&ctx,
"cg_bind_test",
)
.unwrap_err()
.to_string();
assert!(err.contains("cg_bind_test"), "bail must name cgroup: {err}");
assert!(
err.contains("[1]"),
"bail must name uncovered node 1: {err}"
);
assert!(err.contains("{0}"), "bail must name cpuset node 0: {err}");
assert!(
err.contains("(a) add .mpol_flags(MpolFlags::STATIC_NODES)"),
"bail must call out hatch (a) STATIC_NODES opt-in by name: {err}",
);
assert!(
err.contains("(b) widen the cpuset"),
"bail must call out hatch (b) cpuset widening: {err}",
);
assert!(
err.contains("CpusetSpec::Numa(N)"),
"bail must name CpusetSpec::Numa(N) as a widening example: {err}",
);
assert!(
err.contains("CpusetSpec::Exact"),
"bail must name the CpusetSpec::Exact cpuset-widening escape hatch: {err}",
);
assert!(
err.contains("cross-socket allocation traffic"),
"bail must name the cross-socket framing: {err}",
);
assert!(
err.contains("almost certainly unintended"),
"bail must frame the mismatch as unintended: {err}",
);
}
#[test]
fn validate_mempolicy_preferred_covered() {
let (cg, topo) = make_numa_ctx(2, 2, 4, 1);
let ctx = ctx_from(&cg, &topo);
let cpuset: BTreeSet<usize> = (4..8).collect(); let policy = MemPolicy::Preferred(1);
assert!(
validate_mempolicy_cpuset(
&policy,
crate::workload::MpolFlags::NONE,
&cpuset,
&ctx,
"cg_0",
)
.is_ok()
);
}
#[test]
fn validate_mempolicy_preferred_uncovered() {
let (cg, topo) = make_numa_ctx(2, 2, 4, 1);
let ctx = ctx_from(&cg, &topo);
let cpuset: BTreeSet<usize> = (0..4).collect(); let policy = MemPolicy::Preferred(1);
let err = validate_mempolicy_cpuset(
&policy,
crate::workload::MpolFlags::NONE,
&cpuset,
&ctx,
"cg_preferred_test",
)
.unwrap_err()
.to_string();
assert!(
err.contains("cg_preferred_test"),
"bail must name cgroup: {err}"
);
assert!(
err.contains("[1]"),
"bail must name uncovered node 1: {err}"
);
assert!(err.contains("{0}"), "bail must name cpuset node 0: {err}");
assert!(
err.contains("(a) add .mpol_flags(MpolFlags::STATIC_NODES)"),
"bail must enumerate hatch (a): {err}",
);
assert!(
err.contains("(b) widen the cpuset"),
"bail must enumerate hatch (b): {err}",
);
assert!(
err.contains("CpusetSpec::Numa(N)"),
"bail must cite CpusetSpec::Numa(N) example: {err}",
);
assert!(
err.contains("CpusetSpec::Exact"),
"bail must name CpusetSpec::Exact widening: {err}",
);
assert!(
err.contains("almost certainly unintended"),
"bail must frame mismatch as unintended: {err}",
);
}
#[test]
fn validate_mempolicy_interleave_partial_uncovered() {
let (cg, topo) = make_numa_ctx(2, 2, 4, 1);
let ctx = ctx_from(&cg, &topo);
let cpuset: BTreeSet<usize> = (0..4).collect(); let policy = MemPolicy::Interleave([0, 1].into_iter().collect());
let err = validate_mempolicy_cpuset(
&policy,
crate::workload::MpolFlags::NONE,
&cpuset,
&ctx,
"cg_interleave_test",
)
.unwrap_err()
.to_string();
assert!(
err.contains("cg_interleave_test"),
"bail must name cgroup: {err}"
);
assert!(
err.contains("[1]"),
"bail must name uncovered node 1: {err}"
);
assert!(err.contains("{0}"), "bail must name cpuset node 0: {err}");
assert!(
err.contains("(a) add .mpol_flags(MpolFlags::STATIC_NODES)"),
"bail must enumerate hatch (a): {err}",
);
assert!(
err.contains("(b) widen the cpuset"),
"bail must enumerate hatch (b): {err}",
);
assert!(
err.contains("CpusetSpec::Numa(N)"),
"bail must cite CpusetSpec::Numa(N) example: {err}",
);
assert!(
err.contains("CpusetSpec::Exact"),
"bail must name CpusetSpec::Exact widening: {err}",
);
}
#[test]
fn validate_mempolicy_static_nodes_bypasses_cpuset_check() {
let (cg, topo) = make_numa_ctx(2, 2, 4, 1);
let ctx = ctx_from(&cg, &topo);
let cpuset: BTreeSet<usize> = (0..4).collect(); let policy = MemPolicy::Interleave([0, 1].into_iter().collect());
assert!(
validate_mempolicy_cpuset(
&policy,
crate::workload::MpolFlags::STATIC_NODES,
&cpuset,
&ctx,
"cg_0",
)
.is_ok()
);
}
#[test]
fn validate_mempolicy_rejects_static_and_relative_conflict() {
let (cg, topo) = make_numa_ctx(2, 2, 4, 1);
let ctx = ctx_from(&cg, &topo);
let cpuset: BTreeSet<usize> = (0..4).collect();
let policy = MemPolicy::Bind([0].into_iter().collect());
let flags =
crate::workload::MpolFlags::STATIC_NODES | crate::workload::MpolFlags::RELATIVE_NODES;
let err = validate_mempolicy_cpuset(&policy, flags, &cpuset, &ctx, "cg_0")
.expect_err("STATIC_NODES | RELATIVE_NODES must be rejected");
let rendered = format!("{err:#}");
assert!(
rendered.contains("mutually exclusive"),
"error must name the mutual-exclusion contract; got: {rendered}"
);
}
#[test]
fn validate_mempolicy_rejects_unknown_bits() {
let (cg, topo) = make_numa_ctx(2, 2, 4, 1);
let ctx = ctx_from(&cg, &topo);
let cpuset: BTreeSet<usize> = (0..8).collect();
let unknown = crate::workload::MpolFlags::from_bits_for_test(1 << 10);
let err = validate_mempolicy_cpuset(
&MemPolicy::Default,
unknown,
&cpuset,
&ctx,
"cg_unknown_bit",
)
.expect_err("unknown bit must bail");
let s = err.to_string();
assert!(s.contains("cg_unknown_bit"), "bail must name cgroup: {s}");
assert!(
s.contains("unknown bit"),
"bail must name the unknown-bit contract: {s}"
);
assert!(
s.contains("STATIC_NODES"),
"bail must enumerate the known bits so the user sees what IS supported: {s}",
);
}
#[test]
fn validate_mempolicy_relative_nodes_bypasses_cpuset_check() {
let (cg, topo) = make_numa_ctx(2, 2, 4, 1);
let ctx = ctx_from(&cg, &topo);
let cpuset: BTreeSet<usize> = (0..4).collect();
let policy = MemPolicy::Bind([1].into_iter().collect());
assert!(
validate_mempolicy_cpuset(
&policy,
crate::workload::MpolFlags::RELATIVE_NODES,
&cpuset,
&ctx,
"cg_0",
)
.is_ok(),
"RELATIVE_NODES must bypass the absolute-id cpuset coverage check"
);
}
#[test]
fn validate_mempolicy_static_nodes_rejects_missing_host_node() {
let (cg, topo) = make_numa_ctx(2, 2, 4, 1); let ctx = ctx_from(&cg, &topo);
let cpuset: BTreeSet<usize> = (0..8).collect();
let policy = MemPolicy::Bind([7].into_iter().collect());
let err = validate_mempolicy_cpuset(
&policy,
crate::workload::MpolFlags::STATIC_NODES,
&cpuset,
&ctx,
"cg_0",
)
.expect_err("STATIC_NODES policy referencing missing host node must be rejected");
let rendered = format!("{err:#}");
assert!(
rendered.contains("do not exist on this host"),
"error must name the missing-host-node condition; got: {rendered}"
);
}
#[test]
fn cgroupdef_mem_policy_builder() {
let def = CgroupDef::named("test").mem_policy(MemPolicy::Bind([0].into_iter().collect()));
assert!(matches!(def.works[0].mem_policy, MemPolicy::Bind(_)));
}
use crate::cgroup::CgroupOps;
use std::path::Path;
use std::sync::Mutex;
#[derive(Debug, Clone, PartialEq, Eq)]
enum CgroupCall {
Setup(BTreeSet<crate::cgroup::Controller>),
CreateCgroup(String),
RemoveCgroup(String),
SetCpuset(String, BTreeSet<usize>),
ClearCpuset(String),
SetCpusetMems(String, BTreeSet<usize>),
#[allow(dead_code)] ClearCpusetMems(String),
SetCpuMax(String, Option<u64>, u64),
SetCpuWeight(String, u32),
SetMemoryMax(String, Option<u64>),
SetMemoryHigh(String, Option<u64>),
SetMemoryLow(String, Option<u64>),
SetMemorySwapMax(String, Option<u64>),
SetIoWeight(String, u16),
SetFreeze(String, bool),
SetPidsMax(String, Option<u64>),
MoveTask(String, libc::pid_t),
MoveTasks(String, usize), ClearSubtreeControl(String),
DrainTasks(String),
CleanupAll,
}
struct MockCgroupOps {
parent: std::path::PathBuf,
calls: Mutex<Vec<CgroupCall>>,
fail_at: Mutex<Option<(usize, String)>>,
}
impl MockCgroupOps {
fn new() -> Self {
Self {
parent: std::path::PathBuf::from("/mock/cgroup"),
calls: Mutex::new(Vec::new()),
fail_at: Mutex::new(None),
}
}
fn fail_call_at(&self, index: usize, message: &str) {
*self.fail_at.lock().unwrap() = Some((index, message.to_string()));
}
fn calls(&self) -> Vec<CgroupCall> {
self.calls.lock().unwrap().clone()
}
fn record(&self, call: CgroupCall) -> Result<()> {
let mut calls = self.calls.lock().unwrap();
let current_index = calls.len();
calls.push(call);
drop(calls);
let mut fail = self.fail_at.lock().unwrap();
if let Some((index, ref message)) = *fail
&& current_index == index
{
let err_msg = message.clone();
*fail = None;
return Err(anyhow::anyhow!(err_msg));
}
Ok(())
}
}
impl CgroupOps for MockCgroupOps {
fn parent_path(&self) -> &Path {
&self.parent
}
fn setup(&self, controllers: &BTreeSet<crate::cgroup::Controller>) -> Result<()> {
self.record(CgroupCall::Setup(controllers.clone()))
}
fn create_cgroup(&self, name: &str) -> Result<()> {
self.record(CgroupCall::CreateCgroup(name.to_string()))
}
fn remove_cgroup(&self, name: &str) -> Result<()> {
self.record(CgroupCall::RemoveCgroup(name.to_string()))
}
fn set_cpuset(&self, name: &str, cpus: &BTreeSet<usize>) -> Result<()> {
self.record(CgroupCall::SetCpuset(name.to_string(), cpus.clone()))
}
fn clear_cpuset(&self, name: &str) -> Result<()> {
self.record(CgroupCall::ClearCpuset(name.to_string()))
}
fn set_cpuset_mems(&self, name: &str, nodes: &BTreeSet<usize>) -> Result<()> {
self.record(CgroupCall::SetCpusetMems(name.to_string(), nodes.clone()))
}
fn clear_cpuset_mems(&self, name: &str) -> Result<()> {
self.record(CgroupCall::ClearCpusetMems(name.to_string()))
}
fn set_cpu_max(&self, name: &str, quota_us: Option<u64>, period_us: u64) -> Result<()> {
self.record(CgroupCall::SetCpuMax(name.to_string(), quota_us, period_us))
}
fn set_cpu_weight(&self, name: &str, weight: u32) -> Result<()> {
self.record(CgroupCall::SetCpuWeight(name.to_string(), weight))
}
fn set_memory_max(&self, name: &str, bytes: Option<u64>) -> Result<()> {
self.record(CgroupCall::SetMemoryMax(name.to_string(), bytes))
}
fn set_memory_high(&self, name: &str, bytes: Option<u64>) -> Result<()> {
self.record(CgroupCall::SetMemoryHigh(name.to_string(), bytes))
}
fn set_memory_low(&self, name: &str, bytes: Option<u64>) -> Result<()> {
self.record(CgroupCall::SetMemoryLow(name.to_string(), bytes))
}
fn set_io_weight(&self, name: &str, weight: u16) -> Result<()> {
self.record(CgroupCall::SetIoWeight(name.to_string(), weight))
}
fn set_freeze(&self, name: &str, frozen: bool) -> Result<()> {
self.record(CgroupCall::SetFreeze(name.to_string(), frozen))
}
fn set_pids_max(&self, name: &str, max: Option<u64>) -> Result<()> {
self.record(CgroupCall::SetPidsMax(name.to_string(), max))
}
fn set_memory_swap_max(&self, name: &str, bytes: Option<u64>) -> Result<()> {
self.record(CgroupCall::SetMemorySwapMax(name.to_string(), bytes))
}
fn move_task(&self, name: &str, pid: libc::pid_t) -> Result<()> {
self.record(CgroupCall::MoveTask(name.to_string(), pid))
}
fn move_tasks(&self, name: &str, pids: &[libc::pid_t]) -> Result<()> {
self.record(CgroupCall::MoveTasks(name.to_string(), pids.len()))
}
fn clear_subtree_control(&self, name: &str) -> Result<()> {
self.record(CgroupCall::ClearSubtreeControl(name.to_string()))
}
fn drain_tasks(&self, name: &str) -> Result<()> {
self.record(CgroupCall::DrainTasks(name.to_string()))
}
fn cleanup_all(&self) -> Result<()> {
self.record(CgroupCall::CleanupAll)
}
}
fn mock_ctx<'a>(mock: &'a MockCgroupOps, topo: &'a crate::topology::TestTopology) -> Ctx<'a> {
Ctx {
cgroups: mock,
topo,
duration: Duration::from_secs(1),
workers_per_cgroup: 1,
sched_pid: None,
settle: Duration::ZERO,
work_type_override: None,
assert: crate::assert::Assert::default_checks(),
wait_for_map_write: false,
}
}
fn mock_topo() -> crate::topology::TestTopology {
crate::topology::TestTopology::from_vm_topology(&crate::vmm::topology::Topology::new(
1, 1, 4, 1,
))
}
fn cleanup_state(state: &mut StepState<'_>) {
state.handles.clear();
drain_all_payload_handles(&mut state.payload_handles);
}
fn apply_setup_test<'a>(
ctx: &'a Ctx<'a>,
state: &mut StepState<'a>,
defs: &[CgroupDef],
) -> Result<()> {
let mut backdrop = BackdropState::empty(ctx);
let mut scenario = ScenarioState::new(state, &mut backdrop);
apply_setup(ctx, &mut scenario, defs)
}
fn apply_ops_test<'a>(ctx: &'a Ctx<'a>, state: &mut StepState<'a>, ops: &[Op]) -> Result<()> {
let mut backdrop = BackdropState::empty(ctx);
let mut scenario = ScenarioState::new(state, &mut backdrop);
apply_ops(ctx, &mut scenario, ops)
}
#[test]
fn apply_setup_empty_defs_is_noop() {
let mock = MockCgroupOps::new();
let topo = mock_topo();
let ctx = mock_ctx(&mock, &topo);
let mut state = StepState::empty(&ctx);
apply_setup_test(&ctx, &mut state, &[]).unwrap();
assert!(
mock.calls().is_empty(),
"apply_setup on zero defs must not call any cgroup op, got: {:?}",
mock.calls()
);
cleanup_state(&mut state);
}
#[test]
fn apply_setup_creates_cgroup_per_def() {
let mock = MockCgroupOps::new();
let topo = mock_topo();
let ctx = mock_ctx(&mock, &topo);
let mut state = StepState::empty(&ctx);
let defs = vec![
CgroupDef::named("cg_a").workers(1),
CgroupDef::named("cg_b").workers(1),
];
apply_setup_test(&ctx, &mut state, &defs).unwrap();
let calls = mock.calls();
let creates: Vec<&CgroupCall> = calls
.iter()
.filter(|c| matches!(c, CgroupCall::CreateCgroup(_)))
.collect();
assert_eq!(
creates,
vec![
&CgroupCall::CreateCgroup("cg_a".to_string()),
&CgroupCall::CreateCgroup("cg_b".to_string()),
],
"one create_cgroup call per def, in order"
);
cleanup_state(&mut state);
}
#[test]
fn apply_setup_sets_cpuset_when_spec_present() {
let mock = MockCgroupOps::new();
let topo = mock_topo();
let ctx = mock_ctx(&mock, &topo);
let mut state = StepState::empty(&ctx);
let cpus: BTreeSet<usize> = [0, 1].into_iter().collect();
let defs = vec![
CgroupDef::named("cg_0")
.with_cpuset(CpusetSpec::Exact(cpus.clone()))
.workers(1),
];
apply_setup_test(&ctx, &mut state, &defs).unwrap();
let calls = mock.calls();
assert!(
calls.contains(&CgroupCall::SetCpuset("cg_0".to_string(), cpus.clone())),
"set_cpuset must be called with exactly the resolved cpu set, got: {calls:?}"
);
assert_eq!(
state.cpusets.get("cg_0"),
Some(&cpus),
"state.cpusets must record the resolved set"
);
cleanup_state(&mut state);
}
#[test]
fn apply_setup_skips_cpuset_when_none() {
let mock = MockCgroupOps::new();
let topo = mock_topo();
let ctx = mock_ctx(&mock, &topo);
let mut state = StepState::empty(&ctx);
let defs = vec![CgroupDef::named("cg_inherit").workers(1)];
apply_setup_test(&ctx, &mut state, &defs).unwrap();
let calls = mock.calls();
let has_set_cpuset = calls
.iter()
.any(|c| matches!(c, CgroupCall::SetCpuset(_, _)));
assert!(
!has_set_cpuset,
"no set_cpuset should be emitted when CgroupDef.cpuset is None, got: {calls:?}"
);
assert!(
state.cpusets.is_empty(),
"state.cpusets should stay empty when no CpusetSpec was resolved"
);
cleanup_state(&mut state);
}
#[test]
fn apply_setup_moves_spawned_tasks_into_cgroup() {
let mock = MockCgroupOps::new();
let topo = mock_topo();
let ctx = mock_ctx(&mock, &topo);
let mut state = StepState::empty(&ctx);
let defs = vec![CgroupDef::named("cg_move").workers(2)];
apply_setup_test(&ctx, &mut state, &defs).unwrap();
let calls = mock.calls();
assert!(
calls.contains(&CgroupCall::MoveTasks("cg_move".to_string(), 2)),
"move_tasks must be called with the 2 spawned worker pids, got: {calls:?}"
);
let create_idx = calls
.iter()
.position(|c| matches!(c, CgroupCall::CreateCgroup(n) if n == "cg_move"))
.expect("create_cgroup for cg_move");
let move_idx = calls
.iter()
.position(|c| matches!(c, CgroupCall::MoveTasks(n, _) if n == "cg_move"))
.expect("move_tasks for cg_move");
assert!(
create_idx < move_idx,
"create_cgroup must precede move_tasks for the same cgroup: {calls:?}"
);
cleanup_state(&mut state);
}
#[test]
fn apply_setup_sets_cpuset_before_move_tasks() {
let mock = MockCgroupOps::new();
let topo = mock_topo();
let ctx = mock_ctx(&mock, &topo);
let mut state = StepState::empty(&ctx);
let cpus: BTreeSet<usize> = [0, 1].into_iter().collect();
let defs = vec![
CgroupDef::named("cg_ordered")
.with_cpuset(CpusetSpec::Exact(cpus.clone()))
.workers(2),
];
apply_setup_test(&ctx, &mut state, &defs).unwrap();
let calls = mock.calls();
let set_idx = calls
.iter()
.position(|c| matches!(c, CgroupCall::SetCpuset(n, _) if n == "cg_ordered"))
.expect("set_cpuset for cg_ordered");
let move_idx = calls
.iter()
.position(|c| matches!(c, CgroupCall::MoveTasks(n, _) if n == "cg_ordered"))
.expect("move_tasks for cg_ordered");
assert!(
set_idx < move_idx,
"set_cpuset must precede move_tasks for the same cgroup: {calls:?}"
);
cleanup_state(&mut state);
}
#[test]
fn apply_setup_bails_on_invalid_cpuset_spec() {
let mock = MockCgroupOps::new();
let topo = mock_topo();
let ctx = mock_ctx(&mock, &topo);
let mut state = StepState::empty(&ctx);
let defs = vec![CgroupDef::named("cg_bad").with_cpuset(CpusetSpec::Llc(99))];
let err = apply_setup_test(&ctx, &mut state, &defs).unwrap_err();
let msg = format!("{err:#}");
assert!(
msg.contains("CpusetSpec validation failed"),
"expected validation error, got: {msg}"
);
let calls = mock.calls();
assert_eq!(
calls,
vec![CgroupCall::CreateCgroup("cg_bad".to_string())],
"current ordering: create_cgroup first, then cpuset validation"
);
cleanup_state(&mut state);
}
#[test]
fn apply_setup_propagates_set_cpuset_error() {
let mock = MockCgroupOps::new();
mock.fail_call_at(1, "set_cpuset kernel EBUSY");
let topo = mock_topo();
let ctx = mock_ctx(&mock, &topo);
let mut state = StepState::empty(&ctx);
let cpus: BTreeSet<usize> = [0, 1].into_iter().collect();
let defs = vec![
CgroupDef::named("cg_setfail")
.with_cpuset(CpusetSpec::Exact(cpus))
.workers(1),
];
let err = apply_setup_test(&ctx, &mut state, &defs).unwrap_err();
let msg = format!("{err:#}");
assert!(
msg.contains("set_cpuset kernel EBUSY"),
"set_cpuset error must propagate, got: {msg}"
);
let calls = mock.calls();
let has_move = calls
.iter()
.any(|c| matches!(c, CgroupCall::MoveTasks(_, _)));
assert!(
!has_move,
"no move_tasks call should follow a failed set_cpuset, got: {calls:?}"
);
cleanup_state(&mut state);
}
#[test]
fn apply_setup_validates_mempolicy_against_cpuset() {
let mock = MockCgroupOps::new();
let topo = crate::topology::TestTopology::from_vm_topology(
&crate::vmm::topology::Topology::new(2, 2, 4, 1),
);
let ctx = mock_ctx(&mock, &topo);
let mut state = StepState::empty(&ctx);
let cpus: BTreeSet<usize> = (0..4).collect();
let bind: BTreeSet<usize> = [1].into_iter().collect();
let defs = vec![
CgroupDef::named("cg_memfail")
.with_cpuset(CpusetSpec::Exact(cpus))
.mem_policy(MemPolicy::Bind(bind))
.workers(1),
];
let err = apply_setup_test(&ctx, &mut state, &defs).unwrap_err();
let msg = format!("{err:#}");
assert!(
msg.contains("cg_memfail"),
"error must name the bad cgroup, got: {msg}"
);
let calls = mock.calls();
let has_move = calls
.iter()
.any(|c| matches!(c, CgroupCall::MoveTasks(_, _)));
assert!(
!has_move,
"mempolicy validation must bail before spawn, got: {calls:?}"
);
cleanup_state(&mut state);
}
#[test]
fn cgroup_def_default_payload_is_none() {
let def = CgroupDef::named("cg_0");
assert!(def.payload.is_none());
}
#[test]
fn cgroup_def_workload_stores_payload() {
use crate::test_support::{OutputFormat, Payload, PayloadKind};
static FIO: Payload = Payload {
name: "fio",
kind: PayloadKind::Binary("fio"),
output: OutputFormat::Json,
default_args: &[],
default_checks: &[],
metrics: &[],
include_files: &[],
uses_parent_pgrp: false,
known_flags: None,
metric_bounds: None,
};
let def = CgroupDef::named("cg_0").workload(&FIO);
let p = def.payload.expect("workload was attached");
assert_eq!(p.name, "fio");
assert!(!p.is_scheduler());
}
#[test]
#[should_panic(expected = "CgroupDef::workload called with a scheduler-kind Payload")]
fn cgroup_def_workload_rejects_scheduler_kind_payload() {
use crate::test_support::Payload;
let _ = CgroupDef::named("cg_0").workload(&Payload::KERNEL_DEFAULT);
}
#[test]
fn drain_payload_handles_for_cgroup_removes_matching_only() {
use crate::cgroup::CgroupManager;
use crate::scenario::payload_run::PayloadRun;
use crate::test_support::{OutputFormat, Payload, PayloadKind};
use crate::topology::TestTopology;
static TRUE_BIN: Payload = Payload {
name: "true_bin",
kind: PayloadKind::Binary("/bin/true"),
output: OutputFormat::ExitCode,
default_args: &[],
default_checks: &[],
metrics: &[],
include_files: &[],
uses_parent_pgrp: false,
known_flags: None,
metric_bounds: None,
};
let cgroups = CgroupManager::new("/nonexistent");
let topo = TestTopology::synthetic(4, 1);
let ctx = crate::scenario::Ctx::builder(&cgroups, &topo).build();
let h_a = PayloadRun::new(&ctx, &TRUE_BIN)
.spawn()
.expect("spawn /bin/true for cg_a");
let h_b = PayloadRun::new(&ctx, &TRUE_BIN)
.spawn()
.expect("spawn /bin/true for cg_b");
let mut handles = vec![
PayloadEntry {
cgroup: "cg_a".to_string(),
source: PayloadSource::CgroupDefWorkload,
handle: h_a,
},
PayloadEntry {
cgroup: "cg_b".to_string(),
source: PayloadSource::CgroupDefWorkload,
handle: h_b,
},
];
drain_payload_handles_for_cgroup(&mut handles, "cg_a");
assert_eq!(handles.len(), 1);
assert_eq!(handles[0].cgroup, "cg_b");
drain_all_payload_handles(&mut handles);
assert!(handles.is_empty());
}
#[test]
fn step_with_payload_emits_runpayload_op() {
use crate::test_support::{OutputFormat, Payload, PayloadKind};
static FIO: Payload = Payload {
name: "fio",
kind: PayloadKind::Binary("fio"),
output: OutputFormat::Json,
default_args: &[],
default_checks: &[],
metrics: &[],
include_files: &[],
uses_parent_pgrp: false,
known_flags: None,
metric_bounds: None,
};
let step = Step::with_payload(&FIO, HoldSpec::Fixed(Duration::from_millis(50)));
assert_eq!(step.ops.len(), 1);
match &step.ops[0] {
Op::RunPayload {
payload,
args,
cgroup,
} => {
assert_eq!(payload.name, "fio");
assert!(args.is_empty());
assert!(cgroup.is_none());
}
other => panic!("expected RunPayload, got {other:?}"),
}
assert!(matches!(step.hold, HoldSpec::Fixed(d) if d == Duration::from_millis(50)));
assert!(matches!(&step.setup, Setup::Defs(d) if d.is_empty()));
}
#[test]
fn op_payload_constructors_produce_expected_variants() {
use crate::test_support::{OutputFormat, Payload, PayloadKind};
static FIO: Payload = Payload {
name: "fio",
kind: PayloadKind::Binary("fio"),
output: OutputFormat::Json,
default_args: &[],
default_checks: &[],
metrics: &[],
include_files: &[],
uses_parent_pgrp: false,
known_flags: None,
metric_bounds: None,
};
let op = Op::run_payload(&FIO, vec!["--warmup".into()]);
match op {
Op::RunPayload {
payload,
args,
cgroup,
} => {
assert_eq!(payload.name, "fio");
assert_eq!(args, vec!["--warmup".to_string()]);
assert!(cgroup.is_none());
}
other => panic!("expected RunPayload, got {other:?}"),
}
let op = Op::run_payload_in_cgroup(&FIO, vec![], "cg_0");
match op {
Op::RunPayload {
payload,
args,
cgroup,
} => {
assert_eq!(payload.name, "fio");
assert!(args.is_empty());
assert_eq!(cgroup.as_deref(), Some("cg_0"));
}
other => panic!("expected RunPayload, got {other:?}"),
}
let op = Op::wait_payload("fio");
assert!(matches!(
op,
Op::WaitPayload { ref name, ref cgroup } if name.as_ref() == "fio" && cgroup.is_none(),
));
let op = Op::kill_payload("fio");
assert!(matches!(
op,
Op::KillPayload { ref name, ref cgroup } if name.as_ref() == "fio" && cgroup.is_none(),
));
let op = Op::wait_payload_in_cgroup("fio", "cg_0");
assert!(matches!(
op,
Op::WaitPayload { ref name, cgroup: Some(ref c) } if name.as_ref() == "fio" && c.as_ref() == "cg_0",
));
let op = Op::kill_payload_in_cgroup("fio", "cg_0");
assert!(matches!(
op,
Op::KillPayload { ref name, cgroup: Some(ref c) } if name.as_ref() == "fio" && c.as_ref() == "cg_0",
));
}
#[test]
fn apply_ops_runpayload_rejects_scheduler_kind() {
use crate::test_support::Payload;
let mock = MockCgroupOps::new();
let topo = mock_topo();
let ctx = mock_ctx(&mock, &topo);
let mut state = StepState::empty(&ctx);
let ops = vec![Op::RunPayload {
payload: &Payload::KERNEL_DEFAULT,
args: vec![],
cgroup: None,
}];
let err = apply_ops_test(&ctx, &mut state, &ops).unwrap_err();
let msg = format!("{err:#}");
assert!(
msg.contains("scheduler-kind Payload") && msg.contains("kernel_default"),
"error must name the scheduler-kind reason AND the payload name, got: {msg}"
);
assert!(
state.payload_handles.is_empty(),
"no handle should be stored when RunPayload rejects the kind"
);
}
#[test]
fn apply_ops_wait_unknown_payload_bails() {
let mock = MockCgroupOps::new();
let topo = mock_topo();
let ctx = mock_ctx(&mock, &topo);
let mut state = StepState::empty(&ctx);
let err = apply_ops_test(
&ctx,
&mut state,
&[Op::WaitPayload {
name: "ghost".into(),
cgroup: None,
}],
)
.unwrap_err();
let msg = format!("{err:#}");
assert!(
msg.contains("no running payload named 'ghost'"),
"error must name the missing payload, got: {msg}"
);
}
#[test]
fn apply_ops_kill_unknown_payload_bails() {
let mock = MockCgroupOps::new();
let topo = mock_topo();
let ctx = mock_ctx(&mock, &topo);
let mut state = StepState::empty(&ctx);
let err = apply_ops_test(
&ctx,
&mut state,
&[Op::KillPayload {
name: "ghost".into(),
cgroup: None,
}],
)
.unwrap_err();
let msg = format!("{err:#}");
assert!(
msg.contains("no running payload named 'ghost'"),
"error must name the missing payload, got: {msg}"
);
}
#[test]
fn apply_ops_run_then_kill_consumes_handle() {
use crate::test_support::{OutputFormat, Payload, PayloadKind};
static SLEEP: Payload = Payload {
name: "sleeper",
kind: PayloadKind::Binary("/bin/sleep"),
output: OutputFormat::ExitCode,
default_args: &[],
default_checks: &[],
metrics: &[],
include_files: &[],
uses_parent_pgrp: false,
known_flags: None,
metric_bounds: None,
};
let mock = MockCgroupOps::new();
let topo = mock_topo();
let ctx = mock_ctx(&mock, &topo);
let mut state = StepState::empty(&ctx);
apply_ops_test(
&ctx,
&mut state,
&[Op::run_payload(&SLEEP, vec!["3600".into()])],
)
.expect("spawn /bin/sleep");
assert_eq!(state.payload_handles.len(), 1, "one payload is live");
assert_eq!(state.payload_handles[0].handle.payload_name(), "sleeper");
apply_ops_test(&ctx, &mut state, &[Op::kill_payload("sleeper")])
.expect("kill the live payload");
assert!(
state.payload_handles.is_empty(),
"handle must be consumed by KillPayload"
);
}
#[test]
fn apply_ops_run_duplicate_payload_name_bails() {
use crate::test_support::{OutputFormat, Payload, PayloadKind};
static SLEEP: Payload = Payload {
name: "sleeper",
kind: PayloadKind::Binary("/bin/sleep"),
output: OutputFormat::ExitCode,
default_args: &[],
default_checks: &[],
metrics: &[],
include_files: &[],
uses_parent_pgrp: false,
known_flags: None,
metric_bounds: None,
};
let mock = MockCgroupOps::new();
let topo = mock_topo();
let ctx = mock_ctx(&mock, &topo);
let mut state = StepState::empty(&ctx);
apply_ops_test(
&ctx,
&mut state,
&[Op::run_payload(&SLEEP, vec!["3600".into()])],
)
.expect("first spawn");
let err = apply_ops_test(
&ctx,
&mut state,
&[Op::run_payload(&SLEEP, vec!["3600".into()])],
)
.unwrap_err();
let msg = format!("{err:#}");
assert!(
msg.contains("payload 'sleeper' already running"),
"error must flag the duplicate, got: {msg}"
);
assert!(
msg.contains("Op::RunPayload"),
"dup error must name the originating surface, got: {msg}"
);
assert!(
msg.contains("(no cgroup)"),
"empty-cgroup key must render as '(no cgroup)', got: {msg}"
);
assert!(
!msg.contains("cgroup ''"),
"empty-cgroup key must not render as quoted empty, got: {msg}"
);
assert_eq!(
state.payload_handles.len(),
1,
"second spawn must not add a handle on failure"
);
apply_ops_test(&ctx, &mut state, &[Op::kill_payload("sleeper")]).expect("teardown kill");
}
#[test]
fn apply_ops_run_rejects_payload_already_owned_by_cgroup_def() {
use crate::test_support::{OutputFormat, Payload, PayloadKind};
static SLEEP: Payload = Payload {
name: "sleeper",
kind: PayloadKind::Binary("/bin/sleep"),
output: OutputFormat::ExitCode,
default_args: &[],
default_checks: &[],
metrics: &[],
include_files: &[],
uses_parent_pgrp: false,
known_flags: None,
metric_bounds: None,
};
let mock = MockCgroupOps::new();
let topo = mock_topo();
let ctx = mock_ctx(&mock, &topo);
let mut state = StepState::empty(&ctx);
let h = crate::scenario::payload_run::PayloadRun::new(&ctx, &SLEEP)
.args(["3600".to_string()])
.spawn()
.expect("manual def-source spawn");
state.payload_handles.push(PayloadEntry {
cgroup: "def_cg".to_string(),
source: PayloadSource::CgroupDefWorkload,
handle: h,
});
let err = apply_ops_test(
&ctx,
&mut state,
&[Op::run_payload_in_cgroup(
&SLEEP,
vec!["1".into()],
"def_cg",
)],
)
.unwrap_err();
let msg = format!("{err:#}");
assert!(
msg.contains("CgroupDef::workload"),
"dup error must name the def-source surface, got: {msg}"
);
assert!(
msg.contains("'def_cg'"),
"dup error must name the cgroup the live handle is in, got: {msg}"
);
assert_eq!(state.payload_handles.len(), 1);
apply_ops_test(
&ctx,
&mut state,
&[Op::kill_payload_in_cgroup("sleeper", "def_cg")],
)
.expect("teardown kill");
}
#[test]
fn render_cgroup_key_handles_empty_and_populated() {
assert_eq!(render_cgroup_key(""), "(no cgroup)");
assert_eq!(render_cgroup_key("cg_a"), "'cg_a'");
}
#[test]
fn execute_steps_with_early_validation_err_has_nothing_to_drain() {
use crate::cgroup::CgroupManager;
let cgroups = CgroupManager::new("/nonexistent");
let topo = mock_topo();
let ctx = crate::scenario::Ctx::builder(&cgroups, &topo).build();
let step = Step::new(vec![], HoldSpec::Frac(0.0));
let err = execute_steps_with(&ctx, vec![step], None).unwrap_err();
let msg = format!("{err:#}");
assert!(
msg.contains("hold validation") && msg.contains("Frac"),
"expected pre-ops validation err, got: {msg}"
);
}
#[test]
fn apply_ops_error_does_not_lose_live_payload_handles() {
use crate::test_support::{OutputFormat, Payload, PayloadKind};
static SLEEP: Payload = Payload {
name: "sleeper_drain",
kind: PayloadKind::Binary("/bin/sleep"),
output: OutputFormat::ExitCode,
default_args: &[],
default_checks: &[],
metrics: &[],
include_files: &[],
uses_parent_pgrp: false,
known_flags: None,
metric_bounds: None,
};
let mock = MockCgroupOps::new();
let topo = mock_topo();
let ctx = mock_ctx(&mock, &topo);
let mut state = StepState::empty(&ctx);
apply_ops_test(
&ctx,
&mut state,
&[Op::run_payload(&SLEEP, vec!["3600".into()])],
)
.expect("spawn");
assert_eq!(state.payload_handles.len(), 1);
let err =
apply_ops_test(&ctx, &mut state, &[Op::wait_payload("never_spawned")]).unwrap_err();
assert!(
format!("{err:#}").contains("no running payload named 'never_spawned'"),
"expected wait-unknown-name err",
);
drain_all_payload_handles(&mut state.payload_handles);
assert!(state.payload_handles.is_empty());
}
#[test]
fn remove_cgroup_rejects_backdrop_target() {
let mock = MockCgroupOps::new();
let topo = mock_topo();
let ctx = mock_ctx(&mock, &topo);
let mut step_state = StepState::empty(&ctx);
let mut backdrop_state = BackdropState::empty(&ctx);
backdrop_state
.cgroups
.add_cgroup_no_cpuset("bd_cg")
.expect("add backdrop cgroup");
{
let mut scenario = ScenarioState::new(&mut step_state, &mut backdrop_state);
let err = apply_ops(&ctx, &mut scenario, &[Op::remove_cgroup("bd_cg")]).unwrap_err();
let msg = format!("{err:#}");
assert!(
msg.contains("Backdrop-owned") && msg.contains("bd_cg"),
"error must name the backdrop cgroup and explain why, got: {msg}"
);
}
assert_eq!(backdrop_state.cgroups.names(), &["bd_cg".to_string()]);
let calls = mock.calls();
assert!(
!calls
.iter()
.any(|c| matches!(c, CgroupCall::RemoveCgroup(_))),
"pre-bail path must not invoke remove_cgroup, got: {calls:?}"
);
{
let mut scenario = ScenarioState::new(&mut step_state, &mut backdrop_state);
scenario
.with_target_backdrop(|s| apply_ops(&ctx, s, &[Op::remove_cgroup("bd_cg")]))
.expect("backdrop-pass remove is permitted");
}
let calls = mock.calls();
assert!(
calls
.iter()
.any(|c| matches!(c, CgroupCall::RemoveCgroup(n) if n == "bd_cg")),
"backdrop-pass remove must reach the cgroup ops, got: {calls:?}"
);
cleanup_state(&mut step_state);
}
#[test]
fn move_all_tasks_transfers_handle_ownership_step_to_backdrop() {
use crate::workload::{WorkSpec, WorkloadConfig, WorkloadHandle};
let mock = MockCgroupOps::new();
let topo = mock_topo();
let ctx = mock_ctx(&mock, &topo);
let mut step_state = StepState::empty(&ctx);
let mut backdrop_state = BackdropState::empty(&ctx);
backdrop_state
.cgroups
.add_cgroup_no_cpuset("bd_cg")
.unwrap();
step_state.cgroups.add_cgroup_no_cpuset("step_cg").unwrap();
let w = WorkSpec::default();
let wl = WorkloadConfig {
num_workers: 1,
affinity: crate::workload::AffinityIntent::Inherit,
work_type: w.work_type,
sched_policy: w.sched_policy,
mem_policy: w.mem_policy,
mpol_flags: w.mpol_flags,
nice: None,
clone_mode: Default::default(),
comm: None,
uid: None,
gid: None,
numa_node: None,
composed: Vec::new(),
};
let h = WorkloadHandle::spawn(&wl).expect("spawn worker");
step_state.handles.push(("step_cg".to_string(), h));
assert_eq!(step_state.handles.len(), 1);
assert_eq!(backdrop_state.handles.len(), 0);
{
let mut scenario = ScenarioState::new(&mut step_state, &mut backdrop_state);
apply_ops(
&ctx,
&mut scenario,
&[Op::move_all_tasks("step_cg", "bd_cg")],
)
.expect("move into backdrop");
}
assert_eq!(
step_state.handles.len(),
0,
"step-local handle must leave the step slot after transfer",
);
assert_eq!(
backdrop_state.handles.len(),
1,
"backdrop slot must receive the transferred handle",
);
assert_eq!(
backdrop_state.handles[0].0, "bd_cg",
"transferred handle must be re-keyed to `to`",
);
backdrop_state.handles.clear();
step_state.handles.clear();
}
#[test]
fn move_all_tasks_step_to_step_keeps_step_ownership() {
use crate::workload::{WorkSpec, WorkloadConfig, WorkloadHandle};
let mock = MockCgroupOps::new();
let topo = mock_topo();
let ctx = mock_ctx(&mock, &topo);
let mut step_state = StepState::empty(&ctx);
let mut backdrop_state = BackdropState::empty(&ctx);
step_state.cgroups.add_cgroup_no_cpuset("src").unwrap();
step_state.cgroups.add_cgroup_no_cpuset("dst").unwrap();
let w = WorkSpec::default();
let wl = WorkloadConfig {
num_workers: 1,
affinity: crate::workload::AffinityIntent::Inherit,
work_type: w.work_type,
sched_policy: w.sched_policy,
mem_policy: w.mem_policy,
mpol_flags: w.mpol_flags,
nice: None,
clone_mode: Default::default(),
comm: None,
uid: None,
gid: None,
numa_node: None,
composed: Vec::new(),
};
let h = WorkloadHandle::spawn(&wl).expect("spawn");
step_state.handles.push(("src".to_string(), h));
{
let mut scenario = ScenarioState::new(&mut step_state, &mut backdrop_state);
apply_ops(&ctx, &mut scenario, &[Op::move_all_tasks("src", "dst")])
.expect("step-to-step move");
}
assert_eq!(step_state.handles.len(), 1);
assert_eq!(step_state.handles[0].0, "dst");
assert_eq!(backdrop_state.handles.len(), 0);
step_state.handles.clear();
}
#[test]
fn move_all_tasks_backdrop_to_step_rejected() {
let mock = MockCgroupOps::new();
let topo = mock_topo();
let ctx = mock_ctx(&mock, &topo);
let mut step_state = StepState::empty(&ctx);
let mut backdrop_state = BackdropState::empty(&ctx);
backdrop_state.cgroups.add_cgroup_no_cpuset("bd").unwrap();
step_state.cgroups.add_cgroup_no_cpuset("step").unwrap();
let mut scenario = ScenarioState::new(&mut step_state, &mut backdrop_state);
let err = apply_ops(&ctx, &mut scenario, &[Op::move_all_tasks("bd", "step")]).unwrap_err();
let msg = format!("{err:#}");
assert!(
msg.contains("Backdrop-owned 'bd'") && msg.contains("step-local 'step'"),
"error must name both cgroups and the direction, got: {msg}"
);
let calls = mock.calls();
assert!(
!calls
.iter()
.any(|c| matches!(c, CgroupCall::MoveTasks(_, _))),
"pre-bail path must not invoke move_tasks, got: {calls:?}"
);
}
#[test]
fn run_scenario_rejects_scheduler_kind_backdrop_payload() {
use crate::cgroup::CgroupManager;
use crate::test_support::Payload;
let cgroups = CgroupManager::new("/nonexistent");
let topo = mock_topo();
let ctx = crate::scenario::Ctx::builder(&cgroups, &topo).build();
let backdrop =
crate::scenario::backdrop::Backdrop::new().with_payload(&Payload::KERNEL_DEFAULT);
let err = execute_scenario_with(
&ctx,
backdrop,
vec![Step::new(vec![], HoldSpec::Fixed(Duration::from_millis(1)))],
None,
)
.unwrap_err();
let msg = format!("{err:#}");
assert!(
msg.contains("scheduler-kind") && msg.contains("Backdrop"),
"error must name the kind mismatch and the Backdrop surface, got: {msg}"
);
}
#[test]
fn apply_setup_rejects_name_collision_with_backdrop() {
let mock = MockCgroupOps::new();
let topo = mock_topo();
let ctx = mock_ctx(&mock, &topo);
let mut step_state = StepState::empty(&ctx);
let mut backdrop_state = BackdropState::empty(&ctx);
backdrop_state
.cgroups
.add_cgroup_no_cpuset("shared")
.unwrap();
let defs = vec![CgroupDef::named("shared").workers(1)];
let mut scenario = ScenarioState::new(&mut step_state, &mut backdrop_state);
let err = apply_setup(&ctx, &mut scenario, &defs).unwrap_err();
let msg = format!("{err:#}");
assert!(
msg.contains("already tracked") && msg.contains("shared"),
"error must cite the collision and the offending name, got: {msg}"
);
cleanup_state(&mut step_state);
}
fn push_fake_payload_entry<'a>(
ctx: &'a Ctx<'a>,
state: &mut StepState<'a>,
payload: &'static crate::test_support::Payload,
cgroup: &str,
source: PayloadSource,
) {
let h = crate::scenario::payload_run::PayloadRun::new(ctx, payload)
.args(["3600".to_string()])
.spawn()
.expect("manual spawn (no cgroup placement)");
state.payload_handles.push(PayloadEntry {
cgroup: cgroup.to_string(),
source,
handle: h,
});
}
#[test]
fn apply_ops_run_duplicate_name_different_cgroups_allowed() {
use crate::test_support::{OutputFormat, Payload, PayloadKind};
static SLEEP: Payload = Payload {
name: "sleeper",
kind: PayloadKind::Binary("/bin/sleep"),
output: OutputFormat::ExitCode,
default_args: &[],
default_checks: &[],
metrics: &[],
include_files: &[],
uses_parent_pgrp: false,
known_flags: None,
metric_bounds: None,
};
let mock = MockCgroupOps::new();
let topo = mock_topo();
let ctx = mock_ctx(&mock, &topo);
let mut state = StepState::empty(&ctx);
push_fake_payload_entry(
&ctx,
&mut state,
&SLEEP,
"cg_a",
PayloadSource::OpRunPayload,
);
push_fake_payload_entry(
&ctx,
&mut state,
&SLEEP,
"cg_b",
PayloadSource::OpRunPayload,
);
let mut backdrop = BackdropState::empty(&ctx);
let scenario = ScenarioState::new(&mut state, &mut backdrop);
assert!(
scenario
.find_live_payload_with_cgroup("sleeper", "cg_c")
.is_none(),
"fresh (name, cgroup) pair must not collide with live entries in other cgroups",
);
assert!(
scenario
.find_live_payload_with_cgroup("sleeper", "cg_a")
.is_some(),
"same (name, cgroup) still matches — only the pair matters",
);
cleanup_state(&mut state);
}
#[test]
fn take_payload_by_composite_key_matches_exact_cgroup() {
use crate::test_support::{OutputFormat, Payload, PayloadKind};
static SLEEP: Payload = Payload {
name: "sleeper",
kind: PayloadKind::Binary("/bin/sleep"),
output: OutputFormat::ExitCode,
default_args: &[],
default_checks: &[],
metrics: &[],
include_files: &[],
uses_parent_pgrp: false,
known_flags: None,
metric_bounds: None,
};
let mock = MockCgroupOps::new();
let topo = mock_topo();
let ctx = mock_ctx(&mock, &topo);
let mut state = StepState::empty(&ctx);
push_fake_payload_entry(
&ctx,
&mut state,
&SLEEP,
"cg_a",
PayloadSource::OpRunPayload,
);
push_fake_payload_entry(
&ctx,
&mut state,
&SLEEP,
"cg_b",
PayloadSource::OpRunPayload,
);
let mut backdrop = BackdropState::empty(&ctx);
let mut scenario = ScenarioState::new(&mut state, &mut backdrop);
let taken = scenario
.take_payload_by_name("sleeper", Some("cg_a"))
.expect("composite lookup does not bail on ambiguity")
.expect("one entry matches");
assert_eq!(taken.cgroup, "cg_a");
assert_eq!(state.payload_handles.len(), 1);
assert_eq!(state.payload_handles[0].cgroup, "cg_b");
drain_all_payload_handles(&mut state.payload_handles);
let _ = taken.handle.kill();
}
#[test]
fn take_payload_by_bare_name_reports_ambiguous_cgroups() {
use crate::test_support::{OutputFormat, Payload, PayloadKind};
static SLEEP: Payload = Payload {
name: "sleeper",
kind: PayloadKind::Binary("/bin/sleep"),
output: OutputFormat::ExitCode,
default_args: &[],
default_checks: &[],
metrics: &[],
include_files: &[],
uses_parent_pgrp: false,
known_flags: None,
metric_bounds: None,
};
let mock = MockCgroupOps::new();
let topo = mock_topo();
let ctx = mock_ctx(&mock, &topo);
let mut state = StepState::empty(&ctx);
push_fake_payload_entry(
&ctx,
&mut state,
&SLEEP,
"cg_a",
PayloadSource::OpRunPayload,
);
push_fake_payload_entry(
&ctx,
&mut state,
&SLEEP,
"cg_b",
PayloadSource::OpRunPayload,
);
let mut backdrop = BackdropState::empty(&ctx);
let mut scenario = ScenarioState::new(&mut state, &mut backdrop);
let err = match scenario.take_payload_by_name("sleeper", None) {
Err(cgroups) => cgroups,
Ok(_) => panic!("bare lookup over multi-copy must surface ambiguity"),
};
assert_eq!(err.len(), 2);
assert!(err.contains(&"cg_a".to_string()) && err.contains(&"cg_b".to_string()));
assert_eq!(state.payload_handles.len(), 2);
drain_all_payload_handles(&mut state.payload_handles);
}
#[test]
fn take_payload_by_bare_name_succeeds_on_single_copy() {
use crate::test_support::{OutputFormat, Payload, PayloadKind};
static SLEEP: Payload = Payload {
name: "sleeper",
kind: PayloadKind::Binary("/bin/sleep"),
output: OutputFormat::ExitCode,
default_args: &[],
default_checks: &[],
metrics: &[],
include_files: &[],
uses_parent_pgrp: false,
known_flags: None,
metric_bounds: None,
};
let mock = MockCgroupOps::new();
let topo = mock_topo();
let ctx = mock_ctx(&mock, &topo);
let mut state = StepState::empty(&ctx);
push_fake_payload_entry(
&ctx,
&mut state,
&SLEEP,
"cg_a",
PayloadSource::OpRunPayload,
);
let mut backdrop = BackdropState::empty(&ctx);
let mut scenario = ScenarioState::new(&mut state, &mut backdrop);
let taken = scenario
.take_payload_by_name("sleeper", None)
.expect("single-copy bare lookup returns Ok")
.expect("one entry matches");
assert_eq!(taken.cgroup, "cg_a");
assert!(state.payload_handles.is_empty());
let _ = taken.handle.kill();
}
#[test]
fn apply_ops_bare_wait_and_kill_ambiguity_hint_names_full_constructor() {
use crate::test_support::{OutputFormat, Payload, PayloadKind};
static SLEEP: Payload = Payload {
name: "sleeper",
kind: PayloadKind::Binary("/bin/sleep"),
output: OutputFormat::ExitCode,
default_args: &[],
default_checks: &[],
metrics: &[],
include_files: &[],
uses_parent_pgrp: false,
known_flags: None,
metric_bounds: None,
};
let mock = MockCgroupOps::new();
let topo = mock_topo();
let ctx = mock_ctx(&mock, &topo);
let mut state = StepState::empty(&ctx);
push_fake_payload_entry(
&ctx,
&mut state,
&SLEEP,
"cg_a",
PayloadSource::OpRunPayload,
);
push_fake_payload_entry(
&ctx,
&mut state,
&SLEEP,
"cg_b",
PayloadSource::OpRunPayload,
);
let err = apply_ops_test(&ctx, &mut state, &[Op::wait_payload("sleeper")]).unwrap_err();
let msg = format!("{err:#}");
assert!(
msg.contains("ambiguous"),
"wait ambiguity message must flag ambiguity, got: {msg}"
);
assert!(
msg.contains("Op::wait_payload_in_cgroup(name, cgroup)"),
"wait ambiguity hint must name the full snake_case constructor \
so a copy-paste into source compiles, got: {msg}"
);
drain_all_payload_handles(&mut state.payload_handles);
let mut state = StepState::empty(&ctx);
push_fake_payload_entry(
&ctx,
&mut state,
&SLEEP,
"cg_a",
PayloadSource::OpRunPayload,
);
push_fake_payload_entry(
&ctx,
&mut state,
&SLEEP,
"cg_b",
PayloadSource::OpRunPayload,
);
let err = apply_ops_test(&ctx, &mut state, &[Op::kill_payload("sleeper")]).unwrap_err();
let msg = format!("{err:#}");
assert!(
msg.contains("Op::kill_payload_in_cgroup(name, cgroup)"),
"kill ambiguity hint must name the full snake_case constructor, got: {msg}"
);
drain_all_payload_handles(&mut state.payload_handles);
}
#[test]
fn apply_ops_not_found_message_uses_gerund_verb() {
let mock = MockCgroupOps::new();
let topo = mock_topo();
let ctx = mock_ctx(&mock, &topo);
let mut state = StepState::empty(&ctx);
let err = apply_ops_test(&ctx, &mut state, &[Op::wait_payload("ghost")]).unwrap_err();
let msg = format!("{err:#}");
assert!(
msg.contains("before waiting"),
"wait not-found message must say 'before waiting', got: {msg}"
);
assert!(
!msg.contains("before waitpayload"),
"must not collapse 'wait payload' into 'waitpayload', got: {msg}"
);
let err = apply_ops_test(&ctx, &mut state, &[Op::kill_payload("ghost")]).unwrap_err();
let msg = format!("{err:#}");
assert!(
msg.contains("before killing"),
"kill not-found message must say 'before killing', got: {msg}"
);
}
#[test]
fn remove_cgroup_does_not_forget_name_in_cgroupgroup_but_drop_is_safe() {
let mock = MockCgroupOps::new();
let topo = mock_topo();
let ctx = mock_ctx(&mock, &topo);
let mut state = StepState::empty(&ctx);
apply_ops_test(
&ctx,
&mut state,
&[Op::add_cgroup("cg_keep"), Op::add_cgroup("cg_drop")],
)
.unwrap();
apply_ops_test(&ctx, &mut state, &[Op::remove_cgroup("cg_drop")]).unwrap();
assert_eq!(
state.cgroups.names(),
&["cg_keep".to_string(), "cg_drop".to_string()],
"Op::RemoveCgroup must not mutate CgroupGroup::names (current \
invariant); Drop is the single rmdir dispatcher",
);
drop(state);
let calls = mock.calls();
let drops: Vec<&CgroupCall> = calls
.iter()
.filter(|c| matches!(c, CgroupCall::RemoveCgroup(n) if n == "cg_drop"))
.collect();
assert_eq!(
drops.len(),
2,
"expected Op::RemoveCgroup + Drop to both hit the mock for cg_drop: {calls:?}",
);
}
#[test]
fn op_add_cgroup_step_local_rejects_collision_with_backdrop() {
let mock = MockCgroupOps::new();
let topo = mock_topo();
let ctx = mock_ctx(&mock, &topo);
let mut step_state = StepState::empty(&ctx);
let mut backdrop_state = BackdropState::empty(&ctx);
backdrop_state
.cgroups
.add_cgroup_no_cpuset("shared")
.expect("add backdrop cgroup");
let mut scenario = ScenarioState::new(&mut step_state, &mut backdrop_state);
let err = apply_ops(&ctx, &mut scenario, &[Op::add_cgroup("shared")]).expect_err(
"apply_ops must reject a step-local AddCgroup whose \
name already lives in the Backdrop",
);
let msg = format!("{err:?}");
assert!(
msg.contains("'shared'") && msg.contains("collides"),
"error must name the colliding cgroup and explain the collision; got: {msg}",
);
assert!(
step_state.cgroups.names().iter().all(|n| n != "shared"),
"step-local names must not contain the rejected name; got: {:?}",
step_state.cgroups.names(),
);
assert!(
backdrop_state.cgroups.names().iter().any(|n| n == "shared"),
"backdrop copy must survive the rejected op",
);
}
#[test]
fn op_add_cgroup_duplicate_in_same_step_is_rejected() {
let mock = MockCgroupOps::new();
let topo = mock_topo();
let ctx = mock_ctx(&mock, &topo);
let mut state = StepState::empty(&ctx);
let err = apply_ops_test(
&ctx,
&mut state,
&[Op::add_cgroup("cg_dup"), Op::add_cgroup("cg_dup")],
)
.expect_err("second AddCgroup must fail against the same step-local name");
let msg = format!("{err:?}");
assert!(
msg.contains("'cg_dup'") && msg.contains("collides"),
"error must name the colliding cgroup and explain the collision; got: {msg}",
);
let names = state.cgroups.names();
assert_eq!(
names.iter().filter(|n| n.as_str() == "cg_dup").count(),
1,
"the first op must register the name exactly once; the second op \
must not push a shadow entry; got: {names:?}",
);
}
#[test]
fn move_all_tasks_renames_every_handle_keyed_under_from() {
use crate::workload::{AffinityIntent, WorkType, WorkloadConfig, WorkloadHandle};
let mock = MockCgroupOps::new();
let topo = mock_topo();
let ctx = mock_ctx(&mock, &topo);
let mut step_state = StepState::empty(&ctx);
let mut backdrop_state = BackdropState::empty(&ctx);
step_state.cgroups.add_cgroup_no_cpuset("src").unwrap();
step_state.cgroups.add_cgroup_no_cpuset("dst").unwrap();
for _ in 0..3 {
let wl = WorkloadConfig {
num_workers: 1,
affinity: AffinityIntent::Inherit,
work_type: WorkType::SpinWait,
..Default::default()
};
let h = WorkloadHandle::spawn(&wl).expect("spawn worker");
step_state.handles.push(("src".to_string(), h));
}
assert_eq!(step_state.handles.len(), 3);
{
let mut scenario = ScenarioState::new(&mut step_state, &mut backdrop_state);
apply_ops(&ctx, &mut scenario, &[Op::move_all_tasks("src", "dst")]).expect("move");
}
assert_eq!(step_state.handles.len(), 3, "no handles lost");
assert!(
step_state.handles.iter().all(|(name, _)| name == "dst"),
"every handle must be re-keyed to 'dst': {:?}",
step_state
.handles
.iter()
.map(|(n, _)| n.as_str())
.collect::<Vec<_>>(),
);
step_state.handles.clear();
}
#[test]
fn per_step_teardown_removes_step_local_cgroups_in_reverse_order() {
let mock = MockCgroupOps::new();
let topo = mock_topo();
let ctx = mock_ctx(&mock, &topo);
let mut state = StepState::empty(&ctx);
apply_ops_test(
&ctx,
&mut state,
&[
Op::add_cgroup("cg_a"),
Op::add_cgroup("cg_a/sub"),
Op::add_cgroup("cg_b"),
],
)
.unwrap();
drop(state);
let calls = mock.calls();
let removes: Vec<&str> = calls
.iter()
.filter_map(|c| match c {
CgroupCall::RemoveCgroup(n) => Some(n.as_str()),
_ => None,
})
.collect();
assert_eq!(
removes,
vec!["cg_b", "cg_a/sub", "cg_a"],
"per-step teardown must rmdir in reverse addition order so a \
child cgroup's directory is gone before its parent's rmdir \
runs",
);
}
#[test]
fn build_stimulus_saturates_step_idx_at_u16_max() {
let mock = MockCgroupOps::new();
let topo = mock_topo();
let ctx = mock_ctx(&mock, &topo);
let mut step_state = StepState::empty(&ctx);
let mut backdrop_state = BackdropState::empty(&ctx);
let scenario = ScenarioState::new(&mut step_state, &mut backdrop_state);
let start = std::time::Instant::now();
let zero = build_stimulus(&start, 0, &[], &scenario);
assert_eq!(zero.step_index, 0, "step_idx=0 passes through");
let max = build_stimulus(&start, u16::MAX as usize, &[], &scenario);
assert_eq!(
max.step_index,
u16::MAX,
"step_idx=u16::MAX passes through unchanged (no saturation)",
);
let overflow = build_stimulus(&start, u16::MAX as usize + 1, &[], &scenario);
assert_eq!(
overflow.step_index,
u16::MAX,
"step_idx beyond u16::MAX must saturate to u16::MAX, not wrap",
);
let far = build_stimulus(&start, u32::MAX as usize, &[], &scenario);
assert_eq!(
far.step_index,
u16::MAX,
"far-overflow step_idx must saturate to u16::MAX",
);
}
#[test]
fn build_stimulus_warns_on_step_idx_saturation() {
use std::sync::{Arc, Mutex};
use tracing::field::{Field, Visit};
use tracing::span::{Attributes, Id, Record};
use tracing::{Event, Subscriber};
use tracing::{Level, Metadata};
#[derive(Default)]
struct CaptureSubscriber {
events: Arc<Mutex<Vec<(Level, String)>>>,
}
struct MessageVisitor<'a>(&'a mut String);
impl<'a> Visit for MessageVisitor<'a> {
fn record_debug(&mut self, _field: &Field, value: &dyn std::fmt::Debug) {
use std::fmt::Write;
let _ = write!(self.0, "{value:?} ");
}
fn record_str(&mut self, _field: &Field, value: &str) {
use std::fmt::Write;
let _ = write!(self.0, "{value} ");
}
}
impl Subscriber for CaptureSubscriber {
fn enabled(&self, _: &Metadata<'_>) -> bool {
true
}
fn new_span(&self, _: &Attributes<'_>) -> Id {
Id::from_u64(1)
}
fn record(&self, _: &Id, _: &Record<'_>) {}
fn record_follows_from(&self, _: &Id, _: &Id) {}
fn event(&self, event: &Event<'_>) {
let mut msg = String::new();
event.record(&mut MessageVisitor(&mut msg));
self.events
.lock()
.unwrap()
.push((*event.metadata().level(), msg));
}
fn enter(&self, _: &Id) {}
fn exit(&self, _: &Id) {}
}
let events: Arc<Mutex<Vec<(Level, String)>>> = Arc::new(Mutex::new(Vec::new()));
let sub = CaptureSubscriber {
events: events.clone(),
};
tracing::subscriber::with_default(sub, || {
let mock = MockCgroupOps::new();
let topo = mock_topo();
let ctx = mock_ctx(&mock, &topo);
let mut step_state = StepState::empty(&ctx);
let mut backdrop_state = BackdropState::empty(&ctx);
let scenario = ScenarioState::new(&mut step_state, &mut backdrop_state);
let start = std::time::Instant::now();
let _ = build_stimulus(&start, 0, &[], &scenario);
let _ = build_stimulus(&start, u16::MAX as usize + 1, &[], &scenario);
});
let captured = events.lock().unwrap();
let warn_hits: Vec<&String> = captured
.iter()
.filter(|(lvl, _)| *lvl == Level::WARN)
.map(|(_, msg)| msg)
.collect();
assert!(
warn_hits
.iter()
.any(|m| m.contains("step_index")
&& m.contains("StimulusPayload field overflowed u16")),
"saturation must emit a tracing::warn naming step_index; got warns: {warn_hits:?}",
);
assert_eq!(
warn_hits.len(),
1,
"exactly one saturation warn expected; got: {warn_hits:?}",
);
}
static CONSTRUCTOR_TEST_PAYLOAD: crate::test_support::Payload =
crate::test_support::Payload::binary("constructor-test", "/bin/true");
#[test]
fn op_constructor_coverage_is_exhaustive() {
let w = WorkSpec::default();
let constructed: Vec<Op> = vec![
Op::add_cgroup("a"),
Op::remove_cgroup("a"),
Op::set_cpuset("a", CpusetSpec::Llc(0)),
Op::clear_cpuset("a"),
Op::swap_cpusets("a", "b"),
Op::spawn("a", w.clone()),
Op::stop_cgroup("a"),
Op::set_affinity("a", AffinityIntent::Inherit),
Op::spawn_host(w.clone()),
Op::move_all_tasks("a", "b"),
Op::run_payload(&CONSTRUCTOR_TEST_PAYLOAD, Vec::new()),
Op::run_payload_in_cgroup(&CONSTRUCTOR_TEST_PAYLOAD, Vec::new(), "a"),
Op::wait_payload("constructor-test"),
Op::wait_payload_in_cgroup("constructor-test", "a"),
Op::kill_payload("constructor-test"),
Op::kill_payload_in_cgroup("constructor-test", "a"),
Op::freeze_cgroup("a"),
Op::unfreeze_cgroup("a"),
Op::snapshot("constructor-test"),
Op::watch_snapshot("kernel.constructor_test"),
];
let mut seen = [false; 17];
for op in &constructed {
let idx = match op {
Op::AddCgroup { .. } => 0,
Op::RemoveCgroup { .. } => 1,
Op::SetCpuset { .. } => 2,
Op::ClearCpuset { .. } => 3,
Op::SwapCpusets { .. } => 4,
Op::Spawn { .. } => 5,
Op::StopCgroup { .. } => 6,
Op::SetAffinity { .. } => 7,
Op::SpawnHost { .. } => 8,
Op::MoveAllTasks { .. } => 9,
Op::RunPayload { .. } => 10,
Op::WaitPayload { .. } => 11,
Op::KillPayload { .. } => 12,
Op::FreezeCgroup { .. } => 13,
Op::UnfreezeCgroup { .. } => 14,
Op::Snapshot { .. } => 15,
Op::WatchSnapshot { .. } => 16,
};
seen[idx] = true;
}
let missing: Vec<usize> = seen
.iter()
.enumerate()
.filter(|(_, hit)| !**hit)
.map(|(i, _)| i)
.collect();
assert!(
missing.is_empty(),
"Op variant discriminants with no constructor coverage: {missing:?}. \
Every Op variant must have a public constructor under impl Op per the \
non_exhaustive convention documented on the enum.",
);
}
#[test]
fn cpuset_spec_constructor_coverage_is_exhaustive() {
let constructed = [
CpusetSpec::llc(0),
CpusetSpec::numa(0),
CpusetSpec::range(0.0, 1.0),
CpusetSpec::disjoint(0, 2),
CpusetSpec::overlap(0, 2, 0.25),
CpusetSpec::exact([0usize]),
];
let mut seen = [false; 6];
for spec in &constructed {
let idx = match spec {
CpusetSpec::Llc(_) => 0,
CpusetSpec::Numa(_) => 1,
CpusetSpec::Range { .. } => 2,
CpusetSpec::Disjoint { .. } => 3,
CpusetSpec::Overlap { .. } => 4,
CpusetSpec::Exact(_) => 5,
};
seen[idx] = true;
}
assert!(
seen.iter().all(|s| *s),
"every CpusetSpec variant must have a matching constructor, seen={seen:?}",
);
}
#[test]
fn cgroup_def_cpu_quota_pct_uses_100ms_period_and_correct_quota() {
let def = CgroupDef::named("cg_a").cpu_quota_pct(50);
let cpu = def.cpu.expect("cpu_quota_pct must populate `cpu`");
assert_eq!(cpu.max_quota_us, Some(50_000));
assert_eq!(cpu.max_period_us, 100_000);
assert!(cpu.weight.is_none(), "weight must remain unset");
}
#[test]
fn cgroup_def_cpu_quota_accepts_explicit_durations() {
let def = CgroupDef::named("cg_a")
.cpu_quota(Duration::from_micros(7_500), Duration::from_millis(10));
let cpu = def.cpu.unwrap();
assert_eq!(cpu.max_quota_us, Some(7_500));
assert_eq!(cpu.max_period_us, 10_000);
}
#[test]
fn cgroup_def_cpu_unlimited_clears_quota_keeps_weight() {
let def = CgroupDef::named("cg_a")
.cpu_quota_pct(80)
.cpu_weight(200)
.cpu_unlimited();
let cpu = def.cpu.unwrap();
assert!(cpu.max_quota_us.is_none());
assert_eq!(cpu.max_period_us, 100_000);
assert_eq!(cpu.weight, Some(200));
}
#[test]
fn cgroup_def_memory_builders_compose() {
let def = CgroupDef::named("cg_a")
.memory_max(1_000_000)
.memory_high(800_000)
.memory_low(400_000);
let m = def.memory.unwrap();
assert_eq!(m.max, Some(1_000_000));
assert_eq!(m.high, Some(800_000));
assert_eq!(m.low, Some(400_000));
}
#[test]
fn cgroup_def_memory_unlimited_clears_all_three() {
let def = CgroupDef::named("cg_a")
.memory_max(1_000_000)
.memory_high(800_000)
.memory_low(400_000)
.memory_unlimited();
let m = def.memory.unwrap();
assert!(m.max.is_none());
assert!(m.high.is_none());
assert!(m.low.is_none());
}
#[test]
fn cgroup_def_io_weight_populates() {
let def = CgroupDef::named("cg_a").io_weight(750);
assert_eq!(def.io.unwrap().weight, Some(750));
}
#[test]
fn cgroup_def_with_cpuset_mems_populates_independent_field() {
let nodes: BTreeSet<usize> = [0usize, 1].into_iter().collect();
let def = CgroupDef::named("cg_a").with_cpuset_mems(nodes.clone());
assert_eq!(def.cpuset_mems, Some(nodes));
assert!(def.cpuset.is_none());
}
#[test]
fn apply_setup_records_set_cpu_max_for_cpu_quota_pct_builder() {
let mock = MockCgroupOps::new();
let topo = mock_topo();
let ctx = mock_ctx(&mock, &topo);
let mut state = StepState::empty(&ctx);
let defs = vec![CgroupDef::named("cg_cap").cpu_quota_pct(75)];
apply_setup_test(&ctx, &mut state, &defs).expect("apply_setup must succeed");
let calls = mock.calls();
assert!(
calls.contains(&CgroupCall::SetCpuMax(
"cg_cap".to_string(),
Some(75_000),
100_000,
)),
"expected SetCpuMax(cg_cap, Some(75000), 100000); got {calls:?}",
);
cleanup_state(&mut state);
}
#[test]
fn apply_setup_records_three_memory_writes_when_memory_field_set() {
let mock = MockCgroupOps::new();
let topo = mock_topo();
let ctx = mock_ctx(&mock, &topo);
let mut state = StepState::empty(&ctx);
let defs = vec![CgroupDef::named("cg_mem").memory_max(1_000_000)];
apply_setup_test(&ctx, &mut state, &defs).expect("apply_setup must succeed");
let calls = mock.calls();
let max_idx = calls
.iter()
.position(|c| matches!(c, CgroupCall::SetMemoryMax(n, _) if n == "cg_mem"))
.expect("SetMemoryMax must fire");
let high_idx = calls
.iter()
.position(|c| matches!(c, CgroupCall::SetMemoryHigh(n, _) if n == "cg_mem"))
.expect("SetMemoryHigh must fire");
let low_idx = calls
.iter()
.position(|c| matches!(c, CgroupCall::SetMemoryLow(n, _) if n == "cg_mem"))
.expect("SetMemoryLow must fire");
assert!(
max_idx < high_idx && high_idx < low_idx,
"memory writes must land in (max, high, low) order; got max={max_idx} high={high_idx} low={low_idx}",
);
assert!(calls.contains(&CgroupCall::SetMemoryMax(
"cg_mem".to_string(),
Some(1_000_000)
)),);
assert!(calls.contains(&CgroupCall::SetMemoryHigh("cg_mem".to_string(), None)));
assert!(calls.contains(&CgroupCall::SetMemoryLow("cg_mem".to_string(), None)));
cleanup_state(&mut state);
}
#[test]
fn apply_setup_resource_writes_land_before_move_tasks() {
let mock = MockCgroupOps::new();
let topo = mock_topo();
let ctx = mock_ctx(&mock, &topo);
let mut state = StepState::empty(&ctx);
let mems: BTreeSet<usize> = [0usize].into_iter().collect();
let defs = vec![
CgroupDef::named("cg_full")
.with_cpuset_mems(mems)
.cpu_quota_pct(40)
.cpu_weight(200)
.memory_max(2_000_000)
.io_weight(150),
];
apply_setup_test(&ctx, &mut state, &defs).expect("apply_setup must succeed");
let calls = mock.calls();
let move_idx = calls
.iter()
.position(|c| matches!(c, CgroupCall::MoveTasks(n, _) if n == "cg_full"));
let kinds: Vec<usize> = calls
.iter()
.enumerate()
.filter_map(|(i, c)| match c {
CgroupCall::SetCpusetMems(n, _) if n == "cg_full" => Some(i),
CgroupCall::SetCpuMax(n, _, _) if n == "cg_full" => Some(i),
CgroupCall::SetCpuWeight(n, _) if n == "cg_full" => Some(i),
CgroupCall::SetMemoryMax(n, _) if n == "cg_full" => Some(i),
CgroupCall::SetMemoryHigh(n, _) if n == "cg_full" => Some(i),
CgroupCall::SetMemoryLow(n, _) if n == "cg_full" => Some(i),
CgroupCall::SetIoWeight(n, _) if n == "cg_full" => Some(i),
_ => None,
})
.collect();
assert!(
kinds.len() >= 7,
"expected at least 7 resource writes (mems + cpu.max + cpu.weight + 3 memory + io.weight); got {} ({calls:?})",
kinds.len(),
);
if let Some(mi) = move_idx {
assert!(
kinds.iter().all(|k| *k < mi),
"every resource write must precede MoveTasks; kinds={kinds:?} move_idx={mi}",
);
}
cleanup_state(&mut state);
}
#[test]
fn apply_setup_rejects_cpu_weight_out_of_range() {
let mock = MockCgroupOps::new();
let topo = mock_topo();
let ctx = mock_ctx(&mock, &topo);
let mut state = StepState::empty(&ctx);
let defs = vec![CgroupDef::named("cg_bad").cpu_weight(0)];
let err = apply_setup_test(&ctx, &mut state, &defs)
.expect_err("apply_setup must reject weight=0");
let msg = format!("{err:#}");
assert!(
msg.contains("cg_bad") && msg.contains("cpu.weight"),
"error must name cgroup and field; got: {msg}",
);
cleanup_state(&mut state);
}
#[test]
fn apply_setup_rejects_cpu_max_period_zero() {
let mock = MockCgroupOps::new();
let topo = mock_topo();
let ctx = mock_ctx(&mock, &topo);
let mut state = StepState::empty(&ctx);
let defs =
vec![CgroupDef::named("cg_bad").cpu_quota(Duration::from_millis(50), Duration::ZERO)];
let err = apply_setup_test(&ctx, &mut state, &defs)
.expect_err("apply_setup must reject period=0");
let msg = format!("{err:#}");
assert!(
msg.contains("cg_bad") && msg.contains("period"),
"error must name cgroup and period; got: {msg}",
);
cleanup_state(&mut state);
}
#[test]
fn cgroup_def_memory_swap_max_builder_round_trip() {
let d = CgroupDef::named("cg_a").memory_swap_max(2 * 1024 * 1024);
assert_eq!(d.memory.as_ref().unwrap().swap_max, Some(2 * 1024 * 1024));
let d = d.memory_swap_unlimited();
assert_eq!(d.memory.as_ref().unwrap().swap_max, None);
}
#[test]
fn cgroup_def_memory_swap_unlimited_on_fresh_def_is_noop() {
let d = CgroupDef::named("cg_a").memory_swap_unlimited();
assert!(
d.memory.is_none(),
"memory_swap_unlimited() on a fresh CgroupDef must leave \
self.memory == None; got: {:?}",
d.memory,
);
}
#[test]
fn cgroup_def_memory_unlimited_then_swap_unlimited_is_idempotent() {
let d = CgroupDef::named("cg_a")
.memory_unlimited()
.memory_swap_unlimited();
let m = d.memory.expect("memory_unlimited installs Some(default)");
assert!(m.max.is_none());
assert!(m.high.is_none());
assert!(m.low.is_none());
assert!(m.swap_max.is_none());
}
#[test]
fn apply_setup_memory_swap_unlimited_on_fresh_def_emits_no_memory_writes() {
let mock = MockCgroupOps::new();
let topo = mock_topo();
let ctx = mock_ctx(&mock, &topo);
let mut state = StepState::empty(&ctx);
let defs = vec![CgroupDef::named("cg_swap_clear").memory_swap_unlimited()];
apply_setup_test(&ctx, &mut state, &defs).expect("apply_setup must succeed");
let calls = mock.calls();
assert!(
!calls.iter().any(|c| matches!(
c,
CgroupCall::SetMemoryMax(_, _)
| CgroupCall::SetMemoryHigh(_, _)
| CgroupCall::SetMemoryLow(_, _)
| CgroupCall::SetMemorySwapMax(_, _)
)),
"memory_swap_unlimited() on a fresh CgroupDef must emit zero memory writes; got: {calls:?}"
);
cleanup_state(&mut state);
}
#[test]
fn cgroup_def_pids_max_builder_round_trip() {
let d = CgroupDef::named("cg_a").pids_max(1024);
assert_eq!(d.pids.as_ref().unwrap().max, Some(1024));
let d = d.pids_unlimited();
assert_eq!(d.pids.as_ref().unwrap().max, None);
}
#[test]
fn apply_setup_records_set_memory_swap_max() {
let mock = MockCgroupOps::new();
let topo = mock_topo();
let ctx = mock_ctx(&mock, &topo);
let mut state = StepState::empty(&ctx);
let defs = vec![CgroupDef::named("cg_swap").memory_swap_max(4 * 1024 * 1024)];
apply_setup_test(&ctx, &mut state, &defs).expect("apply_setup must succeed");
let calls = mock.calls();
assert!(
calls.contains(&CgroupCall::SetMemorySwapMax(
"cg_swap".to_string(),
Some(4 * 1024 * 1024),
)),
"swap_max with bytes must record SetMemorySwapMax(Some(N)), got: {calls:?}",
);
cleanup_state(&mut state);
let mock = MockCgroupOps::new();
let ctx = mock_ctx(&mock, &topo);
let mut state = StepState::empty(&ctx);
let defs = vec![CgroupDef::named("cg_nosw").memory_max(1_000_000)];
apply_setup_test(&ctx, &mut state, &defs).expect("apply_setup must succeed");
let calls = mock.calls();
assert!(
!calls.iter().any(|c| matches!(
c,
CgroupCall::SetMemorySwapMax(n, _) if n == "cg_nosw",
)),
"memory_max-only must NOT record SetMemorySwapMax (would \
ENOENT on CONFIG_SWAP=n kernels); got: {calls:?}",
);
let max_idx = calls
.iter()
.position(|c| matches!(c, CgroupCall::SetMemoryMax(n, _) if n == "cg_nosw"))
.expect("SetMemoryMax must fire");
let high_idx = calls
.iter()
.position(|c| matches!(c, CgroupCall::SetMemoryHigh(n, _) if n == "cg_nosw"))
.expect("SetMemoryHigh must fire");
let low_idx = calls
.iter()
.position(|c| matches!(c, CgroupCall::SetMemoryLow(n, _) if n == "cg_nosw"))
.expect("SetMemoryLow must fire");
assert!(
max_idx < high_idx && high_idx < low_idx,
"memory writes must land in (max, high, low) order; \
got max={max_idx} high={high_idx} low={low_idx}",
);
cleanup_state(&mut state);
}
#[test]
fn apply_setup_orders_memory_swap_max_after_low() {
let mock = MockCgroupOps::new();
let topo = mock_topo();
let ctx = mock_ctx(&mock, &topo);
let mut state = StepState::empty(&ctx);
let defs = vec![
CgroupDef::named("cg_full_mem")
.memory_max(2_000_000)
.memory_high(1_500_000)
.memory_low(500_000)
.memory_swap_max(8 * 1024 * 1024),
];
apply_setup_test(&ctx, &mut state, &defs).expect("apply_setup must succeed");
let calls = mock.calls();
let max_idx = calls
.iter()
.position(|c| matches!(c, CgroupCall::SetMemoryMax(n, _) if n == "cg_full_mem"))
.expect("SetMemoryMax must fire");
let high_idx = calls
.iter()
.position(|c| matches!(c, CgroupCall::SetMemoryHigh(n, _) if n == "cg_full_mem"))
.expect("SetMemoryHigh must fire");
let low_idx = calls
.iter()
.position(|c| matches!(c, CgroupCall::SetMemoryLow(n, _) if n == "cg_full_mem"))
.expect("SetMemoryLow must fire");
let swap_idx = calls
.iter()
.position(|c| matches!(c, CgroupCall::SetMemorySwapMax(n, _) if n == "cg_full_mem"))
.expect("SetMemorySwapMax must fire when swap_max is opted in");
assert!(
max_idx < high_idx && high_idx < low_idx && low_idx < swap_idx,
"memory writes must land in (max, high, low, swap_max) order; \
got max={max_idx} high={high_idx} low={low_idx} swap={swap_idx}",
);
cleanup_state(&mut state);
}
#[test]
fn apply_setup_records_set_pids_max_only_when_set() {
let mock = MockCgroupOps::new();
let topo = mock_topo();
let ctx = mock_ctx(&mock, &topo);
let mut state = StepState::empty(&ctx);
let defs = vec![CgroupDef::named("cg_pids").pids_max(512)];
apply_setup_test(&ctx, &mut state, &defs).expect("apply_setup must succeed");
let calls = mock.calls();
assert!(
calls.contains(&CgroupCall::SetPidsMax("cg_pids".to_string(), Some(512))),
"pids_max(N) must record SetPidsMax(Some(N)), got: {calls:?}",
);
cleanup_state(&mut state);
let mock = MockCgroupOps::new();
let ctx = mock_ctx(&mock, &topo);
let mut state = StepState::empty(&ctx);
let defs = vec![CgroupDef::named("cg_nopids").memory_max(1_000_000)];
apply_setup_test(&ctx, &mut state, &defs).expect("apply_setup must succeed");
let calls = mock.calls();
assert!(
!calls
.iter()
.any(|c| matches!(c, CgroupCall::SetPidsMax(_, _))),
"no SetPidsMax expected when pids field is None, got: {calls:?}",
);
cleanup_state(&mut state);
}
#[test]
fn apply_setup_rejects_pids_max_zero() {
let mock = MockCgroupOps::new();
let topo = mock_topo();
let ctx = mock_ctx(&mock, &topo);
let mut state = StepState::empty(&ctx);
let defs = vec![CgroupDef::named("cg_zero").pids_max(0)];
let err = apply_setup_test(&ctx, &mut state, &defs)
.expect_err("apply_setup must reject pids_max(0)");
let msg = format!("{err:#}");
assert!(
msg.contains("cg_zero") && msg.contains("pids.max"),
"error must name cgroup and pids.max; got: {msg}",
);
assert!(
msg.contains("must be > 0"),
"error must spell out the constraint; got: {msg}",
);
assert!(
msg.contains("pids_unlimited"),
"error must name the escape hatch (pids_unlimited()); got: {msg}",
);
cleanup_state(&mut state);
}
#[test]
fn apply_ops_freeze_undefined_cgroup_dispatches_set_freeze() {
let mock = MockCgroupOps::new();
let topo = mock_topo();
let ctx = mock_ctx(&mock, &topo);
let mut state = StepState::empty(&ctx);
apply_ops_test(&ctx, &mut state, &[Op::freeze_cgroup("ghost_cg")])
.expect("apply_ops must dispatch FreezeCgroup even for an undeclared name");
let calls = mock.calls();
assert!(
calls.contains(&CgroupCall::SetFreeze("ghost_cg".to_string(), true)),
"FreezeCgroup must reach set_freeze regardless of declaration state, got: {calls:?}"
);
}
#[test]
fn apply_ops_freeze_propagates_set_freeze_error_with_context() {
let mock = MockCgroupOps::new();
mock.fail_call_at(0, "kernel ENOENT — cgroup directory does not exist");
let topo = mock_topo();
let ctx = mock_ctx(&mock, &topo);
let mut state = StepState::empty(&ctx);
let err = apply_ops_test(&ctx, &mut state, &[Op::freeze_cgroup("ghost_cg")])
.expect_err("set_freeze failure must surface as Err");
let msg = format!("{err:#}");
assert!(
msg.contains("Op::FreezeCgroup") && msg.contains("ghost_cg"),
"error must name the op and the cgroup, got: {msg}"
);
assert!(
msg.contains("ENOENT"),
"error must propagate the underlying cause, got: {msg}"
);
}
#[test]
fn apply_ops_freeze_and_unfreeze_record_set_freeze() {
let mock = MockCgroupOps::new();
let topo = mock_topo();
let ctx = mock_ctx(&mock, &topo);
let mut state = StepState::empty(&ctx);
apply_ops_test(
&ctx,
&mut state,
&[Op::freeze_cgroup("cg_x"), Op::unfreeze_cgroup("cg_x")],
)
.expect("freeze/unfreeze ops must succeed");
let calls = mock.calls();
assert!(
calls.contains(&CgroupCall::SetFreeze("cg_x".to_string(), true)),
"FreezeCgroup must dispatch SetFreeze(true), got: {calls:?}",
);
assert!(
calls.contains(&CgroupCall::SetFreeze("cg_x".to_string(), false)),
"UnfreezeCgroup must dispatch SetFreeze(false), got: {calls:?}",
);
let true_idx = calls
.iter()
.position(|c| matches!(c, CgroupCall::SetFreeze(_, true)))
.expect("found freeze");
let false_idx = calls
.iter()
.position(|c| matches!(c, CgroupCall::SetFreeze(_, false)))
.expect("found unfreeze");
assert!(
true_idx < false_idx,
"freeze (true) must come before unfreeze (false): {calls:?}",
);
cleanup_state(&mut state);
}
#[test]
fn apply_setup_rejects_io_weight_out_of_range() {
for (weight, label) in [(0u16, "zero"), (10_001u16, "above-max")] {
let mock = MockCgroupOps::new();
let topo = mock_topo();
let ctx = mock_ctx(&mock, &topo);
let mut state = StepState::empty(&ctx);
let defs = vec![CgroupDef::named("cg_io").io_weight(weight)];
let err = apply_setup_test(&ctx, &mut state, &defs)
.expect_err(&format!("io.weight={weight} ({label}) must reject"));
let msg = format!("{err:#}");
assert!(
msg.contains("io.weight") && msg.contains("out of range"),
"error must name the offending knob and constraint; got: {msg}",
);
assert!(
msg.contains("cg_io"),
"error must name the offending cgroup; got: {msg}",
);
let calls = mock.calls();
assert!(
!calls
.iter()
.any(|c| matches!(c, CgroupCall::SetIoWeight(n, _) if n == "cg_io")),
"rejected weight must not reach the cgroupfs write: {calls:?}",
);
cleanup_state(&mut state);
}
}
#[test]
fn apply_setup_accepts_io_weight_range_endpoints() {
for weight in [1u16, 10_000u16] {
let mock = MockCgroupOps::new();
let topo = mock_topo();
let ctx = mock_ctx(&mock, &topo);
let mut state = StepState::empty(&ctx);
let defs = vec![CgroupDef::named("cg_io").io_weight(weight)];
apply_setup_test(&ctx, &mut state, &defs).unwrap_or_else(|e| {
panic!("io.weight={weight} (boundary) must be accepted: {e:#}")
});
let calls = mock.calls();
assert!(
calls.contains(&CgroupCall::SetIoWeight("cg_io".to_string(), weight)),
"boundary weight must reach the cgroupfs write; got: {calls:?}",
);
cleanup_state(&mut state);
}
}
#[test]
fn apply_setup_substitutes_default_workspec_when_works_empty() {
let mock = MockCgroupOps::new();
let topo = mock_topo();
let ctx = mock_ctx(&mock, &topo);
let mut state = StepState::empty(&ctx);
let def = CgroupDef::named("cg_default_work");
assert!(
def.works.is_empty(),
"test premise: CgroupDef without .work() must start with empty works",
);
apply_setup_test(&ctx, &mut state, &[def])
.expect("apply_setup with default-work substitution must succeed");
let calls = mock.calls();
assert!(
calls.iter().any(|c| matches!(
c,
CgroupCall::MoveTasks(name, count) if name == "cg_default_work" && *count > 0
)),
"default-WorkSpec substitution must spawn workers and migrate them into the \
cgroup; without it the empty `works` would leave the cgroup taskless. \
Got: {calls:?}",
);
cleanup_state(&mut state);
}
fn read_status_tgid(tid: libc::pid_t) -> libc::pid_t {
let status = std::fs::read_to_string(format!("/proc/{tid}/status"))
.expect("/proc/<tid>/status must be readable for live thread");
let line = status
.lines()
.find(|l| l.starts_with("Tgid:"))
.expect("/proc/<tid>/status must include Tgid line");
line.trim_start_matches("Tgid:")
.trim()
.parse()
.expect("Tgid must be a parseable pid_t")
}
fn read_proc_comm(pid: libc::pid_t) -> String {
let raw = std::fs::read_to_string(format!("/proc/{pid}/comm"))
.expect("/proc/<pid>/comm must be readable for live task");
raw.trim_end_matches('\n').to_string()
}
#[test]
fn apply_setup_pcomm_via_cgroup_def_forks_one_container() {
let mock = MockCgroupOps::new();
let topo = mock_topo();
let ctx = mock_ctx(&mock, &topo);
let mut state = StepState::empty(&ctx);
let defs = vec![CgroupDef::named("cg_pcomm").pcomm("leader").workers(2)];
apply_setup_test(&ctx, &mut state, &defs).expect("pcomm apply_setup must succeed");
let calls = mock.calls();
assert!(
calls.iter().any(|c| matches!(
c,
CgroupCall::MoveTasks(name, 1) if name == "cg_pcomm"
)),
"pcomm group must move exactly 1 PID (the container) into the cgroup; \
got: {calls:?}",
);
cleanup_state(&mut state);
}
#[test]
fn apply_setup_pcomm_with_per_thread_comm() {
let mock = MockCgroupOps::new();
let topo = mock_topo();
let ctx = mock_ctx(&mock, &topo);
let mut state = StepState::empty(&ctx);
let defs = vec![
CgroupDef::named("cg_named")
.pcomm("leader")
.comm("worker")
.workers(2),
];
apply_setup_test(&ctx, &mut state, &defs).expect("pcomm + comm apply_setup must succeed");
std::thread::sleep(Duration::from_millis(200));
let mut handles = std::mem::take(&mut state.handles);
assert_eq!(handles.len(), 1, "one CgroupDef → one handle");
let (_name, handle) = handles
.pop()
.expect("apply_setup must have pushed a handle for cg_named");
let pids = handle.worker_pids();
assert_eq!(
pids.len(),
1,
"pcomm handle must report exactly 1 pid (the leader); got {}",
pids.len(),
);
let leader_pid = pids[0];
assert_eq!(
read_proc_comm(leader_pid),
"leader",
"/proc/<leader>/comm must equal pcomm",
);
std::thread::sleep(Duration::from_millis(100));
let task_dir = format!("/proc/{leader_pid}/task");
let entries: Vec<libc::pid_t> = std::fs::read_dir(&task_dir)
.expect("/proc/<leader>/task must be readable for live container")
.flatten()
.filter_map(|e| e.file_name().to_str().and_then(|n| n.parse().ok()))
.collect();
assert!(
entries.len() >= 3,
"leader pid {leader_pid} must have leader + 2 worker threads in /proc/<leader>/task; \
observed {} entries: {entries:?}",
entries.len(),
);
let mut leader_seen = false;
let mut worker_seen = 0usize;
for tid in entries {
let tcomm = read_proc_comm(tid);
if tid == leader_pid {
assert_eq!(
tcomm, "leader",
"/proc/<leader>/task/<leader>/comm must equal pcomm; got {tcomm:?}",
);
leader_seen = true;
} else {
assert_eq!(
tcomm, "worker",
"/proc/<leader>/task/{tid}/comm must equal per-thread comm 'worker'; \
got {tcomm:?}",
);
worker_seen += 1;
}
}
assert!(
leader_seen,
"leader's own task entry must appear in /proc/<leader>/task",
);
assert_eq!(
worker_seen, 2,
"must observe exactly 2 worker threads with per-thread comm 'worker'; \
saw {worker_seen}",
);
drop(handle);
cleanup_state(&mut state);
}
#[test]
fn apply_setup_mixed_pcomm_and_non_pcomm_groups() {
let mock = MockCgroupOps::new();
let topo = mock_topo();
let ctx = mock_ctx(&mock, &topo);
let mut state = StepState::empty(&ctx);
let defs = vec![
CgroupDef::named("cg_pcomm").pcomm("threaded").workers(2),
CgroupDef::named("cg_fork").workers(2),
];
apply_setup_test(&ctx, &mut state, &defs).expect("mixed apply_setup must succeed");
let calls = mock.calls();
assert!(
calls.iter().any(|c| matches!(
c,
CgroupCall::MoveTasks(name, 1) if name == "cg_pcomm"
)),
"cg_pcomm must move 1 PID (container only) into its cgroup; \
got: {calls:?}",
);
assert!(
calls.iter().any(|c| matches!(
c,
CgroupCall::MoveTasks(name, 2) if name == "cg_fork"
)),
"cg_fork must move 2 PIDs (one per fork worker) into its cgroup; \
got: {calls:?}",
);
cleanup_state(&mut state);
}
#[test]
fn apply_setup_pcomm_kernel_truncation_at_15_bytes() {
let long_name = "this_is_a_very_long_name";
assert!(
long_name.len() > 15,
"test fixture must exceed TASK_COMM_LEN-1=15",
);
let mock = MockCgroupOps::new();
let topo = mock_topo();
let ctx = mock_ctx(&mock, &topo);
let mut state = StepState::empty(&ctx);
let defs = vec![CgroupDef::named("cg_trunc").pcomm(long_name).workers(1)];
apply_setup_test(&ctx, &mut state, &defs).expect("long-pcomm apply_setup must succeed");
std::thread::sleep(Duration::from_millis(200));
let mut handles = std::mem::take(&mut state.handles);
let (_, handle) = handles.pop().expect("one handle");
let pids = handle.worker_pids();
let container_pid = read_status_tgid(pids[0]);
let observed = read_proc_comm(container_pid);
assert_eq!(
observed.len(),
15,
"kernel must truncate pcomm to TASK_COMM_LEN-1=15 bytes; \
observed length {} for {observed:?}",
observed.len(),
);
assert_eq!(
observed,
&long_name[..15],
"truncated comm must be the leading 15 bytes of pcomm input",
);
drop(handle);
cleanup_state(&mut state);
}
#[test]
fn apply_setup_pcomm_with_zero_workers_is_rejected() {
let mock = MockCgroupOps::new();
let topo = mock_topo();
let ctx = mock_ctx(&mock, &topo);
let mut state = StepState::empty(&ctx);
let defs = vec![CgroupDef::named("cg_zero").pcomm("empty").workers(0)];
let err = apply_setup_test(&ctx, &mut state, &defs)
.expect_err("pcomm + 0 workers apply_setup must be rejected");
let msg = format!("{err:#}");
assert!(
msg.contains("cg_zero"),
"rejection error must name the cgroup: {msg}",
);
assert!(
msg.contains("num_workers=0"),
"rejection error must name the offending field: {msg}",
);
let calls = mock.calls();
let any_move = calls.iter().any(|c| {
matches!(
c,
CgroupCall::MoveTasks(name, _) if name == "cg_zero"
)
});
assert!(
!any_move,
"rejection must short-circuit before any move_tasks call \
into cg_zero; got: {calls:?}",
);
cleanup_state(&mut state);
}
}