use std::collections::BTreeSet;
use anyhow::Result;
use crate::scenario::Ctx;
use crate::vmm::guest_comms;
use crate::workload::{WorkloadConfig, WorkloadHandle};
use super::{CgroupDef, PayloadEntry, PayloadSource, ScenarioState, validate_mempolicy_cpuset};
pub const PLACEMENT_LOG_PATH: &str = "/tmp/ktstr-placement.log";
pub(super) fn append_placement_log(msg: &str) {
use std::io::Write;
if let Ok(mut f) = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(PLACEMENT_LOG_PATH)
{
let _ = writeln!(f, "{msg}");
}
}
pub(super) fn apply_setup(
ctx: &Ctx,
state: &mut ScenarioState<'_, '_>,
defs: &[CgroupDef],
) -> Result<()> {
for def in defs {
if state.cgroup_name_is_tracked(&def.name) {
anyhow::bail!(
"CgroupDef '{}' collides with a cgroup already tracked \
(by a prior Backdrop or step-local CgroupDef) — declare it \
in exactly one place; use a fresh name for the step-local cgroup",
def.name,
);
}
state.target_cgroups().add_cgroup_no_cpuset(&def.name)?;
if let Some(ref cpuset_spec) = def.cpuset {
let resolved = cpuset_spec.resolve_quiet(ctx);
if resolved.is_empty() {
let works = def.merged_works();
if let Some(dual_work) = works
.iter()
.find(|w| w.workers_pct.is_some() && w.num_workers.is_some())
{
let n = dual_work
.num_workers
.expect("dual_work selected via num_workers.is_some()");
let pct = dual_work
.workers_pct
.expect("dual_work selected via workers_pct.is_some()");
anyhow::bail!(
"cgroup '{}': WorkSpec sets BOTH workers({n}) \
and workers_pct({pct}); pick one — \
workers_pct resolves the cpuset fraction at \
apply-setup time and is incompatible with an \
explicit count. The empty cpuset would \
otherwise mask this conflict; resolve the \
workers/workers_pct conflict first",
def.name,
);
}
let pcts: Vec<(usize, f64)> = works
.iter()
.enumerate()
.filter_map(|(i, w)| w.workers_pct.map(|p| (i, p)))
.collect();
if !pcts.is_empty() {
let pct_display = if pcts.len() == 1 {
format!("workers_pct({})", pcts[0].1)
} else {
let list = pcts
.iter()
.map(|(i, p)| format!("works[{i}]={p}"))
.collect::<Vec<_>>()
.join(", ");
format!("workers_pct values [{list}]")
};
anyhow::bail!(
"cgroup '{}': {pct_display} on a cpuset of 0 \
CPU(s) would resolve to 0 workers; the cgroup \
would have no workers and downstream \
assertions would vacuously pass — narrow the \
cpuset, raise the fraction, or use \
`workers(N)` instead",
def.name,
);
}
anyhow::bail!(
"cgroup '{}': cpuset_spec {:?} resolved to 0 \
CPU(s); the cgroup would have no CPUs assigned \
and downstream worker spawns would fail or \
produce vacuous assertions — adjust the spec \
so it resolves to a non-empty cpuset on this \
topology",
def.name,
cpuset_spec,
);
}
if let Err(reason) = cpuset_spec.validate(ctx) {
anyhow::bail!(
"cgroup '{}': CpusetSpec validation failed: {}",
def.name,
reason
);
}
ctx.cgroups.set_cpuset(&def.name, &resolved)?;
state.record_cpuset(&def.name, resolved);
}
if let Some(ref nodes) = def.cpuset_mems {
ctx.cgroups.set_cpuset_mems(&def.name, nodes)?;
}
if let Some(ref cpu) = def.cpu {
if let Some(w) = cpu.weight {
if !(1..=10_000).contains(&w) {
anyhow::bail!(
"cgroup '{}': cpu.weight {w} out of range 1..=10000",
def.name,
);
}
ctx.cgroups.set_cpu_weight(&def.name, w)?;
}
if cpu.max_period_us == 0 {
anyhow::bail!("cgroup '{}': cpu.max period must be > 0 (got 0)", def.name,);
}
if let Some(q) = cpu.max_quota_us
&& q == 0
{
anyhow::bail!(
"cgroup '{}': cpu.max quota must be > 0 when set; \
use cpu_unlimited() to remove the cap",
def.name,
);
}
ctx.cgroups
.set_cpu_max(&def.name, cpu.max_quota_us, cpu.max_period_us)?;
}
if let Some(ref mem) = def.memory {
ctx.cgroups.set_memory_max(&def.name, mem.max)?;
ctx.cgroups.set_memory_high(&def.name, mem.high)?;
ctx.cgroups.set_memory_low(&def.name, mem.low)?;
if mem.swap_max.is_some() {
ctx.cgroups.set_memory_swap_max(&def.name, mem.swap_max)?;
}
}
if let Some(ref io) = def.io
&& let Some(w) = io.weight
{
if !(1..=10_000).contains(&w) {
anyhow::bail!(
"cgroup '{}': io.weight {w} out of range 1..=10000",
def.name,
);
}
ctx.cgroups.set_io_weight(&def.name, w)?;
}
if let Some(ref pids) = def.pids {
if let Some(0) = pids.max {
anyhow::bail!(
"cgroup '{}': pids.max must be > 0; use \
pids_unlimited() to remove the cap",
def.name,
);
}
ctx.cgroups.set_pids_max(&def.name, pids.max)?;
}
let effective_works = def.merged_works();
for work in &effective_works {
if let Err(reason) = work.mem_policy.validate() {
anyhow::bail!("cgroup '{}': {}", def.name, reason);
}
}
let cgroup_cpuset: Option<BTreeSet<usize>> = state.lookup_cpuset(&def.name).cloned();
if let Some(ref resolved) = cgroup_cpuset {
for work in &effective_works {
validate_mempolicy_cpuset(
&work.mem_policy,
work.mpol_flags,
resolved,
ctx,
&def.name,
)?;
}
}
let mut resolved_works: Vec<crate::workload::WorkSpec> =
Vec::with_capacity(effective_works.len());
for work in &effective_works {
let cpuset_size = cgroup_cpuset
.as_ref()
.map_or_else(|| ctx.topo.usable_cpuset().len(), |s| s.len());
let work = work.clone().resolve_workers_pct(cpuset_size, &def.name)?;
let n = crate::scenario::resolve_num_workers(&work, ctx.workers_per_cgroup, &def.name)?;
let effective_work_type = crate::workload::resolve_work_type(
&work.work_type,
ctx.work_type_override.as_ref(),
def.swappable,
n,
);
let affinity = crate::scenario::intent_for_spawn(
&work.affinity,
cgroup_cpuset.as_ref(),
ctx.topo,
)?;
resolved_works.push(crate::workload::WorkSpec {
work_type: effective_work_type,
sched_policy: work.sched_policy,
num_workers: Some(n),
affinity,
mem_policy: work.mem_policy.clone(),
mpol_flags: work.mpol_flags,
nice: work.nice,
comm: work.comm.clone(),
pcomm: work.pcomm.clone(),
uid: work.uid,
gid: work.gid,
numa_node: work.numa_node,
workers_pct: None,
});
}
let mut pcomm_groups: std::collections::HashMap<String, Vec<crate::workload::WorkSpec>> =
std::collections::HashMap::new();
let mut pcomm_order: Vec<String> = Vec::new();
let mut non_pcomm_works: Vec<crate::workload::WorkSpec> = Vec::new();
for work in resolved_works {
match &work.pcomm {
Some(value) if !value.is_empty() => {
let key = value.to_string();
if !pcomm_groups.contains_key(&key) {
pcomm_order.push(key.clone());
}
pcomm_groups.entry(key).or_default().push(work);
}
_ => non_pcomm_works.push(work),
}
}
for work in non_pcomm_works {
let n = work.num_workers.expect("num_workers resolved above");
let wl = WorkloadConfig::for_scenario_engine(
&work,
n,
work.affinity.clone(),
work.work_type.clone(),
)?;
tracing::debug!(
cgroup = %def.name,
expected_workers = n,
comm = ?work.comm,
work_type = ?work.work_type,
"apply_setup: about to spawn non-pcomm workers"
);
let mut h = WorkloadHandle::spawn(&wl)?;
let pids = h.worker_pids();
append_placement_log(&format!(
"apply_setup: spawned non-pcomm workers cgroup={} count={} pids={:?}",
def.name,
pids.len(),
pids,
));
ctx.cgroups.move_tasks(&def.name, &pids)?;
h.start();
state.target_handles().push((def.name.to_string(), h));
}
for pcomm in pcomm_order {
let works_for_pcomm = pcomm_groups
.remove(&pcomm)
.expect("pcomm key inserted during partition pass");
if works_for_pcomm.len() > 1 {
let first_uid = works_for_pcomm[0].uid;
let first_gid = works_for_pcomm[0].gid;
for (i, w) in works_for_pcomm.iter().enumerate().skip(1) {
if w.uid != first_uid {
anyhow::bail!(
"cgroup '{}' pcomm '{}': WorkSpec[0].uid={:?} differs from \
WorkSpec[{}].uid={:?}; pcomm-coalesced WorkSpecs must \
agree on uid (NPTL setresuid is broadcast to every thread \
in the tgid)",
def.name,
pcomm,
first_uid,
i,
w.uid,
);
}
if w.gid != first_gid {
anyhow::bail!(
"cgroup '{}' pcomm '{}': WorkSpec[0].gid={:?} differs from \
WorkSpec[{}].gid={:?}; pcomm-coalesced WorkSpecs must \
agree on gid (NPTL setresgid is broadcast to every thread \
in the tgid)",
def.name,
pcomm,
first_gid,
i,
w.gid,
);
}
}
}
let container_uid = def
.default_uid
.or_else(|| works_for_pcomm.first().and_then(|w| w.uid));
let container_gid = def
.default_gid
.or_else(|| works_for_pcomm.first().and_then(|w| w.gid));
tracing::debug!(
cgroup = %def.name,
pcomm = %pcomm,
workspec_count = works_for_pcomm.len(),
"apply_setup: about to spawn pcomm-coalesced workers"
);
let mut h = WorkloadHandle::spawn_pcomm_cgroup(
&pcomm,
container_uid,
container_gid,
&works_for_pcomm,
)?;
let pids = h.worker_pids();
append_placement_log(&format!(
"apply_setup: spawned pcomm workers cgroup={} pcomm={} count={} pids={:?}",
def.name,
pcomm,
pids.len(),
pids,
));
ctx.cgroups.move_tasks(&def.name, &pids)?;
tracing::debug!(
cgroup = %def.name,
pcomm = %pcomm,
"apply_setup: move_tasks succeeded; about to h.start()"
);
h.start();
tracing::debug!(
cgroup = %def.name,
pcomm = %pcomm,
"apply_setup: h.start() returned; handle pushed"
);
state.target_handles().push((def.name.to_string(), h));
}
if let Some(payload) = def.payload {
if let Some(existing) =
state.find_live_payload_with_cgroup(payload.name, def.name.as_ref())
{
anyhow::bail!(
"CgroupDef::workload: payload '{}' already running in cgroup '{}' (spawned by {}) — \
declare it in exactly one place per cgroup",
payload.name,
def.name,
existing.source.describe(),
);
}
let handle = crate::scenario::payload_run::PayloadRun::new(ctx, payload)
.in_cgroup(def.name.clone())
.spawn()
.map_err(|e| {
anyhow::anyhow!(
"cgroup '{}': spawn payload '{}': {:#}",
def.name,
payload.name,
e,
)
})?;
state.target_payload_handles().push(PayloadEntry {
cgroup: def.name.to_string(),
source: PayloadSource::CgroupDefWorkload,
handle,
});
}
}
maybe_start_stall_monitor(state);
Ok(())
}
pub(super) fn maybe_start_stall_monitor(state: &mut ScenarioState<'_, '_>) {
if state.target_backdrop {
return;
}
if state.step.stall_monitor.is_some() {
return;
}
if guest_comms::is_guest() || crate::cargo_test_mode::cargo_test_mode_active() {
return;
}
let pids: Vec<libc::pid_t> = state
.step
.handles
.iter()
.chain(state.backdrop.handles.iter())
.flat_map(|(_, h)| h.worker_pids())
.collect();
if pids.is_empty() {
return;
}
match crate::scenario::host_stall::spawn_monitor(&pids) {
Ok(handle) => {
state.step.stall_monitor = Some(handle);
}
Err(e) => {
tracing::warn!(err = %format!("{e:#}"), "host_stall::spawn_monitor failed; stall monitor disabled for this step");
}
}
}