use std::borrow::Cow;
use std::ops::RangeInclusive;
use super::*;
use crate::workload::{AffinityIntent, WorkSpec, WorkType};
use strum::IntoEnumIterator;
#[test]
fn replace_not_trying_deadline_pinned_to_5s() {
assert_eq!(
REPLACE_NOT_TRYING_DEADLINE_S, 5,
"REPLACE_NOT_TRYING_DEADLINE_S changed away from 5 s; \
read its doc and update the dispatch-latency analysis \
before relaxing this assertion."
);
}
#[test]
fn op_kind_bit_indices_are_unique_and_contiguous() {
let kinds: Vec<OpKind> = OpKind::iter().collect();
let indices: Vec<u32> = kinds.iter().copied().map(OpKind::bit_index).collect();
let unique: std::collections::BTreeSet<u32> = indices.iter().copied().collect();
assert_eq!(
unique.len(),
indices.len(),
"OpKind::bit_index produced duplicates. \
Pairs (OpKind, bit_index): {:?}. Fix the match in \
OpKind::bit_index so every variant maps to a distinct \
bit.",
kinds.iter().zip(&indices).collect::<Vec<_>>(),
);
let expected: Vec<u32> = (0..kinds.len() as u32).collect();
let mut sorted = indices.clone();
sorted.sort_unstable();
assert_eq!(
sorted,
expected,
"OpKind::bit_index indices must be contiguous from 0 \
(no gaps, no duplicates). Got sorted indices {sorted:?} \
for {} OpKind variants; expected {expected:?}.",
kinds.len(),
);
}
#[test]
fn op_kind_iter_order_matches_bit_index_ascending() {
let kinds: Vec<OpKind> = OpKind::iter().collect();
let pairs: Vec<(usize, u32)> = kinds
.iter()
.enumerate()
.map(|(i, k)| (i, k.bit_index()))
.collect();
for (i, bit) in &pairs {
assert_eq!(
*bit as usize, *i,
"OpKind::iter()[{i}] (variant {:?}) has bit_index {bit}; \
expected iter-index to match bit_index. Pairs: {pairs:?}",
kinds[*i],
);
}
}
#[derive(Debug)]
enum Layout {
Disjoint,
Overlap(f64, f64),
}
#[derive(Debug)]
struct Traverse {
seed: Option<u64>,
cgroup_count: RangeInclusive<usize>,
layouts: Vec<Layout>,
phases: usize,
phase_duration: Duration,
settle: Duration,
persistent_cgroups: usize,
cgroup_workloads: Vec<WorkSpec>,
}
impl Traverse {
fn generate(&self, ctx: &Ctx) -> Vec<Step> {
use rand::RngExt;
let seed = self.seed.unwrap_or_else(|| std::process::id() as u64);
let mut rng = seeded_rng(seed);
let usable_len = ctx.topo.usable_cpus().len();
let max_cgroups = (*self.cgroup_count.end()).min(usable_len / 2).max(1);
let min_cgroups = (*self.cgroup_count.start()).max(1).min(max_cgroups);
let mut steps = Vec::with_capacity(self.phases + 1);
let mut live_cgroups: Vec<Cow<'static, str>> = Vec::new();
let names: Vec<Cow<'static, str>> = (0..max_cgroups)
.map(|i| Cow::Owned(format!("cg_{i}")))
.collect();
for phase in 0..self.phases {
let range = max_cgroups - min_cgroups + 1;
let target_count = min_cgroups + rng.random_range(0..range);
let layout_idx = rng.random_range(0..self.layouts.len());
let layout = &self.layouts[layout_idx];
let mut ops = Vec::new();
while live_cgroups.len() < target_count {
let idx = live_cgroups.len();
let name = names[idx].clone();
let w = self
.cgroup_workloads
.get(idx)
.or(self.cgroup_workloads.last())
.cloned()
.unwrap_or_default();
ops.push(Op::AddCgroup { name: name.clone() });
ops.push(Op::Spawn {
placement: SpawnPlacement::Cgroup(name.clone()),
work: w,
});
live_cgroups.push(name);
}
while live_cgroups.len() > target_count && live_cgroups.len() > self.persistent_cgroups
{
if let Some(name) = live_cgroups.pop() {
ops.push(Op::StopCgroup {
cgroup: name.clone(),
});
ops.push(Op::RemoveCgroup { cgroup: name });
}
}
for (i, name) in live_cgroups.iter().enumerate() {
let spec = match layout {
Layout::Disjoint => CpusetSpec::Disjoint {
index: i,
of: live_cgroups.len(),
},
Layout::Overlap(min_frac, max_frac) => {
let frac = min_frac
+ rng.random_range(0..100) as f64 / 100.0 * (max_frac - min_frac);
CpusetSpec::Overlap {
index: i,
of: live_cgroups.len(),
frac,
}
}
};
ops.push(Op::SetCpuset {
cgroup: name.clone(),
cpus: spec,
});
}
let hold = if phase == 0 {
HoldSpec::fixed(self.settle + self.phase_duration)
} else {
HoldSpec::fixed(self.phase_duration)
};
steps.push(Step {
setup: vec![].into(),
ops,
hold,
});
}
steps
}
}
fn seeded_rng(seed: u64) -> rand::rngs::StdRng {
use rand::SeedableRng;
rand::rngs::StdRng::seed_from_u64(seed)
}
#[test]
fn validate_known_flags_accepts_listed_long_flags() {
use crate::test_support::{OutputFormat, Payload, PayloadKind};
static WITH_ALLOWLIST: Payload = Payload {
name: "with_allowlist",
kind: PayloadKind::Binary("/bin/true"),
output: OutputFormat::ExitCode,
default_args: &[],
default_checks: &[],
metrics: &[],
include_files: &[],
uses_parent_pgrp: false,
known_flags: Some(&["runtime", "threads", "verbose"]),
metric_bounds: None,
};
let args: Vec<String> = vec![
"--runtime=30".into(),
"--threads".into(),
"4".into(),
"--verbose".into(),
"positional_arg".into(),
"-s".into(), "--".into(),
"--=value".into(),
];
validate_known_flags(&WITH_ALLOWLIST, &args).expect("all long flags in allowlist must pass");
}
#[test]
fn validate_known_flags_fails_fast_on_first_unknown() {
use crate::test_support::{OutputFormat, Payload, PayloadKind};
static WITH_ALLOWLIST: Payload = Payload {
name: "with_allowlist",
kind: PayloadKind::Binary("/bin/true"),
output: OutputFormat::ExitCode,
default_args: &[],
default_checks: &[],
metrics: &[],
include_files: &[],
uses_parent_pgrp: false,
known_flags: Some(&["runtime", "threads", "verbose"]),
metric_bounds: None,
};
let args = vec!["--runtime=30".into(), "--threds".into(), "--verbose".into()];
let err = validate_known_flags(&WITH_ALLOWLIST, &args)
.expect_err("typo between two known flags must be rejected");
let msg = format!("{err:#}");
assert!(msg.contains("--threds"), "error must name the typo: {msg}");
assert!(
!msg.contains("--verbose"),
"error must not mention the later known flag '--verbose' \
— fail-fast broke: {msg}",
);
}
#[test]
fn validate_known_flags_rejects_unknown_long_flag() {
use crate::test_support::{OutputFormat, Payload, PayloadKind};
static WITH_ALLOWLIST: Payload = Payload {
name: "with_allowlist",
kind: PayloadKind::Binary("/bin/true"),
output: OutputFormat::ExitCode,
default_args: &[],
default_checks: &[],
metrics: &[],
include_files: &[],
uses_parent_pgrp: false,
known_flags: Some(&["runtime", "threads"]),
metric_bounds: None,
};
let args = vec!["--threds".to_string(), "4".to_string()];
let err = validate_known_flags(&WITH_ALLOWLIST, &args).expect_err("typo must be rejected");
let msg = format!("{err:#}");
assert!(
msg.contains("--threds"),
"error must name the offending flag: {msg}",
);
assert!(
msg.contains("known_flags allowlist"),
"error must mention the allowlist surface: {msg}",
);
}
#[test]
fn validate_known_flags_none_is_permissive() {
use crate::test_support::{OutputFormat, Payload, PayloadKind};
static NO_ALLOWLIST: Payload = Payload {
name: "no_allowlist",
kind: PayloadKind::Binary("/bin/true"),
output: OutputFormat::ExitCode,
default_args: &[],
default_checks: &[],
metrics: &[],
include_files: &[],
uses_parent_pgrp: false,
known_flags: None,
metric_bounds: None,
};
let args: Vec<String> = vec![
"--anything".into(),
"--whatever=x".into(),
"--threds".into(),
];
validate_known_flags(&NO_ALLOWLIST, &args).expect("None allowlist must pass any flag");
}
#[test]
fn op_discriminant_unique() {
use crate::test_support::{OutputFormat, Payload, PayloadKind};
static TRUE_BIN: Payload = Payload {
name: "true_bin",
kind: PayloadKind::Binary("/bin/true"),
output: OutputFormat::ExitCode,
default_args: &[],
default_checks: &[],
metrics: &[],
include_files: &[],
uses_parent_pgrp: false,
known_flags: None,
metric_bounds: None,
};
let ops: Vec<Op> = vec![
Op::AddCgroup { name: "a".into() },
Op::AddCgroupDef {
def: CgroupDef::named("a"),
},
Op::RemoveCgroup { cgroup: "a".into() },
Op::SetCpuset {
cgroup: "a".into(),
cpus: CpusetSpec::exact([]),
},
Op::ClearCpuset { cgroup: "a".into() },
Op::SwapCpusets {
a: "a".into(),
b: "b".into(),
},
Op::Spawn {
placement: SpawnPlacement::Cgroup("a".into()),
work: Default::default(),
},
Op::StopCgroup { cgroup: "a".into() },
Op::SetAffinity {
cgroup: "a".into(),
affinity: Default::default(),
},
Op::MoveAllTasks {
from: "a".into(),
to: "b".into(),
},
Op::RunPayload {
payload: &TRUE_BIN,
args: vec![],
cgroup: None,
},
Op::WaitPayload {
name: "p".into(),
cgroup: None,
},
Op::KillPayload {
name: "p".into(),
cgroup: None,
},
Op::FreezeCgroup { cgroup: "a".into() },
Op::UnfreezeCgroup { cgroup: "a".into() },
Op::CaptureSnapshot {
name: "snap".into(),
},
Op::WatchSnapshot {
symbol: "kernel.x".into(),
},
Op::WriteKernelHot {
writes: vec![(KernelTarget::symbol("x"), KernelValue::u64(0))],
},
Op::WriteKernelCold {
writes: vec![(KernelTarget::symbol("x"), KernelValue::u64(0))],
},
Op::ReadKernelHot {
tag: "t".into(),
target: KernelTarget::symbol("x"),
width: KernelValueWidth::u64(),
},
Op::ReadKernelCold {
tag: "t".into(),
target: KernelTarget::symbol("x"),
width: KernelValueWidth::u64(),
},
Op::CaptureCgroupProcs {
tag: "snap".into(),
cgroup: "a".into(),
},
];
let mut seen = std::collections::BTreeSet::new();
for op in &ops {
assert!(seen.insert(op.discriminant()), "duplicate discriminant");
}
}
#[test]
fn op_discriminant_values() {
use crate::test_support::{OutputFormat, Payload, PayloadKind};
static TRUE_BIN: Payload = Payload {
name: "true_bin",
kind: PayloadKind::Binary("/bin/true"),
output: OutputFormat::ExitCode,
default_args: &[],
default_checks: &[],
metrics: &[],
include_files: &[],
uses_parent_pgrp: false,
known_flags: None,
metric_bounds: None,
};
assert_eq!(
Op::AddCgroup { name: "a".into() }.discriminant(),
0,
"AddCgroup",
);
assert_eq!(
Op::AddCgroupDef {
def: CgroupDef::named("a")
}
.discriminant(),
1,
"AddCgroupDef",
);
assert_eq!(
Op::RemoveCgroup { cgroup: "a".into() }.discriminant(),
2,
"RemoveCgroup",
);
assert_eq!(
Op::SetCpuset {
cgroup: "a".into(),
cpus: CpusetSpec::Llc(0),
}
.discriminant(),
3,
"SetCpuset",
);
assert_eq!(
Op::ClearCpuset { cgroup: "a".into() }.discriminant(),
4,
"ClearCpuset",
);
assert_eq!(
Op::SwapCpusets {
a: "a".into(),
b: "b".into(),
}
.discriminant(),
5,
"SwapCpusets",
);
assert_eq!(
Op::spawn(SpawnPlacement::cgroup("a"), WorkSpec::default()).discriminant(),
6,
"Spawn",
);
assert_eq!(
Op::StopCgroup { cgroup: "a".into() }.discriminant(),
7,
"StopCgroup",
);
assert_eq!(
Op::SetAffinity {
cgroup: "a".into(),
affinity: AffinityIntent::Inherit,
}
.discriminant(),
8,
"SetAffinity",
);
assert_eq!(
Op::MoveAllTasks {
from: "a".into(),
to: "b".into()
}
.discriminant(),
9,
"MoveAllTasks",
);
assert_eq!(
Op::RunPayload {
payload: &TRUE_BIN,
args: vec![],
cgroup: None,
}
.discriminant(),
10,
"RunPayload",
);
assert_eq!(
Op::WaitPayload {
name: "p".into(),
cgroup: None,
}
.discriminant(),
11,
"WaitPayload",
);
assert_eq!(
Op::KillPayload {
name: "p".into(),
cgroup: None,
}
.discriminant(),
12,
"KillPayload",
);
assert_eq!(
Op::FreezeCgroup { cgroup: "a".into() }.discriminant(),
13,
"FreezeCgroup",
);
assert_eq!(
Op::UnfreezeCgroup { cgroup: "a".into() }.discriminant(),
14,
"UnfreezeCgroup",
);
assert_eq!(
Op::CaptureSnapshot {
name: "snap".into()
}
.discriminant(),
15,
"Snapshot",
);
assert_eq!(
Op::WatchSnapshot {
symbol: "kernel.x".into()
}
.discriminant(),
16,
"WatchSnapshot",
);
assert_eq!(
Op::WriteKernelHot {
writes: vec![(KernelTarget::symbol("x"), KernelValue::u64(0))]
}
.discriminant(),
17,
"WriteKernelHot",
);
assert_eq!(
Op::WriteKernelCold {
writes: vec![(KernelTarget::symbol("x"), KernelValue::u64(0))]
}
.discriminant(),
18,
"WriteKernelCold",
);
assert_eq!(
Op::ReadKernelHot {
tag: "t".into(),
target: KernelTarget::symbol("x"),
width: KernelValueWidth::u64(),
}
.discriminant(),
19,
"ReadKernelHot",
);
assert_eq!(
Op::ReadKernelCold {
tag: "t".into(),
target: KernelTarget::symbol("x"),
width: KernelValueWidth::u64(),
}
.discriminant(),
20,
"ReadKernelCold",
);
static SCHED_FIXTURE: crate::test_support::Scheduler = crate::test_support::Scheduler::EEVDF;
assert_eq!(
Op::AttachScheduler {
scheduler: &SCHED_FIXTURE,
}
.discriminant(),
21,
"AttachScheduler",
);
assert_eq!(Op::DetachScheduler.discriminant(), 22, "DetachScheduler",);
assert_eq!(Op::RestartScheduler.discriminant(), 23, "RestartScheduler",);
assert_eq!(
Op::ReplaceScheduler {
scheduler: &SCHED_FIXTURE,
}
.discriminant(),
24,
"ReplaceScheduler",
);
assert_eq!(
Op::PinBpfMap {
name: "scx_test.bss".into(),
}
.discriminant(),
25,
"PinBpfMap",
);
assert_eq!(
Op::CaptureCgroupProcs {
tag: "snap".into(),
cgroup: "a".into(),
}
.discriminant(),
26,
"CaptureCgroupProcs",
);
}
#[test]
fn seeded_rng_deterministic() {
use rand::RngExt;
let mut rng1 = seeded_rng(42);
let mut rng2 = seeded_rng(42);
for _ in 0..100 {
assert_eq!(rng1.random::<u64>(), rng2.random::<u64>());
}
}
#[test]
fn seeded_rng_different_seeds_differ() {
use rand::RngExt;
let mut rng1 = seeded_rng(1);
let mut rng2 = seeded_rng(2);
let same = (0..10).all(|_| rng1.random::<u64>() == rng2.random::<u64>());
assert!(!same);
}
#[test]
fn holdspec_validate_accepts_valid() {
HoldSpec::Frac(0.5).validate().unwrap();
HoldSpec::Frac(1.0).validate().unwrap();
HoldSpec::Fixed(Duration::from_millis(1))
.validate()
.unwrap();
HoldSpec::Loop {
interval: Duration::from_millis(100),
}
.validate()
.unwrap();
}
#[test]
fn holdspec_validate_accepts_fixed_zero() {
HoldSpec::Fixed(Duration::ZERO)
.validate()
.expect("Duration::ZERO is valid for settle/op-only steps");
}
#[test]
fn holdspec_validate_rejects_frac_zero() {
let err = HoldSpec::Frac(0.0).validate().unwrap_err();
assert!(err.contains("Frac") && err.contains("> 0"), "got: {err}");
}
#[test]
fn holdspec_validate_rejects_frac_negative() {
let err = HoldSpec::Frac(-0.5).validate().unwrap_err();
assert!(err.contains("Frac") && err.contains("> 0"), "got: {err}");
}
#[test]
fn holdspec_validate_rejects_frac_nan() {
let err = HoldSpec::Frac(f64::NAN).validate().unwrap_err();
assert!(
err.contains("not finite") || err.contains("NaN"),
"got: {err}"
);
}
#[test]
fn holdspec_validate_rejects_frac_inf() {
let err = HoldSpec::Frac(f64::INFINITY).validate().unwrap_err();
assert!(
err.contains("not finite") || err.contains("Inf"),
"got: {err}"
);
}
#[test]
fn holdspec_validate_rejects_loop_zero_interval() {
let err = HoldSpec::Loop {
interval: Duration::ZERO,
}
.validate()
.unwrap_err();
assert!(err.contains("Loop") && err.contains("busy"), "got: {err}");
}
#[test]
fn holdspec_frac() {
let step = Step::new(vec![], HoldSpec::frac(0.5));
assert_eq!(step.hold, HoldSpec::Frac(0.5));
}
#[test]
fn holdspec_fixed() {
let step = Step::new(vec![], HoldSpec::fixed(Duration::from_secs(3)));
assert_eq!(step.hold, HoldSpec::Fixed(Duration::from_secs(3)));
}
#[test]
fn holdspec_loop() {
let step = Step::new(vec![], HoldSpec::loop_at(Duration::from_millis(100)));
assert_eq!(
step.hold,
HoldSpec::Loop {
interval: Duration::from_millis(100)
}
);
}
#[test]
fn holdspec_loop_apply_path_repeats_ops_until_duration_elapses() {
let mock = MockCgroupOps::new();
let topo = mock_topo();
let mut ctx = mock_ctx(&mock, &topo);
ctx.duration = Duration::from_millis(150);
let steps = vec![Step::new(
vec![Op::set_cpuset("loop_test", CpusetSpec::Llc(0))],
HoldSpec::loop_at(Duration::from_millis(30)),
)];
let result = execute_steps(&ctx, steps)
.expect("HoldSpec::Loop apply path must succeed against mock cgroups");
assert!(
result.is_pass(),
"scenario must pass with no failing assertions; got: {:?}",
result.outcomes,
);
let set_cpuset_calls = mock
.calls()
.iter()
.filter(|c| matches!(c, CgroupCall::SetCpuset(name, _) if name == "loop_test"))
.count();
assert!(
set_cpuset_calls >= 2,
"HoldSpec::Loop with interval=30ms over duration=150ms must fire \
SetCpuset at least twice; got {set_cpuset_calls} calls. The Loop \
arm of run_step (mod.rs:1163) must invoke apply_ops repeatedly \
until the deadline; a regression that single-shotted the ops \
would surface here as exactly 1 call.",
);
}
#[test]
fn holdspec_loop_apply_path_setup_runs_once_not_per_iteration() {
let mock = MockCgroupOps::new();
let topo = mock_topo();
let mut ctx = mock_ctx(&mock, &topo);
ctx.duration = Duration::from_millis(150);
let steps = vec![
Step::with_defs(
vec![CgroupDef::named("setup_cg")],
HoldSpec::loop_at(Duration::from_millis(30)),
)
.set_ops(vec![Op::set_cpuset("setup_cg", CpusetSpec::Llc(0))]),
];
let result = execute_steps(&ctx, steps)
.expect("HoldSpec::Loop with setup must succeed against mock cgroups");
assert!(
result.is_pass(),
"scenario must pass with no failing assertions; got: {:?}",
result.outcomes,
);
let create_calls = mock
.calls()
.iter()
.filter(|c| matches!(c, CgroupCall::CreateCgroup(name) if name == "setup_cg"))
.count();
assert_eq!(
create_calls, 1,
"Loop arm's setup pass must run exactly ONCE before the loop body; \
got {create_calls} CreateCgroup calls. A regression that moved \
the `if !step.setup.is_empty()` block inside the while loop \
(mod.rs:1165) would surface here as N > 1 calls (the second \
iteration's apply_setup would also fail the collision check, \
but counting reveals the bug source).",
);
let set_cpuset_calls = mock
.calls()
.iter()
.filter(|c| matches!(c, CgroupCall::SetCpuset(name, _) if name == "setup_cg"))
.count();
assert!(
set_cpuset_calls >= 2,
"Loop body must repeat SetCpuset >= 2 times despite setup running \
once; got {set_cpuset_calls}. Pairs with the create-once check \
above to pin the full setup-once + ops-many contract.",
);
}
#[test]
fn holdspec_loop_apply_path_fires_exactly_once_when_interval_exceeds_duration() {
let mock = MockCgroupOps::new();
let topo = mock_topo();
let mut ctx = mock_ctx(&mock, &topo);
ctx.duration = Duration::from_millis(30);
let steps = vec![Step::new(
vec![Op::set_cpuset("brief_loop", CpusetSpec::Llc(0))],
HoldSpec::loop_at(Duration::from_millis(100)),
)];
let result = execute_steps(&ctx, steps)
.expect("HoldSpec::Loop with interval > duration must succeed against mock");
assert!(
result.is_pass(),
"scenario must pass with no failing assertions; got: {:?}",
result.outcomes,
);
let set_cpuset_calls = mock
.calls()
.iter()
.filter(|c| matches!(c, CgroupCall::SetCpuset(name, _) if name == "brief_loop"))
.count();
assert_eq!(
set_cpuset_calls, 1,
"interval (100ms) > duration (30ms) must fire SetCpuset exactly \
once; got {set_cpuset_calls}. The loop body should run a single \
iteration: enter loop (now < deadline) → apply_ops → sleep \
min(remaining, interval) = ~30ms → next deadline check fails. \
0 calls = a regression that skipped the first apply_ops; 2+ \
calls = a regression in the deadline-min logic at mod.rs:1175 \
that let the second iteration's sleep underflow.",
);
}
#[test]
fn holdspec_loop_rejects_capture_snapshot_inside_ops_vec() {
let mock = MockCgroupOps::new();
let topo = mock_topo();
let mut ctx = mock_ctx(&mock, &topo);
ctx.duration = Duration::from_millis(60);
let steps = vec![Step::new(
vec![Op::capture_snapshot("inside_loop_capture")],
HoldSpec::loop_at(Duration::from_millis(30)),
)];
let result = execute_steps(&ctx, steps)
.expect("execute_steps returns Ok(AssertResult) even when a step's apply_ops bails");
assert!(
!result.is_pass(),
"scenario with Op::CaptureSnapshot inside HoldSpec::Loop must NOT pass; got: {:?}",
result.outcomes,
);
let fail_msg = result
.outcomes
.iter()
.find_map(|o| match o {
crate::assert::Outcome::Fail(detail) => Some(detail.message.clone()),
_ => None,
})
.expect("at least one Fail outcome carrying the Loop+CaptureSnapshot reject message");
assert!(
fail_msg.contains("Op::CaptureSnapshot")
&& fail_msg.contains("HoldSpec::Loop")
&& fail_msg.contains("freezing every vCPU"),
"Fail outcome must name the rejected op + the enclosing hold + the \
concrete mechanism (every vCPU frozen per iteration) so the operator \
understands the redirect from the message alone, not just see \
'rejected' with no observer-effect rationale. got: {fail_msg}",
);
assert!(
fail_msg.contains("non-Loop Step"),
"Fail outcome must point to the correct fix (move capture into a non-Loop \
Step before/after the Loop step); got: {fail_msg}",
);
}
#[test]
fn holdspec_loop_arm_exits_early_when_sched_dies_during_hold() {
let mock = MockCgroupOps::new();
let topo = mock_topo();
let mut ctx = mock_ctx(&mock, &topo);
ctx.duration = Duration::from_millis(150);
ctx.sched_pid = Some(libc::pid_t::MAX);
crate::vmm::rust_init::set_sched_pid(libc::pid_t::MAX);
struct ResetSchedPid;
impl Drop for ResetSchedPid {
fn drop(&mut self) {
crate::vmm::rust_init::set_sched_pid(0);
}
}
let _reset = ResetSchedPid;
let steps = vec![Step::new(
vec![Op::set_cpuset("died_test", CpusetSpec::Llc(0))],
HoldSpec::loop_at(Duration::from_millis(30)),
)];
let result = execute_steps(&ctx, steps).expect(
"Loop arm must return Ok even when sched dies — the death \
is surfaced via sched_died_during_hold + one of the three sched-died DetailKind variants, \
NOT as an Err out of run_step",
);
assert!(
!result.is_pass(),
"sched-died during the Loop hold must mark passed=false; \
got passed=true with details: {:?}",
result.outcomes,
);
let sched_died_details: Vec<_> = result
.failure_details()
.filter(|d| {
matches!(
d.kind,
crate::assert::DetailKind::SchedulerCrashed
| crate::assert::DetailKind::SchedulerExitedCleanly
| crate::assert::DetailKind::SchedulerDiedUnknownReason
)
})
.collect();
assert_eq!(
sched_died_details.len(),
1,
"must push exactly one sched-died DetailKind detail (from \
mod.rs:911-922); got {} sched-died failures out of {} total \
failures: {:?}",
sched_died_details.len(),
result.failure_details().count(),
result.outcomes,
);
let set_cpuset_calls = mock
.calls()
.iter()
.filter(|c| matches!(c, CgroupCall::SetCpuset(name, _) if name == "died_test"))
.count();
assert_eq!(
set_cpuset_calls, 1,
"sched-died-on-entry must apply ops once (iter 1) then exit; \
got {set_cpuset_calls} SetCpuset calls. > 1 means the loop \
continued past the sched-died signal (early-exit dropped); \
0 means apply_ops was gated on liveness (would surface as \
a missing-apply regression).",
);
}
#[test]
fn holdspec_loop_arm_propagates_apply_ops_error() {
let mock = MockCgroupOps::new();
mock.fail_call_at(2, "injected SetCpuset error mid-iteration");
let topo = mock_topo();
let mut ctx = mock_ctx(&mock, &topo);
ctx.duration = Duration::from_millis(200);
let steps = vec![Step::new(
vec![Op::set_cpuset("err_drain_test", CpusetSpec::Llc(0))],
HoldSpec::loop_at(Duration::from_millis(30)),
)];
let result = execute_steps(&ctx, steps).expect(
"execute_steps converts step Err to Ok(passed=false) per \
mod.rs:883-901; the Err must NOT propagate to the caller",
);
assert!(
!result.is_pass(),
"injected apply_ops error must mark passed=false; got \
passed=true with details: {:?}",
result.outcomes,
);
let other_details: Vec<_> = result
.failure_details()
.filter(|d| {
matches!(d.kind, crate::assert::DetailKind::Other)
&& d.message.contains("injected SetCpuset error mid-iteration")
})
.collect();
assert_eq!(
other_details.len(),
1,
"step Err must surface exactly once as DetailKind::Other \
carrying the injected message; got {} matching details out \
of {} total: {:?}",
other_details.len(),
result.outcomes.len(),
result.outcomes,
);
let set_cpuset_calls = mock
.calls()
.iter()
.filter(|c| matches!(c, CgroupCall::SetCpuset(name, _) if name == "err_drain_test"))
.count();
assert_eq!(
set_cpuset_calls, 2,
"loop must stop at the failed iteration (1 ok + 1 fail = 2); \
got {set_cpuset_calls} SetCpuset calls. Any value > 2 means \
the apply_ops Err was swallowed and the Loop arm continued, \
which would also bypass the drain_on_err! payload-kill path \
(silent metric loss).",
);
}
#[test]
fn holdspec_loop_arm_drain_on_err_kills_live_payload_via_kill_not_drop() {
use crate::test_support::{OutputFormat, Payload, PayloadKind};
static SLEEP: Payload = Payload {
name: "drain_on_err_observer",
kind: PayloadKind::Binary("/bin/sleep"),
output: OutputFormat::ExitCode,
default_args: &[],
default_checks: &[],
metrics: &[],
include_files: &[],
uses_parent_pgrp: false,
known_flags: None,
metric_bounds: None,
};
let mock = MockCgroupOps::new();
mock.fail_call_at(2, "injected SetCpuset error to trigger drain_on_err");
let topo = mock_topo();
let mut ctx = mock_ctx(&mock, &topo);
ctx.duration = Duration::from_millis(200);
let steps = vec![Step::new(
vec![
Op::run_payload(&SLEEP, ["3600"]),
Op::set_cpuset("drain_observer_cg", CpusetSpec::Llc(0)),
],
HoldSpec::loop_at(Duration::from_millis(30)),
)];
let (_, captured_stderr) = crate::test_support::test_helpers::capture_stderr(|| {
let _ = execute_steps(&ctx, steps).expect(
"execute_steps converts step Err to Ok(passed=false); the \
Err must NOT propagate to the caller",
);
});
let stderr_text = String::from_utf8_lossy(&captured_stderr);
assert!(
!stderr_text.contains("dropped without wait/kill"),
"drain_on_err! must invoke .kill() on every live payload \
handle, not let them fall through to PayloadHandle::Drop's \
process-group SIGKILL. Observed the Drop-path eprintln in \
captured stderr — drain_on_err! regressed (Err propagated \
but payloads were leaked to Drop). Full captured stderr: \
{stderr_text:?}",
);
}
#[test]
fn holdspec_partialeq_payload_participates_in_equality() {
assert_ne!(HoldSpec::Frac(0.5), HoldSpec::Frac(0.75));
assert_ne!(
HoldSpec::Fixed(Duration::from_secs(1)),
HoldSpec::Fixed(Duration::from_secs(2))
);
assert_ne!(
HoldSpec::Loop {
interval: Duration::from_millis(100)
},
HoldSpec::Loop {
interval: Duration::from_millis(200)
}
);
}
#[test]
fn holdspec_partialeq_frac_float_strict_equality() {
assert_ne!(HoldSpec::Frac(0.1 + 0.2), HoldSpec::Frac(0.3));
}
#[test]
fn holdspec_partialeq_frac_nan_self_unequal() {
let nan = HoldSpec::Frac(f64::NAN);
assert_ne!(nan, nan);
}
#[test]
fn holdspec_full_equals_frac_one() {
assert_eq!(HoldSpec::FULL, HoldSpec::Frac(1.0));
}
const _: HoldSpec = HoldSpec::fixed(Duration::from_secs(1));
const _: HoldSpec = HoldSpec::frac(0.5);
const _: HoldSpec = HoldSpec::loop_at(Duration::from_millis(50));
#[test]
fn cpusetspec_exact_is_passthrough() {
let cpus: BTreeSet<usize> = [0, 2, 4].iter().copied().collect();
let spec = CpusetSpec::Exact(cpus.clone());
let topo = crate::topology::TestTopology::from_vm_topology(
&crate::vmm::topology::Topology::new(1, 1, 4, 1),
);
let cgroups = crate::cgroup::CgroupManager::new("/nonexistent");
let ctx = Ctx {
cgroups: &cgroups,
topo: &topo,
duration: Duration::from_secs(10),
workers_per_cgroup: 4,
sched_pid: None,
settle: Duration::from_millis(1000),
work_type_override: None,
assert: crate::assert::Assert::default_checks(),
wait_for_map_write: false,
current_step: std::sync::Arc::new(std::sync::atomic::AtomicU16::new(0)),
entry_name: None,
};
let resolved = spec.resolve(&ctx);
assert_eq!(resolved, cpus);
}
#[test]
fn resolve_disjoint_of_zero_returns_empty_instead_of_panicking() {
let (cg, topo) = make_ctx(1, 4, 1);
let ctx = ctx_from(&cg, &topo);
let spec = CpusetSpec::Disjoint { index: 0, of: 0 };
assert!(spec.resolve(&ctx).is_empty());
}
#[test]
fn resolve_overlap_of_zero_returns_empty_instead_of_panicking() {
let (cg, topo) = make_ctx(1, 4, 1);
let ctx = ctx_from(&cg, &topo);
let spec = CpusetSpec::Overlap {
index: 0,
of: 0,
frac: 0.5,
};
assert!(spec.resolve(&ctx).is_empty());
}
#[test]
fn resolve_range_inverted_fracs_returns_empty_instead_of_panicking() {
let (cg, topo) = make_ctx(1, 4, 1);
let ctx = ctx_from(&cg, &topo);
let spec = CpusetSpec::Range {
start_frac: 0.8,
end_frac: 0.2,
};
assert!(spec.resolve(&ctx).is_empty());
}
#[test]
fn resolve_range_nan_fracs_clamps_to_zero_instead_of_panicking() {
let (cg, topo) = make_ctx(1, 4, 1);
let ctx = ctx_from(&cg, &topo);
let spec = CpusetSpec::Range {
start_frac: f64::NAN,
end_frac: f64::NAN,
};
assert!(spec.resolve(&ctx).is_empty());
}
#[test]
fn resolve_overlap_nonfinite_frac_clamps_to_zero() {
let (cg, topo) = make_ctx(1, 4, 1);
let ctx = ctx_from(&cg, &topo);
let spec = CpusetSpec::Overlap {
index: 0,
of: 2,
frac: f64::NAN,
};
let result = spec.resolve(&ctx);
assert!(!result.is_empty());
}
fn make_ctx(
llcs: u32,
cores: u32,
threads: u32,
) -> (crate::cgroup::CgroupManager, crate::topology::TestTopology) {
let cgroups = crate::cgroup::CgroupManager::new("/nonexistent");
let topo = crate::topology::TestTopology::from_vm_topology(
&crate::vmm::topology::Topology::new(1, llcs, cores, threads),
);
(cgroups, topo)
}
fn ctx_from<'a>(
cgroups: &'a crate::cgroup::CgroupManager,
topo: &'a crate::topology::TestTopology,
) -> Ctx<'a> {
Ctx {
cgroups,
topo,
duration: Duration::from_secs(10),
workers_per_cgroup: 4,
sched_pid: None,
settle: Duration::ZERO,
work_type_override: None,
assert: crate::assert::Assert::default_checks(),
wait_for_map_write: false,
current_step: std::sync::Arc::new(std::sync::atomic::AtomicU16::new(0)),
entry_name: None,
}
}
#[test]
fn cpusetspec_disjoint_two_partitions() {
let (cg, topo) = make_ctx(1, 8, 1);
let ctx = ctx_from(&cg, &topo);
let a = CpusetSpec::Disjoint { index: 0, of: 2 }.resolve(&ctx);
let b = CpusetSpec::Disjoint { index: 1, of: 2 }.resolve(&ctx);
assert!(a.is_disjoint(&b), "partitions overlap: {:?} vs {:?}", a, b);
let usable = ctx.topo.usable_cpuset();
let union: BTreeSet<usize> = a.union(&b).copied().collect();
assert_eq!(union, usable);
}
#[test]
fn cpusetspec_disjoint_remainder_to_last() {
let (cg, topo) = make_ctx(1, 8, 1);
let ctx = ctx_from(&cg, &topo);
let usable_len = ctx.topo.usable_cpus().len();
let c = CpusetSpec::Disjoint { index: 2, of: 3 }.resolve(&ctx);
let chunk = usable_len / 3;
assert!(
c.len() >= chunk,
"last partition {}: expected >= {}",
c.len(),
chunk
);
}
#[test]
fn cpusetspec_disjoint_single_partition() {
let (cg, topo) = make_ctx(1, 4, 1);
let ctx = ctx_from(&cg, &topo);
let all = CpusetSpec::Disjoint { index: 0, of: 1 }.resolve(&ctx);
let usable = ctx.topo.usable_cpuset();
assert_eq!(all, usable);
}
#[test]
fn cpusetspec_disjoint_index_beyond_of_returns_empty() {
let (cg, topo) = make_ctx(1, 4, 1);
let ctx = ctx_from(&cg, &topo);
let cpus = CpusetSpec::Disjoint { index: 5, of: 3 }.resolve(&ctx);
assert!(
cpus.is_empty(),
"Disjoint with index beyond `of` must return an empty \
cpuset rather than panicking, got: {cpus:?}",
);
}
#[test]
fn cpusetspec_range_first_half() {
let (cg, topo) = make_ctx(1, 8, 1);
let ctx = ctx_from(&cg, &topo);
let cpus = CpusetSpec::Range {
start_frac: 0.0,
end_frac: 0.5,
}
.resolve(&ctx);
let usable = ctx.topo.usable_cpus();
let expected_len = usable.len() / 2;
assert_eq!(cpus.len(), expected_len);
for &cpu in &cpus {
assert!(usable.contains(&cpu));
}
}
#[test]
fn cpusetspec_range_second_half() {
let (cg, topo) = make_ctx(1, 8, 1);
let ctx = ctx_from(&cg, &topo);
let a = CpusetSpec::Range {
start_frac: 0.0,
end_frac: 0.5,
}
.resolve(&ctx);
let b = CpusetSpec::Range {
start_frac: 0.5,
end_frac: 1.0,
}
.resolve(&ctx);
assert!(a.is_disjoint(&b));
}
#[test]
fn cpusetspec_range_full() {
let (cg, topo) = make_ctx(1, 4, 1);
let ctx = ctx_from(&cg, &topo);
let cpus = CpusetSpec::Range {
start_frac: 0.0,
end_frac: 1.0,
}
.resolve(&ctx);
let usable = ctx.topo.usable_cpuset();
assert_eq!(cpus, usable);
}
#[test]
fn cpusetspec_range_empty() {
let (cg, topo) = make_ctx(1, 4, 1);
let ctx = ctx_from(&cg, &topo);
let cpus = CpusetSpec::Range {
start_frac: 0.5,
end_frac: 0.5,
}
.resolve(&ctx);
assert!(cpus.is_empty());
}
#[test]
fn cpusetspec_range_clamps_to_bounds() {
let (cg, topo) = make_ctx(1, 4, 1);
let ctx = ctx_from(&cg, &topo);
let cpus = CpusetSpec::Range {
start_frac: 0.0,
end_frac: 2.0,
}
.resolve(&ctx);
let usable = ctx.topo.usable_cpuset();
assert_eq!(cpus, usable);
}
#[test]
fn cpusetspec_overlap_neighbors_share_cpus() {
let (cg, topo) = make_ctx(1, 8, 1);
let ctx = ctx_from(&cg, &topo);
let a = CpusetSpec::Overlap {
index: 0,
of: 2,
frac: 0.5,
}
.resolve(&ctx);
let b = CpusetSpec::Overlap {
index: 1,
of: 2,
frac: 0.5,
}
.resolve(&ctx);
let shared: BTreeSet<usize> = a.intersection(&b).copied().collect();
assert!(!shared.is_empty(), "overlap=0.5 should produce shared CPUs");
}
#[test]
fn cpusetspec_overlap_zero_frac_is_disjoint() {
let (cg, topo) = make_ctx(1, 8, 1);
let ctx = ctx_from(&cg, &topo);
let a = CpusetSpec::Overlap {
index: 0,
of: 2,
frac: 0.0,
}
.resolve(&ctx);
let b = CpusetSpec::Overlap {
index: 1,
of: 2,
frac: 0.0,
}
.resolve(&ctx);
assert!(a.is_disjoint(&b), "frac=0 should be disjoint");
}
#[test]
fn cpusetspec_overlap_last_partition_covers_tail() {
let (cg, topo) = make_ctx(1, 8, 1);
let ctx = ctx_from(&cg, &topo);
let last = CpusetSpec::Overlap {
index: 2,
of: 3,
frac: 0.5,
}
.resolve(&ctx);
let usable = ctx.topo.usable_cpus();
assert!(last.contains(usable.last().unwrap()));
}
#[test]
fn cpusetspec_overlap_first_partition_starts_at_zero() {
let (cg, topo) = make_ctx(1, 8, 1);
let ctx = ctx_from(&cg, &topo);
let first = CpusetSpec::Overlap {
index: 0,
of: 3,
frac: 0.5,
}
.resolve(&ctx);
let usable = ctx.topo.usable_cpus();
assert!(first.contains(&usable[0]));
}
#[test]
fn cpusetspec_llc_index_zero() {
let (cg, topo) = make_ctx(2, 4, 1);
let ctx = ctx_from(&cg, &topo);
let cpus = CpusetSpec::Llc(0).resolve(&ctx);
assert!(!cpus.is_empty());
let llc0 = ctx.topo.llc_aligned_cpuset(0);
assert_eq!(cpus, llc0);
}
#[test]
fn cpusetspec_llc_two_llcs_disjoint() {
let (cg, topo) = make_ctx(2, 4, 1);
let ctx = ctx_from(&cg, &topo);
let llc0 = CpusetSpec::Llc(0).resolve(&ctx);
let llc1 = CpusetSpec::Llc(1).resolve(&ctx);
assert!(llc0.is_disjoint(&llc1), "LLCs should be disjoint");
}
fn make_numa_ctx(
numa_nodes: u32,
llcs: u32,
cores: u32,
threads: u32,
) -> (crate::cgroup::CgroupManager, crate::topology::TestTopology) {
let cgroups = crate::cgroup::CgroupManager::new("/nonexistent");
let topo = crate::topology::TestTopology::from_vm_topology(
&crate::vmm::topology::Topology::new(numa_nodes, llcs, cores, threads),
);
(cgroups, topo)
}
#[test]
fn cpusetspec_numa_node_zero() {
let (cg, topo) = make_numa_ctx(2, 4, 4, 1);
let ctx = ctx_from(&cg, &topo);
let cpus = CpusetSpec::Numa(0).resolve(&ctx);
let expected: BTreeSet<usize> = (0..8).collect();
assert_eq!(cpus, expected);
}
#[test]
fn cpusetspec_numa_node_one() {
let (cg, topo) = make_numa_ctx(2, 4, 4, 1);
let ctx = ctx_from(&cg, &topo);
let cpus = CpusetSpec::Numa(1).resolve(&ctx);
let expected: BTreeSet<usize> = (8..16).collect();
assert_eq!(cpus, expected);
}
#[test]
fn cpusetspec_numa_disjoint() {
let (cg, topo) = make_numa_ctx(2, 4, 4, 1);
let ctx = ctx_from(&cg, &topo);
let node0 = CpusetSpec::Numa(0).resolve(&ctx);
let node1 = CpusetSpec::Numa(1).resolve(&ctx);
assert!(
node0.is_disjoint(&node1),
"NUMA nodes should be disjoint: {:?} vs {:?}",
node0,
node1
);
let union: BTreeSet<usize> = node0.union(&node1).copied().collect();
assert_eq!(union, ctx.topo.all_cpuset());
}
#[test]
fn cpusetspec_numa_single_node_returns_all() {
let (cg, topo) = make_numa_ctx(1, 2, 4, 1);
let ctx = ctx_from(&cg, &topo);
let cpus = CpusetSpec::Numa(0).resolve(&ctx);
assert_eq!(cpus, ctx.topo.all_cpuset());
}
#[test]
fn cpusetspec_numa_validate_out_of_range() {
let (cg, topo) = make_numa_ctx(2, 4, 4, 1);
let ctx = ctx_from(&cg, &topo);
let spec = CpusetSpec::Numa(5);
let err = spec.validate(&ctx).unwrap_err();
assert!(err.contains("out of range"), "got: {err}");
}
#[test]
fn cpusetspec_numa_validate_valid() {
let (cg, topo) = make_numa_ctx(2, 4, 4, 1);
let ctx = ctx_from(&cg, &topo);
assert!(CpusetSpec::Numa(0).validate(&ctx).is_ok());
assert!(CpusetSpec::Numa(1).validate(&ctx).is_ok());
}
#[test]
fn cpusetspec_numa_convenience_constructor() {
let spec = CpusetSpec::numa(0);
assert!(matches!(spec, CpusetSpec::Numa(0)));
}
#[test]
fn traverse_generate_produces_steps() {
let (cg, topo) = make_ctx(1, 8, 1);
let ctx = ctx_from(&cg, &topo);
let t = Traverse {
seed: Some(42),
cgroup_count: 2..=4,
layouts: vec![Layout::Disjoint],
phases: 3,
phase_duration: Duration::from_millis(100),
settle: Duration::from_millis(50),
persistent_cgroups: 0,
cgroup_workloads: vec![WorkSpec::default()],
};
let steps = t.generate(&ctx);
assert_eq!(steps.len(), 3);
for step in &steps {
assert!(!step.ops.is_empty(), "each phase should have ops");
}
}
#[test]
fn traverse_generate_deterministic() {
let (cg, topo) = make_ctx(1, 8, 1);
let ctx = ctx_from(&cg, &topo);
let t = Traverse {
seed: Some(99),
cgroup_count: 2..=4,
layouts: vec![Layout::Disjoint, Layout::Overlap(0.2, 0.5)],
phases: 5,
phase_duration: Duration::from_millis(100),
settle: Duration::from_millis(50),
persistent_cgroups: 1,
cgroup_workloads: vec![WorkSpec::default()],
};
let steps1 = t.generate(&ctx);
let steps2 = t.generate(&ctx);
assert_eq!(steps1.len(), steps2.len());
for (s1, s2) in steps1.iter().zip(steps2.iter()) {
assert_eq!(
s1.ops.len(),
s2.ops.len(),
"deterministic seed should produce same ops"
);
}
}
#[test]
fn traverse_generate_persistent_cgroups_preserved() {
let (cg, topo) = make_ctx(1, 8, 1);
let ctx = ctx_from(&cg, &topo);
let t = Traverse {
seed: Some(42),
cgroup_count: 1..=4,
layouts: vec![Layout::Disjoint],
phases: 5,
phase_duration: Duration::from_millis(100),
settle: Duration::from_millis(50),
persistent_cgroups: 2,
cgroup_workloads: vec![WorkSpec::default()],
};
let steps = t.generate(&ctx);
for step in &steps {
let remove_ops: Vec<&Op> = step.ops.iter()
.filter(|op| matches!(op, Op::RemoveCgroup { cgroup } if cgroup == "cg_0" || cgroup == "cg_1"))
.collect();
assert!(
remove_ops.is_empty(),
"persistent cgroups should never be removed"
);
}
}
#[test]
fn cgroup_def_builder_chain() {
let d = CgroupDef::named("test")
.cpuset(CpusetSpec::llc(0))
.workers(8)
.work_type(WorkType::bursty(
Duration::from_millis(50),
Duration::from_millis(100),
))
.sched_policy(crate::workload::SchedPolicy::Batch)
.swappable(true);
assert_eq!(d.name, "test");
assert!(d.cpuset.is_some());
assert_eq!(d.works.len(), 1);
assert_eq!(d.works[0].num_workers, Some(8));
assert!(d.swappable);
}
#[test]
fn cgroup_def_multi_work() {
let d = CgroupDef::named("multi")
.work(WorkSpec::default().workers(4).work_type(WorkType::SpinWait))
.work(
WorkSpec::default()
.workers(2)
.work_type(WorkType::YieldHeavy),
);
assert_eq!(d.works.len(), 2);
assert_eq!(d.works[0].num_workers, Some(4));
assert_eq!(d.works[1].num_workers, Some(2));
}
#[test]
fn cgroup_def_old_api_then_work() {
let d = CgroupDef::named("mixed")
.workers(4)
.work(WorkSpec::default().workers(2));
assert_eq!(d.works.len(), 2);
assert_eq!(d.works[0].num_workers, Some(4));
assert_eq!(d.works[1].num_workers, Some(2));
}
#[test]
fn cgroup_def_work_only_no_phantom() {
let d = CgroupDef::named("explicit").work(WorkSpec::default().workers(3));
assert_eq!(d.works.len(), 1);
assert_eq!(d.works[0].num_workers, Some(3));
}
#[test]
fn setup_defs_resolves() {
let defs = vec![CgroupDef::named("a"), CgroupDef::named("b")];
let setup = Setup::Defs(defs);
let (cg, topo) = make_ctx(1, 4, 1);
let ctx = ctx_from(&cg, &topo);
let resolved = setup.resolve(&ctx);
assert_eq!(resolved.len(), 2);
assert!(!setup.is_empty());
}
#[test]
fn setup_defs_empty() {
let setup = Setup::Defs(vec![]);
assert!(setup.is_empty());
}
#[test]
fn setup_factory_not_empty() {
let setup = Setup::Factory(|_| vec![CgroupDef::named("generated")]);
assert!(!setup.is_empty());
}
#[test]
fn step_with_defs_empty() {
let step = Step::with_defs(vec![], HoldSpec::frac(0.5));
assert!(step.setup.is_empty());
assert!(step.ops.is_empty());
}
#[test]
fn step_with_defs_populated() {
let step = Step::with_defs(
vec![CgroupDef::named("cg_0"), CgroupDef::named("cg_1")],
HoldSpec::fixed(Duration::from_secs(5)),
);
assert!(!step.setup.is_empty());
assert!(step.ops.is_empty());
}
#[test]
fn step_with_defs_then_ops() {
let step = Step::with_defs(vec![CgroupDef::named("cg_0")], HoldSpec::FULL).set_ops(vec![
Op::AddCgroup {
name: "cg_1".into(),
},
]);
assert!(!step.setup.is_empty());
assert_eq!(step.ops.len(), 1);
}
#[test]
fn step_set_ops_replaces() {
let step = Step::new(
vec![Op::AddCgroup { name: "a".into() }],
HoldSpec::frac(0.5),
)
.set_ops(vec![
Op::AddCgroup { name: "b".into() },
Op::RemoveCgroup { cgroup: "c".into() },
]);
assert_eq!(step.ops.len(), 2);
}
#[test]
fn cpusetspec_validate_disjoint_of_zero() {
let (cg, topo) = make_ctx(1, 4, 1);
let ctx = ctx_from(&cg, &topo);
let spec = CpusetSpec::Disjoint { index: 0, of: 0 };
let err = spec.validate(&ctx).unwrap_err();
assert!(err.contains("must be > 0"), "got: {err}");
}
#[test]
fn cpusetspec_validate_disjoint_index_ge_of() {
let (cg, topo) = make_ctx(1, 4, 1);
let ctx = ctx_from(&cg, &topo);
let spec = CpusetSpec::Disjoint { index: 3, of: 3 };
let err = spec.validate(&ctx).unwrap_err();
assert!(err.contains("index 3 >= partition count 3"), "got: {err}");
}
#[test]
fn cpusetspec_validate_overlap_of_zero() {
let (cg, topo) = make_ctx(1, 4, 1);
let ctx = ctx_from(&cg, &topo);
let spec = CpusetSpec::Overlap {
index: 0,
of: 0,
frac: 0.5,
};
let err = spec.validate(&ctx).unwrap_err();
assert!(err.contains("must be > 0"), "got: {err}");
}
#[test]
fn cpusetspec_validate_overlap_index_ge_of() {
let (cg, topo) = make_ctx(1, 4, 1);
let ctx = ctx_from(&cg, &topo);
let spec = CpusetSpec::Overlap {
index: 2,
of: 2,
frac: 0.5,
};
let err = spec.validate(&ctx).unwrap_err();
assert!(err.contains("index 2 >= partition count 2"), "got: {err}");
}
#[test]
fn cpusetspec_validate_range_start_ge_end() {
let (cg, topo) = make_ctx(1, 4, 1);
let ctx = ctx_from(&cg, &topo);
let spec = CpusetSpec::Range {
start_frac: 0.8,
end_frac: 0.2,
};
let err = spec.validate(&ctx).unwrap_err();
assert!(err.contains("start_frac"), "got: {err}");
}
#[test]
fn cpusetspec_validate_range_rejects_nan() {
let (cg, topo) = make_ctx(1, 4, 1);
let ctx = ctx_from(&cg, &topo);
let spec = CpusetSpec::Range {
start_frac: 0.8,
end_frac: f64::NAN,
};
let err = spec.validate(&ctx).unwrap_err();
assert!(err.contains("not finite"), "got: {err}");
}
#[test]
fn cpusetspec_validate_range_rejects_infinity() {
let (cg, topo) = make_ctx(1, 4, 1);
let ctx = ctx_from(&cg, &topo);
let spec = CpusetSpec::Range {
start_frac: 0.0,
end_frac: f64::INFINITY,
};
let err = spec.validate(&ctx).unwrap_err();
assert!(err.contains("not finite"), "got: {err}");
}
#[test]
fn cpusetspec_validate_range_rejects_negative() {
let (cg, topo) = make_ctx(1, 4, 1);
let ctx = ctx_from(&cg, &topo);
let spec = CpusetSpec::Range {
start_frac: -0.5,
end_frac: 0.5,
};
let err = spec.validate(&ctx).unwrap_err();
assert!(err.contains("[0.0, 1.0]"), "got: {err}");
}
#[test]
fn cpusetspec_validate_range_rejects_above_one() {
let (cg, topo) = make_ctx(1, 4, 1);
let ctx = ctx_from(&cg, &topo);
let spec = CpusetSpec::Range {
start_frac: 0.5,
end_frac: 1.5,
};
let err = spec.validate(&ctx).unwrap_err();
assert!(err.contains("[0.0, 1.0]"), "got: {err}");
}
#[test]
fn cpusetspec_validate_overlap_rejects_nan_frac() {
let (cg, topo) = make_ctx(1, 4, 1);
let ctx = ctx_from(&cg, &topo);
let spec = CpusetSpec::Overlap {
index: 0,
of: 2,
frac: f64::NAN,
};
let err = spec.validate(&ctx).unwrap_err();
assert!(err.contains("not finite"), "got: {err}");
}
#[test]
fn cpusetspec_validate_overlap_rejects_infinity_frac() {
let (cg, topo) = make_ctx(1, 4, 1);
let ctx = ctx_from(&cg, &topo);
let spec = CpusetSpec::Overlap {
index: 0,
of: 2,
frac: f64::INFINITY,
};
let err = spec.validate(&ctx).unwrap_err();
assert!(err.contains("not finite"), "got: {err}");
}
#[test]
fn cpusetspec_validate_overlap_rejects_out_of_range_frac() {
let (cg, topo) = make_ctx(1, 4, 1);
let ctx = ctx_from(&cg, &topo);
let spec = CpusetSpec::Overlap {
index: 0,
of: 2,
frac: 1.5,
};
let err = spec.validate(&ctx).unwrap_err();
assert!(err.contains("[0.0, 1.0]"), "got: {err}");
}
#[test]
fn cpusetspec_validate_too_few_cpus_for_partitions() {
let (cg, topo) = make_ctx(1, 2, 1);
let ctx = ctx_from(&cg, &topo);
let spec = CpusetSpec::Disjoint { index: 0, of: 5 };
let err = spec.validate(&ctx).unwrap_err();
assert!(err.contains("not enough usable CPUs"), "got: {err}");
}
#[test]
fn cpusetspec_validate_exact_in_range_ok() {
let (cg, topo) = make_ctx(1, 4, 1);
let ctx = ctx_from(&cg, &topo);
let spec = CpusetSpec::exact([0, 2]);
assert!(spec.validate(&ctx).is_ok());
}
#[test]
fn cpusetspec_validate_exact_empty_rejected() {
let (cg, topo) = make_ctx(1, 4, 1);
let ctx = ctx_from(&cg, &topo);
let spec = CpusetSpec::Exact(BTreeSet::new());
let err = spec.validate(&ctx).unwrap_err();
assert!(err.contains("Exact") && err.contains("empty"), "got: {err}");
}
#[test]
fn cpusetspec_validate_exact_out_of_range_rejected() {
let (cg, topo) = make_ctx(1, 4, 1);
let ctx = ctx_from(&cg, &topo);
let spec = CpusetSpec::exact([99]);
let err = spec.validate(&ctx).unwrap_err();
assert!(
err.contains("99") && err.contains("physical CPU set"),
"error must name the offending CPU and call it physical: {err}"
);
}
#[test]
fn cpusetspec_validate_exact_accepts_reserved_last_cpu() {
let (cg, topo) = make_ctx(1, 4, 1);
let ctx = ctx_from(&cg, &topo);
let total = ctx.topo.all_cpus().len();
assert!(total > 2, "test requires a topology that reserves a CPU");
let reserved_cpu = total - 1;
assert!(
!ctx.topo.usable_cpuset().contains(&reserved_cpu),
"precondition: reserved CPU {reserved_cpu} must sit outside usable_cpuset",
);
assert!(
ctx.topo.all_cpuset().contains(&reserved_cpu),
"precondition: reserved CPU {reserved_cpu} must be physically present",
);
let spec = CpusetSpec::exact([reserved_cpu]);
assert!(
spec.validate(&ctx).is_ok(),
"validate must accept the reserved CPU — physical presence, not \
usable-set membership, is the bar",
);
}
#[test]
fn execute_steps_with_bails_on_invalid_hold_before_ops() {
let parent = std::env::temp_dir().join(format!("ktstr-hold-validate-{}", std::process::id()));
let _ = std::fs::remove_dir_all(&parent);
std::fs::create_dir_all(&parent).unwrap();
let cgroups = crate::cgroup::CgroupManager::new(parent.to_str().unwrap());
let topo = crate::topology::TestTopology::from_vm_topology(
&crate::vmm::topology::Topology::new(1, 1, 4, 1),
);
let ctx = ctx_from(&cgroups, &topo);
let cg_name = "should_never_exist";
let step = Step::new(vec![Op::add_cgroup(cg_name)], HoldSpec::Frac(0.0));
let err = execute_steps_with(&ctx, vec![step], None).unwrap_err();
let msg = format!("{err:#}");
assert!(
msg.contains("hold validation") && msg.contains("Frac"),
"error must cite hold validation + variant: {msg}"
);
assert!(
!parent.join(cg_name).exists(),
"AddCgroup op ran before hold validation — cgroup dir '{}' exists",
parent.join(cg_name).display()
);
let _ = std::fs::remove_dir_all(&parent);
}
#[test]
fn set_affinity_random_no_op_conditions() {
fn should_apply(from: &BTreeSet<usize>, count: usize) -> bool {
!from.is_empty() && count > 0
}
let pool: BTreeSet<usize> = [0, 1, 2].into_iter().collect();
let empty: BTreeSet<usize> = BTreeSet::new();
assert!(should_apply(&pool, 2));
assert!(!should_apply(&pool, 0), "count=0 → no-op");
assert!(!should_apply(&empty, 1), "empty pool → no-op");
assert!(!should_apply(&empty, 0), "both zero → no-op");
}
#[test]
fn cpusetspec_validate_llc_out_of_range() {
let (cg, topo) = make_ctx(1, 4, 1);
let ctx = ctx_from(&cg, &topo);
let spec = CpusetSpec::Llc(5);
let err = spec.validate(&ctx).unwrap_err();
assert!(err.contains("out of range"), "got: {err}");
}
#[test]
fn cpusetspec_validate_valid_disjoint_ok() {
let (cg, topo) = make_ctx(1, 8, 1);
let ctx = ctx_from(&cg, &topo);
let spec = CpusetSpec::Disjoint { index: 1, of: 2 };
assert!(spec.validate(&ctx).is_ok());
}
#[test]
fn validate_mempolicy_default_always_ok() {
let (cg, topo) = make_numa_ctx(2, 2, 4, 1);
let ctx = ctx_from(&cg, &topo);
let cpuset: BTreeSet<usize> = (0..4).collect();
assert!(
validate_mempolicy_cpuset(
&MemPolicy::Default,
crate::workload::MpolFlags::NONE,
&cpuset,
&ctx,
"cg_0",
)
.is_ok()
);
}
#[test]
fn validate_mempolicy_local_always_ok() {
let (cg, topo) = make_numa_ctx(2, 2, 4, 1);
let ctx = ctx_from(&cg, &topo);
let cpuset: BTreeSet<usize> = (0..4).collect();
assert!(
validate_mempolicy_cpuset(
&MemPolicy::Local,
crate::workload::MpolFlags::NONE,
&cpuset,
&ctx,
"cg_0",
)
.is_ok()
);
}
#[test]
fn validate_mempolicy_bind_covered() {
let (cg, topo) = make_numa_ctx(2, 2, 4, 1);
let ctx = ctx_from(&cg, &topo);
let cpuset: BTreeSet<usize> = (0..8).collect(); let policy = MemPolicy::Bind([0, 1].into_iter().collect());
assert!(
validate_mempolicy_cpuset(
&policy,
crate::workload::MpolFlags::NONE,
&cpuset,
&ctx,
"cg_0",
)
.is_ok()
);
}
#[test]
fn validate_mempolicy_bind_uncovered() {
let (cg, topo) = make_numa_ctx(2, 2, 4, 1);
let ctx = ctx_from(&cg, &topo);
let cpuset: BTreeSet<usize> = (0..4).collect(); let policy = MemPolicy::Bind([1].into_iter().collect()); let err = validate_mempolicy_cpuset(
&policy,
crate::workload::MpolFlags::NONE,
&cpuset,
&ctx,
"cg_bind_test",
)
.unwrap_err()
.to_string();
assert!(err.contains("cg_bind_test"), "bail must name cgroup: {err}");
assert!(
err.contains("[1]"),
"bail must name uncovered node 1: {err}"
);
assert!(err.contains("{0}"), "bail must name cpuset node 0: {err}");
assert!(
err.contains("(a) add .mpol_flags(MpolFlags::STATIC_NODES)"),
"bail must call out hatch (a) STATIC_NODES opt-in by name: {err}",
);
assert!(
err.contains("(b) widen the cpuset"),
"bail must call out hatch (b) cpuset widening: {err}",
);
assert!(
err.contains("CpusetSpec::Numa(N)"),
"bail must name CpusetSpec::Numa(N) as a widening example: {err}",
);
assert!(
err.contains("CpusetSpec::Exact"),
"bail must name the CpusetSpec::Exact cpuset-widening escape hatch: {err}",
);
assert!(
err.contains("cross-socket allocation traffic"),
"bail must name the cross-socket framing: {err}",
);
assert!(
err.contains("almost certainly unintended"),
"bail must frame the mismatch as unintended: {err}",
);
}
#[test]
fn validate_mempolicy_preferred_covered() {
let (cg, topo) = make_numa_ctx(2, 2, 4, 1);
let ctx = ctx_from(&cg, &topo);
let cpuset: BTreeSet<usize> = (4..8).collect(); let policy = MemPolicy::Preferred(1);
assert!(
validate_mempolicy_cpuset(
&policy,
crate::workload::MpolFlags::NONE,
&cpuset,
&ctx,
"cg_0",
)
.is_ok()
);
}
#[test]
fn validate_mempolicy_preferred_uncovered() {
let (cg, topo) = make_numa_ctx(2, 2, 4, 1);
let ctx = ctx_from(&cg, &topo);
let cpuset: BTreeSet<usize> = (0..4).collect(); let policy = MemPolicy::Preferred(1);
let err = validate_mempolicy_cpuset(
&policy,
crate::workload::MpolFlags::NONE,
&cpuset,
&ctx,
"cg_preferred_test",
)
.unwrap_err()
.to_string();
assert!(
err.contains("cg_preferred_test"),
"bail must name cgroup: {err}"
);
assert!(
err.contains("[1]"),
"bail must name uncovered node 1: {err}"
);
assert!(err.contains("{0}"), "bail must name cpuset node 0: {err}");
assert!(
err.contains("(a) add .mpol_flags(MpolFlags::STATIC_NODES)"),
"bail must enumerate hatch (a): {err}",
);
assert!(
err.contains("(b) widen the cpuset"),
"bail must enumerate hatch (b): {err}",
);
assert!(
err.contains("CpusetSpec::Numa(N)"),
"bail must cite CpusetSpec::Numa(N) example: {err}",
);
assert!(
err.contains("CpusetSpec::Exact"),
"bail must name CpusetSpec::Exact widening: {err}",
);
assert!(
err.contains("almost certainly unintended"),
"bail must frame mismatch as unintended: {err}",
);
}
#[test]
fn validate_mempolicy_interleave_partial_uncovered() {
let (cg, topo) = make_numa_ctx(2, 2, 4, 1);
let ctx = ctx_from(&cg, &topo);
let cpuset: BTreeSet<usize> = (0..4).collect(); let policy = MemPolicy::Interleave([0, 1].into_iter().collect());
let err = validate_mempolicy_cpuset(
&policy,
crate::workload::MpolFlags::NONE,
&cpuset,
&ctx,
"cg_interleave_test",
)
.unwrap_err()
.to_string();
assert!(
err.contains("cg_interleave_test"),
"bail must name cgroup: {err}"
);
assert!(
err.contains("[1]"),
"bail must name uncovered node 1: {err}"
);
assert!(err.contains("{0}"), "bail must name cpuset node 0: {err}");
assert!(
err.contains("(a) add .mpol_flags(MpolFlags::STATIC_NODES)"),
"bail must enumerate hatch (a): {err}",
);
assert!(
err.contains("(b) widen the cpuset"),
"bail must enumerate hatch (b): {err}",
);
assert!(
err.contains("CpusetSpec::Numa(N)"),
"bail must cite CpusetSpec::Numa(N) example: {err}",
);
assert!(
err.contains("CpusetSpec::Exact"),
"bail must name CpusetSpec::Exact widening: {err}",
);
}
#[test]
fn validate_mempolicy_static_nodes_bypasses_cpuset_check() {
let (cg, topo) = make_numa_ctx(2, 2, 4, 1);
let ctx = ctx_from(&cg, &topo);
let cpuset: BTreeSet<usize> = (0..4).collect(); let policy = MemPolicy::Interleave([0, 1].into_iter().collect());
assert!(
validate_mempolicy_cpuset(
&policy,
crate::workload::MpolFlags::STATIC_NODES,
&cpuset,
&ctx,
"cg_0",
)
.is_ok()
);
}
#[test]
fn validate_mempolicy_rejects_static_and_relative_conflict() {
let (cg, topo) = make_numa_ctx(2, 2, 4, 1);
let ctx = ctx_from(&cg, &topo);
let cpuset: BTreeSet<usize> = (0..4).collect();
let policy = MemPolicy::Bind([0].into_iter().collect());
let flags =
crate::workload::MpolFlags::STATIC_NODES | crate::workload::MpolFlags::RELATIVE_NODES;
let err = validate_mempolicy_cpuset(&policy, flags, &cpuset, &ctx, "cg_0")
.expect_err("STATIC_NODES | RELATIVE_NODES must be rejected");
let rendered = format!("{err:#}");
assert!(
rendered.contains("mutually exclusive"),
"error must name the mutual-exclusion contract; got: {rendered}"
);
}
#[test]
fn validate_mempolicy_rejects_unknown_bits() {
let (cg, topo) = make_numa_ctx(2, 2, 4, 1);
let ctx = ctx_from(&cg, &topo);
let cpuset: BTreeSet<usize> = (0..8).collect();
let unknown = crate::workload::MpolFlags::from_bits_for_test(1 << 10);
let err = validate_mempolicy_cpuset(
&MemPolicy::Default,
unknown,
&cpuset,
&ctx,
"cg_unknown_bit",
)
.expect_err("unknown bit must bail");
let s = err.to_string();
assert!(s.contains("cg_unknown_bit"), "bail must name cgroup: {s}");
assert!(
s.contains("unknown bit"),
"bail must name the unknown-bit contract: {s}"
);
assert!(
s.contains("STATIC_NODES"),
"bail must enumerate the known bits so the user sees what IS supported: {s}",
);
}
#[test]
fn validate_mempolicy_relative_nodes_bypasses_cpuset_check() {
let (cg, topo) = make_numa_ctx(2, 2, 4, 1);
let ctx = ctx_from(&cg, &topo);
let cpuset: BTreeSet<usize> = (0..4).collect();
let policy = MemPolicy::Bind([1].into_iter().collect());
assert!(
validate_mempolicy_cpuset(
&policy,
crate::workload::MpolFlags::RELATIVE_NODES,
&cpuset,
&ctx,
"cg_0",
)
.is_ok(),
"RELATIVE_NODES must bypass the absolute-id cpuset coverage check"
);
}
#[test]
fn validate_mempolicy_static_nodes_rejects_missing_host_node() {
let (cg, topo) = make_numa_ctx(2, 2, 4, 1); let ctx = ctx_from(&cg, &topo);
let cpuset: BTreeSet<usize> = (0..8).collect();
let policy = MemPolicy::Bind([7].into_iter().collect());
let err = validate_mempolicy_cpuset(
&policy,
crate::workload::MpolFlags::STATIC_NODES,
&cpuset,
&ctx,
"cg_0",
)
.expect_err("STATIC_NODES policy referencing missing host node must be rejected");
let rendered = format!("{err:#}");
assert!(
rendered.contains("do not exist on this host"),
"error must name the missing-host-node condition; got: {rendered}"
);
}
#[test]
fn cgroupdef_mem_policy_builder() {
let def = CgroupDef::named("test").mem_policy(MemPolicy::Bind([0].into_iter().collect()));
assert!(matches!(def.works[0].mem_policy, MemPolicy::Bind(_)));
}
use crate::cgroup::CgroupOps;
use std::collections::HashMap;
use std::path::Path;
use std::sync::Mutex;
#[derive(Debug, Clone, PartialEq, Eq)]
enum CgroupCall {
Setup(BTreeSet<crate::cgroup::Controller>),
CreateCgroup(String),
RemoveCgroup(String),
SetCpuset(String, BTreeSet<usize>),
ClearCpuset(String),
SetCpusetMems(String, BTreeSet<usize>),
#[allow(dead_code)] ClearCpusetMems(String),
SetCpuMax(String, Option<u64>, u64),
SetCpuWeight(String, u32),
SetMemoryMax(String, Option<u64>),
SetMemoryHigh(String, Option<u64>),
SetMemoryLow(String, Option<u64>),
SetMemorySwapMax(String, Option<u64>),
SetIoWeight(String, u16),
SetFreeze(String, bool),
SetPidsMax(String, Option<u64>),
MoveTask(String, libc::pid_t),
MoveTasks(String, usize), PlaceTaskDuringHandshake(String, libc::pid_t),
ClearSubtreeControl(String),
DrainTasks(String),
ReadProcs(String),
CleanupAll,
}
type CallPredicate = Box<dyn Fn(&CgroupCall) -> bool + Send + 'static>;
struct MockCgroupOps {
parent: std::path::PathBuf,
calls: Mutex<Vec<CgroupCall>>,
fail_at: Mutex<Option<(usize, String)>>,
fail_match: Mutex<Option<(usize, usize, String, CallPredicate)>>,
procs_returns: Mutex<HashMap<String, Vec<libc::pid_t>>>,
procs_err: Mutex<Option<(String, String)>>,
}
impl MockCgroupOps {
fn new() -> Self {
Self {
parent: std::path::PathBuf::from("/mock/cgroup"),
calls: Mutex::new(Vec::new()),
fail_at: Mutex::new(None),
fail_match: Mutex::new(None),
procs_returns: Mutex::new(HashMap::new()),
procs_err: Mutex::new(None),
}
}
#[allow(dead_code)] fn set_procs(&self, cgroup: &str, pids: Vec<libc::pid_t>) {
self.procs_returns
.lock()
.unwrap()
.insert(cgroup.to_string(), pids);
}
#[allow(dead_code)] fn fail_read_procs(&self, cgroup: &str, message: &str) {
*self.procs_err.lock().unwrap() = Some((cgroup.to_string(), message.to_string()));
}
#[allow(dead_code)] fn fail_call_at(&self, index: usize, message: &str) {
*self.fail_at.lock().unwrap() = Some((index, message.to_string()));
}
#[allow(dead_code)] fn fail_nth_call_matching<F>(&self, n: usize, predicate: F, message: &str)
where
F: Fn(&CgroupCall) -> bool + Send + 'static,
{
*self.fail_match.lock().unwrap() = Some((n, 0, message.to_string(), Box::new(predicate)));
}
fn calls(&self) -> Vec<CgroupCall> {
self.calls.lock().unwrap().clone()
}
fn record(&self, call: CgroupCall) -> Result<()> {
let mut calls = self.calls.lock().unwrap();
let current_index = calls.len();
calls.push(call);
let recorded_idx = calls.len() - 1;
let just_recorded = calls[recorded_idx].clone();
drop(calls);
let mut fail_match = self.fail_match.lock().unwrap();
if let Some((target_n, ref mut matches_so_far, ref message, ref predicate)) = *fail_match
&& predicate(&just_recorded)
{
if *matches_so_far == target_n {
let err_msg = message.clone();
*fail_match = None;
return Err(anyhow::anyhow!(err_msg));
}
*matches_so_far += 1;
}
drop(fail_match);
let mut fail = self.fail_at.lock().unwrap();
if let Some((index, ref message)) = *fail
&& current_index == index
{
let err_msg = message.clone();
*fail = None;
return Err(anyhow::anyhow!(err_msg));
}
Ok(())
}
}
impl CgroupOps for MockCgroupOps {
fn parent_path(&self) -> &Path {
&self.parent
}
fn setup(&self, controllers: &BTreeSet<crate::cgroup::Controller>) -> Result<()> {
self.record(CgroupCall::Setup(controllers.clone()))
}
fn create_cgroup(&self, name: &str) -> Result<()> {
self.record(CgroupCall::CreateCgroup(name.to_string()))
}
fn remove_cgroup(&self, name: &str) -> Result<()> {
self.record(CgroupCall::RemoveCgroup(name.to_string()))
}
fn set_cpuset(&self, name: &str, cpus: &BTreeSet<usize>) -> Result<()> {
self.record(CgroupCall::SetCpuset(name.to_string(), cpus.clone()))
}
fn clear_cpuset(&self, name: &str) -> Result<()> {
self.record(CgroupCall::ClearCpuset(name.to_string()))
}
fn set_cpuset_mems(&self, name: &str, nodes: &BTreeSet<usize>) -> Result<()> {
self.record(CgroupCall::SetCpusetMems(name.to_string(), nodes.clone()))
}
fn clear_cpuset_mems(&self, name: &str) -> Result<()> {
self.record(CgroupCall::ClearCpusetMems(name.to_string()))
}
fn set_cpu_max(&self, name: &str, quota_us: Option<u64>, period_us: u64) -> Result<()> {
self.record(CgroupCall::SetCpuMax(name.to_string(), quota_us, period_us))
}
fn set_cpu_weight(&self, name: &str, weight: u32) -> Result<()> {
self.record(CgroupCall::SetCpuWeight(name.to_string(), weight))
}
fn set_memory_max(&self, name: &str, bytes: Option<u64>) -> Result<()> {
self.record(CgroupCall::SetMemoryMax(name.to_string(), bytes))
}
fn set_memory_high(&self, name: &str, bytes: Option<u64>) -> Result<()> {
self.record(CgroupCall::SetMemoryHigh(name.to_string(), bytes))
}
fn set_memory_low(&self, name: &str, bytes: Option<u64>) -> Result<()> {
self.record(CgroupCall::SetMemoryLow(name.to_string(), bytes))
}
fn set_io_weight(&self, name: &str, weight: u16) -> Result<()> {
self.record(CgroupCall::SetIoWeight(name.to_string(), weight))
}
fn set_freeze(&self, name: &str, frozen: bool) -> Result<()> {
self.record(CgroupCall::SetFreeze(name.to_string(), frozen))
}
fn set_pids_max(&self, name: &str, max: Option<u64>) -> Result<()> {
self.record(CgroupCall::SetPidsMax(name.to_string(), max))
}
fn set_memory_swap_max(&self, name: &str, bytes: Option<u64>) -> Result<()> {
self.record(CgroupCall::SetMemorySwapMax(name.to_string(), bytes))
}
fn move_task(&self, name: &str, pid: libc::pid_t) -> Result<()> {
self.record(CgroupCall::MoveTask(name.to_string(), pid))
}
fn move_tasks(&self, name: &str, pids: &[libc::pid_t]) -> Result<()> {
self.record(CgroupCall::MoveTasks(name.to_string(), pids.len()))
}
fn place_task_during_handshake(&self, cgroup_name: &str, child_pid: libc::pid_t) -> Result<()> {
self.record(CgroupCall::PlaceTaskDuringHandshake(
cgroup_name.to_string(),
child_pid,
))
}
fn clear_subtree_control(&self, name: &str) -> Result<()> {
self.record(CgroupCall::ClearSubtreeControl(name.to_string()))
}
fn drain_tasks(&self, name: &str) -> Result<()> {
self.record(CgroupCall::DrainTasks(name.to_string()))
}
fn read_procs(&self, name: &str) -> Result<Vec<libc::pid_t>> {
self.record(CgroupCall::ReadProcs(name.to_string()))?;
let err = {
let mut guard = self.procs_err.lock().unwrap();
match guard.as_ref() {
Some((target, _)) if target == name => guard.take(),
_ => None,
}
};
if let Some((_, msg)) = err {
anyhow::bail!("{msg}");
}
let pids = self
.procs_returns
.lock()
.unwrap()
.get(name)
.cloned()
.unwrap_or_default();
Ok(pids)
}
fn cleanup_all(&self) -> Result<()> {
self.record(CgroupCall::CleanupAll)
}
}
fn mock_ctx<'a>(mock: &'a MockCgroupOps, topo: &'a crate::topology::TestTopology) -> Ctx<'a> {
Ctx {
cgroups: mock,
topo,
duration: Duration::from_secs(1),
workers_per_cgroup: 1,
sched_pid: None,
settle: Duration::ZERO,
work_type_override: None,
assert: crate::assert::Assert::default_checks(),
wait_for_map_write: false,
current_step: std::sync::Arc::new(std::sync::atomic::AtomicU16::new(0)),
entry_name: None,
}
}
fn mock_topo() -> crate::topology::TestTopology {
crate::topology::TestTopology::from_vm_topology(&crate::vmm::topology::Topology::new(
1, 1, 4, 1,
))
}
fn cleanup_state(state: &mut StepState<'_>) {
state.handles.clear();
drain_all_payload_handles(&mut state.payload_handles);
}
fn apply_setup_test<'a>(
ctx: &'a Ctx<'a>,
state: &mut StepState<'a>,
defs: &[CgroupDef],
) -> Result<()> {
let mut backdrop = BackdropState::empty(ctx);
let mut scenario = ScenarioState::new(state, &mut backdrop);
apply_setup(ctx, &mut scenario, defs)
}
fn apply_ops_test<'a>(ctx: &'a Ctx<'a>, state: &mut StepState<'a>, ops: &[Op]) -> Result<()> {
let mut backdrop = BackdropState::empty(ctx);
let mut scenario = ScenarioState::new(state, &mut backdrop);
apply_ops(ctx, &mut scenario, ops, false)
}
macro_rules! mock_setup {
($mock:ident, $topo:ident, $ctx:ident) => {
let $mock = MockCgroupOps::new();
let $topo = mock_topo();
let $ctx = mock_ctx(&$mock, &$topo);
};
}
macro_rules! mock_setup_state {
($mock:ident, $topo:ident, $ctx:ident, $state:ident) => {
mock_setup!($mock, $topo, $ctx);
let mut $state = StepState::empty(&$ctx);
};
}
macro_rules! mock_setup_backdrop {
($mock:ident, $topo:ident, $ctx:ident, $step:ident, $backdrop:ident) => {
mock_setup!($mock, $topo, $ctx);
let mut $step = StepState::empty(&$ctx);
let mut $backdrop = BackdropState::empty(&$ctx);
};
}
#[test]
fn fail_nth_call_matching_predicate_skipping_does_not_advance_counter() {
let mock = MockCgroupOps::new();
mock.fail_nth_call_matching(
0,
|c| matches!(c, CgroupCall::CreateCgroup(_)),
"injected on first CreateCgroup",
);
mock.setup(&BTreeSet::new())
.expect("non-matching setup() must succeed");
mock.remove_cgroup("x")
.expect("non-matching remove_cgroup() must succeed");
let err = mock
.create_cgroup("first")
.expect_err("first CreateCgroup must hit the injected failure");
assert!(
format!("{err:#}").contains("injected on first CreateCgroup"),
"error must surface the injected message verbatim: {err:#}"
);
}
#[test]
fn fail_nth_call_matching_n_index_counts_only_matches() {
let mock = MockCgroupOps::new();
mock.fail_nth_call_matching(
1,
|c| matches!(c, CgroupCall::MoveTasks(_, _)),
"injected on 2nd MoveTasks",
);
mock.create_cgroup("a")
.expect("non-match create_cgroup must succeed");
mock.move_tasks("a", &[1])
.expect("1st MoveTasks must succeed (counter advances to 1)");
mock.create_cgroup("b")
.expect("non-match between matches must succeed without advancing n");
let err = mock
.move_tasks("b", &[2])
.expect_err("2nd MoveTasks must hit the injected failure");
assert!(
format!("{err:#}").contains("injected on 2nd MoveTasks"),
"error must surface the injected message: {err:#}"
);
}
#[test]
fn fail_nth_call_matching_no_match_means_no_fire() {
let mock = MockCgroupOps::new();
mock.fail_nth_call_matching(
0,
|c| matches!(c, CgroupCall::MoveTasks(_, _)),
"should never fire — no MoveTasks calls happen",
);
mock.create_cgroup("x")
.expect("non-match create_cgroup must succeed");
mock.set_cpuset("x", &BTreeSet::new())
.expect("non-match set_cpuset must succeed");
mock.remove_cgroup("x")
.expect("non-match remove_cgroup must succeed");
assert_eq!(
mock.calls().len(),
3,
"all 3 non-matching calls must have been recorded without firing: {:?}",
mock.calls()
);
}
#[test]
fn apply_setup_empty_defs_is_noop() {
mock_setup_state!(mock, topo, ctx, state);
apply_setup_test(&ctx, &mut state, &[]).unwrap();
assert!(
mock.calls().is_empty(),
"apply_setup on zero defs must not call any cgroup op, got: {:?}",
mock.calls()
);
cleanup_state(&mut state);
}
#[test]
fn apply_setup_creates_cgroup_per_def() {
mock_setup_state!(mock, topo, ctx, state);
let defs = vec![
CgroupDef::named("cg_a").workers(1),
CgroupDef::named("cg_b").workers(1),
];
apply_setup_test(&ctx, &mut state, &defs).unwrap();
let calls = mock.calls();
let creates: Vec<&CgroupCall> = calls
.iter()
.filter(|c| matches!(c, CgroupCall::CreateCgroup(_)))
.collect();
assert_eq!(
creates,
vec![
&CgroupCall::CreateCgroup("cg_a".to_string()),
&CgroupCall::CreateCgroup("cg_b".to_string()),
],
"one create_cgroup call per def, in order"
);
cleanup_state(&mut state);
}
#[test]
fn apply_setup_sets_cpuset_when_spec_present() {
mock_setup_state!(mock, topo, ctx, state);
let cpus: BTreeSet<usize> = [0, 1].into_iter().collect();
let defs = vec![
CgroupDef::named("cg_0")
.cpuset(CpusetSpec::Exact(cpus.clone()))
.workers(1),
];
apply_setup_test(&ctx, &mut state, &defs).unwrap();
let calls = mock.calls();
assert!(
calls.contains(&CgroupCall::SetCpuset("cg_0".to_string(), cpus.clone())),
"set_cpuset must be called with exactly the resolved cpu set, got: {calls:?}"
);
assert_eq!(
state.cpusets.get("cg_0"),
Some(&cpus),
"state.cpusets must record the resolved set"
);
cleanup_state(&mut state);
}
#[test]
fn apply_setup_skips_cpuset_when_none() {
mock_setup_state!(mock, topo, ctx, state);
let defs = vec![CgroupDef::named("cg_inherit").workers(1)];
apply_setup_test(&ctx, &mut state, &defs).unwrap();
let calls = mock.calls();
let has_set_cpuset = calls
.iter()
.any(|c| matches!(c, CgroupCall::SetCpuset(_, _)));
assert!(
!has_set_cpuset,
"no set_cpuset should be emitted when CgroupDef.cpuset is None, got: {calls:?}"
);
assert!(
state.cpusets.is_empty(),
"state.cpusets should stay empty when no CpusetSpec was resolved"
);
cleanup_state(&mut state);
}
#[test]
fn apply_setup_moves_spawned_tasks_into_cgroup() {
mock_setup_state!(mock, topo, ctx, state);
let defs = vec![CgroupDef::named("cg_move").workers(2)];
apply_setup_test(&ctx, &mut state, &defs).unwrap();
let calls = mock.calls();
assert!(
calls.contains(&CgroupCall::MoveTasks("cg_move".to_string(), 2)),
"move_tasks must be called with the 2 spawned worker pids, got: {calls:?}"
);
let create_idx = calls
.iter()
.position(|c| matches!(c, CgroupCall::CreateCgroup(n) if n == "cg_move"))
.expect("create_cgroup for cg_move");
let move_idx = calls
.iter()
.position(|c| matches!(c, CgroupCall::MoveTasks(n, _) if n == "cg_move"))
.expect("move_tasks for cg_move");
assert!(
create_idx < move_idx,
"create_cgroup must precede move_tasks for the same cgroup: {calls:?}"
);
cleanup_state(&mut state);
}
#[test]
fn apply_setup_sets_cpuset_before_move_tasks() {
mock_setup_state!(mock, topo, ctx, state);
let cpus: BTreeSet<usize> = [0, 1].into_iter().collect();
let defs = vec![
CgroupDef::named("cg_ordered")
.cpuset(CpusetSpec::Exact(cpus.clone()))
.workers(2),
];
apply_setup_test(&ctx, &mut state, &defs).unwrap();
let calls = mock.calls();
let set_idx = calls
.iter()
.position(|c| matches!(c, CgroupCall::SetCpuset(n, _) if n == "cg_ordered"))
.expect("set_cpuset for cg_ordered");
let move_idx = calls
.iter()
.position(|c| matches!(c, CgroupCall::MoveTasks(n, _) if n == "cg_ordered"))
.expect("move_tasks for cg_ordered");
assert!(
set_idx < move_idx,
"set_cpuset must precede move_tasks for the same cgroup: {calls:?}"
);
cleanup_state(&mut state);
}
#[test]
fn apply_setup_bails_on_invalid_cpuset_spec() {
mock_setup_state!(mock, topo, ctx, state);
let defs = vec![CgroupDef::named("cg_bad").cpuset(CpusetSpec::Llc(99))];
let err = apply_setup_test(&ctx, &mut state, &defs).unwrap_err();
let msg = format!("{err:#}");
assert!(
msg.contains("CpusetSpec validation failed"),
"expected validation error, got: {msg}"
);
let calls = mock.calls();
assert_eq!(
calls,
vec![CgroupCall::CreateCgroup("cg_bad".to_string())],
"current ordering: create_cgroup first, then cpuset validation"
);
cleanup_state(&mut state);
}
#[test]
fn apply_setup_propagates_set_cpuset_error() {
mock_setup_state!(mock, topo, ctx, state);
mock.fail_call_at(1, "set_cpuset kernel EBUSY");
let cpus: BTreeSet<usize> = [0, 1].into_iter().collect();
let defs = vec![
CgroupDef::named("cg_setfail")
.cpuset(CpusetSpec::Exact(cpus))
.workers(1),
];
let err = apply_setup_test(&ctx, &mut state, &defs).unwrap_err();
let msg = format!("{err:#}");
assert!(
msg.contains("set_cpuset kernel EBUSY"),
"set_cpuset error must propagate, got: {msg}"
);
let calls = mock.calls();
let has_move = calls
.iter()
.any(|c| matches!(c, CgroupCall::MoveTasks(_, _)));
assert!(
!has_move,
"no move_tasks call should follow a failed set_cpuset, got: {calls:?}"
);
cleanup_state(&mut state);
}
#[test]
fn apply_setup_validates_mempolicy_against_cpuset() {
let mock = MockCgroupOps::new();
let topo = crate::topology::TestTopology::from_vm_topology(
&crate::vmm::topology::Topology::new(2, 2, 4, 1),
);
let ctx = mock_ctx(&mock, &topo);
let mut state = StepState::empty(&ctx);
let cpus: BTreeSet<usize> = (0..4).collect();
let bind: BTreeSet<usize> = [1].into_iter().collect();
let defs = vec![
CgroupDef::named("cg_memfail")
.cpuset(CpusetSpec::Exact(cpus))
.mem_policy(MemPolicy::Bind(bind))
.workers(1),
];
let err = apply_setup_test(&ctx, &mut state, &defs).unwrap_err();
let msg = format!("{err:#}");
assert!(
msg.contains("cg_memfail"),
"error must name the bad cgroup, got: {msg}"
);
let calls = mock.calls();
let has_move = calls
.iter()
.any(|c| matches!(c, CgroupCall::MoveTasks(_, _)));
assert!(
!has_move,
"mempolicy validation must bail before spawn, got: {calls:?}"
);
cleanup_state(&mut state);
}
#[test]
fn cgroup_def_default_payload_is_none() {
let def = CgroupDef::named("cg_0");
assert!(def.payload.is_none());
}
#[test]
fn cgroup_def_workload_stores_payload() {
use crate::test_support::{OutputFormat, Payload, PayloadKind};
static FIO: Payload = Payload {
name: "fio",
kind: PayloadKind::Binary("fio"),
output: OutputFormat::Json,
default_args: &[],
default_checks: &[],
metrics: &[],
include_files: &[],
uses_parent_pgrp: false,
known_flags: None,
metric_bounds: None,
};
let def = CgroupDef::named("cg_0").workload(&FIO);
let p = def.payload.expect("workload was attached");
assert_eq!(p.name, "fio");
assert!(!p.is_scheduler());
}
#[test]
#[should_panic(expected = "CgroupDef::workload called with a scheduler-kind Payload")]
fn cgroup_def_workload_rejects_scheduler_kind_payload() {
use crate::test_support::Payload;
let _ = CgroupDef::named("cg_0").workload(&Payload::KERNEL_DEFAULT);
}
#[test]
fn drain_payload_handles_for_cgroup_removes_matching_only() {
use crate::cgroup::CgroupManager;
use crate::scenario::payload_run::PayloadRun;
use crate::test_support::{OutputFormat, Payload, PayloadKind};
use crate::topology::TestTopology;
static TRUE_BIN: Payload = Payload {
name: "true_bin",
kind: PayloadKind::Binary("/bin/true"),
output: OutputFormat::ExitCode,
default_args: &[],
default_checks: &[],
metrics: &[],
include_files: &[],
uses_parent_pgrp: false,
known_flags: None,
metric_bounds: None,
};
let cgroups = CgroupManager::new("/nonexistent");
let topo = TestTopology::synthetic(4, 1);
let ctx = crate::scenario::Ctx::builder(&cgroups, &topo).build();
let h_a = PayloadRun::new(&ctx, &TRUE_BIN)
.spawn()
.expect("spawn /bin/true for cg_a");
let h_b = PayloadRun::new(&ctx, &TRUE_BIN)
.spawn()
.expect("spawn /bin/true for cg_b");
let mut handles = vec![
PayloadEntry {
cgroup: "cg_a".to_string(),
source: PayloadSource::CgroupDefWorkload,
handle: h_a,
},
PayloadEntry {
cgroup: "cg_b".to_string(),
source: PayloadSource::CgroupDefWorkload,
handle: h_b,
},
];
drain_payload_handles_for_cgroup(&mut handles, "cg_a");
assert_eq!(handles.len(), 1);
assert_eq!(handles[0].cgroup, "cg_b");
drain_all_payload_handles(&mut handles);
assert!(handles.is_empty());
}
#[test]
fn step_with_payload_emits_runpayload_op() {
use crate::test_support::{OutputFormat, Payload, PayloadKind};
static FIO: Payload = Payload {
name: "fio",
kind: PayloadKind::Binary("fio"),
output: OutputFormat::Json,
default_args: &[],
default_checks: &[],
metrics: &[],
include_files: &[],
uses_parent_pgrp: false,
known_flags: None,
metric_bounds: None,
};
let step = Step::with_payload(&FIO, HoldSpec::fixed(Duration::from_millis(50)));
assert_eq!(step.ops.len(), 1);
match &step.ops[0] {
Op::RunPayload {
payload,
args,
cgroup,
} => {
assert_eq!(payload.name, "fio");
assert!(args.is_empty());
assert!(cgroup.is_none());
}
other => panic!("expected RunPayload, got {other:?}"),
}
assert!(matches!(step.hold, HoldSpec::Fixed(d) if d == Duration::from_millis(50)));
assert!(matches!(&step.setup, Setup::Defs(d) if d.is_empty()));
}
#[test]
fn op_payload_constructors_produce_expected_variants() {
use crate::test_support::{OutputFormat, Payload, PayloadKind};
static FIO: Payload = Payload {
name: "fio",
kind: PayloadKind::Binary("fio"),
output: OutputFormat::Json,
default_args: &[],
default_checks: &[],
metrics: &[],
include_files: &[],
uses_parent_pgrp: false,
known_flags: None,
metric_bounds: None,
};
let op = Op::run_payload(&FIO, ["--warmup"]);
match op {
Op::RunPayload {
payload,
args,
cgroup,
} => {
assert_eq!(payload.name, "fio");
assert_eq!(args, vec!["--warmup".to_string()]);
assert!(cgroup.is_none());
}
other => panic!("expected RunPayload, got {other:?}"),
}
let op = Op::run_payload_in_cgroup(&FIO, [] as [&str; 0], "cg_0");
match op {
Op::RunPayload {
payload,
args,
cgroup,
} => {
assert_eq!(payload.name, "fio");
assert!(args.is_empty());
assert_eq!(cgroup.as_deref(), Some("cg_0"));
}
other => panic!("expected RunPayload, got {other:?}"),
}
let op = Op::wait_payload("fio");
assert!(matches!(
op,
Op::WaitPayload { ref name, ref cgroup } if name.as_ref() == "fio" && cgroup.is_none(),
));
let op = Op::kill_payload("fio");
assert!(matches!(
op,
Op::KillPayload { ref name, ref cgroup } if name.as_ref() == "fio" && cgroup.is_none(),
));
let op = Op::wait_payload_in_cgroup("fio", "cg_0");
assert!(matches!(
op,
Op::WaitPayload { ref name, cgroup: Some(ref c) } if name.as_ref() == "fio" && c.as_ref() == "cg_0",
));
let op = Op::kill_payload_in_cgroup("fio", "cg_0");
assert!(matches!(
op,
Op::KillPayload { ref name, cgroup: Some(ref c) } if name.as_ref() == "fio" && c.as_ref() == "cg_0",
));
}
#[test]
fn apply_ops_runpayload_rejects_scheduler_kind() {
use crate::test_support::Payload;
mock_setup_state!(mock, topo, ctx, state);
let ops = vec![Op::RunPayload {
payload: &Payload::KERNEL_DEFAULT,
args: vec![],
cgroup: None,
}];
let err = apply_ops_test(&ctx, &mut state, &ops).unwrap_err();
let msg = format!("{err:#}");
assert!(
msg.contains("scheduler-kind Payload") && msg.contains("kernel_default"),
"error must name the scheduler-kind reason AND the payload name, got: {msg}"
);
assert!(
state.payload_handles.is_empty(),
"no handle should be stored when RunPayload rejects the kind"
);
}
#[test]
fn op_runpayload_writes_pid_to_named_cgroup_via_placement_trait() {
use crate::test_support::{OutputFormat, Payload, PayloadKind};
static TRUE_BIN: Payload = Payload {
name: "true_bin_t_f_named",
kind: PayloadKind::Binary("/bin/true"),
output: OutputFormat::ExitCode,
default_args: &[],
default_checks: &[],
metrics: &[],
include_files: &[],
uses_parent_pgrp: false,
known_flags: None,
metric_bounds: None,
};
mock_setup_state!(mock, topo, ctx, state);
let ops = vec![Op::RunPayload {
payload: &TRUE_BIN,
args: vec![],
cgroup: Some("cg_a".into()),
}];
apply_ops_test(&ctx, &mut state, &ops)
.expect("apply_ops Op::RunPayload { cgroup: Some(_) } must succeed under MockCgroupOps");
let calls = mock.calls();
let placements: Vec<&CgroupCall> = calls
.iter()
.filter(|c| matches!(c, CgroupCall::PlaceTaskDuringHandshake(_, _)))
.collect();
assert_eq!(
placements.len(),
1,
"Op::RunPayload {{ cgroup: Some(_), .. }} must record exactly one \
PlaceTaskDuringHandshake call, got: {calls:?}"
);
match placements[0] {
CgroupCall::PlaceTaskDuringHandshake(name, pid) => {
assert_eq!(
name, "cg_a",
"PlaceTaskDuringHandshake must carry the user-facing cgroup name"
);
assert!(
*pid > 0,
"PlaceTaskDuringHandshake must carry the real child pid (>0), got {pid}"
);
}
other => panic!("expected PlaceTaskDuringHandshake, got {other:?}"),
}
cleanup_state(&mut state);
}
#[test]
fn op_runpayload_without_cgroup_does_not_place() {
use crate::test_support::{OutputFormat, Payload, PayloadKind};
static TRUE_BIN: Payload = Payload {
name: "true_bin_t_f_none",
kind: PayloadKind::Binary("/bin/true"),
output: OutputFormat::ExitCode,
default_args: &[],
default_checks: &[],
metrics: &[],
include_files: &[],
uses_parent_pgrp: false,
known_flags: None,
metric_bounds: None,
};
mock_setup_state!(mock, topo, ctx, state);
let ops = vec![Op::RunPayload {
payload: &TRUE_BIN,
args: vec![],
cgroup: None,
}];
apply_ops_test(&ctx, &mut state, &ops)
.expect("apply_ops Op::RunPayload { cgroup: None } must succeed");
let calls = mock.calls();
let placements: Vec<&CgroupCall> = calls
.iter()
.filter(|c| matches!(c, CgroupCall::PlaceTaskDuringHandshake(_, _)))
.collect();
assert!(
placements.is_empty(),
"Op::RunPayload {{ cgroup: None, .. }} MUST NOT record any \
PlaceTaskDuringHandshake calls; got: {calls:?}"
);
cleanup_state(&mut state);
}
#[test]
fn cgroupdef_workload_with_payload_places_in_def_name() {
use crate::test_support::{OutputFormat, Payload, PayloadKind};
static TRUE_BIN: Payload = Payload {
name: "true_bin_t_f_defworkload",
kind: PayloadKind::Binary("/bin/true"),
output: OutputFormat::ExitCode,
default_args: &[],
default_checks: &[],
metrics: &[],
include_files: &[],
uses_parent_pgrp: false,
known_flags: None,
metric_bounds: None,
};
mock_setup_state!(mock, topo, ctx, state);
let defs = vec![CgroupDef::named("cg_def").workload(&TRUE_BIN)];
apply_setup_test(&ctx, &mut state, &defs)
.expect("apply_setup with CgroupDef::workload must spawn the payload");
let calls = mock.calls();
let placements: Vec<&CgroupCall> = calls
.iter()
.filter(|c| matches!(c, CgroupCall::PlaceTaskDuringHandshake(_, _)))
.collect();
assert_eq!(
placements.len(),
1,
"CgroupDef::workload(&payload) must record exactly one \
PlaceTaskDuringHandshake call, got: {calls:?}"
);
match placements[0] {
CgroupCall::PlaceTaskDuringHandshake(name, pid) => {
assert_eq!(
name, "cg_def",
"PlaceTaskDuringHandshake must carry CgroupDef::name verbatim"
);
assert!(
*pid > 0,
"PlaceTaskDuringHandshake must carry the real child pid (>0), got {pid}"
);
}
other => panic!("expected PlaceTaskDuringHandshake, got {other:?}"),
}
cleanup_state(&mut state);
}
#[test]
fn pcomm_coalesced_spawn_uses_move_tasks() {
mock_setup_state!(mock, topo, ctx, state);
let defs = vec![CgroupDef::named("cg_pcomm").workers(2).pcomm("shared")];
apply_setup_test(&ctx, &mut state, &defs)
.expect("apply_setup with pcomm-coalesced workers must succeed");
let calls = mock.calls();
let placements: Vec<&CgroupCall> = calls
.iter()
.filter(|c| matches!(c, CgroupCall::PlaceTaskDuringHandshake(_, _)))
.collect();
assert!(
placements.is_empty(),
"pcomm-coalesced WorkSpec spawn MUST NOT route through \
place_task_during_handshake (post-spawn move_tasks owns \
worker placement); got: {calls:?}"
);
let move_tasks: Vec<&CgroupCall> = calls
.iter()
.filter(|c| matches!(c, CgroupCall::MoveTasks(name, _) if name == "cg_pcomm"))
.collect();
assert!(
!move_tasks.is_empty(),
"pcomm-coalesced WorkSpec spawn MUST record at least one \
MoveTasks call for 'cg_pcomm' (post-spawn placement); \
got: {calls:?}"
);
cleanup_state(&mut state);
}
#[test]
fn runpayload_placement_uses_def_name_not_resolved_path() {
use crate::test_support::{OutputFormat, Payload, PayloadKind};
static TRUE_BIN: Payload = Payload {
name: "true_bin_t_f_namecheck",
kind: PayloadKind::Binary("/bin/true"),
output: OutputFormat::ExitCode,
default_args: &[],
default_checks: &[],
metrics: &[],
include_files: &[],
uses_parent_pgrp: false,
known_flags: None,
metric_bounds: None,
};
mock_setup_state!(mock, topo, ctx, state);
let ops = vec![Op::RunPayload {
payload: &TRUE_BIN,
args: vec![],
cgroup: Some("namecheck_cg".into()),
}];
apply_ops_test(&ctx, &mut state, &ops)
.expect("apply_ops Op::RunPayload { cgroup: Some(_) } must succeed");
let calls = mock.calls();
let placement_name = calls
.iter()
.find_map(|c| match c {
CgroupCall::PlaceTaskDuringHandshake(name, _) => Some(name.clone()),
_ => None,
})
.expect("Op::RunPayload { cgroup: Some(_) } must record a PlaceTaskDuringHandshake");
assert_eq!(
placement_name, "namecheck_cg",
"trait method must receive the raw user-facing cgroup name verbatim; \
received {placement_name:?}"
);
assert!(
!placement_name.contains('/'),
"trait method must receive a bare name, not a path with '/': \
received {placement_name:?}"
);
assert!(
!placement_name.contains("mock"),
"trait method must NOT receive any substring of the mock's parent path \
(/mock/cgroup); received {placement_name:?}"
);
cleanup_state(&mut state);
}
#[test]
fn op_runpayload_unknown_cgroup_currently_silently_places_via_trait() {
use crate::test_support::{OutputFormat, Payload, PayloadKind};
static TRUE_BIN: Payload = Payload {
name: "true_bin_unknown_cg",
kind: PayloadKind::Binary("/bin/true"),
output: OutputFormat::ExitCode,
default_args: &[],
default_checks: &[],
metrics: &[],
include_files: &[],
uses_parent_pgrp: false,
known_flags: None,
metric_bounds: None,
};
mock_setup_state!(mock, topo, ctx, state);
let ops = vec![Op::RunPayload {
payload: &TRUE_BIN,
args: vec![],
cgroup: Some("cg_never_added".into()),
}];
apply_ops_test(&ctx, &mut state, &ops)
.expect("apply_ops must currently succeed even for an unregistered cgroup name");
let calls = mock.calls();
let placements: Vec<&CgroupCall> = calls
.iter()
.filter(|c| matches!(c, CgroupCall::PlaceTaskDuringHandshake(_, _)))
.collect();
assert_eq!(
placements.len(),
1,
"current behavior: trait method fires against the unknown name \
with no upfront framework-level check; got: {calls:?}"
);
match placements[0] {
CgroupCall::PlaceTaskDuringHandshake(name, _) => {
assert_eq!(
name, "cg_never_added",
"name must be the unknown name verbatim — no silent renaming or path-stripping"
);
}
other => panic!("expected PlaceTaskDuringHandshake, got {other:?}"),
}
cleanup_state(&mut state);
}
#[test]
fn apply_ops_wait_unknown_payload_bails() {
mock_setup_state!(mock, topo, ctx, state);
let err = apply_ops_test(
&ctx,
&mut state,
&[Op::WaitPayload {
name: "ghost".into(),
cgroup: None,
}],
)
.unwrap_err();
let msg = format!("{err:#}");
assert!(
msg.contains("no running payload named 'ghost'"),
"error must name the missing payload, got: {msg}"
);
}
#[test]
fn apply_ops_kill_unknown_payload_bails() {
mock_setup_state!(mock, topo, ctx, state);
let err = apply_ops_test(
&ctx,
&mut state,
&[Op::KillPayload {
name: "ghost".into(),
cgroup: None,
}],
)
.unwrap_err();
let msg = format!("{err:#}");
assert!(
msg.contains("no running payload named 'ghost'"),
"error must name the missing payload, got: {msg}"
);
}
#[test]
fn apply_ops_attach_scheduler_bails_when_staged_binary_missing() {
static SCHED: crate::test_support::Scheduler = crate::test_support::Scheduler::EEVDF;
mock_setup_state!(mock, topo, ctx, state);
let cb: crate::scenario::snapshot::CaptureCallback = std::sync::Arc::new(|_| None);
let _bridge_guard = crate::scenario::snapshot::SnapshotBridge::new(cb).set_thread_local();
let err = apply_ops_test(&ctx, &mut state, &[Op::attach_scheduler(&SCHED)]).unwrap_err();
let msg = format!("{err:#}");
assert!(
msg.contains("Op::AttachScheduler"),
"error must name the op (catches copy-paste regression across the 4 arms): {msg}"
);
assert!(
msg.contains("staging") || msg.contains("staged"),
"error must point at the staging pipeline so the operator knows where to look: {msg}"
);
assert!(
msg.contains("eevdf"),
"error must include the scheduler name so the operator can identify which entry: {msg}"
);
}
#[test]
fn apply_ops_detach_scheduler_bails_when_no_scheduler_attached() {
mock_setup_state!(mock, topo, ctx, state);
let err = apply_ops_test(&ctx, &mut state, &[Op::detach_scheduler()]).unwrap_err();
let msg = format!("{err:#}");
assert!(
msg.contains("Op::DetachScheduler"),
"error must name the op: {msg}"
);
assert!(
msg.contains("no scheduler attached") || msg.contains("SCHED_PID"),
"error must name the no-scheduler failure mode: {msg}"
);
}
#[test]
fn apply_ops_restart_scheduler_bails_when_no_scheduler_attached() {
mock_setup_state!(mock, topo, ctx, state);
let err = apply_ops_test(&ctx, &mut state, &[Op::restart_scheduler()]).unwrap_err();
let msg = format!("{err:#}");
assert!(
msg.contains("Op::RestartScheduler"),
"error must name the op: {msg}"
);
assert!(
msg.contains("no scheduler attached") || msg.contains("SCHED_PID"),
"error must name the no-scheduler failure mode: {msg}"
);
}
#[test]
fn apply_ops_replace_scheduler_bails_when_no_scheduler_attached() {
static SCHED: crate::test_support::Scheduler = crate::test_support::Scheduler::EEVDF;
mock_setup_state!(mock, topo, ctx, state);
let err = apply_ops_test(&ctx, &mut state, &[Op::replace_scheduler(&SCHED)]).unwrap_err();
let msg = format!("{err:#}");
assert!(
msg.contains("Op::ReplaceScheduler"),
"error must name the op: {msg}"
);
assert!(
msg.contains("no scheduler attached") || msg.contains("SCHED_PID"),
"error must name the no-scheduler failure mode (detach phase fails fast): {msg}"
);
}
#[test]
fn staged_scheduler_log_path_is_per_name_and_seq_keyed() {
let a1 = staged_scheduler_log_path("scx_mitosis_a");
let a2 = staged_scheduler_log_path("scx_mitosis_a");
let b1 = staged_scheduler_log_path("scx_mitosis_b");
assert_ne!(a1, a2, "same-name consecutive calls must differ via seq");
assert_ne!(a1, b1, "different names must produce distinct paths");
assert!(
a1.starts_with("/tmp/sched_scx_mitosis_a_"),
"missing name + underscore prefix: {a1}"
);
assert!(a1.ends_with(".log"), "missing .log extension: {a1}");
let seq_part = a1
.strip_prefix("/tmp/sched_scx_mitosis_a_")
.unwrap()
.strip_suffix(".log")
.unwrap();
assert!(
seq_part.chars().all(|c| c.is_ascii_digit()),
"seq suffix must be all digits: {seq_part:?}"
);
}
#[test]
fn apply_ops_run_then_kill_consumes_handle() {
use crate::test_support::{OutputFormat, Payload, PayloadKind};
static SLEEP: Payload = Payload {
name: "sleeper",
kind: PayloadKind::Binary("/bin/sleep"),
output: OutputFormat::ExitCode,
default_args: &[],
default_checks: &[],
metrics: &[],
include_files: &[],
uses_parent_pgrp: false,
known_flags: None,
metric_bounds: None,
};
mock_setup_state!(mock, topo, ctx, state);
apply_ops_test(&ctx, &mut state, &[Op::run_payload(&SLEEP, ["3600"])])
.expect("spawn /bin/sleep");
assert_eq!(state.payload_handles.len(), 1, "one payload is live");
assert_eq!(state.payload_handles[0].handle.payload_name(), "sleeper");
apply_ops_test(&ctx, &mut state, &[Op::kill_payload("sleeper")])
.expect("kill the live payload");
assert!(
state.payload_handles.is_empty(),
"handle must be consumed by KillPayload"
);
}
#[test]
fn apply_ops_run_duplicate_payload_name_bails() {
use crate::test_support::{OutputFormat, Payload, PayloadKind};
static SLEEP: Payload = Payload {
name: "sleeper",
kind: PayloadKind::Binary("/bin/sleep"),
output: OutputFormat::ExitCode,
default_args: &[],
default_checks: &[],
metrics: &[],
include_files: &[],
uses_parent_pgrp: false,
known_flags: None,
metric_bounds: None,
};
mock_setup_state!(mock, topo, ctx, state);
apply_ops_test(&ctx, &mut state, &[Op::run_payload(&SLEEP, ["3600"])]).expect("first spawn");
let err = apply_ops_test(&ctx, &mut state, &[Op::run_payload(&SLEEP, ["3600"])]).unwrap_err();
let msg = format!("{err:#}");
assert!(
msg.contains("payload 'sleeper' already running"),
"error must flag the duplicate, got: {msg}"
);
assert!(
msg.contains("Op::RunPayload"),
"dup error must name the originating surface, got: {msg}"
);
assert!(
msg.contains("(no cgroup)"),
"empty-cgroup key must render as '(no cgroup)', got: {msg}"
);
assert!(
!msg.contains("cgroup ''"),
"empty-cgroup key must not render as quoted empty, got: {msg}"
);
assert_eq!(
state.payload_handles.len(),
1,
"second spawn must not add a handle on failure"
);
apply_ops_test(&ctx, &mut state, &[Op::kill_payload("sleeper")]).expect("teardown kill");
}
#[test]
fn apply_ops_run_rejects_payload_already_owned_by_cgroup_def() {
use crate::test_support::{OutputFormat, Payload, PayloadKind};
static SLEEP: Payload = Payload {
name: "sleeper",
kind: PayloadKind::Binary("/bin/sleep"),
output: OutputFormat::ExitCode,
default_args: &[],
default_checks: &[],
metrics: &[],
include_files: &[],
uses_parent_pgrp: false,
known_flags: None,
metric_bounds: None,
};
mock_setup_state!(mock, topo, ctx, state);
let h = crate::scenario::payload_run::PayloadRun::new(&ctx, &SLEEP)
.args(["3600".to_string()])
.spawn()
.expect("manual def-source spawn");
state.payload_handles.push(PayloadEntry {
cgroup: "def_cg".to_string(),
source: PayloadSource::CgroupDefWorkload,
handle: h,
});
let err = apply_ops_test(
&ctx,
&mut state,
&[Op::run_payload_in_cgroup(&SLEEP, ["1"], "def_cg")],
)
.unwrap_err();
let msg = format!("{err:#}");
assert!(
msg.contains("CgroupDef::workload"),
"dup error must name the def-source surface, got: {msg}"
);
assert!(
msg.contains("'def_cg'"),
"dup error must name the cgroup the live handle is in, got: {msg}"
);
assert_eq!(state.payload_handles.len(), 1);
apply_ops_test(
&ctx,
&mut state,
&[Op::kill_payload_in_cgroup("sleeper", "def_cg")],
)
.expect("teardown kill");
}
#[test]
fn render_cgroup_key_handles_empty_and_populated() {
assert_eq!(render_cgroup_key(""), "(no cgroup)");
assert_eq!(render_cgroup_key("cg_a"), "'cg_a'");
}
#[test]
fn execute_steps_with_early_validation_err_has_nothing_to_drain() {
use crate::cgroup::CgroupManager;
let cgroups = CgroupManager::new("/nonexistent");
let topo = mock_topo();
let ctx = crate::scenario::Ctx::builder(&cgroups, &topo).build();
let step = Step::new(vec![], HoldSpec::Frac(0.0));
let err = execute_steps_with(&ctx, vec![step], None).unwrap_err();
let msg = format!("{err:#}");
assert!(
msg.contains("hold validation") && msg.contains("Frac"),
"expected pre-ops validation err, got: {msg}"
);
}
#[test]
fn apply_ops_error_does_not_lose_live_payload_handles() {
use crate::test_support::{OutputFormat, Payload, PayloadKind};
static SLEEP: Payload = Payload {
name: "sleeper_drain",
kind: PayloadKind::Binary("/bin/sleep"),
output: OutputFormat::ExitCode,
default_args: &[],
default_checks: &[],
metrics: &[],
include_files: &[],
uses_parent_pgrp: false,
known_flags: None,
metric_bounds: None,
};
mock_setup_state!(mock, topo, ctx, state);
apply_ops_test(&ctx, &mut state, &[Op::run_payload(&SLEEP, ["3600"])]).expect("spawn");
assert_eq!(state.payload_handles.len(), 1);
let err = apply_ops_test(&ctx, &mut state, &[Op::wait_payload("never_spawned")]).unwrap_err();
assert!(
format!("{err:#}").contains("no running payload named 'never_spawned'"),
"expected wait-unknown-name err",
);
drain_all_payload_handles(&mut state.payload_handles);
assert!(state.payload_handles.is_empty());
}
#[test]
fn remove_and_stop_cgroup_permit_backdrop_target_from_step() {
mock_setup_backdrop!(mock, topo, ctx, step_state, backdrop_state);
backdrop_state
.cgroups
.add_cgroup_no_cpuset("bd_cg")
.expect("add backdrop cgroup");
{
let mut scenario = ScenarioState::new(&mut step_state, &mut backdrop_state);
apply_ops(&ctx, &mut scenario, &[Op::remove_cgroup("bd_cg")], false)
.expect("step-local RemoveCgroup permitted against Backdrop target");
}
let calls = mock.calls();
assert!(
calls
.iter()
.any(|c| matches!(c, CgroupCall::RemoveCgroup(n) if n == "bd_cg")),
"step-local remove must reach the cgroup ops, got: {calls:?}"
);
assert!(
!backdrop_state.cgroups.names().iter().any(|n| n == "bd_cg"),
"post-RemoveCgroup must drop backdrop tracking entry, got: {:?}",
backdrop_state.cgroups.names()
);
{
let mut scenario = ScenarioState::new(&mut step_state, &mut backdrop_state);
apply_ops(&ctx, &mut scenario, &[Op::add_cgroup("bd_cg")], false)
.expect("AddCgroup with previously-removed name must succeed");
}
backdrop_state
.cgroups
.add_cgroup_no_cpuset("bd_cg_2")
.expect("add second backdrop cgroup");
{
let mut scenario = ScenarioState::new(&mut step_state, &mut backdrop_state);
apply_ops(&ctx, &mut scenario, &[Op::stop_cgroup("bd_cg_2")], false)
.expect("step-local StopCgroup permitted against Backdrop target");
}
backdrop_state
.cgroups
.add_cgroup_no_cpuset("bd_cg_3")
.expect("add third backdrop cgroup");
{
let mut scenario = ScenarioState::new(&mut step_state, &mut backdrop_state);
scenario
.with_target_backdrop(|s| apply_ops(&ctx, s, &[Op::remove_cgroup("bd_cg_3")], false))
.expect("backdrop-pass RemoveCgroup permitted against Backdrop target");
}
cleanup_state(&mut step_state);
}
#[test]
fn move_all_tasks_transfers_handle_ownership_step_to_backdrop() {
use crate::workload::{WorkSpec, WorkloadConfig, WorkloadHandle};
mock_setup_backdrop!(mock, topo, ctx, step_state, backdrop_state);
backdrop_state
.cgroups
.add_cgroup_no_cpuset("bd_cg")
.unwrap();
step_state.cgroups.add_cgroup_no_cpuset("step_cg").unwrap();
let w = WorkSpec::default();
let wl = WorkloadConfig::for_scenario_engine(
&w,
1,
crate::workload::AffinityIntent::Inherit,
w.work_type.clone(),
)
.expect(
"test fixture: pcomm must stay None for scenario-engine dispatch — \
if a future fixture variant sets pcomm, route via spawn_pcomm_cgroup \
instead of for_scenario_engine",
);
let h = WorkloadHandle::spawn(&wl).expect("spawn worker");
step_state.handles.push(("step_cg".to_string(), h));
assert_eq!(step_state.handles.len(), 1);
assert_eq!(backdrop_state.handles.len(), 0);
{
let mut scenario = ScenarioState::new(&mut step_state, &mut backdrop_state);
apply_ops(
&ctx,
&mut scenario,
&[Op::move_all_tasks("step_cg", "bd_cg")],
false,
)
.expect("move into backdrop");
}
assert_eq!(
step_state.handles.len(),
0,
"step-local handle must leave the step slot after transfer",
);
assert_eq!(
backdrop_state.handles.len(),
1,
"backdrop slot must receive the transferred handle",
);
assert_eq!(
backdrop_state.handles[0].0, "bd_cg",
"transferred handle must be re-keyed to `to`",
);
backdrop_state.handles.clear();
step_state.handles.clear();
}
#[test]
fn move_all_tasks_step_to_step_keeps_step_ownership() {
use crate::workload::{WorkSpec, WorkloadConfig, WorkloadHandle};
mock_setup_backdrop!(mock, topo, ctx, step_state, backdrop_state);
step_state.cgroups.add_cgroup_no_cpuset("src").unwrap();
step_state.cgroups.add_cgroup_no_cpuset("dst").unwrap();
let w = WorkSpec::default();
let wl = WorkloadConfig::for_scenario_engine(
&w,
1,
crate::workload::AffinityIntent::Inherit,
w.work_type.clone(),
)
.expect(
"test fixture: pcomm must stay None for scenario-engine dispatch — \
if a future fixture variant sets pcomm, route via spawn_pcomm_cgroup \
instead of for_scenario_engine",
);
let h = WorkloadHandle::spawn(&wl).expect("spawn");
step_state.handles.push(("src".to_string(), h));
{
let mut scenario = ScenarioState::new(&mut step_state, &mut backdrop_state);
apply_ops(
&ctx,
&mut scenario,
&[Op::move_all_tasks("src", "dst")],
false,
)
.expect("step-to-step move");
}
assert_eq!(step_state.handles.len(), 1);
assert_eq!(step_state.handles[0].0, "dst");
assert_eq!(backdrop_state.handles.len(), 0);
step_state.handles.clear();
}
#[test]
fn move_all_tasks_backdrop_to_step_rejected() {
mock_setup_backdrop!(mock, topo, ctx, step_state, backdrop_state);
backdrop_state.cgroups.add_cgroup_no_cpuset("bd").unwrap();
step_state.cgroups.add_cgroup_no_cpuset("step").unwrap();
let mut scenario = ScenarioState::new(&mut step_state, &mut backdrop_state);
let err = apply_ops(
&ctx,
&mut scenario,
&[Op::move_all_tasks("bd", "step")],
false,
)
.unwrap_err();
let msg = format!("{err:#}");
assert!(
msg.contains("Backdrop-owned 'bd'") && msg.contains("step-local 'step'"),
"error must name both cgroups and the direction, got: {msg}"
);
let calls = mock.calls();
assert!(
!calls
.iter()
.any(|c| matches!(c, CgroupCall::MoveTasks(_, _))),
"pre-bail path must not invoke move_tasks, got: {calls:?}"
);
}
#[test]
fn move_all_tasks_self_move_bails() {
mock_setup_backdrop!(mock, topo, ctx, step_state, backdrop_state);
step_state.cgroups.add_cgroup_no_cpuset("cg").unwrap();
let mut scenario = ScenarioState::new(&mut step_state, &mut backdrop_state);
let err = apply_ops(
&ctx,
&mut scenario,
&[Op::move_all_tasks("cg", "cg")],
false,
)
.unwrap_err();
let msg = format!("{err:#}");
assert!(
msg.contains("Op::MoveAllTasks") && msg.contains("self-move"),
"error must name the op + the self-move shape, got: {msg}"
);
assert!(
msg.contains("'cg'"),
"error must quote the colliding name so the operator sees which side to change, got: {msg}"
);
let calls = mock.calls();
assert!(
!calls
.iter()
.any(|c| matches!(c, CgroupCall::MoveTasks(_, _))),
"self-move bail must not invoke move_tasks, got: {calls:?}"
);
}
#[test]
fn move_all_tasks_self_move_bails_on_empty_string_pair() {
mock_setup_backdrop!(mock, topo, ctx, step_state, backdrop_state);
let mut scenario = ScenarioState::new(&mut step_state, &mut backdrop_state);
let err = apply_ops(&ctx, &mut scenario, &[Op::move_all_tasks("", "")], false).unwrap_err();
let msg = format!("{err:#}");
assert!(
msg.contains("self-move"),
"empty-string pair must hit the same self-move bail path, got: {msg}"
);
}
#[test]
fn move_all_tasks_unknown_dst_warn_dumps_tracked_cgroups() {
let events = capture_tracing_events(|| {
mock_setup_backdrop!(mock, topo, ctx, step_state, backdrop_state);
backdrop_state
.cgroups
.add_cgroup_no_cpuset("bd_real_name")
.expect("add backdrop cgroup");
step_state
.cgroups
.add_cgroup_no_cpuset("step_local_real")
.expect("add step-local cgroup");
let mut scenario = ScenarioState::new(&mut step_state, &mut backdrop_state);
apply_ops(
&ctx,
&mut scenario,
&[Op::move_all_tasks("src", "dts")],
false,
)
.expect("typo'd-dst MoveAllTasks must succeed (warn-then-attempt)");
});
let warns: Vec<&(tracing::Level, String)> = events
.iter()
.filter(|(lvl, _)| *lvl == tracing::Level::WARN)
.collect();
assert_eq!(
warns.len(),
1,
"exactly one typo-warn expected on unknown dst; got: {warns:?}",
);
let body = &warns[0].1;
assert!(
body.contains("matches no step-local"),
"warn must use the typo-late-surfacing phrasing, got: {body:?}",
);
assert!(
body.contains("Op::MoveAllTasks"),
"warn must name the op so the operator can grep, got: {body:?}",
);
assert!(
body.contains("dts"),
"warn must include the typo'd destination name, got: {body:?}",
);
assert!(
body.contains("bd_real_name"),
"warn must dump backdrop_cgroups so the operator sees real names, got: {body:?}",
);
assert!(
body.contains("step_local_real"),
"warn must dump step_cgroups so the operator sees real names, got: {body:?}",
);
}
#[test]
fn move_all_tasks_emits_no_typo_warn_when_dst_tracked() {
let events = capture_tracing_events(|| {
mock_setup_backdrop!(mock, topo, ctx, step_state, backdrop_state);
step_state.cgroups.add_cgroup_no_cpuset("src").unwrap();
step_state.cgroups.add_cgroup_no_cpuset("dst").unwrap();
let mut scenario = ScenarioState::new(&mut step_state, &mut backdrop_state);
apply_ops(
&ctx,
&mut scenario,
&[Op::move_all_tasks("src", "dst")],
false,
)
.expect("tracked-dst MoveAllTasks must succeed without warn");
});
let move_warns: Vec<&(tracing::Level, String)> = events
.iter()
.filter(|(lvl, body)| {
*lvl == tracing::Level::WARN
&& body.contains("Op::MoveAllTasks")
&& body.contains("matches no")
})
.collect();
assert!(
move_warns.is_empty(),
"happy-path move must emit zero typo-warns, got: {move_warns:?}",
);
}
#[test]
fn move_all_tasks_self_move_bails_with_handles_present_no_kernel_call() {
use crate::workload::{WorkSpec, WorkloadConfig, WorkloadHandle};
mock_setup_backdrop!(mock, topo, ctx, step_state, backdrop_state);
step_state.cgroups.add_cgroup_no_cpuset("cg").unwrap();
let w = WorkSpec::default();
let wl = WorkloadConfig::for_scenario_engine(
&w,
1,
crate::workload::AffinityIntent::Inherit,
w.work_type.clone(),
)
.expect(
"test fixture: pcomm must stay None for scenario-engine dispatch — \
if a future fixture variant sets pcomm, route via spawn_pcomm_cgroup \
instead of for_scenario_engine",
);
let h = WorkloadHandle::spawn(&wl).expect("spawn");
step_state.handles.push(("cg".to_string(), h));
let mut scenario = ScenarioState::new(&mut step_state, &mut backdrop_state);
let err = apply_ops(
&ctx,
&mut scenario,
&[Op::move_all_tasks("cg", "cg")],
false,
)
.unwrap_err();
let msg = format!("{err:#}");
assert!(
msg.contains("self-move"),
"self-move bail must fire ahead of the handle walk, got: {msg}",
);
let calls = mock.calls();
assert!(
!calls
.iter()
.any(|c| matches!(c, CgroupCall::MoveTasks(_, _))),
"bail-before-handle-walk ordering must skip every move_tasks call \
even with a matching handle present, got: {calls:?}"
);
step_state.handles.clear();
}
#[test]
fn move_all_tasks_unknown_dst_with_handles_does_attempt_kernel_call() {
use crate::workload::{WorkSpec, WorkloadConfig, WorkloadHandle};
mock_setup_backdrop!(mock, topo, ctx, step_state, backdrop_state);
backdrop_state
.cgroups
.add_cgroup_no_cpuset("bd_real_name")
.expect("add backdrop cgroup");
step_state.cgroups.add_cgroup_no_cpuset("src").unwrap();
let w = WorkSpec::default();
let wl = WorkloadConfig::for_scenario_engine(
&w,
1,
crate::workload::AffinityIntent::Inherit,
w.work_type.clone(),
)
.expect(
"test fixture: pcomm must stay None for scenario-engine dispatch — \
if a future fixture variant sets pcomm, route via spawn_pcomm_cgroup \
instead of for_scenario_engine",
);
let h = WorkloadHandle::spawn(&wl).expect("spawn");
step_state.handles.push(("src".to_string(), h));
let mut scenario = ScenarioState::new(&mut step_state, &mut backdrop_state);
apply_ops(
&ctx,
&mut scenario,
&[Op::move_all_tasks("src", "dts")],
false,
)
.expect("warn-then-attempt path must succeed against the mock");
let calls = mock.calls();
let move_calls: Vec<&CgroupCall> = calls
.iter()
.filter(|c| matches!(c, CgroupCall::MoveTasks(_, _)))
.collect();
assert_eq!(
move_calls.len(),
1,
"warn-is-informational contract: exactly one move_tasks call \
against the typo'd dst, got: {calls:?}",
);
match &move_calls[0] {
CgroupCall::MoveTasks(dst, _) => assert_eq!(
dst, "dts",
"the warn doesn't intercept; move_tasks fires against the typo'd dst verbatim",
),
_ => unreachable!(),
}
step_state.handles.clear();
}
#[test]
fn run_scenario_rejects_scheduler_kind_backdrop_payload() {
use crate::cgroup::CgroupManager;
use crate::test_support::Payload;
let cgroups = CgroupManager::new("/nonexistent");
let topo = mock_topo();
let ctx = crate::scenario::Ctx::builder(&cgroups, &topo).build();
let backdrop =
crate::scenario::backdrop::Backdrop::new().push_payload(&Payload::KERNEL_DEFAULT);
let err = execute_scenario_with(
&ctx,
backdrop,
vec![Step::new(vec![], HoldSpec::fixed(Duration::from_millis(1)))],
None,
)
.unwrap_err();
let msg = format!("{err:#}");
assert!(
msg.contains("scheduler-kind") && msg.contains("Backdrop"),
"error must name the kind mismatch and the Backdrop surface, got: {msg}"
);
}
#[test]
fn apply_setup_rejects_name_collision_with_backdrop() {
mock_setup_backdrop!(mock, topo, ctx, step_state, backdrop_state);
backdrop_state
.cgroups
.add_cgroup_no_cpuset("shared")
.unwrap();
let defs = vec![CgroupDef::named("shared").workers(1)];
let mut scenario = ScenarioState::new(&mut step_state, &mut backdrop_state);
let err = apply_setup(&ctx, &mut scenario, &defs).unwrap_err();
let msg = format!("{err:#}");
assert!(
msg.contains("already tracked") && msg.contains("shared"),
"error must cite the collision and the offending name, got: {msg}"
);
cleanup_state(&mut step_state);
}
fn push_fake_payload_entry<'a>(
ctx: &'a Ctx<'a>,
state: &mut StepState<'a>,
payload: &'static crate::test_support::Payload,
cgroup: &str,
source: PayloadSource,
) {
let h = crate::scenario::payload_run::PayloadRun::new(ctx, payload)
.args(["3600".to_string()])
.spawn()
.expect("manual spawn (no cgroup placement)");
state.payload_handles.push(PayloadEntry {
cgroup: cgroup.to_string(),
source,
handle: h,
});
}
#[test]
fn apply_ops_run_duplicate_name_different_cgroups_allowed() {
use crate::test_support::{OutputFormat, Payload, PayloadKind};
static SLEEP: Payload = Payload {
name: "sleeper",
kind: PayloadKind::Binary("/bin/sleep"),
output: OutputFormat::ExitCode,
default_args: &[],
default_checks: &[],
metrics: &[],
include_files: &[],
uses_parent_pgrp: false,
known_flags: None,
metric_bounds: None,
};
mock_setup_state!(mock, topo, ctx, state);
push_fake_payload_entry(
&ctx,
&mut state,
&SLEEP,
"cg_a",
PayloadSource::OpRunPayload,
);
push_fake_payload_entry(
&ctx,
&mut state,
&SLEEP,
"cg_b",
PayloadSource::OpRunPayload,
);
let mut backdrop = BackdropState::empty(&ctx);
let scenario = ScenarioState::new(&mut state, &mut backdrop);
assert!(
scenario
.find_live_payload_with_cgroup("sleeper", "cg_c")
.is_none(),
"fresh (name, cgroup) pair must not collide with live entries in other cgroups",
);
assert!(
scenario
.find_live_payload_with_cgroup("sleeper", "cg_a")
.is_some(),
"same (name, cgroup) still matches — only the pair matters",
);
cleanup_state(&mut state);
}
#[test]
fn take_payload_by_composite_key_matches_exact_cgroup() {
use crate::test_support::{OutputFormat, Payload, PayloadKind};
static SLEEP: Payload = Payload {
name: "sleeper",
kind: PayloadKind::Binary("/bin/sleep"),
output: OutputFormat::ExitCode,
default_args: &[],
default_checks: &[],
metrics: &[],
include_files: &[],
uses_parent_pgrp: false,
known_flags: None,
metric_bounds: None,
};
mock_setup_state!(mock, topo, ctx, state);
push_fake_payload_entry(
&ctx,
&mut state,
&SLEEP,
"cg_a",
PayloadSource::OpRunPayload,
);
push_fake_payload_entry(
&ctx,
&mut state,
&SLEEP,
"cg_b",
PayloadSource::OpRunPayload,
);
let mut backdrop = BackdropState::empty(&ctx);
let mut scenario = ScenarioState::new(&mut state, &mut backdrop);
let taken = scenario
.take_payload_by_name("sleeper", Some("cg_a"))
.expect("composite lookup does not bail on ambiguity")
.expect("one entry matches");
assert_eq!(taken.cgroup, "cg_a");
assert_eq!(state.payload_handles.len(), 1);
assert_eq!(state.payload_handles[0].cgroup, "cg_b");
drain_all_payload_handles(&mut state.payload_handles);
let _ = taken.handle.kill();
}
#[test]
fn take_payload_by_bare_name_reports_ambiguous_cgroups() {
use crate::test_support::{OutputFormat, Payload, PayloadKind};
static SLEEP: Payload = Payload {
name: "sleeper",
kind: PayloadKind::Binary("/bin/sleep"),
output: OutputFormat::ExitCode,
default_args: &[],
default_checks: &[],
metrics: &[],
include_files: &[],
uses_parent_pgrp: false,
known_flags: None,
metric_bounds: None,
};
mock_setup_state!(mock, topo, ctx, state);
push_fake_payload_entry(
&ctx,
&mut state,
&SLEEP,
"cg_a",
PayloadSource::OpRunPayload,
);
push_fake_payload_entry(
&ctx,
&mut state,
&SLEEP,
"cg_b",
PayloadSource::OpRunPayload,
);
let mut backdrop = BackdropState::empty(&ctx);
let mut scenario = ScenarioState::new(&mut state, &mut backdrop);
let err = match scenario.take_payload_by_name("sleeper", None) {
Err(cgroups) => cgroups,
Ok(_) => panic!("bare lookup over multi-copy must surface ambiguity"),
};
assert_eq!(err.len(), 2);
assert!(err.contains(&"cg_a".to_string()) && err.contains(&"cg_b".to_string()));
assert_eq!(state.payload_handles.len(), 2);
drain_all_payload_handles(&mut state.payload_handles);
}
#[test]
fn take_payload_by_bare_name_succeeds_on_single_copy() {
use crate::test_support::{OutputFormat, Payload, PayloadKind};
static SLEEP: Payload = Payload {
name: "sleeper",
kind: PayloadKind::Binary("/bin/sleep"),
output: OutputFormat::ExitCode,
default_args: &[],
default_checks: &[],
metrics: &[],
include_files: &[],
uses_parent_pgrp: false,
known_flags: None,
metric_bounds: None,
};
mock_setup_state!(mock, topo, ctx, state);
push_fake_payload_entry(
&ctx,
&mut state,
&SLEEP,
"cg_a",
PayloadSource::OpRunPayload,
);
let mut backdrop = BackdropState::empty(&ctx);
let mut scenario = ScenarioState::new(&mut state, &mut backdrop);
let taken = scenario
.take_payload_by_name("sleeper", None)
.expect("single-copy bare lookup returns Ok")
.expect("one entry matches");
assert_eq!(taken.cgroup, "cg_a");
assert!(state.payload_handles.is_empty());
let _ = taken.handle.kill();
}
#[test]
fn apply_ops_bare_wait_and_kill_ambiguity_hint_names_full_constructor() {
use crate::test_support::{OutputFormat, Payload, PayloadKind};
static SLEEP: Payload = Payload {
name: "sleeper",
kind: PayloadKind::Binary("/bin/sleep"),
output: OutputFormat::ExitCode,
default_args: &[],
default_checks: &[],
metrics: &[],
include_files: &[],
uses_parent_pgrp: false,
known_flags: None,
metric_bounds: None,
};
mock_setup!(mock, topo, ctx);
let mut state = StepState::empty(&ctx);
push_fake_payload_entry(
&ctx,
&mut state,
&SLEEP,
"cg_a",
PayloadSource::OpRunPayload,
);
push_fake_payload_entry(
&ctx,
&mut state,
&SLEEP,
"cg_b",
PayloadSource::OpRunPayload,
);
let err = apply_ops_test(&ctx, &mut state, &[Op::wait_payload("sleeper")]).unwrap_err();
let msg = format!("{err:#}");
assert!(
msg.contains("ambiguous"),
"wait ambiguity message must flag ambiguity, got: {msg}"
);
assert!(
msg.contains("Op::wait_payload_in_cgroup(name, cgroup)"),
"wait ambiguity hint must name the full snake_case constructor \
so a copy-paste into source compiles, got: {msg}"
);
drain_all_payload_handles(&mut state.payload_handles);
let mut state = StepState::empty(&ctx);
push_fake_payload_entry(
&ctx,
&mut state,
&SLEEP,
"cg_a",
PayloadSource::OpRunPayload,
);
push_fake_payload_entry(
&ctx,
&mut state,
&SLEEP,
"cg_b",
PayloadSource::OpRunPayload,
);
let err = apply_ops_test(&ctx, &mut state, &[Op::kill_payload("sleeper")]).unwrap_err();
let msg = format!("{err:#}");
assert!(
msg.contains("Op::kill_payload_in_cgroup(name, cgroup)"),
"kill ambiguity hint must name the full snake_case constructor, got: {msg}"
);
drain_all_payload_handles(&mut state.payload_handles);
}
#[test]
fn apply_ops_not_found_message_uses_gerund_verb() {
mock_setup_state!(mock, topo, ctx, state);
let err = apply_ops_test(&ctx, &mut state, &[Op::wait_payload("ghost")]).unwrap_err();
let msg = format!("{err:#}");
assert!(
msg.contains("before waiting"),
"wait not-found message must say 'before waiting', got: {msg}"
);
assert!(
!msg.contains("before waitpayload"),
"must not collapse 'wait payload' into 'waitpayload', got: {msg}"
);
let err = apply_ops_test(&ctx, &mut state, &[Op::kill_payload("ghost")]).unwrap_err();
let msg = format!("{err:#}");
assert!(
msg.contains("before killing"),
"kill not-found message must say 'before killing', got: {msg}"
);
}
#[test]
fn remove_cgroup_forgets_name_so_drop_does_not_double_rmdir() {
mock_setup_state!(mock, topo, ctx, state);
apply_ops_test(
&ctx,
&mut state,
&[Op::add_cgroup("cg_keep"), Op::add_cgroup("cg_drop")],
)
.unwrap();
apply_ops_test(&ctx, &mut state, &[Op::remove_cgroup("cg_drop")]).unwrap();
assert_eq!(
state.cgroups.names(),
&["cg_keep".to_string()],
"Op::RemoveCgroup must prune the removed name from \
CgroupGroup::names so a later AddCgroup with the same \
name can re-create the cgroup without colliding against \
a stale tracking entry",
);
drop(state);
let calls = mock.calls();
let cg_drop_removes: Vec<&CgroupCall> = calls
.iter()
.filter(|c| matches!(c, CgroupCall::RemoveCgroup(n) if n == "cg_drop"))
.collect();
assert_eq!(
cg_drop_removes.len(),
1,
"Op::RemoveCgroup must be the sole rmdir dispatcher for \
cg_drop; Drop must not re-issue rmdir against a forgotten \
name: {calls:?}",
);
let cg_keep_removes: Vec<&CgroupCall> = calls
.iter()
.filter(|c| matches!(c, CgroupCall::RemoveCgroup(n) if n == "cg_keep"))
.collect();
assert_eq!(
cg_keep_removes.len(),
1,
"Drop must rmdir the surviving cg_keep exactly once: {calls:?}",
);
}
fn capture_tracing_events<F: FnOnce()>(f: F) -> Vec<(tracing::Level, String)> {
use std::sync::{Arc, Mutex};
use tracing::field::{Field, Visit};
use tracing::span::{Attributes, Id, Record};
use tracing::{Event, Level, Metadata, Subscriber};
#[derive(Default)]
struct CaptureSubscriber {
events: Arc<Mutex<Vec<(Level, String)>>>,
}
struct MessageVisitor<'a>(&'a mut String);
impl<'a> Visit for MessageVisitor<'a> {
fn record_debug(&mut self, _field: &Field, value: &dyn std::fmt::Debug) {
use std::fmt::Write;
let _ = write!(self.0, "{value:?} ");
}
fn record_str(&mut self, _field: &Field, value: &str) {
use std::fmt::Write;
let _ = write!(self.0, "{value} ");
}
}
impl Subscriber for CaptureSubscriber {
fn enabled(&self, _: &Metadata<'_>) -> bool {
true
}
fn new_span(&self, _: &Attributes<'_>) -> Id {
Id::from_u64(1)
}
fn record(&self, _: &Id, _: &Record<'_>) {}
fn record_follows_from(&self, _: &Id, _: &Id) {}
fn event(&self, event: &Event<'_>) {
let mut msg = String::new();
event.record(&mut MessageVisitor(&mut msg));
self.events
.lock()
.unwrap()
.push((*event.metadata().level(), msg));
}
fn enter(&self, _: &Id) {}
fn exit(&self, _: &Id) {}
}
let events: Arc<Mutex<Vec<(Level, String)>>> = Arc::new(Mutex::new(Vec::new()));
let sub = CaptureSubscriber {
events: events.clone(),
};
tracing::subscriber::with_default(sub, f);
events.lock().unwrap().clone()
}
#[test]
fn remove_cgroup_warn_branch_1_fires_on_backdrop_tracked_name() {
let events = capture_tracing_events(|| {
mock_setup_backdrop!(mock, topo, ctx, step_state, backdrop_state);
backdrop_state
.cgroups
.add_cgroup_no_cpuset("bd_cg")
.expect("add backdrop cgroup");
let mut scenario = ScenarioState::new(&mut step_state, &mut backdrop_state);
apply_ops(&ctx, &mut scenario, &[Op::remove_cgroup("bd_cg")], false)
.expect("remove_cgroup of backdrop target must succeed");
});
let warns: Vec<&(tracing::Level, String)> = events
.iter()
.filter(|(lvl, _)| *lvl == tracing::Level::WARN)
.collect();
assert_eq!(
warns.len(),
1,
"exactly one warn expected from branch 1; got: {warns:?}",
);
assert!(
warns[0]
.1
.contains("removed a Backdrop-owned cgroup mid-scenario"),
"warn must include branch-1 text identifying Backdrop-owned removal; got: {:?}",
warns[0].1,
);
assert!(
warns[0].1.contains("bd_cg"),
"warn must include the cgroup name; got: {:?}",
warns[0].1,
);
}
#[test]
fn remove_cgroup_warn_branch_2_fires_on_unknown_typo_name() {
let events = capture_tracing_events(|| {
mock_setup_backdrop!(mock, topo, ctx, step_state, backdrop_state);
backdrop_state
.cgroups
.add_cgroup_no_cpuset("bd_real_name")
.expect("add backdrop cgroup");
let mut scenario = ScenarioState::new(&mut step_state, &mut backdrop_state);
apply_ops(
&ctx,
&mut scenario,
&[Op::add_cgroup("step_local_real")],
false,
)
.expect("add step-local cgroup");
apply_ops(
&ctx,
&mut scenario,
&[Op::remove_cgroup("bd_typoed_name")],
false,
)
.expect("remove_cgroup of unknown name must succeed (permissive)");
});
let warns: Vec<&(tracing::Level, String)> = events
.iter()
.filter(|(lvl, _)| *lvl == tracing::Level::WARN)
.collect();
assert_eq!(
warns.len(),
1,
"exactly one warn expected from branch 2; got: {warns:?}",
);
assert!(
warns[0].1.contains("matches no step-local"),
"warn must include branch-2 text identifying unknown-name typo; got: {:?}",
warns[0].1,
);
assert!(
warns[0].1.contains("bd_real_name"),
"warn must dump backdrop_cgroups list including the real name; got: {:?}",
warns[0].1,
);
assert!(
warns[0].1.contains("step_local_real"),
"warn must dump step_cgroups list including the step-local name; got: {:?}",
warns[0].1,
);
assert!(
warns[0].1.contains("bd_typoed_name"),
"warn must include the typo'd cgroup target name; got: {:?}",
warns[0].1,
);
}
#[test]
fn remove_cgroup_warn_branch_2_fires_on_double_remove_already_forgotten() {
let events = capture_tracing_events(|| {
mock_setup_state!(mock, topo, ctx, state);
apply_ops_test(&ctx, &mut state, &[Op::add_cgroup("cg_once")]).unwrap();
apply_ops_test(&ctx, &mut state, &[Op::remove_cgroup("cg_once")]).unwrap();
apply_ops_test(&ctx, &mut state, &[Op::remove_cgroup("cg_once")]).unwrap();
});
let warns: Vec<&(tracing::Level, String)> = events
.iter()
.filter(|(lvl, _)| *lvl == tracing::Level::WARN)
.collect();
assert_eq!(
warns.len(),
1,
"exactly one warn expected — first remove gated by in_step, second remove fires branch 2 once; got: {warns:?}",
);
assert!(
warns[0]
.1
.contains("second-remove of an already-forgotten name"),
"branch-2 wording must acknowledge double-remove as legitimate cause alongside typo; got: {:?}",
warns[0].1,
);
}
#[test]
fn remove_cgroup_emits_no_warn_on_happy_step_local_path() {
let events = capture_tracing_events(|| {
mock_setup_state!(mock, topo, ctx, state);
apply_ops_test(&ctx, &mut state, &[Op::add_cgroup("cg_local")]).unwrap();
apply_ops_test(&ctx, &mut state, &[Op::remove_cgroup("cg_local")]).unwrap();
});
let warns: Vec<&(tracing::Level, String)> = events
.iter()
.filter(|(lvl, _)| *lvl == tracing::Level::WARN)
.collect();
assert!(
warns.is_empty(),
"happy step-local add-then-remove path must emit zero warns; got: {warns:?}",
);
}
#[test]
fn op_add_cgroup_step_local_rejects_collision_with_backdrop() {
mock_setup_backdrop!(mock, topo, ctx, step_state, backdrop_state);
backdrop_state
.cgroups
.add_cgroup_no_cpuset("shared")
.expect("add backdrop cgroup");
let mut scenario = ScenarioState::new(&mut step_state, &mut backdrop_state);
let err = apply_ops(&ctx, &mut scenario, &[Op::add_cgroup("shared")], false).expect_err(
"apply_ops must reject a step-local AddCgroup whose \
name already lives in the Backdrop",
);
let msg = format!("{err:?}");
assert!(
msg.contains("'shared'") && msg.contains("collides"),
"error must name the colliding cgroup and explain the collision; got: {msg}",
);
assert!(
step_state.cgroups.names().iter().all(|n| n != "shared"),
"step-local names must not contain the rejected name; got: {:?}",
step_state.cgroups.names(),
);
assert!(
backdrop_state.cgroups.names().iter().any(|n| n == "shared"),
"backdrop copy must survive the rejected op",
);
}
#[test]
fn op_add_cgroup_duplicate_in_same_step_is_rejected() {
mock_setup_state!(mock, topo, ctx, state);
let err = apply_ops_test(
&ctx,
&mut state,
&[Op::add_cgroup("cg_dup"), Op::add_cgroup("cg_dup")],
)
.expect_err("second AddCgroup must fail against the same step-local name");
let msg = format!("{err:?}");
assert!(
msg.contains("'cg_dup'") && msg.contains("collides"),
"error must name the colliding cgroup and explain the collision; got: {msg}",
);
let names = state.cgroups.names();
assert_eq!(
names.iter().filter(|n| n.as_str() == "cg_dup").count(),
1,
"the first op must register the name exactly once; the second op \
must not push a shadow entry; got: {names:?}",
);
}
#[test]
fn op_add_cgroup_def_constructor_wraps_def_unmutated() {
let def = CgroupDef::named("midstep").workers(3);
let op = Op::add_cgroup_def(def.clone());
match op {
Op::AddCgroupDef { def: out } => {
assert_eq!(out.name, def.name);
assert_eq!(out.merged_works().len(), def.merged_works().len());
assert_eq!(out.merged_works()[0].num_workers, Some(3));
}
other => panic!("expected AddCgroupDef, got {other:?}"),
}
}
#[test]
fn op_add_cgroup_def_creates_cgroup_through_apply_setup() {
mock_setup_state!(mock, topo, ctx, state);
apply_ops_test(
&ctx,
&mut state,
&[Op::add_cgroup_def(CgroupDef::named("cg_midstep"))],
)
.expect("AddCgroupDef must succeed for a fresh name");
assert!(
state.cgroups.names().iter().any(|n| n == "cg_midstep"),
"step-local tracking must record the AddCgroupDef name; got: {:?}",
state.cgroups.names(),
);
}
#[test]
fn op_add_cgroup_def_rejects_collision_with_backdrop() {
mock_setup_backdrop!(mock, topo, ctx, step_state, backdrop_state);
backdrop_state
.cgroups
.add_cgroup_no_cpuset("persistent")
.expect("add backdrop cgroup");
let mut scenario = ScenarioState::new(&mut step_state, &mut backdrop_state);
let err = apply_ops(
&ctx,
&mut scenario,
&[Op::add_cgroup_def(CgroupDef::named("persistent"))],
false,
)
.expect_err("AddCgroupDef must reject a name already tracked by the Backdrop");
let msg = format!("{err:?}");
assert!(
msg.contains("'persistent'") && msg.contains("collides"),
"error must name the colliding cgroup and explain the collision; got: {msg}",
);
}
#[test]
fn required_controllers_absorbs_add_cgroup_def_cpuset() {
use crate::cgroup::Controller;
mock_setup!(mock, topo, ctx);
let steps = vec![Step::new(
vec![Op::add_cgroup_def(
CgroupDef::named("cg_pinned").cpuset(CpusetSpec::disjoint(0, 2)),
)],
HoldSpec::fixed(Duration::from_millis(1)),
)];
let needed = required_controllers(&ctx, &backdrop::Backdrop::new(), &steps);
assert!(
needed.contains(&Controller::Cpuset),
"AddCgroupDef carrying a cpuset must require Cpuset controller; got: {needed:?}",
);
}
#[test]
fn op_add_cgroup_def_spawns_workers_and_moves_into_cgroup() {
mock_setup_state!(mock, topo, ctx, state);
apply_ops_test(
&ctx,
&mut state,
&[Op::add_cgroup_def(
CgroupDef::named("cg_workers").workers(2),
)],
)
.expect("AddCgroupDef with workers must succeed against mock");
let calls = mock.calls();
assert!(
calls.contains(&CgroupCall::MoveTasks("cg_workers".to_string(), 2)),
"AddCgroupDef must move 2 spawned worker pids into 'cg_workers'; got: {calls:?}",
);
}
#[test]
fn op_add_cgroup_def_writes_embedded_cpuset_to_mock() {
mock_setup_state!(mock, topo, ctx, state);
apply_ops_test(
&ctx,
&mut state,
&[Op::add_cgroup_def(
CgroupDef::named("cg_pinned").cpuset(CpusetSpec::disjoint(0, 2)),
)],
)
.expect("AddCgroupDef with cpuset must succeed against mock");
let calls = mock.calls();
let has_set_cpuset = calls
.iter()
.any(|c| matches!(c, CgroupCall::SetCpuset(name, _) if name == "cg_pinned"));
assert!(
has_set_cpuset,
"AddCgroupDef must emit SetCpuset for 'cg_pinned' via apply_setup; got: {calls:?}",
);
}
#[test]
fn op_add_cgroup_def_workers_pct_empty_cpuset_bails() {
mock_setup_state!(mock, topo, ctx, state);
let err = apply_ops_test(
&ctx,
&mut state,
&[Op::add_cgroup_def(
CgroupDef::named("cg_pct")
.cpuset(CpusetSpec::exact(std::iter::empty::<usize>()))
.workers_pct(0.5),
)],
)
.expect_err("workers_pct + empty cpuset must bail through AddCgroupDef");
let msg = format!("{err:?}");
assert!(
msg.contains("cg_pct") && msg.contains("workers_pct"),
"diagnostic must name the cgroup and the workers_pct condition; got: {msg}",
);
}
#[test]
fn op_add_cgroup_def_collides_with_prior_add_cgroup_in_same_step() {
mock_setup_state!(mock, topo, ctx, state);
let err = apply_ops_test(
&ctx,
&mut state,
&[
Op::add_cgroup("shared"),
Op::add_cgroup_def(CgroupDef::named("shared")),
],
)
.expect_err("AddCgroupDef must reject a name already tracked by a prior AddCgroup");
let msg = format!("{err:?}");
assert!(
msg.contains("'shared'") && msg.contains("collides"),
"error must name the colliding cgroup and explain the collision; got: {msg}",
);
}
#[test]
fn op_add_cgroup_def_collides_with_prior_add_cgroup_def_in_same_step() {
mock_setup_state!(mock, topo, ctx, state);
let err = apply_ops_test(
&ctx,
&mut state,
&[
Op::add_cgroup_def(CgroupDef::named("dup")),
Op::add_cgroup_def(CgroupDef::named("dup")),
],
)
.expect_err("second AddCgroupDef must reject the duplicated name");
let msg = format!("{err:?}");
assert!(
msg.contains("'dup'") && msg.contains("collides"),
"error must name the duplicated cgroup and explain the collision; got: {msg}",
);
}
#[test]
fn op_add_cgroup_collides_with_prior_add_cgroup_def_in_same_step() {
mock_setup_state!(mock, topo, ctx, state);
let err = apply_ops_test(
&ctx,
&mut state,
&[
Op::add_cgroup_def(CgroupDef::named("shared")),
Op::add_cgroup("shared"),
],
)
.expect_err("AddCgroup must reject a name already tracked by a prior AddCgroupDef");
let msg = format!("{err:?}");
assert!(
msg.contains("'shared'") && msg.contains("collides"),
"error must name the colliding cgroup and explain the collision; got: {msg}",
);
}
#[test]
fn move_all_tasks_renames_every_handle_keyed_under_from() {
use crate::workload::{AffinityIntent, WorkType, WorkloadConfig, WorkloadHandle};
mock_setup_backdrop!(mock, topo, ctx, step_state, backdrop_state);
step_state.cgroups.add_cgroup_no_cpuset("src").unwrap();
step_state.cgroups.add_cgroup_no_cpuset("dst").unwrap();
for _ in 0..3 {
let wl = WorkloadConfig {
num_workers: 1,
affinity: AffinityIntent::Inherit,
work_type: WorkType::SpinWait,
..Default::default()
};
let h = WorkloadHandle::spawn(&wl).expect("spawn worker");
step_state.handles.push(("src".to_string(), h));
}
assert_eq!(step_state.handles.len(), 3);
{
let mut scenario = ScenarioState::new(&mut step_state, &mut backdrop_state);
apply_ops(
&ctx,
&mut scenario,
&[Op::move_all_tasks("src", "dst")],
false,
)
.expect("move");
}
assert_eq!(step_state.handles.len(), 3, "no handles lost");
assert!(
step_state.handles.iter().all(|(name, _)| name == "dst"),
"every handle must be re-keyed to 'dst': {:?}",
step_state
.handles
.iter()
.map(|(n, _)| n.as_str())
.collect::<Vec<_>>(),
);
let calls = mock.calls();
let dst_moves = calls
.iter()
.filter(|c| matches!(c, CgroupCall::MoveTasks(n, _) if n == "dst"))
.count();
assert_eq!(
dst_moves, 3,
"expected 3 move_tasks(\"dst\", _) calls (one per handle), \
got {dst_moves} in: {calls:?}"
);
step_state.handles.clear();
}
#[test]
fn per_step_teardown_removes_step_local_cgroups_in_reverse_order() {
mock_setup_state!(mock, topo, ctx, state);
apply_ops_test(
&ctx,
&mut state,
&[
Op::add_cgroup("cg_a"),
Op::add_cgroup("cg_a/sub"),
Op::add_cgroup("cg_b"),
],
)
.unwrap();
drop(state);
let calls = mock.calls();
let removes: Vec<&str> = calls
.iter()
.filter_map(|c| match c {
CgroupCall::RemoveCgroup(n) => Some(n.as_str()),
_ => None,
})
.collect();
assert_eq!(
removes,
vec!["cg_b", "cg_a/sub", "cg_a"],
"per-step teardown must rmdir in reverse addition order so a \
child cgroup's directory is gone before its parent's rmdir \
runs",
);
}
#[test]
fn build_stimulus_saturates_step_idx_at_u16_max() {
mock_setup_backdrop!(mock, topo, ctx, step_state, backdrop_state);
let scenario = ScenarioState::new(&mut step_state, &mut backdrop_state);
let start = std::time::Instant::now();
let zero = build_stimulus(&start, 0, &[], &scenario);
assert_eq!(
zero.step_index, 1,
"scenario step_idx=0 publishes wire step_index=1 \
per the 1-indexed phase encoding (BASELINE owns 0)",
);
let last_unsaturated = build_stimulus(&start, u16::MAX as usize - 1, &[], &scenario);
assert_eq!(
last_unsaturated.step_index,
u16::MAX,
"scenario step_idx=u16::MAX - 1 publishes wire step_index=u16::MAX \
without saturation (highest 1-indexed value that fits)",
);
let overflow = build_stimulus(&start, u16::MAX as usize, &[], &scenario);
assert_eq!(
overflow.step_index,
u16::MAX,
"scenario step_idx=u16::MAX would publish wire step_index=u16::MAX+1 \
after the 1-indexed +1, so the encoder must saturate to u16::MAX \
rather than wrap to 0",
);
let far = build_stimulus(&start, u32::MAX as usize, &[], &scenario);
assert_eq!(
far.step_index,
u16::MAX,
"far-overflow step_idx must saturate to u16::MAX",
);
}
#[test]
fn build_stimulus_warns_on_step_idx_saturation() {
use std::sync::{Arc, Mutex};
use tracing::field::{Field, Visit};
use tracing::span::{Attributes, Id, Record};
use tracing::{Event, Subscriber};
use tracing::{Level, Metadata};
#[derive(Default)]
struct CaptureSubscriber {
events: Arc<Mutex<Vec<(Level, String)>>>,
}
struct MessageVisitor<'a>(&'a mut String);
impl<'a> Visit for MessageVisitor<'a> {
fn record_debug(&mut self, _field: &Field, value: &dyn std::fmt::Debug) {
use std::fmt::Write;
let _ = write!(self.0, "{value:?} ");
}
fn record_str(&mut self, _field: &Field, value: &str) {
use std::fmt::Write;
let _ = write!(self.0, "{value} ");
}
}
impl Subscriber for CaptureSubscriber {
fn enabled(&self, _: &Metadata<'_>) -> bool {
true
}
fn new_span(&self, _: &Attributes<'_>) -> Id {
Id::from_u64(1)
}
fn record(&self, _: &Id, _: &Record<'_>) {}
fn record_follows_from(&self, _: &Id, _: &Id) {}
fn event(&self, event: &Event<'_>) {
let mut msg = String::new();
event.record(&mut MessageVisitor(&mut msg));
self.events
.lock()
.unwrap()
.push((*event.metadata().level(), msg));
}
fn enter(&self, _: &Id) {}
fn exit(&self, _: &Id) {}
}
let events: Arc<Mutex<Vec<(Level, String)>>> = Arc::new(Mutex::new(Vec::new()));
let sub = CaptureSubscriber {
events: events.clone(),
};
tracing::subscriber::with_default(sub, || {
mock_setup_backdrop!(mock, topo, ctx, step_state, backdrop_state);
let scenario = ScenarioState::new(&mut step_state, &mut backdrop_state);
let start = std::time::Instant::now();
let _ = build_stimulus(&start, 0, &[], &scenario);
let _ = build_stimulus(&start, u16::MAX as usize, &[], &scenario);
});
let captured = events.lock().unwrap();
let warn_hits: Vec<&String> = captured
.iter()
.filter(|(lvl, _)| *lvl == Level::WARN)
.map(|(_, msg)| msg)
.collect();
assert!(
warn_hits
.iter()
.any(|m| m.contains("step_index")
&& m.contains("StimulusPayload step_index overflowed u16")),
"saturation must emit a tracing::warn naming step_index; got warns: {warn_hits:?}",
);
assert_eq!(
warn_hits.len(),
1,
"exactly one saturation warn expected; got: {warn_hits:?}",
);
}
static CONSTRUCTOR_TEST_PAYLOAD: crate::test_support::Payload =
crate::test_support::Payload::binary("constructor-test", "/bin/true");
static CONSTRUCTOR_TEST_SCHEDULER: crate::test_support::Scheduler =
crate::test_support::Scheduler::EEVDF;
#[test]
fn op_constructor_coverage_is_exhaustive() {
let w = WorkSpec::default();
let constructed: Vec<Op> = vec![
Op::add_cgroup("a"),
Op::add_cgroup_def(CgroupDef::named("midstep")),
Op::remove_cgroup("a"),
Op::set_cpuset("a", CpusetSpec::Llc(0)),
Op::clear_cpuset("a"),
Op::swap_cpusets("a", "b"),
Op::stop_cgroup("a"),
Op::set_affinity("a", AffinityIntent::Inherit),
Op::move_all_tasks("a", "b"),
Op::run_payload(&CONSTRUCTOR_TEST_PAYLOAD, [] as [&str; 0]),
Op::run_payload_in_cgroup(&CONSTRUCTOR_TEST_PAYLOAD, [] as [&str; 0], "a"),
Op::wait_payload("constructor-test"),
Op::wait_payload_in_cgroup("constructor-test", "a"),
Op::kill_payload("constructor-test"),
Op::kill_payload_in_cgroup("constructor-test", "a"),
Op::freeze_cgroup("a"),
Op::unfreeze_cgroup("a"),
Op::capture_snapshot("constructor-test"),
Op::watch_snapshot("kernel.constructor_test"),
Op::write_kernel_hot(
KernelTarget::symbol("constructor_test_symbol"),
KernelValue::u64(0),
),
Op::write_kernel_cold(
KernelTarget::symbol("constructor_test_symbol"),
KernelValue::u64(0),
),
Op::read_kernel_hot(
"constructor-test-hot",
KernelTarget::symbol("constructor_test_symbol"),
KernelValueWidth::u64(),
),
Op::read_kernel_cold(
"constructor-test-cold",
KernelTarget::symbol("constructor_test_symbol"),
KernelValueWidth::u32(),
),
Op::attach_scheduler(&CONSTRUCTOR_TEST_SCHEDULER),
Op::detach_scheduler(),
Op::restart_scheduler(),
Op::replace_scheduler(&CONSTRUCTOR_TEST_SCHEDULER),
Op::pin_bpf_map("constructor_test.bss"),
Op::spawn(SpawnPlacement::cgroup("a"), w.clone()),
Op::capture_cgroup_procs("constructor-test", "a"),
];
let mut seen = [false; 27];
for op in &constructed {
let idx = match op {
Op::AddCgroup { .. } => 0,
Op::AddCgroupDef { .. } => 1,
Op::RemoveCgroup { .. } => 2,
Op::SetCpuset { .. } => 3,
Op::ClearCpuset { .. } => 4,
Op::SwapCpusets { .. } => 5,
Op::Spawn { .. } => 6,
Op::StopCgroup { .. } => 7,
Op::SetAffinity { .. } => 8,
Op::MoveAllTasks { .. } => 9,
Op::RunPayload { .. } => 10,
Op::WaitPayload { .. } => 11,
Op::KillPayload { .. } => 12,
Op::FreezeCgroup { .. } => 13,
Op::UnfreezeCgroup { .. } => 14,
Op::CaptureSnapshot { .. } => 15,
Op::WatchSnapshot { .. } => 16,
Op::WriteKernelHot { .. } => 17,
Op::WriteKernelCold { .. } => 18,
Op::ReadKernelHot { .. } => 19,
Op::ReadKernelCold { .. } => 20,
Op::AttachScheduler { .. } => 21,
Op::DetachScheduler => 22,
Op::RestartScheduler => 23,
Op::ReplaceScheduler { .. } => 24,
Op::PinBpfMap { .. } => 25,
Op::CaptureCgroupProcs { .. } => 26,
};
seen[idx] = true;
}
let missing: Vec<usize> = seen
.iter()
.enumerate()
.filter(|(_, hit)| !**hit)
.map(|(i, _)| i)
.collect();
assert!(
missing.is_empty(),
"Op variant discriminants with no constructor coverage: {missing:?}. \
Every Op variant must have a public constructor under impl Op per the \
non_exhaustive convention documented on the enum.",
);
}
#[test]
fn cpuset_spec_constructor_coverage_is_exhaustive() {
let constructed = [
CpusetSpec::llc(0),
CpusetSpec::numa(0),
CpusetSpec::range(0.0, 1.0),
CpusetSpec::disjoint(0, 2),
CpusetSpec::overlap(0, 2, 0.25),
CpusetSpec::exact([0usize]),
];
let mut seen = [false; 6];
for spec in &constructed {
let idx = match spec {
CpusetSpec::Llc(_) => 0,
CpusetSpec::Numa(_) => 1,
CpusetSpec::Range { .. } => 2,
CpusetSpec::Disjoint { .. } => 3,
CpusetSpec::Overlap { .. } => 4,
CpusetSpec::Exact(_) => 5,
};
seen[idx] = true;
}
assert!(
seen.iter().all(|s| *s),
"every CpusetSpec variant must have a matching constructor, seen={seen:?}",
);
}
#[test]
fn cgroup_def_cpu_quota_pct_uses_100ms_period_and_correct_quota() {
let def = CgroupDef::named("cg_a").cpu_quota_pct(50);
let cpu = def.cpu.expect("cpu_quota_pct must populate `cpu`");
assert_eq!(cpu.max_quota_us, Some(50_000));
assert_eq!(cpu.max_period_us, 100_000);
assert!(cpu.weight.is_none(), "weight must remain unset");
}
#[test]
fn cgroup_def_cpu_quota_accepts_explicit_durations() {
let def =
CgroupDef::named("cg_a").cpu_quota(Duration::from_micros(7_500), Duration::from_millis(10));
let cpu = def.cpu.unwrap();
assert_eq!(cpu.max_quota_us, Some(7_500));
assert_eq!(cpu.max_period_us, 10_000);
}
#[test]
fn cgroup_def_cpu_unlimited_clears_quota_keeps_weight() {
let def = CgroupDef::named("cg_a")
.cpu_quota_pct(80)
.cpu_weight(200)
.cpu_unlimited();
let cpu = def.cpu.unwrap();
assert!(cpu.max_quota_us.is_none());
assert_eq!(cpu.max_period_us, 100_000);
assert_eq!(cpu.weight, Some(200));
}
#[test]
fn cgroup_def_memory_builders_compose() {
let def = CgroupDef::named("cg_a")
.memory_max(1_000_000)
.memory_high(800_000)
.memory_low(400_000);
let m = def.memory.unwrap();
assert_eq!(m.max, Some(1_000_000));
assert_eq!(m.high, Some(800_000));
assert_eq!(m.low, Some(400_000));
}
#[test]
fn cgroup_def_memory_unlimited_clears_all_three() {
let def = CgroupDef::named("cg_a")
.memory_max(1_000_000)
.memory_high(800_000)
.memory_low(400_000)
.memory_unlimited();
let m = def.memory.unwrap();
assert!(m.max.is_none());
assert!(m.high.is_none());
assert!(m.low.is_none());
}
#[test]
fn cgroup_def_io_weight_populates() {
let def = CgroupDef::named("cg_a").io_weight(750);
assert_eq!(def.io.unwrap().weight, Some(750));
}
#[test]
fn cgroup_def_cpuset_mems_populates_independent_field() {
let nodes: BTreeSet<usize> = [0usize, 1].into_iter().collect();
let def = CgroupDef::named("cg_a").cpuset_mems(nodes.clone());
assert_eq!(def.cpuset_mems, Some(nodes));
assert!(def.cpuset.is_none());
}
#[test]
fn apply_setup_records_set_cpu_max_for_cpu_quota_pct_builder() {
mock_setup_state!(mock, topo, ctx, state);
let defs = vec![CgroupDef::named("cg_cap").cpu_quota_pct(75)];
apply_setup_test(&ctx, &mut state, &defs).expect("apply_setup must succeed");
let calls = mock.calls();
assert!(
calls.contains(&CgroupCall::SetCpuMax(
"cg_cap".to_string(),
Some(75_000),
100_000,
)),
"expected SetCpuMax(cg_cap, Some(75000), 100000); got {calls:?}",
);
cleanup_state(&mut state);
}
#[test]
fn apply_setup_records_three_memory_writes_when_memory_field_set() {
mock_setup_state!(mock, topo, ctx, state);
let defs = vec![CgroupDef::named("cg_mem").memory_max(1_000_000)];
apply_setup_test(&ctx, &mut state, &defs).expect("apply_setup must succeed");
let calls = mock.calls();
let max_idx = calls
.iter()
.position(|c| matches!(c, CgroupCall::SetMemoryMax(n, _) if n == "cg_mem"))
.expect("SetMemoryMax must fire");
let high_idx = calls
.iter()
.position(|c| matches!(c, CgroupCall::SetMemoryHigh(n, _) if n == "cg_mem"))
.expect("SetMemoryHigh must fire");
let low_idx = calls
.iter()
.position(|c| matches!(c, CgroupCall::SetMemoryLow(n, _) if n == "cg_mem"))
.expect("SetMemoryLow must fire");
assert!(
max_idx < high_idx && high_idx < low_idx,
"memory writes must land in (max, high, low) order; got max={max_idx} high={high_idx} low={low_idx}",
);
assert!(calls.contains(&CgroupCall::SetMemoryMax(
"cg_mem".to_string(),
Some(1_000_000)
)),);
assert!(calls.contains(&CgroupCall::SetMemoryHigh("cg_mem".to_string(), None)));
assert!(calls.contains(&CgroupCall::SetMemoryLow("cg_mem".to_string(), None)));
cleanup_state(&mut state);
}
#[test]
fn apply_setup_resource_writes_land_before_move_tasks() {
mock_setup_state!(mock, topo, ctx, state);
let mems: BTreeSet<usize> = [0usize].into_iter().collect();
let defs = vec![
CgroupDef::named("cg_full")
.cpuset_mems(mems)
.cpu_quota_pct(40)
.cpu_weight(200)
.memory_max(2_000_000)
.io_weight(150),
];
apply_setup_test(&ctx, &mut state, &defs).expect("apply_setup must succeed");
let calls = mock.calls();
let move_idx = calls
.iter()
.position(|c| matches!(c, CgroupCall::MoveTasks(n, _) if n == "cg_full"));
let kinds: Vec<usize> = calls
.iter()
.enumerate()
.filter_map(|(i, c)| match c {
CgroupCall::SetCpusetMems(n, _) if n == "cg_full" => Some(i),
CgroupCall::SetCpuMax(n, _, _) if n == "cg_full" => Some(i),
CgroupCall::SetCpuWeight(n, _) if n == "cg_full" => Some(i),
CgroupCall::SetMemoryMax(n, _) if n == "cg_full" => Some(i),
CgroupCall::SetMemoryHigh(n, _) if n == "cg_full" => Some(i),
CgroupCall::SetMemoryLow(n, _) if n == "cg_full" => Some(i),
CgroupCall::SetIoWeight(n, _) if n == "cg_full" => Some(i),
_ => None,
})
.collect();
assert!(
kinds.len() >= 7,
"expected at least 7 resource writes (mems + cpu.max + cpu.weight + 3 memory + io.weight); got {} ({calls:?})",
kinds.len(),
);
if let Some(mi) = move_idx {
assert!(
kinds.iter().all(|k| *k < mi),
"every resource write must precede MoveTasks; kinds={kinds:?} move_idx={mi}",
);
}
cleanup_state(&mut state);
}
#[test]
fn apply_setup_rejects_cpu_weight_out_of_range() {
mock_setup_state!(mock, topo, ctx, state);
let defs = vec![CgroupDef::named("cg_bad").cpu_weight(0)];
let err =
apply_setup_test(&ctx, &mut state, &defs).expect_err("apply_setup must reject weight=0");
let msg = format!("{err:#}");
assert!(
msg.contains("cg_bad") && msg.contains("cpu.weight"),
"error must name cgroup and field; got: {msg}",
);
cleanup_state(&mut state);
}
#[test]
fn apply_setup_rejects_cpu_max_period_zero() {
mock_setup_state!(mock, topo, ctx, state);
let defs =
vec![CgroupDef::named("cg_bad").cpu_quota(Duration::from_millis(50), Duration::ZERO)];
let err =
apply_setup_test(&ctx, &mut state, &defs).expect_err("apply_setup must reject period=0");
let msg = format!("{err:#}");
assert!(
msg.contains("cg_bad") && msg.contains("period"),
"error must name cgroup and period; got: {msg}",
);
cleanup_state(&mut state);
}
#[test]
fn cgroup_def_memory_swap_max_builder_round_trip() {
let d = CgroupDef::named("cg_a").memory_swap_max(2 * 1024 * 1024);
assert_eq!(d.memory.as_ref().unwrap().swap_max, Some(2 * 1024 * 1024));
let d = d.memory_swap_unlimited();
assert_eq!(d.memory.as_ref().unwrap().swap_max, None);
}
#[test]
fn cgroup_def_memory_swap_unlimited_on_fresh_def_is_noop() {
let d = CgroupDef::named("cg_a").memory_swap_unlimited();
assert!(
d.memory.is_none(),
"memory_swap_unlimited() on a fresh CgroupDef must leave \
self.memory == None; got: {:?}",
d.memory,
);
}
#[test]
fn cgroup_def_memory_unlimited_then_swap_unlimited_is_idempotent() {
let d = CgroupDef::named("cg_a")
.memory_unlimited()
.memory_swap_unlimited();
let m = d.memory.expect("memory_unlimited installs Some(default)");
assert!(m.max.is_none());
assert!(m.high.is_none());
assert!(m.low.is_none());
assert!(m.swap_max.is_none());
}
#[test]
fn apply_setup_memory_swap_unlimited_on_fresh_def_emits_no_memory_writes() {
mock_setup_state!(mock, topo, ctx, state);
let defs = vec![CgroupDef::named("cg_swap_clear").memory_swap_unlimited()];
apply_setup_test(&ctx, &mut state, &defs).expect("apply_setup must succeed");
let calls = mock.calls();
assert!(
!calls.iter().any(|c| matches!(
c,
CgroupCall::SetMemoryMax(_, _)
| CgroupCall::SetMemoryHigh(_, _)
| CgroupCall::SetMemoryLow(_, _)
| CgroupCall::SetMemorySwapMax(_, _)
)),
"memory_swap_unlimited() on a fresh CgroupDef must emit zero memory writes; got: {calls:?}"
);
cleanup_state(&mut state);
}
#[test]
fn cgroup_def_pids_max_builder_round_trip() {
let d = CgroupDef::named("cg_a").pids_max(1024);
assert_eq!(d.pids.as_ref().unwrap().max, Some(1024));
let d = d.pids_unlimited();
assert_eq!(d.pids.as_ref().unwrap().max, None);
}
#[test]
fn apply_setup_records_set_memory_swap_max() {
mock_setup_state!(mock, topo, ctx, state);
let defs = vec![CgroupDef::named("cg_swap").memory_swap_max(4 * 1024 * 1024)];
apply_setup_test(&ctx, &mut state, &defs).expect("apply_setup must succeed");
let calls = mock.calls();
assert!(
calls.contains(&CgroupCall::SetMemorySwapMax(
"cg_swap".to_string(),
Some(4 * 1024 * 1024),
)),
"swap_max with bytes must record SetMemorySwapMax(Some(N)), got: {calls:?}",
);
cleanup_state(&mut state);
let mock = MockCgroupOps::new();
let ctx = mock_ctx(&mock, &topo);
let mut state = StepState::empty(&ctx);
let defs = vec![CgroupDef::named("cg_nosw").memory_max(1_000_000)];
apply_setup_test(&ctx, &mut state, &defs).expect("apply_setup must succeed");
let calls = mock.calls();
assert!(
!calls.iter().any(|c| matches!(
c,
CgroupCall::SetMemorySwapMax(n, _) if n == "cg_nosw",
)),
"memory_max-only must NOT record SetMemorySwapMax (would \
ENOENT on CONFIG_SWAP=n kernels); got: {calls:?}",
);
let max_idx = calls
.iter()
.position(|c| matches!(c, CgroupCall::SetMemoryMax(n, _) if n == "cg_nosw"))
.expect("SetMemoryMax must fire");
let high_idx = calls
.iter()
.position(|c| matches!(c, CgroupCall::SetMemoryHigh(n, _) if n == "cg_nosw"))
.expect("SetMemoryHigh must fire");
let low_idx = calls
.iter()
.position(|c| matches!(c, CgroupCall::SetMemoryLow(n, _) if n == "cg_nosw"))
.expect("SetMemoryLow must fire");
assert!(
max_idx < high_idx && high_idx < low_idx,
"memory writes must land in (max, high, low) order; \
got max={max_idx} high={high_idx} low={low_idx}",
);
cleanup_state(&mut state);
}
#[test]
fn apply_setup_orders_memory_swap_max_after_low() {
mock_setup_state!(mock, topo, ctx, state);
let defs = vec![
CgroupDef::named("cg_full_mem")
.memory_max(2_000_000)
.memory_high(1_500_000)
.memory_low(500_000)
.memory_swap_max(8 * 1024 * 1024),
];
apply_setup_test(&ctx, &mut state, &defs).expect("apply_setup must succeed");
let calls = mock.calls();
let max_idx = calls
.iter()
.position(|c| matches!(c, CgroupCall::SetMemoryMax(n, _) if n == "cg_full_mem"))
.expect("SetMemoryMax must fire");
let high_idx = calls
.iter()
.position(|c| matches!(c, CgroupCall::SetMemoryHigh(n, _) if n == "cg_full_mem"))
.expect("SetMemoryHigh must fire");
let low_idx = calls
.iter()
.position(|c| matches!(c, CgroupCall::SetMemoryLow(n, _) if n == "cg_full_mem"))
.expect("SetMemoryLow must fire");
let swap_idx = calls
.iter()
.position(|c| matches!(c, CgroupCall::SetMemorySwapMax(n, _) if n == "cg_full_mem"))
.expect("SetMemorySwapMax must fire when swap_max is opted in");
assert!(
max_idx < high_idx && high_idx < low_idx && low_idx < swap_idx,
"memory writes must land in (max, high, low, swap_max) order; \
got max={max_idx} high={high_idx} low={low_idx} swap={swap_idx}",
);
cleanup_state(&mut state);
}
#[test]
fn apply_setup_records_set_pids_max_only_when_set() {
mock_setup_state!(mock, topo, ctx, state);
let defs = vec![CgroupDef::named("cg_pids").pids_max(512)];
apply_setup_test(&ctx, &mut state, &defs).expect("apply_setup must succeed");
let calls = mock.calls();
assert!(
calls.contains(&CgroupCall::SetPidsMax("cg_pids".to_string(), Some(512))),
"pids_max(N) must record SetPidsMax(Some(N)), got: {calls:?}",
);
cleanup_state(&mut state);
let mock = MockCgroupOps::new();
let ctx = mock_ctx(&mock, &topo);
let mut state = StepState::empty(&ctx);
let defs = vec![CgroupDef::named("cg_nopids").memory_max(1_000_000)];
apply_setup_test(&ctx, &mut state, &defs).expect("apply_setup must succeed");
let calls = mock.calls();
assert!(
!calls
.iter()
.any(|c| matches!(c, CgroupCall::SetPidsMax(_, _))),
"no SetPidsMax expected when pids field is None, got: {calls:?}",
);
cleanup_state(&mut state);
}
#[test]
fn apply_setup_rejects_pids_max_zero() {
mock_setup_state!(mock, topo, ctx, state);
let defs = vec![CgroupDef::named("cg_zero").pids_max(0)];
let err =
apply_setup_test(&ctx, &mut state, &defs).expect_err("apply_setup must reject pids_max(0)");
let msg = format!("{err:#}");
assert!(
msg.contains("cg_zero") && msg.contains("pids.max"),
"error must name cgroup and pids.max; got: {msg}",
);
assert!(
msg.contains("must be > 0"),
"error must spell out the constraint; got: {msg}",
);
assert!(
msg.contains("pids_unlimited"),
"error must name the escape hatch (pids_unlimited()); got: {msg}",
);
cleanup_state(&mut state);
}
#[test]
fn apply_ops_freeze_undefined_cgroup_dispatches_set_freeze() {
mock_setup_state!(mock, topo, ctx, state);
apply_ops_test(&ctx, &mut state, &[Op::freeze_cgroup("ghost_cg")])
.expect("apply_ops must dispatch FreezeCgroup even for an undeclared name");
let calls = mock.calls();
assert!(
calls.contains(&CgroupCall::SetFreeze("ghost_cg".to_string(), true)),
"FreezeCgroup must reach set_freeze regardless of declaration state, got: {calls:?}"
);
}
#[test]
fn apply_ops_freeze_propagates_set_freeze_error_with_context() {
mock_setup_state!(mock, topo, ctx, state);
mock.fail_call_at(0, "kernel ENOENT — cgroup directory does not exist");
let err = apply_ops_test(&ctx, &mut state, &[Op::freeze_cgroup("ghost_cg")])
.expect_err("set_freeze failure must surface as Err");
let msg = format!("{err:#}");
assert!(
msg.contains("Op::FreezeCgroup") && msg.contains("ghost_cg"),
"error must name the op and the cgroup, got: {msg}"
);
assert!(
msg.contains("ENOENT"),
"error must propagate the underlying cause, got: {msg}"
);
}
#[test]
fn apply_ops_freeze_and_unfreeze_record_set_freeze() {
mock_setup_state!(mock, topo, ctx, state);
apply_ops_test(
&ctx,
&mut state,
&[Op::freeze_cgroup("cg_x"), Op::unfreeze_cgroup("cg_x")],
)
.expect("freeze/unfreeze ops must succeed");
let calls = mock.calls();
assert!(
calls.contains(&CgroupCall::SetFreeze("cg_x".to_string(), true)),
"FreezeCgroup must dispatch SetFreeze(true), got: {calls:?}",
);
assert!(
calls.contains(&CgroupCall::SetFreeze("cg_x".to_string(), false)),
"UnfreezeCgroup must dispatch SetFreeze(false), got: {calls:?}",
);
let true_idx = calls
.iter()
.position(|c| matches!(c, CgroupCall::SetFreeze(_, true)))
.expect("found freeze");
let false_idx = calls
.iter()
.position(|c| matches!(c, CgroupCall::SetFreeze(_, false)))
.expect("found unfreeze");
assert!(
true_idx < false_idx,
"freeze (true) must come before unfreeze (false): {calls:?}",
);
cleanup_state(&mut state);
}
#[test]
fn apply_setup_rejects_io_weight_out_of_range() {
for (weight, label) in [(0u16, "zero"), (10_001u16, "above-max")] {
mock_setup_state!(mock, topo, ctx, state);
let defs = vec![CgroupDef::named("cg_io").io_weight(weight)];
let err = apply_setup_test(&ctx, &mut state, &defs)
.expect_err(&format!("io.weight={weight} ({label}) must reject"));
let msg = format!("{err:#}");
assert!(
msg.contains("io.weight") && msg.contains("out of range"),
"error must name the offending knob and constraint; got: {msg}",
);
assert!(
msg.contains("cg_io"),
"error must name the offending cgroup; got: {msg}",
);
let calls = mock.calls();
assert!(
!calls
.iter()
.any(|c| matches!(c, CgroupCall::SetIoWeight(n, _) if n == "cg_io")),
"rejected weight must not reach the cgroupfs write: {calls:?}",
);
cleanup_state(&mut state);
}
}
#[test]
fn apply_setup_accepts_io_weight_range_endpoints() {
for weight in [1u16, 10_000u16] {
mock_setup_state!(mock, topo, ctx, state);
let defs = vec![CgroupDef::named("cg_io").io_weight(weight)];
apply_setup_test(&ctx, &mut state, &defs)
.unwrap_or_else(|e| panic!("io.weight={weight} (boundary) must be accepted: {e:#}"));
let calls = mock.calls();
assert!(
calls.contains(&CgroupCall::SetIoWeight("cg_io".to_string(), weight)),
"boundary weight must reach the cgroupfs write; got: {calls:?}",
);
cleanup_state(&mut state);
}
}
#[test]
fn apply_setup_substitutes_default_workspec_when_works_empty() {
mock_setup_state!(mock, topo, ctx, state);
let def = CgroupDef::named("cg_default_work");
assert!(
def.works.is_empty(),
"test premise: CgroupDef without .work() must start with empty works",
);
apply_setup_test(&ctx, &mut state, &[def])
.expect("apply_setup with default-work substitution must succeed");
let calls = mock.calls();
assert!(
calls.iter().any(|c| matches!(
c,
CgroupCall::MoveTasks(name, count) if name == "cg_default_work" && *count > 0
)),
"default-WorkSpec substitution must spawn workers and migrate them into the \
cgroup; without it the empty `works` would leave the cgroup taskless. \
Got: {calls:?}",
);
cleanup_state(&mut state);
}
fn read_proc_comm(pid: libc::pid_t) -> String {
let raw = std::fs::read_to_string(format!("/proc/{pid}/comm"))
.expect("/proc/<pid>/comm must be readable for live task");
raw.trim_end_matches('\n').to_string()
}
#[test]
fn apply_setup_pcomm_via_cgroup_def_forks_one_container() {
mock_setup_state!(mock, topo, ctx, state);
let defs = vec![CgroupDef::named("cg_pcomm").pcomm("leader").workers(2)];
apply_setup_test(&ctx, &mut state, &defs).expect("pcomm apply_setup must succeed");
let calls = mock.calls();
assert!(
calls.iter().any(|c| matches!(
c,
CgroupCall::MoveTasks(name, 1) if name == "cg_pcomm"
)),
"pcomm group must move exactly 1 PID (the container) into the cgroup; \
got: {calls:?}",
);
cleanup_state(&mut state);
}
#[test]
fn apply_setup_pcomm_with_per_thread_comm() {
mock_setup_state!(mock, topo, ctx, state);
let defs = vec![
CgroupDef::named("cg_named")
.pcomm("leader")
.comm("worker")
.workers(2),
];
apply_setup_test(&ctx, &mut state, &defs).expect("pcomm + comm apply_setup must succeed");
std::thread::sleep(Duration::from_millis(200));
let mut handles = std::mem::take(&mut state.handles);
assert_eq!(handles.len(), 1, "one CgroupDef → one handle");
let (_name, handle) = handles
.pop()
.expect("apply_setup must have pushed a handle for cg_named");
let pids = handle.worker_pids();
assert_eq!(
pids.len(),
1,
"pcomm handle must report exactly 1 pid (the leader); got {}",
pids.len(),
);
let leader_pid = pids[0];
assert_eq!(
read_proc_comm(leader_pid),
"leader",
"/proc/<leader>/comm must equal pcomm",
);
std::thread::sleep(Duration::from_millis(100));
let task_dir = format!("/proc/{leader_pid}/task");
let entries: Vec<libc::pid_t> = std::fs::read_dir(&task_dir)
.expect("/proc/<leader>/task must be readable for live container")
.flatten()
.filter_map(|e| e.file_name().to_str().and_then(|n| n.parse().ok()))
.collect();
assert!(
entries.len() >= 3,
"leader pid {leader_pid} must have leader + 2 worker threads in /proc/<leader>/task; \
observed {} entries: {entries:?}",
entries.len(),
);
let mut leader_seen = false;
let mut worker_seen = 0usize;
for tid in entries {
let tcomm = read_proc_comm(tid);
if tid == leader_pid {
assert_eq!(
tcomm, "leader",
"/proc/<leader>/task/<leader>/comm must equal pcomm; got {tcomm:?}",
);
leader_seen = true;
} else {
assert_eq!(
tcomm, "worker",
"/proc/<leader>/task/{tid}/comm must equal per-thread comm 'worker'; \
got {tcomm:?}",
);
worker_seen += 1;
}
}
assert!(
leader_seen,
"leader's own task entry must appear in /proc/<leader>/task",
);
assert_eq!(
worker_seen, 2,
"must observe exactly 2 worker threads with per-thread comm 'worker'; \
saw {worker_seen}",
);
drop(handle);
cleanup_state(&mut state);
}
#[test]
fn apply_setup_mixed_pcomm_and_non_pcomm_groups() {
mock_setup_state!(mock, topo, ctx, state);
let defs = vec![
CgroupDef::named("cg_pcomm").pcomm("threaded").workers(2),
CgroupDef::named("cg_fork").workers(2),
];
apply_setup_test(&ctx, &mut state, &defs).expect("mixed apply_setup must succeed");
let calls = mock.calls();
assert!(
calls.iter().any(|c| matches!(
c,
CgroupCall::MoveTasks(name, 1) if name == "cg_pcomm"
)),
"cg_pcomm must move 1 PID (container only) into its cgroup; \
got: {calls:?}",
);
assert!(
calls.iter().any(|c| matches!(
c,
CgroupCall::MoveTasks(name, 2) if name == "cg_fork"
)),
"cg_fork must move 2 PIDs (one per fork worker) into its cgroup; \
got: {calls:?}",
);
cleanup_state(&mut state);
}
#[test]
fn apply_setup_pcomm_with_zero_workers_is_rejected() {
mock_setup_state!(mock, topo, ctx, state);
let defs = vec![CgroupDef::named("cg_zero").pcomm("empty").workers(0)];
let err = apply_setup_test(&ctx, &mut state, &defs)
.expect_err("pcomm + 0 workers apply_setup must be rejected");
let msg = format!("{err:#}");
assert!(
msg.contains("cg_zero"),
"rejection error must name the cgroup: {msg}",
);
assert!(
msg.contains("num_workers=0"),
"rejection error must name the offending field: {msg}",
);
let calls = mock.calls();
let any_move = calls.iter().any(|c| {
matches!(
c,
CgroupCall::MoveTasks(name, _) if name == "cg_zero"
)
});
assert!(
!any_move,
"rejection must short-circuit before any move_tasks call \
into cg_zero; got: {calls:?}",
);
cleanup_state(&mut state);
}
#[test]
fn workers_pct_no_cpuset_resolves_against_usable_topology() {
mock_setup_state!(mock, topo, ctx, state);
let def = CgroupDef::named("cg_p").workers_pct(0.5);
apply_setup_test(&ctx, &mut state, std::slice::from_ref(&def)).unwrap();
let handle = &state
.handles
.iter()
.find(|(n, _)| n == "cg_p")
.expect("workload spawned")
.1;
assert_eq!(
handle.worker_pids().len(),
2,
"workers_pct(0.5) on usable=3 CPUs must resolve to ceil(3*0.5)=2 \
workers; got {} workers",
handle.worker_pids().len(),
);
cleanup_state(&mut state);
}
#[test]
fn workers_pct_with_cpuset_resolves_against_cpuset_size() {
mock_setup_state!(mock, topo, ctx, state);
let def = CgroupDef::named("cg_p")
.cpuset(CpusetSpec::Llc(0))
.workers_pct(0.34);
apply_setup_test(&ctx, &mut state, std::slice::from_ref(&def)).unwrap();
let handle = &state
.handles
.iter()
.find(|(n, _)| n == "cg_p")
.expect("workload spawned")
.1;
assert_eq!(
handle.worker_pids().len(),
2,
"workers_pct(0.34) on Llc(0)=4 CPUs must resolve to ceil(4*0.34)=2 \
workers; got {} workers",
handle.worker_pids().len(),
);
cleanup_state(&mut state);
}
#[test]
fn workers_pct_above_one_accepts_oversubscription() {
mock_setup_state!(mock, topo, ctx, state);
let def = CgroupDef::named("cg_p")
.cpuset(CpusetSpec::Llc(0))
.workers_pct(2.0);
apply_setup_test(&ctx, &mut state, std::slice::from_ref(&def)).unwrap();
let handle = &state
.handles
.iter()
.find(|(n, _)| n == "cg_p")
.expect("workload spawned")
.1;
assert_eq!(
handle.worker_pids().len(),
8,
"workers_pct(2.0) on Llc(0)=4 CPUs must resolve to ceil(4*2.0)=8 \
workers (oversubscription); got {}",
handle.worker_pids().len(),
);
cleanup_state(&mut state);
}
#[test]
fn workers_pct_then_workers_rejected_at_apply() {
mock_setup_state!(mock, topo, ctx, state);
let def = CgroupDef::named("cg_p")
.cpuset(CpusetSpec::Llc(0))
.workers_pct(0.5)
.workers(2);
let err = apply_setup_test(&ctx, &mut state, std::slice::from_ref(&def))
.expect_err("workers_pct + workers must be rejected");
let msg = format!("{err}");
assert!(
msg.contains("workers_pct") && msg.contains("workers(2)"),
"error must name both workers and workers_pct: {msg}",
);
cleanup_state(&mut state);
}
#[test]
fn workers_then_workers_pct_rejected_at_apply() {
mock_setup_state!(mock, topo, ctx, state);
let def = CgroupDef::named("cg_p")
.cpuset(CpusetSpec::Llc(0))
.workers(2)
.workers_pct(0.5);
let err = apply_setup_test(&ctx, &mut state, std::slice::from_ref(&def))
.expect_err("workers + workers_pct must be rejected");
let msg = format!("{err}");
assert!(
msg.contains("workers_pct") && msg.contains("workers(2)"),
"error must name both workers and workers_pct: {msg}",
);
cleanup_state(&mut state);
}
#[test]
fn workers_pct_construction_stores_pct_without_resolving() {
let def = CgroupDef::named("cg_p").workers_pct(0.5);
let work = &def.works[0];
assert_eq!(
work.workers_pct,
Some(0.5),
"workers_pct must be stored verbatim at construction; got {:?}",
work.workers_pct,
);
assert_eq!(
work.num_workers, None,
"num_workers must be left unset at construction (apply-setup resolves); got {:?}",
work.num_workers,
);
}
#[test]
fn workers_pct_rounding_is_ceil_not_round_or_floor() {
mock_setup!(mock, topo, ctx);
let mut state = StepState::empty(&ctx);
let def_exact = CgroupDef::named("cg_exact")
.cpuset(CpusetSpec::Llc(0))
.workers_pct(0.5);
apply_setup_test(&ctx, &mut state, std::slice::from_ref(&def_exact)).unwrap();
let handle = &state
.handles
.iter()
.find(|(n, _)| n == "cg_exact")
.expect("workload spawned")
.1;
assert_eq!(
handle.worker_pids().len(),
2,
"workers_pct(0.5) on 4 CPUs (exact 2.0) must produce 2 workers",
);
cleanup_state(&mut state);
let mut state = StepState::empty(&ctx);
let def_mid = CgroupDef::named("cg_mid")
.cpuset(CpusetSpec::Llc(0))
.workers_pct(0.6);
apply_setup_test(&ctx, &mut state, std::slice::from_ref(&def_mid)).unwrap();
let handle = &state
.handles
.iter()
.find(|(n, _)| n == "cg_mid")
.expect("workload spawned")
.1;
assert_eq!(
handle.worker_pids().len(),
3,
"workers_pct(0.6) on 4 CPUs (≈2.4) must ceil to 3 workers; round (2) or floor (2) would be wrong",
);
cleanup_state(&mut state);
let mut state = StepState::empty(&ctx);
let def_just_over = CgroupDef::named("cg_over")
.cpuset(CpusetSpec::Llc(0))
.workers_pct(0.51);
apply_setup_test(&ctx, &mut state, std::slice::from_ref(&def_just_over)).unwrap();
let handle = &state
.handles
.iter()
.find(|(n, _)| n == "cg_over")
.expect("workload spawned")
.1;
assert_eq!(
handle.worker_pids().len(),
3,
"workers_pct(0.51) on 4 CPUs (≈2.04) must ceil to 3 workers; round (2) or floor (2) would be wrong",
);
cleanup_state(&mut state);
let mut state = StepState::empty(&ctx);
let def_just_under = CgroupDef::named("cg_under")
.cpuset(CpusetSpec::Llc(0))
.workers_pct(0.49);
apply_setup_test(&ctx, &mut state, std::slice::from_ref(&def_just_under)).unwrap();
let handle = &state
.handles
.iter()
.find(|(n, _)| n == "cg_under")
.expect("workload spawned")
.1;
assert_eq!(
handle.worker_pids().len(),
2,
"workers_pct(0.49) on 4 CPUs (≈1.96) must ceil to 2 workers; floor (1) would be wrong",
);
cleanup_state(&mut state);
}
#[test]
fn workers_pct_setup_workers_survive_op_setcpuset_narrowing() {
mock_setup_state!(mock, topo, ctx, state);
let def = CgroupDef::named("cg_stable")
.cpuset(CpusetSpec::Llc(0))
.workers_pct(0.5);
apply_setup_test(&ctx, &mut state, std::slice::from_ref(&def)).unwrap();
let initial_count = state
.handles
.iter()
.find(|(n, _)| n == "cg_stable")
.expect("workload spawned")
.1
.worker_pids()
.len();
assert_eq!(
initial_count, 2,
"baseline: workers_pct(0.5) on Llc(0)=4 CPUs → ceil(4*0.5)=2 workers",
);
let narrower: std::collections::BTreeSet<usize> = [0usize, 1].into_iter().collect();
apply_ops_test(
&ctx,
&mut state,
&[Op::SetCpuset {
cgroup: "cg_stable".into(),
cpus: CpusetSpec::Exact(narrower.clone()),
}],
)
.expect("Op::SetCpuset applies");
assert_eq!(
state
.cpusets
.get("cg_stable")
.expect("cg_stable has recorded cpuset"),
&narrower,
"Op::SetCpuset must persist the narrower set in state.cpusets via record_cpuset",
);
let after_count = state
.handles
.iter()
.find(|(n, _)| n == "cg_stable")
.expect("workload still present")
.1
.worker_pids()
.len();
assert_eq!(
after_count, initial_count,
"Op::SetCpuset apply arm must NOT re-resolve workers_pct; \
setup-spawned worker count must remain {initial_count}; got {after_count}",
);
cleanup_state(&mut state);
}
#[test]
fn workers_pct_pathological_finite_rejected_at_construction() {
let nan_panic = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let _ = CgroupDef::named("cg_nan").workers_pct(f64::NAN);
}));
assert!(
nan_panic.is_err(),
"CgroupDef::workers_pct(NaN) must panic at construction",
);
let inf_panic = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let _ = CgroupDef::named("cg_inf").workers_pct(f64::INFINITY);
}));
assert!(
inf_panic.is_err(),
"CgroupDef::workers_pct(INFINITY) must panic at construction",
);
let neg_panic = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let _ = CgroupDef::named("cg_neg").workers_pct(-1.0);
}));
assert!(
neg_panic.is_err(),
"CgroupDef::workers_pct(-1.0) must panic at construction",
);
let zero_panic = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let _ = CgroupDef::named("cg_zero").workers_pct(0.0);
}));
assert!(
zero_panic.is_err(),
"CgroupDef::workers_pct(0.0) must panic at construction",
);
}
#[test]
fn workers_pct_pathological_finite_large_saturates_usize() {
let work = crate::workload::WorkSpec::default().workers_pct(1e100);
let resolved = work
.resolve_workers_pct(4, "cg_saturate")
.expect("current framework does not gate against usize::MAX saturation");
assert_eq!(
resolved.num_workers,
Some(usize::MAX),
"extreme pct saturates `num_workers` to `usize::MAX` per Rust's saturating \
float-to-int `as` cast (RFC 2484); got {:?}",
resolved.num_workers,
);
}
#[test]
fn workers_pct_empty_cpuset_multi_workspec_lists_all_pcts() {
mock_setup_state!(mock, topo, ctx, state);
let def = CgroupDef::named("cg_multi")
.cpuset(CpusetSpec::Exact(std::collections::BTreeSet::new()))
.workers_pct(0.3)
.work(crate::workload::WorkSpec::default().workers_pct(0.7))
.work(crate::workload::WorkSpec::default().workers_pct(0.5));
let err = apply_setup_test(&ctx, &mut state, std::slice::from_ref(&def))
.expect_err("multi-workspec workers_pct on empty cpuset must reject");
let msg = format!("{err}");
assert!(
msg.contains("0.3") && msg.contains("0.7") && msg.contains("0.5"),
"diagnostic must name ALL configured workers_pct values, not just the first: {msg}",
);
assert!(
msg.contains("cpuset of 0"),
"diagnostic must still name the empty cpuset size: {msg}",
);
cleanup_state(&mut state);
}
#[test]
fn workers_pct_empty_cpuset_dual_set_bails_with_dedicated_error() {
mock_setup_state!(mock, topo, ctx, state);
let def = CgroupDef::named("cg_both")
.cpuset(CpusetSpec::Exact(std::collections::BTreeSet::new()))
.workers(2)
.workers_pct(0.5);
let err = apply_setup_test(&ctx, &mut state, std::slice::from_ref(&def))
.expect_err("workers + workers_pct on empty cpuset must reject");
let msg = format!("{err}");
assert!(
msg.contains("BOTH workers"),
"dual-set error must fire first; got the empty-cpuset diagnostic instead: {msg}",
);
assert!(
!msg.contains("cpuset of 0"),
"workers_pct-only empty-cpuset diagnostic must NOT preempt the more fundamental dual-set error: {msg}",
);
assert!(
msg.contains("empty cpuset would otherwise mask"),
"dual-set bail must include the case-(1)-specific trailing context that \
explains why this fired at apply_setup rather than at the deeper resolve \
path: {msg}",
);
cleanup_state(&mut state);
}
#[test]
fn empty_resolved_cpuset_without_workers_pct_bails_in_apply_setup() {
mock_setup_state!(mock, topo, ctx, state);
let def = CgroupDef::named("cg_empty_range").cpuset(CpusetSpec::Range {
start_frac: 0.0,
end_frac: 0.1,
});
let err = apply_setup_test(&ctx, &mut state, std::slice::from_ref(&def))
.expect_err("empty-resolved cpuset must reject even without workers_pct");
let msg = format!("{err}");
assert!(
msg.contains("cg_empty_range"),
"diagnostic must name the cgroup: {msg}",
);
assert!(
msg.contains("resolved to 0 CPU(s)"),
"diagnostic must name the zero-CPU resolution: {msg}",
);
assert!(
!msg.contains("workers_pct"),
"diagnostic must NOT cite workers_pct when none is set; \
that would mis-direct the operator to a knob they didn't \
configure: {msg}",
);
cleanup_state(&mut state);
}
#[test]
fn op_set_cpuset_narrow_to_empty_bails() {
mock_setup_state!(mock, topo, ctx, state);
apply_setup_test(
&ctx,
&mut state,
std::slice::from_ref(&CgroupDef::named("cg_narrow").cpuset(CpusetSpec::Llc(0))),
)
.unwrap();
let err = apply_ops_test(
&ctx,
&mut state,
&[Op::SetCpuset {
cgroup: std::borrow::Cow::Borrowed("cg_narrow"),
cpus: CpusetSpec::Range {
start_frac: 0.0,
end_frac: 0.1,
},
}],
)
.expect_err("Op::SetCpuset narrowing to empty must reject");
let msg = format!("{err}");
assert!(
msg.contains("cg_narrow"),
"diagnostic must name the target cgroup: {msg}",
);
assert!(
msg.contains("resolved to 0 CPU(s)"),
"diagnostic must name the zero-CPU resolution: {msg}",
);
assert!(
msg.contains("Op::SetCpuset"),
"diagnostic must identify the Op layer so the operator \
knows this came from a mid-scenario narrow, not setup: \
{msg}",
);
assert!(
msg.contains("Op::ClearCpuset"),
"diagnostic must point the operator at the right \
primitive for the 'release cpuset restriction' intent \
so a regression that drops the Op::ClearCpuset \
direction (leading users to the workaround \
`Range {{ 0.0, 1.0 }}` instead) is caught: {msg}",
);
cleanup_state(&mut state);
}
#[test]
fn workers_pct_empty_cpuset_rejects_with_diagnostic() {
mock_setup_state!(mock, topo, ctx, state);
let def = CgroupDef::named("cg_e")
.cpuset(CpusetSpec::Exact(std::collections::BTreeSet::new()))
.workers_pct(0.9);
let err = apply_setup_test(&ctx, &mut state, std::slice::from_ref(&def))
.expect_err("workers_pct on empty cpuset must reject");
let msg = format!("{err}");
assert!(
msg.contains("workers_pct(0.9)") && msg.contains("cpuset of 0"),
"diagnostic must name the requested fraction AND cpuset size: {msg}",
);
cleanup_state(&mut state);
}
#[test]
fn op_spawn_cgroup_pct_resolves_against_cgroup_cpuset() {
mock_setup_state!(mock, topo, ctx, state);
apply_setup_test(
&ctx,
&mut state,
std::slice::from_ref(&CgroupDef::named("cg_spawn").cpuset(CpusetSpec::Llc(0))),
)
.unwrap();
state.handles.clear();
let work = crate::workload::WorkSpec::default().workers_pct(0.5);
apply_ops_test(
&ctx,
&mut state,
&[Op::spawn(SpawnPlacement::cgroup("cg_spawn"), work)],
)
.unwrap();
let handle = &state
.handles
.iter()
.find(|(n, _)| n == "cg_spawn")
.expect("Op::Spawn(Cgroup) workload registered")
.1;
assert_eq!(
handle.worker_pids().len(),
2,
"Op::Spawn(Cgroup) workers_pct(0.5) on Llc(0)=4 must resolve to 2 workers; \
got {}",
handle.worker_pids().len(),
);
cleanup_state(&mut state);
}
#[test]
fn op_spawn_cgroup_pct_dual_set_rejected() {
mock_setup_state!(mock, topo, ctx, state);
apply_setup_test(
&ctx,
&mut state,
std::slice::from_ref(&CgroupDef::named("cg_x").cpuset(CpusetSpec::Llc(0))),
)
.unwrap();
state.handles.clear();
let work = crate::workload::WorkSpec::default()
.workers(2)
.workers_pct(0.5);
let err = apply_ops_test(
&ctx,
&mut state,
&[Op::spawn(SpawnPlacement::cgroup("cg_x"), work)],
)
.expect_err("Op::Spawn(Cgroup) dual-set must reject");
let msg = format!("{err}");
assert!(
msg.contains("workers_pct") && msg.contains("workers(2)"),
"Op::Spawn(Cgroup) diagnostic must name both knobs: {msg}",
);
cleanup_state(&mut state);
}
#[test]
fn ctx_cpuset_cpus_matches_resolve_len() {
mock_setup!(mock, topo, ctx);
let specs = [
CpusetSpec::Llc(0),
CpusetSpec::Numa(0),
CpusetSpec::Range {
start_frac: 0.0,
end_frac: 0.5,
},
CpusetSpec::Disjoint { index: 0, of: 2 },
CpusetSpec::Overlap {
index: 0,
of: 2,
frac: 0.5,
},
CpusetSpec::Exact([0usize, 1, 2].iter().copied().collect()),
];
for spec in &specs {
assert_eq!(
ctx.cpuset_cpus(spec),
spec.resolve(&ctx).len(),
"ctx.cpuset_cpus drift on {spec:?}",
);
}
}
#[test]
fn apply_ops_write_kernel_hot_dispatches_via_bridge() {
use std::sync::Arc;
let captured = Arc::new(std::sync::Mutex::new(
None::<crate::vmm::wire::KernelOpRequestPayload>,
));
let captured_clone = captured.clone();
let kernel_op_cb: crate::scenario::snapshot::KernelOpCallback = Arc::new(move |req| {
*captured_clone.lock().unwrap() = Some(req.clone());
crate::vmm::wire::KernelOpReplyPayload {
request_id: req.request_id,
success: true,
reason: String::new(),
read_values: vec![],
}
});
let bridge = crate::scenario::snapshot::SnapshotBridge::new(Arc::new(|_| None))
.with_kernel_op(kernel_op_cb);
let bridge_clone = bridge.clone();
let _bg = bridge.set_thread_local();
mock_setup_state!(mock, topo, ctx, state);
let ops = vec![Op::write_kernel_hot(
KernelTarget::symbol("test_field"),
KernelValue::u64(42),
)];
apply_ops_test(&ctx, &mut state, &ops).expect("WriteKernelHot must dispatch");
let req = captured.lock().unwrap().take().expect("callback must fire");
assert_eq!(req.mode, crate::vmm::wire::KernelOpMode::Hot);
assert_eq!(req.direction, crate::vmm::wire::KernelOpDirection::Write);
assert_eq!(req.entries.len(), 1);
match &req.entries[0].target {
crate::vmm::wire::KernelOpTarget::Symbol(s) => assert_eq!(s, "test_field"),
other => panic!("unexpected target shape: {other:?}"),
}
match req.entries[0].value {
crate::vmm::wire::KernelOpValue::U64(42) => {}
ref other => panic!("unexpected value shape: {other:?}"),
}
assert_eq!(bridge_clone.drain_kernel_ops().len(), 1);
cleanup_state(&mut state);
}
#[test]
fn apply_ops_write_kernel_cold_dispatches_with_cold_mode() {
use std::sync::Arc;
let captured = Arc::new(std::sync::Mutex::new(
None::<crate::vmm::wire::KernelOpRequestPayload>,
));
let captured_clone = captured.clone();
let kernel_op_cb: crate::scenario::snapshot::KernelOpCallback = Arc::new(move |req| {
*captured_clone.lock().unwrap() = Some(req.clone());
crate::vmm::wire::KernelOpReplyPayload {
request_id: req.request_id,
success: true,
reason: String::new(),
read_values: vec![],
}
});
let bridge = crate::scenario::snapshot::SnapshotBridge::new(Arc::new(|_| None))
.with_kernel_op(kernel_op_cb);
let _bg = bridge.set_thread_local();
mock_setup_state!(mock, topo, ctx, state);
let ops = vec![Op::write_kernel_cold_batch(vec![
(
KernelTarget::per_cpu_field("runqueues", "clock", 0),
KernelValue::u64(100),
),
(
KernelTarget::per_cpu_field("runqueues", "clock", 1),
KernelValue::u64(200),
),
])];
apply_ops_test(&ctx, &mut state, &ops).expect("WriteKernelCold must dispatch");
let req = captured.lock().unwrap().take().expect("callback must fire");
assert_eq!(req.mode, crate::vmm::wire::KernelOpMode::Cold);
assert_eq!(req.direction, crate::vmm::wire::KernelOpDirection::Write);
assert_eq!(req.entries.len(), 2, "batch must carry both entries");
cleanup_state(&mut state);
}
#[test]
fn apply_ops_read_kernel_hot_dispatches_with_width_u32() {
use std::sync::Arc;
let captured = Arc::new(std::sync::Mutex::new(
None::<crate::vmm::wire::KernelOpRequestPayload>,
));
let captured_clone = captured.clone();
let kernel_op_cb: crate::scenario::snapshot::KernelOpCallback = Arc::new(move |req| {
*captured_clone.lock().unwrap() = Some(req.clone());
crate::vmm::wire::KernelOpReplyPayload {
request_id: req.request_id,
success: true,
reason: String::new(),
read_values: vec![crate::vmm::wire::KernelOpValue::U32(7)],
}
});
let bridge = crate::scenario::snapshot::SnapshotBridge::new(Arc::new(|_| None))
.with_kernel_op(kernel_op_cb);
let bridge_clone = bridge.clone();
let _bg = bridge.set_thread_local();
mock_setup_state!(mock, topo, ctx, state);
let ops = vec![Op::read_kernel_hot(
"scratch_u32",
KernelTarget::symbol("some_u32"),
KernelValueWidth::u32(),
)];
apply_ops_test(&ctx, &mut state, &ops).expect("ReadKernelHot must dispatch");
let req = captured.lock().unwrap().take().expect("callback must fire");
assert_eq!(req.mode, crate::vmm::wire::KernelOpMode::Hot);
assert_eq!(req.direction, crate::vmm::wire::KernelOpDirection::Read);
assert_eq!(req.tag, "scratch_u32");
match req.entries[0].value {
crate::vmm::wire::KernelOpValue::U32(_) => {}
ref other => panic!("u32 width hint must emit U32 slot, got {other:?}"),
}
match bridge_clone.kernel_op_value("scratch_u32") {
Some(crate::vmm::wire::KernelOpValue::U32(7)) => {}
other => panic!("kernel_op_value lookup mismatch: {other:?}"),
}
cleanup_state(&mut state);
}
#[test]
fn apply_ops_read_kernel_cold_dispatches_with_width_bytes() {
use std::sync::Arc;
let captured = Arc::new(std::sync::Mutex::new(
None::<crate::vmm::wire::KernelOpRequestPayload>,
));
let captured_clone = captured.clone();
let kernel_op_cb: crate::scenario::snapshot::KernelOpCallback = Arc::new(move |req| {
*captured_clone.lock().unwrap() = Some(req.clone());
crate::vmm::wire::KernelOpReplyPayload {
request_id: req.request_id,
success: true,
reason: String::new(),
read_values: vec![crate::vmm::wire::KernelOpValue::Bytes(vec![0xAA; 16])],
}
});
let bridge = crate::scenario::snapshot::SnapshotBridge::new(Arc::new(|_| None))
.with_kernel_op(kernel_op_cb);
let _bg = bridge.set_thread_local();
mock_setup_state!(mock, topo, ctx, state);
let ops = vec![Op::read_kernel_cold(
"scratch_bytes",
KernelTarget::kva(0xffff_c900_0000_1000),
KernelValueWidth::bytes(16),
)];
apply_ops_test(&ctx, &mut state, &ops).expect("ReadKernelCold must dispatch");
let req = captured.lock().unwrap().take().expect("callback must fire");
assert_eq!(req.mode, crate::vmm::wire::KernelOpMode::Cold);
assert_eq!(req.direction, crate::vmm::wire::KernelOpDirection::Read);
match &req.entries[0].value {
crate::vmm::wire::KernelOpValue::Bytes(b) => {
assert_eq!(b.len(), 16, "Bytes(16) width hint must emit a 16-byte slot");
}
other => panic!("Bytes width hint must emit Bytes slot, got {other:?}"),
}
cleanup_state(&mut state);
}
#[test]
fn apply_ops_merges_three_adjacent_cold_write_singletons_into_one_dispatch() {
use std::sync::Arc;
let captured = Arc::new(std::sync::Mutex::new(Vec::<
crate::vmm::wire::KernelOpRequestPayload,
>::new()));
let captured_clone = captured.clone();
let kernel_op_cb: crate::scenario::snapshot::KernelOpCallback = Arc::new(move |req| {
captured_clone.lock().unwrap().push(req.clone());
crate::vmm::wire::KernelOpReplyPayload {
request_id: req.request_id,
success: true,
reason: String::new(),
read_values: vec![],
}
});
let bridge = crate::scenario::snapshot::SnapshotBridge::new(Arc::new(|_| None))
.with_kernel_op(kernel_op_cb);
let _bg = bridge.set_thread_local();
mock_setup_state!(mock, topo, ctx, state);
let ops = vec![
Op::write_kernel_cold(
KernelTarget::per_cpu_field("runqueues", "clock", 0),
KernelValue::u64(100),
),
Op::write_kernel_cold(
KernelTarget::per_cpu_field("runqueues", "clock", 1),
KernelValue::u64(200),
),
Op::write_kernel_cold(
KernelTarget::per_cpu_field("runqueues", "clock", 2),
KernelValue::u64(300),
),
];
apply_ops_test(&ctx, &mut state, &ops).expect("merged cold-write batch must dispatch");
let payloads = captured.lock().unwrap();
assert_eq!(
payloads.len(),
1,
"3 adjacent singletons must collapse into ONE bridge dispatch, got {} dispatches",
payloads.len()
);
assert_eq!(payloads[0].mode, crate::vmm::wire::KernelOpMode::Cold);
assert_eq!(
payloads[0].direction,
crate::vmm::wire::KernelOpDirection::Write
);
assert_eq!(
payloads[0].entries.len(),
3,
"merged batch must carry all 3 writes in input order"
);
cleanup_state(&mut state);
}
#[test]
fn merge_adjacent_cold_writes_capture_snapshot_is_barrier() {
use super::merge_adjacent_cold_writes;
let ops = vec![
Op::write_kernel_cold(KernelTarget::symbol("a"), KernelValue::u64(1)),
Op::CaptureSnapshot { name: "mid".into() },
Op::write_kernel_cold(KernelTarget::symbol("b"), KernelValue::u64(2)),
];
let merged = merge_adjacent_cold_writes(&ops);
assert_eq!(merged.len(), 3, "CaptureSnapshot must split cold writes");
assert!(matches!(merged[0], Op::WriteKernelCold { ref writes } if writes.len() == 1));
assert!(matches!(merged[1], Op::CaptureSnapshot { .. }));
assert!(matches!(merged[2], Op::WriteKernelCold { ref writes } if writes.len() == 1));
}
#[test]
fn merge_adjacent_cold_writes_non_kernel_op_is_barrier() {
use super::merge_adjacent_cold_writes;
let ops = vec![
Op::write_kernel_cold(KernelTarget::symbol("a"), KernelValue::u64(1)),
Op::AddCgroup {
name: "cg_mid".into(),
},
Op::write_kernel_cold(KernelTarget::symbol("b"), KernelValue::u64(2)),
];
let merged = merge_adjacent_cold_writes(&ops);
assert_eq!(
merged.len(),
3,
"non-kernel cgroup op must split cold writes"
);
assert!(matches!(merged[0], Op::WriteKernelCold { ref writes } if writes.len() == 1));
assert!(matches!(merged[1], Op::AddCgroup { .. }));
assert!(matches!(merged[2], Op::WriteKernelCold { ref writes } if writes.len() == 1));
}
#[test]
fn op_spawn_cgroup_moves_tasks_into_named_cgroup() {
mock_setup_state!(mock, topo, ctx, state);
state.cgroups.add_cgroup_no_cpuset("cg_test").unwrap();
let work = WorkSpec::default().workers(2).work_type(WorkType::SpinWait);
apply_ops_test(&ctx, &mut state, &[Op::spawn_workers("cg_test", work)])
.expect("Op::Spawn(Cgroup) should succeed");
let calls = mock.calls();
assert!(
calls
.iter()
.any(|c| matches!(c, CgroupCall::MoveTasks(n, 2) if n == "cg_test")),
"Op::Spawn(Cgroup) must call move_tasks(\"cg_test\", 2 pids), \
got: {calls:?}"
);
cleanup_state(&mut state);
}
#[test]
fn op_spawn_runner_cgroup_emits_zero_cgroup_calls() {
mock_setup_state!(mock, topo, ctx, state);
let work = WorkSpec::default().workers(1).work_type(WorkType::SpinWait);
apply_ops_test(&ctx, &mut state, &[Op::spawn_host(work)])
.expect("Op::Spawn(RunnerCgroup) should succeed");
let calls = mock.calls();
assert!(
calls.is_empty(),
"Op::Spawn(RunnerCgroup) must NOT touch the cgroup ops surface — \
workers stay in the spawner's own cgroup. Got: {calls:?}"
);
cleanup_state(&mut state);
}
#[test]
fn op_spawn_runner_cgroup_workers_zero_bails_with_actionable_diagnostic() {
mock_setup_state!(mock, topo, ctx, state);
let work = WorkSpec::default().workers(0).work_type(WorkType::SpinWait);
let err = apply_ops_test(&ctx, &mut state, &[Op::spawn_host(work)])
.expect_err("Op::Spawn(RunnerCgroup) workers(0) must bail");
let msg = format!("{err:#}");
assert!(
msg.contains("num_workers=0 is not allowed"),
"error must cite the resolve_num_workers diagnostic: {msg}"
);
assert!(
msg.contains("<runner>"),
"error must label the runner-cgroup path as `<runner>` \
(literal angle-brackets) so the operator can grep the \
RunnerCgroup spawn call site from the error msg: {msg}"
);
assert!(
mock.calls().is_empty(),
"bail path must not invoke cgroup ops: {:?}",
mock.calls()
);
cleanup_state(&mut state);
}
#[test]
fn op_spawn_runner_cgroup_bails_when_workspec_workers_pct_set() {
mock_setup_state!(mock, topo, ctx, state);
let work = WorkSpec::default()
.workers_pct(0.5)
.work_type(WorkType::SpinWait);
let err = apply_ops_test(&ctx, &mut state, &[Op::spawn_host(work)])
.expect_err("Op::Spawn(RunnerCgroup) with WorkSpec::workers_pct must bail");
let msg = format!("{err:#}");
assert!(
msg.contains("Op::Spawn") && msg.contains("workers_pct"),
"error must name the op + the rejected field: {msg}"
);
assert!(
msg.contains(".workers(N)"),
"error must name the explicit-count recovery as `.workers(N)`: {msg}"
);
assert!(
msg.contains("SpawnPlacement::Cgroup"),
"error must name the named-cgroup recovery path: {msg}"
);
assert!(
mock.calls().is_empty(),
"bail must skip every cgroup op: {:?}",
mock.calls()
);
cleanup_state(&mut state);
}
#[test]
fn op_spawn_cgroup_honors_workspec_workers_pct_against_cgroup_cpuset() {
mock_setup_state!(mock, topo, ctx, state);
let cpus: std::collections::BTreeSet<usize> = [0, 1, 2, 3].into_iter().collect();
state.cgroups.add_cgroup_no_cpuset("cg").unwrap();
state.cpusets.insert("cg".to_string(), cpus);
let work = WorkSpec::default()
.workers_pct(0.5)
.work_type(WorkType::SpinWait);
apply_ops_test(&ctx, &mut state, &[Op::spawn_workers("cg", work)]).expect(
"Op::Spawn(SpawnPlacement::Cgroup) + workers_pct(0.5) on 4-CPU cpuset must succeed",
);
let calls = mock.calls();
let moves: Vec<&CgroupCall> = calls
.iter()
.filter(|c| matches!(c, CgroupCall::MoveTasks(n, _) if n == "cg"))
.collect();
assert_eq!(
moves.len(),
1,
"exactly one move_tasks expected for the spawn; got: {moves:?}",
);
match &moves[0] {
CgroupCall::MoveTasks(_, n_pids) => assert_eq!(
*n_pids, 2,
"workers_pct(0.5) × 4 CPUs = ceil(2.0) = 2 workers; \
a regression in resolve_workers_pct's ceil-then-write surfaces here",
),
_ => unreachable!(),
}
cleanup_state(&mut state);
}
#[test]
fn op_spawn_cgroup_bails_when_workspec_pcomm_set() {
mock_setup_state!(mock, topo, ctx, state);
state.cgroups.add_cgroup_no_cpuset("cg").unwrap();
let work = WorkSpec::default()
.workers(1)
.work_type(WorkType::SpinWait)
.pcomm("chrome");
let err = apply_ops_test(&ctx, &mut state, &[Op::spawn_workers("cg", work)])
.expect_err("Op::Spawn(SpawnPlacement::Cgroup) with WorkSpec::pcomm must bail");
let msg = format!("{err:#}");
assert!(
msg.contains("pcomm"),
"error must cite the pcomm rejection: {msg}"
);
assert!(
msg.contains("CgroupDef::pcomm"),
"error must name the CgroupDef::pcomm recovery path: {msg}"
);
assert!(
msg.contains("spawn_pcomm_cgroup"),
"error must name the WorkloadHandle::spawn_pcomm_cgroup recovery path: {msg}"
);
let move_calls: Vec<_> = mock
.calls()
.iter()
.filter(|c| matches!(c, CgroupCall::MoveTasks(_, _)))
.cloned()
.collect();
assert!(
move_calls.is_empty(),
"construction-time bail must skip every move_tasks call: {move_calls:?}"
);
cleanup_state(&mut state);
}
#[test]
fn op_spawn_runner_cgroup_bails_when_workspec_pcomm_set() {
mock_setup_state!(mock, topo, ctx, state);
let work = WorkSpec::default()
.workers(1)
.work_type(WorkType::SpinWait)
.pcomm("java");
let err = apply_ops_test(&ctx, &mut state, &[Op::spawn_host(work)])
.expect_err("Op::Spawn(SpawnPlacement::RunnerCgroup) with WorkSpec::pcomm must bail");
let msg = format!("{err:#}");
assert!(
msg.contains("pcomm"),
"error must cite the pcomm rejection: {msg}"
);
assert!(
msg.contains("CgroupDef::pcomm"),
"error must name the CgroupDef::pcomm recovery path: {msg}"
);
assert!(
msg.contains("spawn_pcomm_cgroup"),
"error must name the WorkloadHandle::spawn_pcomm_cgroup recovery path: {msg}"
);
assert!(
mock.calls().is_empty(),
"construction-time bail must skip every cgroup op: {:?}",
mock.calls()
);
cleanup_state(&mut state);
}
#[test]
fn move_all_tasks_preserves_state_when_move_tasks_fails() {
use crate::workload::{WorkSpec, WorkloadConfig, WorkloadHandle};
mock_setup_backdrop!(mock, topo, ctx, step_state, backdrop_state);
step_state.cgroups.add_cgroup_no_cpuset("src").unwrap();
step_state.cgroups.add_cgroup_no_cpuset("dst").unwrap();
let w = WorkSpec::default();
let wl = WorkloadConfig::for_scenario_engine(
&w,
1,
crate::workload::AffinityIntent::Inherit,
w.work_type.clone(),
)
.expect(
"test fixture: pcomm must stay None for scenario-engine dispatch — \
if a future fixture variant sets pcomm, route via spawn_pcomm_cgroup \
instead of for_scenario_engine",
);
let h = WorkloadHandle::spawn(&wl).expect("spawn worker");
step_state.handles.push(("src".to_string(), h));
let setup_call_count = mock.calls().len();
mock.fail_nth_call_matching(
0,
|c| matches!(c, CgroupCall::MoveTasks(_, _)),
"injected kernel ENOSPC mid-move",
);
{
let mut scenario = ScenarioState::new(&mut step_state, &mut backdrop_state);
let err = apply_ops(
&ctx,
&mut scenario,
&[Op::move_all_tasks("src", "dst")],
false,
)
.expect_err("move_tasks failure must propagate");
let msg = format!("{err:#}");
assert!(
msg.contains("injected kernel ENOSPC mid-move"),
"error must surface the injected failure verbatim: {msg}"
);
}
assert_eq!(
step_state.handles.len(),
1,
"handle must survive the failed move; got: {n}",
n = step_state.handles.len(),
);
assert_eq!(
step_state.handles[0].0, "src",
"partial-failure rollback contract: handle stays keyed under \
`from` when move_tasks errored before rename_handles ran"
);
assert_eq!(
backdrop_state.handles.len(),
0,
"step-to-backdrop ownership transfer must not have run on \
the failure path; got: {n}",
n = backdrop_state.handles.len(),
);
let recorded = mock.calls();
let post_setup_calls: Vec<&CgroupCall> = recorded.iter().skip(setup_call_count).collect();
assert!(
matches!(post_setup_calls.first(), Some(CgroupCall::ClearSubtreeControl(n)) if n == "dst"),
"expected first post-setup call = clear_subtree_control(\"dst\"); \
got: {post_setup_calls:?}"
);
assert!(
matches!(post_setup_calls.get(1), Some(CgroupCall::MoveTasks(n, _)) if n == "dst"),
"expected second post-setup call = move_tasks(\"dst\", _) (the one \
we targeted via fail_nth_call_matching for the 0th MoveTasks); \
got: {post_setup_calls:?}"
);
step_state.handles.clear();
}
#[test]
fn move_all_tasks_step_to_backdrop_failure_preserves_step_ownership() {
use crate::workload::{WorkSpec, WorkloadConfig, WorkloadHandle};
mock_setup_backdrop!(mock, topo, ctx, step_state, backdrop_state);
backdrop_state
.cgroups
.add_cgroup_no_cpuset("bd_dst")
.unwrap();
step_state.cgroups.add_cgroup_no_cpuset("src").unwrap();
let w = WorkSpec::default();
let wl = WorkloadConfig::for_scenario_engine(
&w,
1,
crate::workload::AffinityIntent::Inherit,
w.work_type.clone(),
)
.expect(
"test fixture: pcomm must stay None for scenario-engine dispatch — \
if a future fixture variant sets pcomm, route via spawn_pcomm_cgroup \
instead of for_scenario_engine",
);
let h = WorkloadHandle::spawn(&wl).expect("spawn worker");
step_state.handles.push(("src".to_string(), h));
mock.fail_nth_call_matching(
0,
|c| matches!(c, CgroupCall::MoveTasks(_, _)),
"injected kernel ENOSPC mid-move (step→backdrop)",
);
{
let mut scenario = ScenarioState::new(&mut step_state, &mut backdrop_state);
let err = apply_ops(
&ctx,
&mut scenario,
&[Op::move_all_tasks("src", "bd_dst")],
false,
)
.expect_err("step→backdrop move_tasks failure must propagate");
let msg = format!("{err:#}");
assert!(
msg.contains("ENOSPC mid-move (step→backdrop)"),
"error must surface the injected failure verbatim: {msg}"
);
}
assert_eq!(
step_state.handles.len(),
1,
"handle must stay in step_state on failure; got: {n}",
n = step_state.handles.len(),
);
assert_eq!(
step_state.handles[0].0, "src",
"handle must stay keyed under 'src' (no rename ran)"
);
assert!(
backdrop_state.handles.is_empty(),
"no ownership transfer on failure path; got: {:?}",
backdrop_state
.handles
.iter()
.map(|(n, _)| n.as_str())
.collect::<Vec<_>>()
);
step_state.handles.clear();
}
#[test]
fn move_all_tasks_multi_handle_partial_failure_keeps_all_under_src() {
use crate::workload::{WorkSpec, WorkloadConfig, WorkloadHandle};
mock_setup_backdrop!(mock, topo, ctx, step_state, backdrop_state);
step_state.cgroups.add_cgroup_no_cpuset("src").unwrap();
step_state.cgroups.add_cgroup_no_cpuset("dst").unwrap();
for _ in 0..2 {
let w = WorkSpec::default();
let wl = WorkloadConfig::for_scenario_engine(
&w,
1,
crate::workload::AffinityIntent::Inherit,
w.work_type.clone(),
)
.expect(
"test fixture: pcomm must stay None for scenario-engine dispatch — \
if a future fixture variant sets pcomm, route via spawn_pcomm_cgroup \
instead of for_scenario_engine",
);
let h = WorkloadHandle::spawn(&wl).expect("spawn worker");
step_state.handles.push(("src".to_string(), h));
}
mock.fail_nth_call_matching(
1,
|c| matches!(c, CgroupCall::MoveTasks(_, _)),
"injected kernel ENOSPC on second handle's move_tasks",
);
{
let mut scenario = ScenarioState::new(&mut step_state, &mut backdrop_state);
let err = apply_ops(
&ctx,
&mut scenario,
&[Op::move_all_tasks("src", "dst")],
false,
)
.expect_err("multi-handle partial-failure must propagate");
let msg = format!("{err:#}");
assert!(
msg.contains("ENOSPC on second handle"),
"error must surface the injected failure verbatim: {msg}"
);
}
assert_eq!(
step_state.handles.len(),
2,
"no handle dropped on partial failure; got: {n}",
n = step_state.handles.len(),
);
assert!(
step_state.handles.iter().all(|(n, _)| n == "src"),
"all handles MUST stay keyed under 'src' (no partial re-key); \
got: {:?}",
step_state
.handles
.iter()
.map(|(n, _)| n.as_str())
.collect::<Vec<_>>(),
);
step_state.handles.clear();
}
#[test]
fn op_move_all_tasks_clears_subtree_control_then_moves_to_dst() {
use crate::workload::{WorkloadConfig, WorkloadHandle};
mock_setup_state!(mock, topo, ctx, state);
state.cgroups.add_cgroup_no_cpuset("src").unwrap();
state.cgroups.add_cgroup_no_cpuset("dst").unwrap();
let wl = WorkloadConfig {
num_workers: 1,
affinity: AffinityIntent::Inherit,
work_type: WorkType::SpinWait,
..Default::default()
};
let h = WorkloadHandle::spawn(&wl).expect("spawn worker");
state.handles.push(("src".to_string(), h));
apply_ops_test(&ctx, &mut state, &[Op::move_all_tasks("src", "dst")])
.expect("MoveAllTasks should succeed");
let calls = mock.calls();
let clear_idx = calls
.iter()
.position(|c| matches!(c, CgroupCall::ClearSubtreeControl(n) if n == "dst"))
.expect("MoveAllTasks must call clear_subtree_control(\"dst\")");
let move_idx = calls
.iter()
.position(|c| matches!(c, CgroupCall::MoveTasks(n, _) if n == "dst"))
.expect("MoveAllTasks must call move_tasks(\"dst\", _)");
assert!(
clear_idx < move_idx,
"clear_subtree_control must precede move_tasks for the same \
cgroup or the kernel rejects the cgroup.procs write with \
EBUSY (no-internal-process invariant). Got: {calls:?}"
);
cleanup_state(&mut state);
}
#[test]
fn op_spawn_cgroup_after_addcgroupdef_sets_cpuset_before_move_tasks() {
mock_setup_state!(mock, topo, ctx, state);
let cpus: BTreeSet<usize> = [0, 1].into_iter().collect();
apply_ops_test(
&ctx,
&mut state,
&[
Op::AddCgroupDef {
def: CgroupDef::named("cg_ordered").cpuset(CpusetSpec::Exact(cpus.clone())),
},
Op::spawn_workers(
"cg_ordered",
WorkSpec::default().workers(2).work_type(WorkType::SpinWait),
),
],
)
.expect("AddCgroupDef + Op::Spawn(SpawnPlacement::Cgroup) should succeed");
let calls = mock.calls();
let set_idx = calls
.iter()
.position(|c| matches!(c, CgroupCall::SetCpuset(n, _) if n == "cg_ordered"))
.expect("set_cpuset for cg_ordered");
let move_idx = calls
.iter()
.position(|c| matches!(c, CgroupCall::MoveTasks(n, _) if n == "cg_ordered"))
.expect("move_tasks for cg_ordered");
assert!(
set_idx < move_idx,
"set_cpuset must precede move_tasks for the same cgroup \
across the AddCgroupDef → Op::Spawn(SpawnPlacement::Cgroup) boundary: {calls:?}"
);
cleanup_state(&mut state);
}
#[test]
fn op_capture_cgroup_procs_records_snapshot_on_active_bridge() {
use std::sync::Arc;
mock_setup_state!(mock, topo, ctx, state);
mock.set_procs("cg_x", vec![100, 200, 300]);
let bridge = crate::scenario::snapshot::SnapshotBridge::new(Arc::new(|_| None));
let bridge_for_drain = bridge.clone();
let _guard = bridge.set_thread_local();
apply_ops_test(
&ctx,
&mut state,
&[Op::capture_cgroup_procs("snap_tag", "cg_x")],
)
.expect("Op::CaptureCgroupProcs should succeed");
let procs_calls: Vec<String> = mock
.calls()
.iter()
.filter_map(|c| match c {
CgroupCall::ReadProcs(name) => Some(name.clone()),
_ => None,
})
.collect();
assert_eq!(
procs_calls,
vec!["cg_x".to_string()],
"exactly one read_procs(\"cg_x\") expected; got: {procs_calls:?}",
);
let snaps = bridge_for_drain.drain_cgroup_procs();
assert_eq!(snaps.len(), 1);
assert_eq!(snaps[0].tag, "snap_tag");
assert_eq!(snaps[0].cgroup, "cg_x");
assert_eq!(snaps[0].pids, vec![100, 200, 300]);
cleanup_state(&mut state);
}
#[test]
fn op_capture_cgroup_procs_propagates_read_procs_error() {
use std::sync::Arc;
mock_setup_state!(mock, topo, ctx, state);
mock.fail_read_procs("cg_x", "injected ENOENT from cgroup.procs");
let bridge = crate::scenario::snapshot::SnapshotBridge::new(Arc::new(|_| None));
let bridge_for_drain = bridge.clone();
let _guard = bridge.set_thread_local();
let err = apply_ops_test(
&ctx,
&mut state,
&[Op::capture_cgroup_procs("snap_tag", "cg_x")],
)
.expect_err("read_procs Err must surface as apply_ops Err");
let msg = format!("{err:#}");
assert!(
msg.contains("Op::CaptureCgroupProcs"),
"diagnostic must name the op; got: {msg}",
);
assert!(
msg.contains("snap_tag") && msg.contains("cg_x"),
"diagnostic must echo tag + cgroup; got: {msg}",
);
assert!(
msg.contains("injected ENOENT"),
"diagnostic must chain the inner read_procs error; got: {msg}",
);
assert!(
bridge_for_drain.drain_cgroup_procs().is_empty(),
"no snapshot must be recorded when the read fails",
);
cleanup_state(&mut state);
}
#[test]
fn op_capture_cgroup_procs_empty_tag_bails_before_read() {
use std::sync::Arc;
mock_setup_state!(mock, topo, ctx, state);
let bridge = crate::scenario::snapshot::SnapshotBridge::new(Arc::new(|_| None));
let _guard = bridge.set_thread_local();
let err = apply_ops_test(&ctx, &mut state, &[Op::capture_cgroup_procs("", "cg_x")])
.expect_err("empty tag must bail");
let msg = format!("{err:#}");
assert!(
msg.contains("tag is empty"),
"diagnostic must cite the empty-tag bail; got: {msg}",
);
let procs_calls: Vec<String> = mock
.calls()
.iter()
.filter_map(|c| match c {
CgroupCall::ReadProcs(name) => Some(name.clone()),
_ => None,
})
.collect();
assert!(
procs_calls.is_empty(),
"empty-tag bail must skip read_procs; got: {procs_calls:?}",
);
cleanup_state(&mut state);
}
#[test]
fn op_capture_cgroup_procs_empty_cgroup_bails_before_read() {
use std::sync::Arc;
mock_setup_state!(mock, topo, ctx, state);
let bridge = crate::scenario::snapshot::SnapshotBridge::new(Arc::new(|_| None));
let _guard = bridge.set_thread_local();
let err = apply_ops_test(
&ctx,
&mut state,
&[Op::capture_cgroup_procs("snap_tag", "")],
)
.expect_err("empty cgroup name must bail");
let msg = format!("{err:#}");
assert!(
msg.contains("cgroup name is empty"),
"diagnostic must cite the empty-cgroup bail; got: {msg}",
);
let procs_calls: Vec<String> = mock
.calls()
.iter()
.filter_map(|c| match c {
CgroupCall::ReadProcs(name) => Some(name.clone()),
_ => None,
})
.collect();
assert!(
procs_calls.is_empty(),
"empty-cgroup bail must skip read_procs; got: {procs_calls:?}",
);
cleanup_state(&mut state);
}
#[test]
fn op_capture_cgroup_procs_bails_when_no_bridge_installed() {
mock_setup_state!(mock, topo, ctx, state);
mock.set_procs("cg_x", vec![42]);
let err = apply_ops_test(
&ctx,
&mut state,
&[Op::capture_cgroup_procs("snap_tag", "cg_x")],
)
.expect_err("missing bridge must surface as Err");
let msg = format!("{err:#}");
assert!(
msg.contains("no SnapshotBridge installed"),
"diagnostic must cite the missing bridge; got: {msg}",
);
assert!(
msg.contains("set_thread_local"),
"diagnostic must point to the install API; got: {msg}",
);
let read_calls: Vec<String> = mock
.calls()
.iter()
.filter_map(|c| match c {
CgroupCall::ReadProcs(name) => Some(name.clone()),
_ => None,
})
.collect();
assert!(
read_calls.is_empty(),
"no-bridge bail must hoist above read_procs; got read_procs calls: {read_calls:?}",
);
cleanup_state(&mut state);
}
#[test]
fn op_capture_cgroup_procs_multiple_tags_same_cgroup_preserve_order() {
use std::sync::Arc;
mock_setup_state!(mock, topo, ctx, state);
mock.set_procs("cg_x", vec![100, 200]);
let bridge = crate::scenario::snapshot::SnapshotBridge::new(Arc::new(|_| None));
let bridge_for_drain = bridge.clone();
let _guard = bridge.set_thread_local();
apply_ops_test(
&ctx,
&mut state,
&[
Op::capture_cgroup_procs("before", "cg_x"),
Op::capture_cgroup_procs("after", "cg_x"),
],
)
.expect("two captures should succeed");
let snaps = bridge_for_drain.drain_cgroup_procs();
assert_eq!(snaps.len(), 2);
assert_eq!(snaps[0].tag, "before");
assert_eq!(snaps[1].tag, "after");
assert_eq!(snaps[0].cgroup, "cg_x");
assert_eq!(snaps[1].cgroup, "cg_x");
assert_eq!(snaps[0].pids, vec![100, 200]);
assert_eq!(snaps[1].pids, vec![100, 200]);
cleanup_state(&mut state);
}
#[test]
fn op_capture_cgroup_procs_same_tag_same_cgroup_appends_not_overwrites() {
use std::sync::Arc;
mock_setup_state!(mock, topo, ctx, state);
mock.set_procs("cg_x", vec![42]);
let bridge = crate::scenario::snapshot::SnapshotBridge::new(Arc::new(|_| None));
let bridge_for_drain = bridge.clone();
let _guard = bridge.set_thread_local();
apply_ops_test(
&ctx,
&mut state,
&[
Op::capture_cgroup_procs("snap", "cg_x"),
Op::capture_cgroup_procs("snap", "cg_x"),
],
)
.expect("duplicate (tag, cgroup) captures should succeed");
let snaps = bridge_for_drain.drain_cgroup_procs();
assert_eq!(
snaps.len(),
2,
"same (tag, cgroup) MUST append both captures; HashMap-style \
overwrite would yield len=1 and silently drop a capture",
);
assert_eq!(snaps[0].tag, "snap");
assert_eq!(snaps[1].tag, "snap");
cleanup_state(&mut state);
}