mod types;
pub use types::*;
mod setup;
pub use setup::PLACEMENT_LOG_PATH;
use setup::apply_setup;
mod dispatch;
#[cfg(test)]
use dispatch::{
REPLACE_NOT_TRYING_DEADLINE_S, build_kernel_op_request, dispatch_kernel_op_request,
merge_adjacent_cold_writes, staged_scheduler_log_path, wait_for_accessor_publish_or_bail,
wait_for_worker_state_not_trying_or_bail, write_entries_from_writes,
};
use dispatch::{apply_ops, render_cgroup_key};
use std::collections::BTreeSet;
use std::thread;
use std::time::Duration;
use anyhow::{Context, Result};
use crate::assert::AssertResult;
use crate::scenario::backdrop;
use crate::scenario::{CgroupGroup, Ctx, process_alive};
use crate::vmm::guest_comms;
use crate::vmm::wire::StimulusPayload;
use crate::workload::{MemPolicy, WorkloadHandle};
struct BackdropState<'a> {
cgroups: CgroupGroup<'a>,
handles: Vec<(String, WorkloadHandle)>,
cpusets: std::collections::HashMap<String, BTreeSet<usize>>,
payload_handles: Vec<PayloadEntry>,
pinned_bpf_maps: std::collections::HashMap<String, std::os::fd::OwnedFd>,
}
impl<'a> BackdropState<'a> {
fn empty(ctx: &'a Ctx) -> Self {
Self {
cgroups: CgroupGroup::new(ctx.cgroups),
handles: Vec::new(),
cpusets: std::collections::HashMap::new(),
payload_handles: Vec::new(),
pinned_bpf_maps: std::collections::HashMap::new(),
}
}
}
struct StepState<'a> {
cgroups: CgroupGroup<'a>,
handles: Vec<(String, WorkloadHandle)>,
cpusets: std::collections::HashMap<String, BTreeSet<usize>>,
payload_handles: Vec<PayloadEntry>,
stall_monitor: Option<crate::scenario::host_stall::StallMonitorHandle>,
}
impl<'a> StepState<'a> {
fn empty(ctx: &'a Ctx) -> Self {
Self {
cgroups: CgroupGroup::new(ctx.cgroups),
handles: Vec::new(),
cpusets: std::collections::HashMap::new(),
payload_handles: Vec::new(),
stall_monitor: None,
}
}
}
struct ScenarioState<'a, 'b> {
step: &'b mut StepState<'a>,
backdrop: &'b mut BackdropState<'a>,
target_backdrop: bool,
}
impl<'a, 'b> ScenarioState<'a, 'b> {
fn new(step: &'b mut StepState<'a>, backdrop: &'b mut BackdropState<'a>) -> Self {
Self {
step,
backdrop,
target_backdrop: false,
}
}
fn with_target_backdrop<R>(&mut self, f: impl FnOnce(&mut Self) -> R) -> R {
let prev = self.target_backdrop;
self.target_backdrop = true;
let r = f(self);
self.target_backdrop = prev;
r
}
fn target_cgroups(&mut self) -> &mut CgroupGroup<'a> {
if self.target_backdrop {
&mut self.backdrop.cgroups
} else {
&mut self.step.cgroups
}
}
fn target_handles(&mut self) -> &mut Vec<(String, WorkloadHandle)> {
if self.target_backdrop {
&mut self.backdrop.handles
} else {
&mut self.step.handles
}
}
fn target_cpusets(&mut self) -> &mut std::collections::HashMap<String, BTreeSet<usize>> {
if self.target_backdrop {
&mut self.backdrop.cpusets
} else {
&mut self.step.cpusets
}
}
fn target_payload_handles(&mut self) -> &mut Vec<PayloadEntry> {
if self.target_backdrop {
&mut self.backdrop.payload_handles
} else {
&mut self.step.payload_handles
}
}
fn lookup_cpuset(&self, name: &str) -> Option<&BTreeSet<usize>> {
self.step
.cpusets
.get(name)
.or_else(|| self.backdrop.cpusets.get(name))
}
fn find_live_payload_with_cgroup(
&self,
payload_name: &str,
cgroup_key: &str,
) -> Option<&PayloadEntry> {
let matches =
|e: &&PayloadEntry| e.handle.payload_name() == payload_name && e.cgroup == cgroup_key;
self.step
.payload_handles
.iter()
.find(matches)
.or_else(|| self.backdrop.payload_handles.iter().find(matches))
}
fn take_payload_by_name(
&mut self,
name: &str,
cgroup: Option<&str>,
) -> std::result::Result<Option<PayloadEntry>, Vec<String>> {
if let Some(c) = cgroup {
if let Some(idx) = self
.step
.payload_handles
.iter()
.position(|e| e.handle.payload_name() == name && e.cgroup == c)
{
return Ok(Some(self.step.payload_handles.swap_remove(idx)));
}
if let Some(idx) = self
.backdrop
.payload_handles
.iter()
.position(|e| e.handle.payload_name() == name && e.cgroup == c)
{
return Ok(Some(self.backdrop.payload_handles.swap_remove(idx)));
}
return Ok(None);
}
let mut step_idx: Option<usize> = None;
let mut backdrop_idx: Option<usize> = None;
let mut cgroups: Vec<String> = Vec::new();
for (i, e) in self.step.payload_handles.iter().enumerate() {
if e.handle.payload_name() == name {
if step_idx.is_none() {
step_idx = Some(i);
}
cgroups.push(e.cgroup.clone());
}
}
for (i, e) in self.backdrop.payload_handles.iter().enumerate() {
if e.handle.payload_name() == name {
if backdrop_idx.is_none() && step_idx.is_none() {
backdrop_idx = Some(i);
}
cgroups.push(e.cgroup.clone());
}
}
if cgroups.len() > 1 {
return Err(cgroups);
}
if let Some(i) = step_idx {
return Ok(Some(self.step.payload_handles.swap_remove(i)));
}
if let Some(i) = backdrop_idx {
return Ok(Some(self.backdrop.payload_handles.swap_remove(i)));
}
Ok(None)
}
fn drain_all_payloads(&mut self) {
drain_all_payload_handles(&mut self.step.payload_handles);
drain_all_payload_handles(&mut self.backdrop.payload_handles);
}
fn drain_payloads_for_cgroup(&mut self, cgroup: &str) {
drain_payload_handles_for_cgroup(&mut self.step.payload_handles, cgroup);
drain_payload_handles_for_cgroup(&mut self.backdrop.payload_handles, cgroup);
}
fn drop_handles_for_cgroup(&mut self, cgroup: &str) {
self.step.handles.retain(|(n, _)| n.as_str() != cgroup);
self.backdrop.handles.retain(|(n, _)| n.as_str() != cgroup);
}
fn forget_cpuset(&mut self, cgroup: &str) {
self.step.cpusets.remove(cgroup);
self.backdrop.cpusets.remove(cgroup);
}
fn record_cpuset(&mut self, cgroup: &str, cpuset: BTreeSet<usize>) {
if self.step.cpusets.contains_key(cgroup) {
self.step.cpusets.insert(cgroup.to_string(), cpuset);
} else if self.backdrop.cpusets.contains_key(cgroup) {
self.backdrop.cpusets.insert(cgroup.to_string(), cpuset);
} else {
self.target_cpusets().insert(cgroup.to_string(), cpuset);
}
}
fn rename_handles(&mut self, from: &str, to: &str) {
let to_is_backdrop = self.cgroup_name_is_backdrop(to);
if to_is_backdrop {
let mut i = self.step.handles.len();
while i > 0 {
i -= 1;
if self.step.handles[i].0.as_str() == from {
let (_, handle) = self.step.handles.swap_remove(i);
self.backdrop.handles.push((to.to_string(), handle));
}
}
} else {
for (name, _) in &mut self.step.handles {
if name.as_str() == from {
*name = to.to_string();
}
}
}
for (name, _) in &mut self.backdrop.handles {
if name.as_str() == from {
*name = to.to_string();
}
}
}
fn all_handles(&self) -> impl Iterator<Item = &(String, WorkloadHandle)> {
self.step.handles.iter().chain(self.backdrop.handles.iter())
}
fn cgroup_name_is_tracked(&self, name: &str) -> bool {
self.step.cgroups.names().iter().any(|n| n == name)
|| self.backdrop.cgroups.names().iter().any(|n| n == name)
}
fn cgroup_name_is_backdrop(&self, name: &str) -> bool {
self.backdrop.cgroups.names().iter().any(|n| n == name)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum PayloadSource {
CgroupDefWorkload,
OpRunPayload,
}
impl PayloadSource {
fn describe(self) -> &'static str {
match self {
PayloadSource::CgroupDefWorkload => "CgroupDef::workload",
PayloadSource::OpRunPayload => "Op::RunPayload",
}
}
}
struct PayloadEntry {
cgroup: String,
source: PayloadSource,
handle: crate::scenario::payload_run::PayloadHandle,
}
fn sched_died_detail_kind() -> crate::assert::DetailKind {
use crate::assert::DetailKind;
use crate::probe::process::{SchedExitKind, sched_exit_kind};
match sched_exit_kind() {
SchedExitKind::Crashed => DetailKind::SchedulerCrashed,
SchedExitKind::Clean => DetailKind::SchedulerExitedCleanly,
SchedExitKind::Unknown => DetailKind::SchedulerDiedUnknownReason,
}
}
pub fn execute_defs(ctx: &Ctx, defs: Vec<CgroupDef>) -> Result<AssertResult> {
execute_steps(ctx, vec![Step::with_defs(defs, HoldSpec::FULL)])
}
pub fn await_accessor_ready() {
if guest_comms::is_guest() {
let latch = crate::vmm::rust_init::accessor_ready_latch();
if !latch.wait_timeout(Duration::from_secs(60)) {
tracing::warn!(
"await_accessor_ready timed out after 60s — host freeze \
coordinator did not signal accessor adoption; a dump from a \
stall after this point may render placeholder map values"
);
}
}
}
pub fn execute_steps(ctx: &Ctx, steps: Vec<Step>) -> Result<AssertResult> {
execute_steps_with(ctx, steps, None)
}
pub fn execute_scenario(
ctx: &Ctx,
backdrop: backdrop::Backdrop,
steps: Vec<Step>,
) -> Result<AssertResult> {
execute_scenario_with(ctx, backdrop, steps, None)
}
pub fn execute_scenario_with(
ctx: &Ctx,
backdrop: backdrop::Backdrop,
steps: Vec<Step>,
checks: Option<&crate::assert::Assert>,
) -> Result<AssertResult> {
run_scenario(ctx, backdrop, steps, checks)
}
pub fn execute_steps_with(
ctx: &Ctx,
steps: Vec<Step>,
checks: Option<&crate::assert::Assert>,
) -> Result<AssertResult> {
execute_scenario_with(ctx, backdrop::Backdrop::new(), steps, checks)
}
fn required_controllers(
ctx: &Ctx,
backdrop: &backdrop::Backdrop,
steps: &[Step],
) -> BTreeSet<crate::cgroup::Controller> {
use crate::cgroup::Controller;
fn absorb_def(set: &mut BTreeSet<Controller>, def: &CgroupDef) {
if def.cpuset.is_some() || def.cpuset_mems.is_some() {
set.insert(Controller::Cpuset);
}
if def.cpu.is_some() {
set.insert(Controller::Cpu);
}
if def.memory.is_some() {
set.insert(Controller::Memory);
}
if def.io.is_some() {
set.insert(Controller::Io);
}
if def.pids.is_some() {
set.insert(Controller::Pids);
}
}
fn absorb_op(set: &mut BTreeSet<Controller>, op: &Op) {
if matches!(
op,
Op::SetCpuset { .. }
| Op::ClearCpuset { .. }
| Op::SwapCpusets { .. }
| Op::SetAffinity { .. }
) {
set.insert(Controller::Cpuset);
}
if let Op::AddCgroupDef { def } = op {
absorb_def(set, def);
}
}
let mut set = BTreeSet::new();
for def in &backdrop.cgroups {
absorb_def(&mut set, def);
}
for op in &backdrop.ops {
absorb_op(&mut set, op);
}
for step in steps {
for def in step.setup.resolve(ctx) {
absorb_def(&mut set, &def);
}
for op in &step.ops {
absorb_op(&mut set, op);
}
}
set
}
fn run_scenario(
ctx: &Ctx,
backdrop: backdrop::Backdrop,
steps: Vec<Step>,
checks: Option<&crate::assert::Assert>,
) -> Result<AssertResult> {
for (i, step) in steps.iter().enumerate() {
if let Err(reason) = step.hold.validate() {
anyhow::bail!("step {i} hold validation: {reason}");
}
}
for p in &backdrop.payloads {
if p.is_scheduler() {
anyhow::bail!(
"Backdrop::push_payload received scheduler-kind Payload '{}' — \
only PayloadKind::Binary payloads run in the Backdrop; \
place scheduler-kind payloads on the #[ktstr_test(scheduler = ...)] \
attribute instead",
p.name,
);
}
}
for op in &backdrop.ops {
if let Op::RunPayload { payload, .. } = op
&& payload.is_scheduler()
{
anyhow::bail!(
"Backdrop::push_op(Op::RunPayload) received scheduler-kind Payload '{}' — \
only PayloadKind::Binary payloads run in the Backdrop; \
place scheduler-kind payloads on the #[ktstr_test(scheduler = ...)] \
attribute instead",
payload.name,
);
}
}
let effective_checks = checks.unwrap_or(&ctx.assert);
let required = required_controllers(ctx, &backdrop, &steps);
ctx.cgroups
.setup(&required)
.context("enable cgroup controllers in subtree_control")?;
let mut backdrop_state = BackdropState::empty(ctx);
let mut result = AssertResult::pass();
let scenario_start = std::time::Instant::now();
if guest_comms::is_guest() {
crate::vmm::guest_comms::send_scenario_start();
}
if ctx.wait_for_map_write && guest_comms::is_guest() {
let latch = crate::vmm::rust_init::bpf_map_write_done_latch();
if !latch.wait_timeout(Duration::from_secs(60)) {
tracing::warn!(
"wait_for_map_write timed out after 60s — host bpf-map-write \
thread may have failed to resolve a queued map; proceeding \
with the workload regardless"
);
}
}
if !backdrop.is_empty() {
let mut step_staging = StepState::empty(ctx);
let mut scratch = ScenarioState::new(&mut step_staging, &mut backdrop_state);
let setup_res = scratch.with_target_backdrop(|s| {
if !backdrop.cgroups.is_empty() {
apply_setup(ctx, s, &backdrop.cgroups)?;
}
if !backdrop.ops.is_empty() {
apply_ops(ctx, s, &backdrop.ops, false)?;
}
if !backdrop.payloads.is_empty() {
let ops: Vec<Op> = backdrop
.payloads
.iter()
.map(|p| Op::run_payload(p, Vec::<String>::new()))
.collect();
apply_ops(ctx, s, &ops, false)?;
}
Ok::<(), anyhow::Error>(())
});
if let Err(err) = setup_res {
let mut r =
collect_backdrop(&mut backdrop_state, effective_checks, ctx.topo, ctx.cgroups);
let staging_result =
collect_step(&mut step_staging, effective_checks, ctx.topo, ctx.cgroups);
r.merge(staging_result);
r.merge(result);
r.record_fail(crate::assert::AssertDetail::new(
crate::assert::DetailKind::Other,
format!("Backdrop setup failed: {err:#}"),
));
return Ok(r);
}
drain_all_payload_handles(&mut step_staging.payload_handles);
}
for (step_idx, step) in steps.iter().enumerate() {
if step_idx > 0
&& let Some(pid) = crate::vmm::rust_init::sched_pid()
&& !process_alive(pid)
{
let mut r =
collect_backdrop(&mut backdrop_state, effective_checks, ctx.topo, ctx.cgroups);
r.merge(result);
r.record_fail(crate::assert::AssertDetail::new(
sched_died_detail_kind(),
crate::assert::format_sched_died_after_step(
step_idx,
steps.len(),
scenario_start.elapsed().as_secs_f64(),
),
));
return Ok(r);
}
let mut step_state = StepState::empty(ctx);
let mut sched_died_during_hold = false;
let phase_step_index = u16::try_from(step_idx)
.ok()
.and_then(|i| i.checked_add(1))
.unwrap_or(u16::MAX);
ctx.current_step
.store(phase_step_index, std::sync::atomic::Ordering::Release);
let _phase_guard = crate::assert::PhaseGuard::install_step(step_idx as u16);
let step_res = run_step(
ctx,
step,
step_idx,
&mut step_state,
&mut backdrop_state,
scenario_start,
effective_checks,
&mut sched_died_during_hold,
);
if guest_comms::is_guest() {
crate::vmm::guest_comms::send_scenario_pause();
}
let step_result = collect_step(&mut step_state, effective_checks, ctx.topo, ctx.cgroups);
result.merge(step_result);
if let Err(err) = step_res {
let mut r =
collect_backdrop(&mut backdrop_state, effective_checks, ctx.topo, ctx.cgroups);
r.merge(result);
r.record_fail(crate::assert::AssertDetail::new(
crate::assert::DetailKind::Other,
format!("step {step_idx} failed: {err:#}"),
));
return Ok(r);
}
if sched_died_during_hold {
let mut r =
collect_backdrop(&mut backdrop_state, effective_checks, ctx.topo, ctx.cgroups);
r.merge(result);
r.record_fail(crate::assert::AssertDetail::new(
sched_died_detail_kind(),
crate::assert::format_sched_died_during_workload(
scenario_start.elapsed().as_secs_f64(),
),
));
return Ok(r);
}
}
if guest_comms::is_guest() {
let elapsed = scenario_start.elapsed().as_millis() as u64;
crate::vmm::guest_comms::send_scenario_end(elapsed);
}
let sched_dead = crate::vmm::rust_init::sched_pid().is_some_and(|pid| !process_alive(pid));
let backdrop_result =
collect_backdrop(&mut backdrop_state, effective_checks, ctx.topo, ctx.cgroups);
result.merge(backdrop_result);
if sched_dead {
result.record_fail(crate::assert::AssertDetail::new(
sched_died_detail_kind(),
crate::assert::format_sched_died_after_all_steps(
steps.len(),
scenario_start.elapsed().as_secs_f64(),
),
));
}
Ok(result)
}
#[cold]
#[track_caller]
fn panic_evented_hold_defect(
op: &str,
pid: libc::pid_t,
err: impl std::fmt::Display,
advice: &str,
) -> ! {
panic!("ktstr::scenario::hold_or_sched_died: {op} failed (pid={pid}): {err} — {advice}");
}
fn hold_or_sched_died(dur: Duration, sched_pid: Option<libc::pid_t>) -> bool {
use nix::sys::epoll::{Epoll, EpollCreateFlags, EpollEvent, EpollFlags, EpollTimeout};
use std::os::fd::{AsFd, FromRawFd, OwnedFd};
if dur.is_zero() {
return sched_pid.is_some_and(|pid| !process_alive(pid));
}
let Some(pid) = sched_pid else {
thread::sleep(dur);
return false;
};
let pidfd_raw = unsafe { libc::syscall(libc::SYS_pidfd_open, pid, 0i32) };
if pidfd_raw < 0 {
let err = std::io::Error::last_os_error();
if err.raw_os_error() == Some(libc::ESRCH) {
return true;
}
panic_evented_hold_defect(
"pidfd_open",
pid,
format_args!("{err} (errno {:?})", err.raw_os_error()),
"pidfd_open is unconditional from Linux 5.3; failure on a \
5.3+ kernel = env defect — check ulimit -n / memory pressure / \
cgroup pids.max",
);
}
let pidfd: OwnedFd = unsafe { OwnedFd::from_raw_fd(pidfd_raw as i32) };
let epoll = match Epoll::new(EpollCreateFlags::EPOLL_CLOEXEC) {
Ok(e) => e,
Err(e) => {
let fd = std::os::fd::AsRawFd::as_raw_fd(&pidfd);
panic_evented_hold_defect(
"epoll_create1(EPOLL_CLOEXEC)",
pid,
format_args!("{e} (pidfd={fd})"),
"epoll has been universally available since 2.6; failure = \
env defect — check ulimit -n and CONFIG_EPOLL",
);
}
};
let event = EpollEvent::new(EpollFlags::EPOLLIN, 0);
if let Err(e) = epoll.add(pidfd.as_fd(), event) {
panic_evented_hold_defect(
"epoll_ctl(ADD)",
pid,
e,
"epoll_ctl(ADD) on a freshly-opened pidfd should never fail \
in a healthy kernel; documented errors (EBADF/EEXIST/ENOMEM) \
are env defects — likely fd exhaustion or memory pressure",
);
}
let deadline = std::time::Instant::now() + dur;
let mut events = [EpollEvent::empty()];
loop {
let remaining = deadline.saturating_duration_since(std::time::Instant::now());
if remaining.is_zero() {
return !process_alive(pid);
}
let ms_i32 = remaining.as_millis().min(i32::MAX as u128) as i32;
let timeout_param = match EpollTimeout::try_from(ms_i32) {
Ok(t) => t,
Err(e) => {
panic_evented_hold_defect(
"EpollTimeout::try_from",
pid,
format_args!("{e} (input={ms_i32})"),
"input was pre-clamped to fit i32; failure indicates an \
upstream nix EpollTimeout API change requiring code update",
);
}
};
match epoll.wait(&mut events, timeout_param) {
Ok(0) => {
}
Ok(_) => {
return true;
}
Err(nix::errno::Errno::EINTR) => {
}
Err(e) => {
panic_evented_hold_defect(
"epoll_wait",
pid,
e,
"epoll_wait on a freshly-created epoll with a single \
valid pidfd cannot legitimately fail outside EINTR; \
documented errors (EBADF/EFAULT/EINVAL) are framework- \
internal memory-safety defects — investigate concurrent \
fd mutation or stack-frame corruption",
);
}
}
}
}
#[allow(clippy::too_many_arguments)]
fn run_step<'a>(
ctx: &Ctx,
step: &Step,
step_idx: usize,
step_state: &mut StepState<'a>,
backdrop_state: &mut BackdropState<'a>,
scenario_start: std::time::Instant,
_effective_checks: &crate::assert::Assert,
sched_died_during_hold: &mut bool,
) -> Result<()> {
let mut scenario = ScenarioState::new(step_state, backdrop_state);
macro_rules! drain_on_err {
($scenario:expr, $e:expr) => {
match $e {
Ok(v) => v,
Err(err) => {
$scenario.drain_all_payloads();
return Err(err);
}
}
};
}
match step.hold {
HoldSpec::Loop { interval } => {
if !step.setup.is_empty() {
let defs = step.setup.resolve(ctx);
drain_on_err!(scenario, apply_setup(ctx, &mut scenario, &defs));
}
let deadline = scenario_start + ctx.duration;
while std::time::Instant::now() < deadline {
drain_on_err!(scenario, apply_ops(ctx, &mut scenario, &step.ops, true));
let remaining = deadline.saturating_duration_since(std::time::Instant::now());
if hold_or_sched_died(remaining.min(interval), crate::vmm::rust_init::sched_pid()) {
*sched_died_during_hold = true;
return Ok(());
}
}
}
_ => {
drain_on_err!(scenario, apply_ops(ctx, &mut scenario, &step.ops, false));
if !step.setup.is_empty() {
let defs = step.setup.resolve(ctx);
drain_on_err!(scenario, apply_setup(ctx, &mut scenario, &defs));
}
if guest_comms::is_guest() {
let payload = build_stimulus(&scenario_start, step_idx, &step.ops, &scenario);
crate::vmm::guest_comms::send_stimulus(zerocopy::IntoBytes::as_bytes(&payload));
}
if guest_comms::is_guest() {
crate::vmm::guest_comms::send_scenario_resume();
}
let hold_dur = match step.hold {
HoldSpec::Frac(f) => Duration::from_secs_f64(ctx.duration.as_secs_f64() * f),
HoldSpec::Fixed(d) => d,
HoldSpec::Loop { .. } => unreachable!(),
};
let remaining = (scenario_start + ctx.duration)
.saturating_duration_since(std::time::Instant::now());
let hold_dur = hold_dur.min(remaining);
if hold_or_sched_died(hold_dur, crate::vmm::rust_init::sched_pid()) {
*sched_died_during_hold = true;
return Ok(());
}
}
}
Ok(())
}
fn build_stimulus(
scenario_start: &std::time::Instant,
step_idx: usize,
ops: &[Op],
state: &ScenarioState<'_, '_>,
) -> StimulusPayload {
let mut op_kinds: u32 = 0;
for op in ops {
op_kinds |= 1 << op.discriminant();
}
let total_iterations: u64 = state
.all_handles()
.flat_map(|(_, h)| h.snapshot_iterations())
.sum();
let cgroup_count = state.step.cgroups.names().len() + state.backdrop.cgroups.names().len();
let worker_count = state.step.handles.len() + state.backdrop.handles.len();
let to_u32 = |field: &str, v: u128| -> u32 {
u32::try_from(v).unwrap_or_else(|_| {
tracing::warn!(
field,
value = %v,
"StimulusPayload field overflowed u32; saturating to u32::MAX",
);
u32::MAX
})
};
let to_u16 = |field: &str, v: usize| -> u16 {
u16::try_from(v).unwrap_or_else(|_| {
tracing::warn!(
field,
value = v,
"StimulusPayload field overflowed u16; saturating to u16::MAX",
);
u16::MAX
})
};
let phase_step_index: u16 = u16::try_from(step_idx)
.ok()
.and_then(|i| i.checked_add(1))
.unwrap_or_else(|| {
tracing::warn!(
field = "step_index",
value = step_idx,
"StimulusPayload step_index overflowed u16 after 1-indexed encoding; saturating to u16::MAX",
);
u16::MAX
});
StimulusPayload {
elapsed_ms: to_u32("elapsed_ms", scenario_start.elapsed().as_millis()),
step_index: phase_step_index,
op_count: to_u16("op_count", ops.len()),
op_kinds,
cgroup_count: to_u16("cgroup_count", cgroup_count),
worker_count: to_u16("worker_count", worker_count),
total_iterations,
}
}
fn validate_known_flags(payload: &crate::test_support::Payload, args: &[String]) -> Result<()> {
let Some(allowlist) = payload.known_flags else {
return Ok(());
};
for arg in args {
let Some(flag_body) = arg.strip_prefix("--") else {
continue;
};
let name = flag_body
.split('=')
.next()
.expect("str::split always yields at least one element");
if name.is_empty() {
continue;
}
if !allowlist.contains(&name) {
anyhow::bail!(
"Op::RunPayload: payload '{}' received unknown flag \
'--{name}' — not in its known_flags allowlist \
{allowlist:?}. Check the spelling against the \
payload's declared flags; if '--{name}' is a new \
legitimate flag, add it to `Payload::known_flags`.",
payload.name,
);
}
}
Ok(())
}
fn validate_mempolicy_cpuset(
policy: &MemPolicy,
flags: crate::workload::MpolFlags,
cpuset: &BTreeSet<usize>,
ctx: &Ctx,
cgroup_name: &str,
) -> Result<()> {
use crate::workload::MpolFlags;
let known_bits = MpolFlags::STATIC_NODES.bits()
| MpolFlags::RELATIVE_NODES.bits()
| MpolFlags::NUMA_BALANCING.bits();
let unknown_bits = flags.bits() & !known_bits;
if unknown_bits != 0 {
anyhow::bail!(
"cgroup '{}': MpolFlags contains unknown bit(s) {:#x} (known bits: \
STATIC_NODES={:#x}, RELATIVE_NODES={:#x}, NUMA_BALANCING={:#x}); \
refusing to forward to the kernel — update MpolFlags to model the \
new bit before using it, or clear the bit at the call site",
cgroup_name,
unknown_bits,
MpolFlags::STATIC_NODES.bits(),
MpolFlags::RELATIVE_NODES.bits(),
MpolFlags::NUMA_BALANCING.bits(),
);
}
if flags.contains(MpolFlags::STATIC_NODES) && flags.contains(MpolFlags::RELATIVE_NODES) {
anyhow::bail!(
"cgroup '{}': MpolFlags::STATIC_NODES and MpolFlags::RELATIVE_NODES are \
mutually exclusive (the kernel will reject the set_mempolicy syscall with \
EINVAL); pick whichever matches the intended semantics — STATIC_NODES \
for absolute node ids that survive cpuset changes, RELATIVE_NODES for \
cpuset-relative indices",
cgroup_name,
);
}
let policy_nodes = policy.node_set();
if policy_nodes.is_empty() {
return Ok(());
}
if flags.contains(MpolFlags::STATIC_NODES) {
let host_nodes = ctx.topo.numa_node_ids();
let missing: Vec<usize> = policy_nodes
.iter()
.copied()
.filter(|n| !host_nodes.contains(n))
.collect();
if !missing.is_empty() {
anyhow::bail!(
"cgroup '{}': MemPolicy with MpolFlags::STATIC_NODES references \
NUMA node(s) {:?} that do not exist on this host (host nodes: {:?}); \
the kernel will reject or silently drop the policy (Preferred can \
silently fall back to local allocation; Bind/Interleave reject with \
EINVAL) — fix the MemPolicy or pick a host with the required nodes",
cgroup_name,
missing,
host_nodes,
);
}
return Ok(());
}
if flags.contains(MpolFlags::RELATIVE_NODES) {
return Ok(());
}
let cpuset_numa = ctx.topo.numa_nodes_for_cpuset(cpuset);
let uncovered: Vec<usize> = policy_nodes
.iter()
.copied()
.filter(|n| !cpuset_numa.contains(n))
.collect();
if !uncovered.is_empty() {
anyhow::bail!(
"cgroup '{}': MemPolicy references NUMA node(s) {:?} \
outside the cpuset's coverage (cpuset covers node(s) \
{:?}) — some or all of the worker's allocations would \
live on NUMA nodes its CPUs cannot reach locally, \
producing cross-socket allocation traffic that is \
almost certainly unintended. Two fixes: \
(a) add .mpol_flags(MpolFlags::STATIC_NODES) to \
declare the cross-node placement intentional (the \
flag survives cpuset rebinds; see MpolFlags doc), or \
(b) widen the cpuset to cover the policy's nodes \
(e.g. CpusetSpec::Numa(N) for each referenced N, or \
a CpusetSpec::Exact set that spans both).",
cgroup_name,
uncovered,
cpuset_numa,
);
}
Ok(())
}
fn collect_step(
step_state: &mut StepState<'_>,
checks: &crate::assert::Assert,
topo: &crate::topology::TestTopology,
cgroups: &dyn crate::cgroup::CgroupOps,
) -> AssertResult {
for name in step_state.cgroups.names() {
if let Err(e) = cgroups.set_freeze(name, false) {
tracing::warn!(
cgroup = %name,
err = %format!("{e:#}"),
"collect_step: pre-teardown unfreeze failed; rmdir may EBUSY"
);
}
}
drain_all_payload_handles(&mut step_state.payload_handles);
let stall_reports = if let Some(handle) = step_state.stall_monitor.take() {
let reports = handle.drain();
drop(handle);
reports
} else {
Vec::new()
};
let handles = std::mem::take(&mut step_state.handles);
let mut result = crate::scenario::collect_handles(
handles
.into_iter()
.map(|(name, h)| (h, step_state.cpusets.get(&name))),
checks,
Some(topo),
);
for report in stall_reports {
result.record_fail(crate::assert::AssertDetail::new(
crate::assert::DetailKind::WorkerStalled,
format_stall_report(&report),
));
}
result
}
fn format_stall_report(report: &crate::scenario::host_stall::StallReport) -> String {
use std::fmt::Write as _;
let mut s = String::new();
let _ = writeln!(
s,
"worker stall detected: pid={} comm={:?} (host-mode /proc/<pid>/sched polling)",
report.pid, report.comm,
);
if let (Some(first), Some(last)) = (report.samples.first(), report.samples.last()) {
let nr_delta = last.nr_switches.saturating_sub(first.nr_switches);
let rt_delta = last
.sum_exec_runtime_ns
.saturating_sub(first.sum_exec_runtime_ns);
let _ = writeln!(
s,
" sample window: nr_switches {} -> {} (delta {nr_delta}), sum_exec_runtime_ns {} -> {} (delta {rt_delta}), {} samples",
first.nr_switches,
last.nr_switches,
first.sum_exec_runtime_ns,
last.sum_exec_runtime_ns,
report.samples.len(),
);
}
let d = &report.diagnostic;
let _ = writeln!(s, " state: {}", d.state);
let _ = writeln!(s, " wchan: {}", d.wchan);
let _ = writeln!(s, " syscall: {}", d.syscall);
let _ = writeln!(s, " cgroup: {}", d.cgroup);
let _ = writeln!(s, " host loadavg: {}", d.host_loadavg);
if let Some(stack) = &d.stack {
let _ = writeln!(s, " kernel stack:\n{stack}");
}
s
}
fn collect_backdrop(
backdrop_state: &mut BackdropState<'_>,
checks: &crate::assert::Assert,
topo: &crate::topology::TestTopology,
cgroups: &dyn crate::cgroup::CgroupOps,
) -> AssertResult {
for name in backdrop_state.cgroups.names() {
if let Err(e) = cgroups.set_freeze(name, false) {
tracing::warn!(
cgroup = %name,
err = %format!("{e:#}"),
"collect_backdrop: pre-teardown unfreeze failed; rmdir may EBUSY"
);
}
}
drain_all_payload_handles(&mut backdrop_state.payload_handles);
let handles = std::mem::take(&mut backdrop_state.handles);
crate::scenario::collect_handles(
handles
.into_iter()
.map(|(name, h)| (h, backdrop_state.cpusets.get(&name))),
checks,
Some(topo),
)
}
fn drain_payload_handles_for_cgroup(handles: &mut Vec<PayloadEntry>, cgroup: &str) {
let mut i = handles.len();
while i > 0 {
i -= 1;
if handles[i].cgroup.as_str() == cgroup {
let entry = handles.remove(i);
if let Err(e) = entry.handle.kill() {
eprintln!("ktstr: kill payload in cgroup '{cgroup}': {e:#}");
}
}
}
}
fn drain_all_payload_handles(handles: &mut Vec<PayloadEntry>) {
while let Some(entry) = handles.pop() {
if let Err(e) = entry.handle.kill() {
eprintln!(
"ktstr: teardown kill payload in cgroup {}: {e:#}",
render_cgroup_key(&entry.cgroup),
);
}
}
}
#[cfg(test)]
mod tests;
#[cfg(test)]
mod workers_pct_construction_tests;
#[cfg(test)]
mod kernel_op_dispatch_tests;