use std::collections::BTreeSet;
use std::sync::atomic::{AtomicBool, Ordering};
use anyhow::{Context, Result};
use crate::scenario::Ctx;
use crate::workload::{ResolvedAffinity, WorkloadConfig, WorkloadHandle};
use super::setup::{append_placement_log, apply_setup};
use super::{
KernelTarget, KernelValue, Op, PayloadEntry, PayloadSource, ScenarioState, SpawnPlacement,
validate_known_flags, validate_mempolicy_cpuset,
};
static SNAPSHOT_TRANSPORT_DEAD: AtomicBool = AtomicBool::new(false);
pub(super) fn apply_ops(
ctx: &Ctx,
state: &mut ScenarioState<'_, '_>,
ops: &[Op],
in_loop: bool,
) -> Result<()> {
let merged = merge_adjacent_cold_writes(ops);
for op in &merged {
match op {
Op::AddCgroup { name } => {
if state.cgroup_name_is_tracked(name) {
anyhow::bail!(
"Op::AddCgroup '{}' collides with a cgroup already \
tracked (by a prior Backdrop or step-local CgroupDef) — \
declare it in exactly one place; use a fresh name for \
the step-local cgroup",
name,
);
}
state.target_cgroups().add_cgroup_no_cpuset(name)?;
}
Op::AddCgroupDef { def } => {
apply_setup(ctx, state, std::slice::from_ref(def))?;
}
Op::RemoveCgroup { cgroup } => {
state.drain_payloads_for_cgroup(cgroup);
state.drop_handles_for_cgroup(cgroup);
state.forget_cpuset(cgroup);
let in_backdrop = state
.backdrop
.cgroups
.names()
.iter()
.any(|n| n == &**cgroup);
let in_step = state.step.cgroups.names().iter().any(|n| n == &**cgroup);
if in_backdrop {
tracing::warn!(
cgroup = %cgroup,
"Op::RemoveCgroup removed a Backdrop-owned cgroup mid-scenario; \
unless this name is re-added by a later Op::AddCgroup, \
downstream ops referencing it will see kernel-level \
`cgroup missing` errors. If this removal was unintended \
(e.g. typo'd cgroup name that coincidentally matched a \
Backdrop entry), check the test source for the intended \
Backdrop cgroup.",
);
} else if !in_step {
tracing::warn!(
cgroup = %cgroup,
backdrop_cgroups = ?state.backdrop.cgroups.names(),
step_cgroups = ?state.step.cgroups.names(),
"Op::RemoveCgroup target '{cgroup}' matches no step-local \
or Backdrop-owned cgroup — could be a typo or a \
second-remove of an already-forgotten name. Compare \
against the listed Backdrop and step cgroups; if a \
downstream op later hits kernel-level `cgroup missing` \
on a similar name, the typo here is the probable source.",
);
}
state.step.cgroups.forget(cgroup);
state.backdrop.cgroups.forget(cgroup);
if let Err(err) = ctx.cgroups.remove_cgroup(cgroup)
&& !crate::scenario::is_io_not_found(&err)
{
let hint = crate::scenario::remove_cgroup_errno_hint(&err).unwrap_or("");
tracing::warn!(
cgroup = %cgroup,
err = %format!("{err:#}"),
hint,
"Op::RemoveCgroup: remove_cgroup returned non-ENOENT error",
);
}
}
Op::SetCpuset { cgroup, cpus } => {
if let Err(reason) = cpus.validate(ctx) {
anyhow::bail!(
"cgroup '{}': CpusetSpec validation failed: {}",
cgroup,
reason
);
}
let resolved = cpus.resolve_quiet(ctx);
if resolved.is_empty() {
anyhow::bail!(
"cgroup '{}': Op::SetCpuset spec {:?} \
resolved to 0 CPU(s); narrowing a live \
cgroup to empty would leave running \
workers without CPUs and downstream \
assertions would vacuously pass — adjust \
the spec so it resolves to a non-empty \
cpuset on this topology, or use \
Op::ClearCpuset if the intent was to \
release the cpuset restriction (allow all \
CPUs)",
cgroup,
cpus,
);
}
ctx.cgroups.set_cpuset(cgroup, &resolved)?;
state.record_cpuset(cgroup, resolved);
}
Op::ClearCpuset { cgroup } => {
ctx.cgroups.clear_cpuset(cgroup)?;
state.forget_cpuset(cgroup);
}
Op::SwapCpusets { a, b } => {
let cpus_a = read_cpuset(ctx, a);
let cpus_b = read_cpuset(ctx, b);
if let Some(ca) = cpus_a {
ctx.cgroups.set_cpuset(b, &ca)?;
state.record_cpuset(b, ca);
}
if let Some(cb) = cpus_b {
ctx.cgroups.set_cpuset(a, &cb)?;
state.record_cpuset(a, cb);
}
}
Op::Spawn { placement, work } => match placement {
SpawnPlacement::RunnerCgroup => {
if let Err(reason) = work.mem_policy.validate() {
anyhow::bail!("Op::Spawn(RunnerCgroup): {}", reason);
}
if work.workers_pct.is_some() {
anyhow::bail!(
"Op::Spawn with SpawnPlacement::RunnerCgroup does not support \
`WorkSpec::workers_pct` — RunnerCgroup spawns workers in the \
test runner's own cgroup, with no managed cgroup whose cpuset \
would scale the fraction against (workers_pct = `ceil(cpuset_cpus \
* pct)`). Either set an explicit `.workers(N)` count, or switch \
to SpawnPlacement::Cgroup(name) against a cgroup whose cpuset \
gives `workers_pct` a denominator.",
);
}
let n = crate::scenario::resolve_num_workers(
work,
ctx.workers_per_cgroup,
"<runner>",
)?;
let affinity =
crate::scenario::intent_for_spawn(&work.affinity, None, ctx.topo)?;
let wl = WorkloadConfig::for_scenario_engine(
work,
n,
affinity,
work.work_type.clone(),
)?;
let mut h = WorkloadHandle::spawn(&wl)?;
h.start();
state.target_handles().push((String::new(), h));
}
SpawnPlacement::Cgroup(cgroup) => {
if cgroup.is_empty() {
anyhow::bail!(
"Op::Spawn(SpawnPlacement::Cgroup): cgroup name is empty — \
use SpawnPlacement::runner_cgroup() to spawn workers in \
the test runner's own cgroup, or pass a non-empty name \
via SpawnPlacement::cgroup(name)",
);
}
if !state.cgroup_name_is_tracked(cgroup) {
anyhow::bail!(
"Op::Spawn(SpawnPlacement::Cgroup('{cgroup}')): \
cgroup '{cgroup}' is not tracked by the scenario state — \
declare it via CgroupDef in Step.setup, \
Op::add_cgroup / Op::add_cgroup_def earlier in the \
same step, or on the persistent Backdrop. Tracked \
step-local cgroups: {step:?}; tracked Backdrop \
cgroups: {backdrop:?}",
step = state.step.cgroups.names(),
backdrop = state.backdrop.cgroups.names(),
);
}
if let Err(reason) = work.mem_policy.validate() {
anyhow::bail!("Op::Spawn(Cgroup '{}'): {}", cgroup, reason);
}
let cgroup_cpuset: Option<BTreeSet<usize>> =
state.lookup_cpuset(cgroup).cloned();
let cpuset_size = cgroup_cpuset
.as_ref()
.map_or_else(|| ctx.topo.usable_cpuset().len(), |s| s.len());
let work = work.clone().resolve_workers_pct(cpuset_size, cgroup)?;
let n = crate::scenario::resolve_num_workers(
&work,
ctx.workers_per_cgroup,
cgroup,
)?;
if let Some(ref resolved) = cgroup_cpuset {
validate_mempolicy_cpuset(
&work.mem_policy,
work.mpol_flags,
resolved,
ctx,
cgroup,
)?;
}
let affinity = crate::scenario::intent_for_spawn(
&work.affinity,
cgroup_cpuset.as_ref(),
ctx.topo,
)?;
let wl = WorkloadConfig::for_scenario_engine(
&work,
n,
affinity,
work.work_type.clone(),
)?;
let mut h = WorkloadHandle::spawn(&wl)?;
ctx.cgroups.move_tasks(cgroup, &h.worker_pids())?;
h.start();
state.target_handles().push((cgroup.to_string(), h));
}
},
Op::StopCgroup { cgroup } => {
state.drain_payloads_for_cgroup(cgroup);
state.drop_handles_for_cgroup(cgroup);
}
Op::SetAffinity { cgroup, affinity } => {
let cgroup_cpuset: Option<BTreeSet<usize>> = state.lookup_cpuset(cgroup).cloned();
let resolved = crate::scenario::resolve_affinity_for_cgroup(
affinity,
cgroup_cpuset.as_ref(),
ctx.topo,
)?;
let random_pool: Vec<usize> =
if let ResolvedAffinity::Random { from, .. } = &resolved {
from.iter().copied().collect()
} else {
Vec::new()
};
for (name, handle) in state.all_handles() {
if name.as_str() == *cgroup {
match &resolved {
ResolvedAffinity::None => {}
ResolvedAffinity::Fixed(cpus) => {
for idx in 0..handle.worker_pids().len() {
if let Err(e) = handle.set_affinity(idx, cpus) {
tracing::warn!(
cgroup = %cgroup,
idx,
err = %format!("{e:#}"),
"Op::SetAffinity Fixed: handle.set_affinity failed; \
worker keeps prior affinity"
);
}
}
}
ResolvedAffinity::Random { from, count } => {
if from.is_empty() || *count == 0 {
unreachable!(
"ResolvedAffinity::Random {{ from={from:?}, count={count} }} \
reached Op::SetAffinity with empty pool or count==0 — \
resolve_affinity_for_cgroup is supposed to bail on those \
cases (no-silent-drops invariant). Audit the new caller \
that constructed it.",
);
}
use rand::seq::IndexedRandom;
for idx in 0..handle.worker_pids().len() {
let chosen: BTreeSet<usize> = random_pool
.sample(&mut rand::rng(), *count)
.copied()
.collect();
if let Err(e) = handle.set_affinity(idx, &chosen) {
tracing::warn!(
cgroup = %cgroup,
idx,
err = %format!("{e:#}"),
"Op::SetAffinity Random: handle.set_affinity failed; \
worker keeps prior affinity"
);
}
}
}
ResolvedAffinity::SingleCpu(cpu) => {
let cpus: BTreeSet<usize> = [*cpu].into_iter().collect();
for idx in 0..handle.worker_pids().len() {
if let Err(e) = handle.set_affinity(idx, &cpus) {
tracing::warn!(
cgroup = %cgroup,
idx,
cpu = *cpu,
err = %format!("{e:#}"),
"Op::SetAffinity SingleCpu: handle.set_affinity failed; \
worker keeps prior affinity"
);
}
}
}
}
}
}
}
Op::MoveAllTasks { from, to } => {
if from == to {
anyhow::bail!(
"Op::MoveAllTasks from '{}' to '{}' is a self-move \
and a silent no-op (cgroup.procs is idempotent on \
same-cgroup writes); remove the op or correct the \
typo on whichever side was intended to differ",
from,
to,
);
}
if !state.target_backdrop
&& state.cgroup_name_is_backdrop(from)
&& !state.cgroup_name_is_backdrop(to)
{
anyhow::bail!(
"Op::MoveAllTasks from Backdrop-owned '{}' to step-local '{}' \
would leave persistent workers in a cgroup that disappears \
at step boundary; declare `{}` in the Backdrop too, or \
move the workers back into a Backdrop-owned cgroup",
from,
to,
to,
);
}
if !state.cgroup_name_is_tracked(to) {
tracing::warn!(
cgroup = %to,
backdrop_cgroups = ?state.backdrop.cgroups.names(),
step_cgroups = ?state.step.cgroups.names(),
"Op::MoveAllTasks destination '{to}' matches no \
step-local or Backdrop-owned cgroup — could be a \
typo or a move into an externally-managed cgroup. \
Compare against the listed Backdrop and step \
cgroups; if the subsequent move fails with a \
kernel-level `cgroup.procs ENOENT`, the typo here \
is the probable source.",
);
}
if let Err(e) = ctx.cgroups.clear_subtree_control(to) {
tracing::warn!(
cgroup = to.as_ref(),
err = %e,
"failed to clear subtree_control before task move"
);
}
let pid_batches: Vec<Vec<libc::pid_t>> = state
.all_handles()
.filter(|(name, _)| name.as_str() == *from)
.map(|(_, handle)| handle.worker_pids())
.collect();
let from_procs_path = ctx
.cgroups
.parent_path()
.join(from.as_ref())
.join("cgroup.procs");
let from_procs_pre = std::fs::read_to_string(&from_procs_path)
.unwrap_or_else(|e| format!("<read {}: {e}>", from_procs_path.display()));
append_placement_log(&format!(
"Op::MoveAllTasks pre-move from={} to={} batches={:?} src_cgroup.procs={:?}",
from,
to,
pid_batches,
from_procs_pre.trim(),
));
for pids in &pid_batches {
ctx.cgroups.move_tasks(to, pids)?;
}
let to_procs_path = ctx
.cgroups
.parent_path()
.join(to.as_ref())
.join("cgroup.procs");
let to_procs_post = std::fs::read_to_string(&to_procs_path)
.unwrap_or_else(|e| format!("<read {}: {e}>", to_procs_path.display()));
append_placement_log(&format!(
"Op::MoveAllTasks post-move to={} dest_cgroup.procs={:?}",
to,
to_procs_post.trim(),
));
state.rename_handles(from, to);
}
Op::RunPayload {
payload,
args,
cgroup,
} => {
if payload.is_scheduler() {
anyhow::bail!(
"Op::RunPayload called with scheduler-kind Payload ('{}'); \
only PayloadKind::Binary payloads can be spawned by step ops",
payload.name,
);
}
validate_known_flags(payload, args)?;
let cgroup_key = cgroup.as_ref().map(|c| c.to_string()).unwrap_or_default();
if let Some(existing) =
state.find_live_payload_with_cgroup(payload.name, &cgroup_key)
{
anyhow::bail!(
"Op::RunPayload: payload '{}' already running in cgroup {} (spawned by {}) — \
WaitPayload/KillPayload it before spawning another with the same name in the same cgroup",
payload.name,
render_cgroup_key(&existing.cgroup),
existing.source.describe(),
);
}
let mut run = crate::scenario::payload_run::PayloadRun::new(ctx, payload);
if !args.is_empty() {
run = run.args(args.iter().cloned());
}
if let Some(c) = cgroup {
run = run.in_cgroup(c.clone());
}
let handle = run.spawn().with_context(|| {
format!(
"Op::RunPayload: spawn payload '{}' in cgroup {}",
payload.name,
render_cgroup_key(&cgroup_key),
)
})?;
state.target_payload_handles().push(PayloadEntry {
cgroup: cgroup_key,
source: PayloadSource::OpRunPayload,
handle,
});
}
Op::WaitPayload { name, cgroup } => {
let entry = take_payload_for_op(
state,
"Op::WaitPayload",
"waiting",
"Op::wait_payload_in_cgroup",
name,
cgroup.as_deref(),
)?;
let _result = entry
.handle
.wait()
.with_context(|| format!("Op::WaitPayload: wait payload '{name}'"))?;
}
Op::KillPayload { name, cgroup } => {
let entry = take_payload_for_op(
state,
"Op::KillPayload",
"killing",
"Op::kill_payload_in_cgroup",
name,
cgroup.as_deref(),
)?;
let _result = entry
.handle
.kill()
.with_context(|| format!("Op::KillPayload: kill payload '{name}'"))?;
}
Op::FreezeCgroup { cgroup } => {
ctx.cgroups
.set_freeze(cgroup, true)
.with_context(|| format!("Op::FreezeCgroup: cgroup '{cgroup}'"))?;
}
Op::UnfreezeCgroup { cgroup } => {
ctx.cgroups
.set_freeze(cgroup, false)
.with_context(|| format!("Op::UnfreezeCgroup: cgroup '{cgroup}'"))?;
}
Op::CaptureSnapshot { name } => {
if in_loop {
anyhow::bail!(
"Op::CaptureSnapshot('{name}') inside HoldSpec::Loop forces a freeze \
rendezvous every loop iteration, freezing every vCPU on each iteration \
so the workload no longer runs at the rate you wrote; move the capture \
into a non-Loop Step before or after the Loop step"
);
}
let phase = ctx.current_step.load(std::sync::atomic::Ordering::Acquire);
let invoked = crate::scenario::snapshot::with_active_bridge(|b| {
let captured = b.capture_with_step(name, phase);
if captured {
tracing::info!(
name = %name,
stored = b.len(),
step_index = phase,
"Op::CaptureSnapshot: captured diagnostic snapshot"
);
}
captured
});
if invoked.is_none() {
if crate::vmm::guest_comms::is_guest() {
if SNAPSHOT_TRANSPORT_DEAD.load(Ordering::Relaxed) {
anyhow::bail!(
"Op::CaptureSnapshot('{name}'): snapshot transport latched dead; \
a prior request observed TransportError and the latch only flips \
on transport failure (host-side coordinator unreachable until \
process restart)"
);
} else {
let timeout = std::time::Duration::from_secs(30);
match crate::vmm::guest_comms::request_snapshot(
crate::vmm::wire::SNAPSHOT_KIND_CAPTURE,
name,
timeout,
) {
crate::vmm::wire::SnapshotRequestResult::Ok => {
tracing::info!(
name = %name,
"Op::CaptureSnapshot: host captured diagnostic snapshot via TLV stream"
);
}
crate::vmm::wire::SnapshotRequestResult::HostError { reason } => {
anyhow::bail!(
"Op::CaptureSnapshot('{name}'): host rejected capture: {reason}"
);
}
crate::vmm::wire::SnapshotRequestResult::TransportError {
reason,
} => {
SNAPSHOT_TRANSPORT_DEAD.store(true, Ordering::Relaxed);
anyhow::bail!(
"Op::CaptureSnapshot('{name}'): port-1 transport failure: {reason}"
);
}
}
}
} else {
anyhow::bail!(
"Op::CaptureSnapshot('{name}'): not supported in host_only mode \
(no guest VM, no test-fixture SnapshotBridge installed); \
snapshot capture is mutually exclusive with host_only — \
either drop the snapshot op or convert the test to non-host_only",
);
}
}
}
Op::WatchSnapshot { symbol } => {
let registered =
crate::scenario::snapshot::with_active_bridge(|b| b.register_watch(symbol));
match registered {
Some(Ok(())) => {
tracing::info!(
symbol = %symbol,
"Op::WatchSnapshot: registered hardware-watchpoint snapshot"
);
}
Some(Err(err)) => {
anyhow::bail!(
"Op::WatchSnapshot: register watch on '{symbol}' failed: {err}",
);
}
None => {
if crate::vmm::guest_comms::is_guest() {
if SNAPSHOT_TRANSPORT_DEAD.load(Ordering::Relaxed) {
anyhow::bail!(
"Op::WatchSnapshot('{symbol}'): snapshot transport latched \
dead; a prior request observed TransportError and the latch \
only flips on transport failure (host-side coordinator \
unreachable until process restart)"
);
} else {
let timeout = std::time::Duration::from_secs(30);
match crate::vmm::guest_comms::request_snapshot(
crate::vmm::wire::SNAPSHOT_KIND_WATCH,
symbol,
timeout,
) {
crate::vmm::wire::SnapshotRequestResult::Ok => {
tracing::info!(
symbol = %symbol,
"Op::WatchSnapshot: host armed hardware-watchpoint via TLV stream"
);
}
crate::vmm::wire::SnapshotRequestResult::HostError {
reason,
} => {
anyhow::bail!(
"Op::WatchSnapshot('{symbol}'): host rejected: {reason}"
);
}
crate::vmm::wire::SnapshotRequestResult::TransportError {
reason,
} => {
SNAPSHOT_TRANSPORT_DEAD.store(true, Ordering::Relaxed);
anyhow::bail!(
"Op::WatchSnapshot('{symbol}'): port-1 transport failure: {reason}"
);
}
}
}
} else {
anyhow::bail!(
"Op::WatchSnapshot('{symbol}'): not supported in host_only mode \
(no guest VM, no test-fixture SnapshotBridge installed); \
hardware-watchpoint snapshots are mutually exclusive with \
host_only — either drop the watch op or convert the test to \
non-host_only",
);
}
}
}
}
Op::WriteKernelHot { writes } => {
let payload = build_kernel_op_request(
crate::vmm::wire::KernelOpMode::Hot,
crate::vmm::wire::KernelOpDirection::Write,
String::new(),
write_entries_from_writes(writes),
);
dispatch_kernel_op_request("Op::WriteKernelHot", payload)?;
}
Op::WriteKernelCold { writes } => {
let payload = build_kernel_op_request(
crate::vmm::wire::KernelOpMode::Cold,
crate::vmm::wire::KernelOpDirection::Write,
String::new(),
write_entries_from_writes(writes),
);
dispatch_kernel_op_request("Op::WriteKernelCold", payload)?;
}
Op::ReadKernelHot { tag, target, width } => {
let payload = build_kernel_op_request(
crate::vmm::wire::KernelOpMode::Hot,
crate::vmm::wire::KernelOpDirection::Read,
tag.to_string(),
vec![crate::vmm::wire::KernelOpEntry {
target: target.into(),
value: width.into(),
}],
);
dispatch_kernel_op_request("Op::ReadKernelHot", payload)?;
}
Op::ReadKernelCold { tag, target, width } => {
let payload = build_kernel_op_request(
crate::vmm::wire::KernelOpMode::Cold,
crate::vmm::wire::KernelOpDirection::Read,
tag.to_string(),
vec![crate::vmm::wire::KernelOpEntry {
target: target.into(),
value: width.into(),
}],
);
dispatch_kernel_op_request("Op::ReadKernelCold", payload)?;
}
Op::AttachScheduler { scheduler } => {
dispatch_attach_scheduler(scheduler)?;
crate::vmm::rust_init::set_current_scheduler(Some(&scheduler.binary));
}
Op::DetachScheduler => {
dispatch_detach_scheduler()?;
crate::vmm::rust_init::set_current_scheduler(None);
}
Op::RestartScheduler => {
dispatch_restart_scheduler()?;
}
Op::ReplaceScheduler { scheduler } => {
dispatch_replace_scheduler(scheduler)?;
crate::vmm::rust_init::set_current_scheduler(Some(&scheduler.binary));
}
Op::PinBpfMap { name } => {
let name_key = name.as_ref().to_string();
if let std::collections::hash_map::Entry::Vacant(slot) =
state.backdrop.pinned_bpf_maps.entry(name_key)
{
let fd = crate::scenario::bpf_pin::open_bpf_map_fd_by_name(name.as_ref())
.map_err(|e| {
anyhow::anyhow!(
"Op::PinBpfMap({name:?}): {e:#}\n\
\n\
Common causes:\n \
(a) Target scheduler's BPF object hasn't finished \
loading. Place this op AFTER a hold long enough for \
the scheduler to attach (typically ~100ms for the \
small scx-ktstr fixture, longer for heavyweight \
schedulers).\n \
(b) Step ran before any `Op::AttachScheduler` or \
before the boot scheduler started; pin must come \
after the scheduler that owns the map is up.\n \
(c) Name exceeds the 15-char usable cap of \
`BPF_OBJ_NAME_LEN` and was truncated by libbpf when \
loaded — compare against the observed names in the \
error above; the kernel-visible name is the \
truncated form."
)
})?;
slot.insert(fd);
}
}
Op::CaptureCgroupProcs { tag, cgroup } => {
if tag.is_empty() {
anyhow::bail!(
"Op::CaptureCgroupProcs: tag is empty; the tag is the \
snapshot key consumers use to find the capture in \
`SnapshotBridge::drain_cgroup_procs` — supply a \
non-empty identifier (e.g. \"after_spawn\", \
\"post_migrate\")"
);
}
if cgroup.is_empty() {
anyhow::bail!(
"Op::CaptureCgroupProcs(tag={tag:?}): cgroup name is empty. \
Provide a cgroup name registered via `Op::AddCgroup`, a \
`CgroupDef` in setup, or pushed on the Backdrop; an empty \
name would resolve to the runner's own cgroup, which is \
almost certainly not the test author's intent"
);
}
if crate::scenario::snapshot::with_active_bridge(|_| ()).is_none() {
anyhow::bail!(
"Op::CaptureCgroupProcs(tag={tag:?}, cgroup={cgroup:?}): \
no SnapshotBridge installed for this thread; bailing \
before the cgroup.procs read so the misconfiguration \
surfaces without burning a syscall (a silent record \
would also leave subsequent `drain_cgroup_procs` empty, \
masking the missing-bridge bug). Install a bridge via \
`SnapshotBridge::set_thread_local` (RAII via \
`BridgeGuard`) before `execute_scenario` runs the op"
);
}
let pids = ctx.cgroups.read_procs(cgroup).with_context(|| {
format!(
"Op::CaptureCgroupProcs(tag={tag:?}, cgroup={cgroup:?}): \
CgroupOps::read_procs failed",
)
})?;
let pid_count = pids.len();
crate::scenario::snapshot::with_active_bridge(|b| {
b.record_cgroup_procs(tag.to_string(), cgroup.to_string(), pids);
});
tracing::info!(
tag = %tag,
cgroup = %cgroup,
pid_count,
"Op::CaptureCgroupProcs: captured cgroup.procs snapshot"
);
}
}
}
Ok(())
}
pub(super) fn merge_adjacent_cold_writes(ops: &[Op]) -> Vec<Op> {
let mut out: Vec<Op> = Vec::with_capacity(ops.len());
let mut pending_writes: Option<Vec<(KernelTarget, KernelValue)>> = None;
for op in ops {
match op {
Op::WriteKernelCold { writes } => {
match &mut pending_writes {
Some(buf) => buf.extend(writes.iter().cloned()),
None => pending_writes = Some(writes.clone()),
}
}
_ => {
if let Some(buf) = pending_writes.take() {
out.push(Op::WriteKernelCold { writes: buf });
}
out.push(op.clone());
}
}
}
if let Some(buf) = pending_writes.take() {
out.push(Op::WriteKernelCold { writes: buf });
}
out
}
pub(super) fn build_kernel_op_request(
mode: crate::vmm::wire::KernelOpMode,
direction: crate::vmm::wire::KernelOpDirection,
tag: String,
entries: Vec<crate::vmm::wire::KernelOpEntry>,
) -> crate::vmm::wire::KernelOpRequestPayload {
crate::vmm::wire::KernelOpRequestPayload {
request_id: 0,
mode,
direction,
tag,
entries,
}
}
pub(super) fn write_entries_from_writes(
writes: &[(KernelTarget, KernelValue)],
) -> Vec<crate::vmm::wire::KernelOpEntry> {
writes
.iter()
.map(|(target, value)| crate::vmm::wire::KernelOpEntry {
target: target.into(),
value: value.into(),
})
.collect()
}
fn next_sched_spawn_seq() -> u64 {
static SEQ: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
SEQ.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
}
pub(super) fn staged_scheduler_log_path(name: &str) -> String {
format!("/tmp/sched_{name}_{seq}.log", seq = next_sched_spawn_seq())
}
const SCHED_LIFECYCLE_KILL_GRACE: std::time::Duration = std::time::Duration::from_secs(10);
pub(super) const REPLACE_NOT_TRYING_DEADLINE_S: u64 = 5;
const SCX_STATE_SYSFS: &str = "/sys/kernel/sched_ext/state";
fn wait_for_scx_disabled(timeout: std::time::Duration) -> Result<std::time::Duration> {
use crate::vmm::freeze_coord::evented_wait::{KernfsWaitOutcome, kernfs_evented_wait};
use nix::sys::inotify::AddWatchFlags;
let start = std::time::Instant::now();
let path = std::path::Path::new(SCX_STATE_SYSFS);
if !path.exists() {
return Ok(std::time::Duration::ZERO);
}
let mut buf = String::with_capacity(32);
let mut last_state = String::new();
let check_done = || -> Option<()> {
buf.clear();
let _ = std::fs::File::open(SCX_STATE_SYSFS).and_then(|mut f| {
use std::io::Read;
f.read_to_string(&mut buf)
});
let state = buf.trim_end();
last_state.clear();
last_state.push_str(state);
if state == "disabled" { Some(()) } else { None }
};
let cadence = std::time::Duration::from_millis(50);
let outcome = kernfs_evented_wait(
"/sys/kernel/sched_ext/",
AddWatchFlags::IN_DELETE,
Some("/sys/kernel/sched_ext/state"),
cadence,
start + timeout,
check_done,
);
match outcome {
KernfsWaitOutcome::Done(()) => Ok(start.elapsed()),
KernfsWaitOutcome::Timeout => {
anyhow::bail!(
"wait_for_scx_disabled: state '{last_state}' did not reach 'disabled' \
within {timeout:?}; the kernel scx state machine is stuck — \
the next scheduler spawn will hit -EBUSY at the enable path. \
Inspect /sys/kernel/sched_ext/state + dmesg for the stuck \
disable transition.",
);
}
KernfsWaitOutcome::NoEventedSource => {
anyhow::bail!(
"wait_for_scx_disabled: could not subscribe to evented wake \
sources (state fd open failed AND inotify_add_watch on \
/sys/kernel/sched_ext/ failed). Diagnose: \
(1) does '/sys/kernel/sched_ext/state' exist AND contain \
'disabled' or 'enabled'? If absent/garbage the kernel was \
built without CONFIG_SCHED_CLASS_EXT — rebuild with that \
config. \
(2) zcat /proc/config.gz | grep CONFIG_INOTIFY_USER must be \
=y — without it the framework's evented wake can't \
subscribe; rebuild with CONFIG_INOTIFY_USER=y."
);
}
}
}
fn kill_current_scheduler(op_label: &str) -> Result<libc::pid_t> {
let pid = crate::vmm::rust_init::sched_pid().ok_or_else(|| {
anyhow::anyhow!(
"{op_label}: no scheduler attached (SCHED_PID is 0); \
attach a scheduler via boot-time `scheduler` field or \
`Op::AttachScheduler` before invoking this Op"
)
})?;
crate::vmm::rust_init::stop_sched_exit_monitor();
debug_assert!(
crate::vmm::rust_init::sched_exit_monitor_slot_is_empty(),
"kill_current_scheduler did not clear sched_exit_monitor slot — \
stop_sched_exit_monitor() must precede the kill (called from {op_label})",
);
let _ = std::fs::write("/proc/sysrq-trigger", "S");
let r = unsafe { libc::kill(pid, libc::SIGTERM) };
if r != 0 {
let errno = std::io::Error::last_os_error();
anyhow::bail!("{op_label}: SIGTERM to pid {pid} failed: {errno}");
}
let elapsed = wait_for_scx_disabled(SCHED_LIFECYCLE_KILL_GRACE).map_err(|e| {
anyhow::anyhow!("{op_label}: wait_for_scx_disabled(pid={pid}) failed: {e:#}")
})?;
tracing::debug!(
op = op_label,
pid = pid,
elapsed_ms = elapsed.as_millis() as u64,
"scx state reached 'disabled' after SIGTERM",
);
crate::vmm::rust_init::set_sched_pid(0);
Ok(pid)
}
fn spawn_scheduler_for_op(
op_label: &str,
binary_path: &str,
args_path: &str,
log_path: &str,
expected_scheduler_name: &str,
) -> Result<()> {
match crate::vmm::rust_init::try_spawn_scheduler(binary_path, args_path, log_path) {
Ok(Some(_)) => Ok(()),
Ok(None) => anyhow::bail!(
"{op_label}: scheduler binary for '{expected_scheduler_name}' is missing at \
{binary_path}. The staging cpio pack at initramfs build time should have \
materialised it via staged_scheduler_binary_path — check that \
KtstrTestEntry.staged_schedulers contains the named entry and the host-side \
resolve_staged_schedulers_strict found its binary."
),
Err(e) => anyhow::bail!(
"{op_label}: scheduler '{expected_scheduler_name}' spawn failed: {e}. The boot \
path would force_reboot on this; the Op dispatch path surfaces it as a typed \
test-failure so the operator sees the specific failure mode (spawn vs \
startup-died vs not-attached) instead of a bare reboot signal."
),
}
}
pub(super) fn dispatch_attach_scheduler(
scheduler: &'static crate::test_support::Scheduler,
) -> Result<()> {
let boot_deadline = std::time::Instant::now() + std::time::Duration::from_secs(60);
wait_for_worker_state_not_trying_or_bail("Op::AttachScheduler", boot_deadline)?;
let seqno_before =
crate::scenario::snapshot::with_active_bridge(|b| b.accessor_publish_seqno()).unwrap_or(0);
let binary = crate::test_support::staged::staged_scheduler_binary_path(scheduler.name);
let args = crate::test_support::staged::staged_scheduler_args_path(scheduler.name);
let log = staged_scheduler_log_path(scheduler.name);
spawn_scheduler_for_op("Op::AttachScheduler", &binary, &args, &log, scheduler.name)?;
crate::vmm::rust_init::restart_sched_exit_monitor_with_log(Some(&log));
wait_for_accessor_publish_or_bail(
"Op::AttachScheduler",
seqno_before,
std::time::Duration::from_secs(30),
)?;
tracing::info!(
op = "AttachScheduler",
scheduler_name = scheduler.name,
binary_path = %binary,
log_path = %log,
"scheduler attached",
);
Ok(())
}
pub(super) fn dispatch_detach_scheduler() -> Result<()> {
let pid = kill_current_scheduler("Op::DetachScheduler")?;
tracing::info!(
op = "DetachScheduler",
killed_pid = pid,
"scheduler detached"
);
Ok(())
}
pub(super) fn dispatch_restart_scheduler() -> Result<()> {
let prev_pid = kill_current_scheduler("Op::RestartScheduler")?;
let log = staged_scheduler_log_path("boot");
spawn_scheduler_for_op(
"Op::RestartScheduler",
"/scheduler",
"/sched_args",
&log,
"boot",
)?;
crate::vmm::rust_init::restart_sched_exit_monitor_with_log(Some(&log));
tracing::info!(
op = "RestartScheduler",
prev_pid = prev_pid,
"boot scheduler restarted",
);
Ok(())
}
pub(super) fn dispatch_replace_scheduler(
scheduler: &'static crate::test_support::Scheduler,
) -> Result<()> {
let prev_pid = kill_current_scheduler("Op::ReplaceScheduler")?;
let binary = crate::test_support::staged::staged_scheduler_binary_path(scheduler.name);
let args = crate::test_support::staged::staged_scheduler_args_path(scheduler.name);
let log = staged_scheduler_log_path(scheduler.name);
spawn_scheduler_for_op("Op::ReplaceScheduler", &binary, &args, &log, scheduler.name)?;
crate::vmm::rust_init::restart_sched_exit_monitor_with_log(Some(&log));
let not_trying_deadline =
std::time::Instant::now() + std::time::Duration::from_secs(REPLACE_NOT_TRYING_DEADLINE_S);
wait_for_worker_state_not_trying_or_bail("Op::ReplaceScheduler", not_trying_deadline)?;
let seqno_before =
crate::scenario::snapshot::with_active_bridge(|b| b.accessor_publish_seqno()).unwrap_or(0);
wait_for_accessor_publish_or_bail(
"Op::ReplaceScheduler",
seqno_before,
std::time::Duration::from_secs(10),
)?;
tracing::info!(
op = "ReplaceScheduler",
prev_pid = prev_pid,
new_scheduler_name = scheduler.name,
binary_path = %binary,
log_path = %log,
"scheduler replaced",
);
Ok(())
}
pub(super) fn wait_for_worker_state_not_trying_or_bail(
op_label: &str,
deadline: std::time::Instant,
) -> Result<()> {
let res = crate::scenario::snapshot::with_active_bridge(|b| {
b.wait_for_worker_state_not_trying(deadline, op_label)
});
match res {
Some(Ok(_)) => Ok(()),
Some(Err(e)) => Err(e),
None => {
if crate::vmm::guest_comms::is_guest() {
tracing::debug!(
op = %op_label,
"wait_for_worker_state_not_trying skipped: guest mode (host-side accessor not installable in guest executor thread; attach correctness verified by poll_scx_attached)",
);
return Ok(());
}
anyhow::bail!(
"{op_label}: no SnapshotBridge installed on the executor's thread; \
cannot observe worker-state-not-trying gate — the pre-spawn quiesce \
would be silently skipped, letting a stale in-flight publish from \
the prior scheduler corrupt the post-spawn seqno baseline. Recovery \
options: (1) install a SnapshotBridge on the executor thread \
(test-fixture path via `SnapshotBridge::new(cb).set_thread_local()`), \
(2) drop the scheduler op, or (3) convert host_only tests to \
non-host_only so the VM-orchestrated bridge install applies \
(snapshot mutex per the host-only-mode contract)",
)
}
}
}
pub(super) fn wait_for_accessor_publish_or_bail(
op_label: &str,
seqno_before: u64,
budget: std::time::Duration,
) -> Result<()> {
let deadline = std::time::Instant::now() + budget;
let res = crate::scenario::snapshot::with_active_bridge(|b| {
b.wait_for_accessor_publish_advance(seqno_before, deadline, op_label)
});
match res {
Some(Ok(_)) => Ok(()),
Some(Err(e)) => Err(e),
None => {
if crate::vmm::guest_comms::is_guest() {
tracing::debug!(
op = %op_label,
"wait_for_accessor_publish skipped: guest mode (host-side accessor not installable in guest executor thread; scheduler-liveness verified by poll_scx_attached)",
);
return Ok(());
}
anyhow::bail!(
"{op_label}: no SnapshotBridge installed on the executor's thread; \
cannot observe accessor-publish seqno advance — scheduler-liveness \
verification would be silently skipped. Recovery options: (1) \
install a SnapshotBridge on the executor thread (test-fixture \
path via `SnapshotBridge::new(cb).set_thread_local()`), (2) drop \
the scheduler-attach op, or (3) convert host_only tests to \
non-host_only so the VM-orchestrated bridge install applies \
(snapshot mutex per the host-only-mode contract)",
)
}
}
}
pub(super) fn dispatch_kernel_op_request(
op_label: &str,
payload: crate::vmm::wire::KernelOpRequestPayload,
) -> Result<()> {
let bridge_reply =
crate::scenario::snapshot::with_active_bridge(|b| b.dispatch_kernel_op(&payload)).flatten();
if let Some(reply) = bridge_reply {
return check_kernel_op_reply(op_label, &payload, &reply);
}
if !crate::vmm::guest_comms::is_guest() {
anyhow::bail!(
"{op_label}('{}'): no SnapshotBridge kernel-op callback is installed on this \
thread and not running in a guest VM. Install a callback via \
SnapshotBridge::new(...).with_kernel_op(...).set_thread_local() for host-side \
tests, or run the scenario inside a ktstr guest VM where the port-1 wire path \
provides dispatch.",
payload.tag,
);
}
let timeout = std::time::Duration::from_secs(30);
match crate::vmm::guest_comms::request_kernel_op(payload.clone(), timeout) {
crate::vmm::wire::KernelOpRequestResult::Ok(reply) => {
check_kernel_op_reply(op_label, &payload, &reply)
}
crate::vmm::wire::KernelOpRequestResult::TransportError { reason } => {
anyhow::bail!(
"{op_label}('{}'): port-1 transport failure: {reason}",
payload.tag,
);
}
}
}
fn check_kernel_op_reply(
op_label: &str,
request: &crate::vmm::wire::KernelOpRequestPayload,
reply: &crate::vmm::wire::KernelOpReplyPayload,
) -> Result<()> {
if !reply.success {
anyhow::bail!(
"{op_label}('{}'): host reported failure: {}",
request.tag,
reply.reason,
);
}
tracing::info!(
op = op_label,
tag = %request.tag,
mode = ?request.mode,
direction = ?request.direction,
entries = request.entries.len(),
read_values = reply.read_values.len(),
"{op_label}: host completed kernel-op batch",
);
Ok(())
}
fn take_payload_for_op(
state: &mut ScenarioState<'_, '_>,
op_tag: &str,
verb_ing: &str,
ctor_path: &str,
name: &str,
cgroup: Option<&str>,
) -> Result<PayloadEntry> {
match state.take_payload_by_name(name, cgroup) {
Ok(Some(entry)) => Ok(entry),
Ok(None) => match cgroup {
Some(c) => anyhow::bail!(
"{op_tag}: no running payload named '{name}' in cgroup {} \
(spawn it via Op::RunPayload or CgroupDef::workload before {verb_ing})",
render_cgroup_key(c),
),
None => anyhow::bail!(
"{op_tag}: no running payload named '{name}' \
(spawn it via Op::RunPayload or CgroupDef::workload before {verb_ing})",
),
},
Err(cgroups) => {
let rendered: Vec<String> = cgroups.iter().map(|c| render_cgroup_key(c)).collect();
anyhow::bail!(
"{op_tag}: payload '{name}' is ambiguous — {} live copies in cgroups {} — \
use {ctor_path}(name, cgroup) to disambiguate",
rendered.len(),
rendered.join(", "),
)
}
}
}
fn read_cpuset(ctx: &Ctx, name: &str) -> Option<BTreeSet<usize>> {
let path = ctx.cgroups.parent_path().join(name).join("cpuset.cpus");
let content = std::fs::read_to_string(&path).ok()?;
let content = content.trim();
if content.is_empty() {
return None;
}
let cpus: BTreeSet<usize> = crate::topology::parse_cpu_list_lenient(content)
.into_iter()
.collect();
Some(cpus)
}
pub(super) fn render_cgroup_key(cgroup: &str) -> String {
if cgroup.is_empty() {
"(no cgroup)".to_string()
} else {
format!("'{cgroup}'")
}
}