use anyhow::{Context, Result};
use crate::flock::{FlockMode, try_flock};
fn cargo_test_mode_active() -> bool {
std::env::var("KTSTR_CARGO_TEST_MODE")
.map(|v| !v.is_empty())
.unwrap_or(false)
}
#[derive(Debug)]
pub struct ResourceContention {
pub reason: String,
}
impl std::fmt::Display for ResourceContention {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.reason)
}
}
impl std::error::Error for ResourceContention {}
#[derive(Debug, Clone)]
pub struct LlcGroup {
pub cpus: Vec<usize>,
}
#[derive(Debug, Clone)]
pub struct HostTopology {
pub llc_groups: Vec<LlcGroup>,
pub online_cpus: Vec<usize>,
pub cpu_to_node: std::collections::HashMap<usize, usize>,
pub(crate) host_node_llcs: std::collections::BTreeMap<usize, Vec<usize>>,
}
#[derive(Debug)]
pub struct PinningPlan {
pub assignments: Vec<(u32, usize)>,
pub service_cpu: Option<usize>,
pub llc_indices: Vec<usize>,
#[allow(dead_code)] pub(crate) locks: Vec<std::os::fd::OwnedFd>,
}
static CACHED_HOST_TOPOLOGY: std::sync::OnceLock<HostTopology> = std::sync::OnceLock::new();
impl HostTopology {
pub fn from_sysfs() -> Result<Self> {
let topo = crate::topology::TestTopology::from_system()
.context("read host topology from sysfs")?;
let online_cpus = topo.all_cpus().to_vec();
let llc_groups: Vec<LlcGroup> = topo
.llcs()
.iter()
.map(|llc| LlcGroup {
cpus: llc.cpus().to_vec(),
})
.collect();
let cpu_to_node: std::collections::HashMap<usize, usize> = topo
.llcs()
.iter()
.flat_map(|llc| llc.cpus().iter().map(|&cpu| (cpu, llc.numa_node())))
.collect();
let host_node_llcs = Self::compute_host_node_llcs(&llc_groups, &cpu_to_node);
Ok(Self {
llc_groups,
online_cpus,
cpu_to_node,
host_node_llcs,
})
}
pub fn cached() -> Result<Self> {
if let Some(topo) = CACHED_HOST_TOPOLOGY.get() {
return Ok(topo.clone());
}
let topo = Self::from_sysfs()?;
let _ = CACHED_HOST_TOPOLOGY.set(topo.clone());
Ok(topo)
}
#[cfg(test)]
pub(crate) fn new_for_tests(groups: &[(Vec<usize>, usize)]) -> Self {
let llc_groups: Vec<LlcGroup> = groups
.iter()
.map(|(cpus, _)| LlcGroup { cpus: cpus.clone() })
.collect();
let cpu_to_node: std::collections::HashMap<usize, usize> = groups
.iter()
.flat_map(|(cpus, node)| cpus.iter().map(move |&cpu| (cpu, *node)))
.collect();
let online_cpus: Vec<usize> = groups
.iter()
.flat_map(|(cpus, _)| cpus.iter().copied())
.collect();
let host_node_llcs = HostTopology::compute_host_node_llcs(&llc_groups, &cpu_to_node);
HostTopology {
llc_groups,
online_cpus,
cpu_to_node,
host_node_llcs,
}
}
fn compute_host_node_llcs(
llc_groups: &[LlcGroup],
cpu_to_node: &std::collections::HashMap<usize, usize>,
) -> std::collections::BTreeMap<usize, Vec<usize>> {
let mut node_llcs: std::collections::BTreeMap<usize, Vec<usize>> =
std::collections::BTreeMap::new();
for (idx, group) in llc_groups.iter().enumerate() {
let mut counts: std::collections::HashMap<usize, usize> =
std::collections::HashMap::new();
for &cpu in &group.cpus {
let node = cpu_to_node.get(&cpu).copied().unwrap_or(0);
*counts.entry(node).or_insert(0) += 1;
}
let node = counts
.into_iter()
.max_by_key(|&(_, count)| count)
.map(|(node, _)| node)
.unwrap_or(0);
node_llcs.entry(node).or_default().push(idx);
}
for llcs in node_llcs.values_mut() {
llcs.sort_unstable();
}
node_llcs
}
pub fn max_cores_per_llc(&self) -> usize {
self.llc_groups
.iter()
.map(|g| g.cpus.len())
.max()
.unwrap_or(0)
}
pub fn total_cpus(&self) -> usize {
self.online_cpus.len()
}
#[allow(dead_code)]
pub(crate) fn host_llcs_by_numa_node(&self) -> &std::collections::BTreeMap<usize, Vec<usize>> {
&self.host_node_llcs
}
pub(crate) fn numa_nodes_with_capacity(&self, min_llcs: usize) -> Vec<(usize, &Vec<usize>)> {
self.host_node_llcs
.iter()
.filter(|(_, llcs)| llcs.len() >= min_llcs)
.map(|(&node, llcs)| (node, llcs))
.collect()
}
pub(crate) fn numa_nodes_sorted_by_distance(
&self,
anchor: usize,
distance_fn: impl Fn(usize, usize) -> u8,
) -> Vec<usize> {
let mut nodes: Vec<(usize, u8)> = self
.host_node_llcs
.keys()
.map(|&node| (node, distance_fn(anchor, node)))
.collect();
nodes.sort_by(|a, b| {
let a_unreachable = a.1 == 255;
let b_unreachable = b.1 == 255;
match (a_unreachable, b_unreachable) {
(true, false) => std::cmp::Ordering::Greater,
(false, true) => std::cmp::Ordering::Less,
_ => a.1.cmp(&b.1),
}
});
nodes.into_iter().map(|(node, _)| node).collect()
}
pub fn llc_numa_node(&self, llc_idx: usize) -> usize {
let group = &self.llc_groups[llc_idx];
let mut counts: std::collections::HashMap<usize, usize> = std::collections::HashMap::new();
for &cpu in &group.cpus {
let node = self.cpu_to_node.get(&cpu).copied().unwrap_or(0);
*counts.entry(node).or_insert(0) += 1;
}
counts
.into_iter()
.max_by_key(|&(_, count)| count)
.map(|(node, _)| node)
.unwrap_or(0)
}
pub fn compute_pinning(
&self,
topo: &super::topology::Topology,
reserve_service_cpu: bool,
llc_offset: usize,
) -> Result<PinningPlan> {
let cores = topo.cores_per_llc;
let threads = topo.threads_per_core;
let llcs = topo.llcs;
let vcpus_per_llc = cores * threads;
let total_vcpus = llcs * vcpus_per_llc;
let total_needed = total_vcpus as usize + if reserve_service_cpu { 1 } else { 0 };
anyhow::ensure!(
total_needed <= self.total_cpus(),
"performance_mode: need {} CPUs ({} vCPUs + {} service) \
but only {} host CPUs available",
total_needed,
total_vcpus,
if reserve_service_cpu { 1 } else { 0 },
self.total_cpus(),
);
let num_llcs = self.llc_groups.len();
anyhow::ensure!(
llcs as usize <= num_llcs,
"performance_mode: need {} LLCs for {} virtual LLCs, \
but host has {} LLC groups",
llcs,
llcs,
num_llcs,
);
let llc_order = self.numa_aware_llc_order(topo.numa_nodes, llcs, llc_offset);
let mut assignments = Vec::with_capacity(total_vcpus as usize);
let mut used_cpus = std::collections::HashSet::new();
for llc in 0..llcs {
let llc_idx = llc_order[llc as usize];
let group = &self.llc_groups[llc_idx];
let available: Vec<usize> = group
.cpus
.iter()
.copied()
.filter(|c| !used_cpus.contains(c))
.collect();
anyhow::ensure!(
available.len() >= vcpus_per_llc as usize,
"performance_mode: LLC group {} has {} available CPUs, \
need {} for virtual LLC {}",
llc_idx,
available.len(),
vcpus_per_llc,
llc,
);
for vcpu_in_llc in 0..vcpus_per_llc {
let vcpu_id = llc * vcpus_per_llc + vcpu_in_llc;
let host_cpu = available[vcpu_in_llc as usize];
used_cpus.insert(host_cpu);
assignments.push((vcpu_id, host_cpu));
}
}
let service_cpu = if reserve_service_cpu {
let cpu = self
.online_cpus
.iter()
.copied()
.find(|c| !used_cpus.contains(c));
anyhow::ensure!(
cpu.is_some(),
"performance_mode: no free host CPU for service threads \
after assigning {} vCPUs",
total_vcpus,
);
cpu
} else {
None
};
let mut llc_indices = llc_order;
llc_indices.sort_unstable();
llc_indices.dedup();
Ok(PinningPlan {
assignments,
service_cpu,
llc_indices,
locks: Vec::new(),
})
}
pub(crate) fn numa_aware_llc_order(
&self,
numa_nodes: u32,
llcs: u32,
llc_offset: usize,
) -> Vec<usize> {
let num_host_llcs = self.llc_groups.len();
let sequential_fallback = || -> Vec<usize> {
(0..llcs as usize)
.map(|i| (i + llc_offset) % num_host_llcs)
.collect()
};
if numa_nodes == 0 || numa_nodes == 1 || self.cpu_to_node.is_empty() {
return sequential_fallback();
}
if llcs < numa_nodes {
return sequential_fallback();
}
let base_per_node = (llcs / numa_nodes) as usize;
let remainder = (llcs % numa_nodes) as usize;
let max_per_node = base_per_node + if remainder > 0 { 1 } else { 0 };
let eligible_nodes = self.numa_nodes_with_capacity(max_per_node);
if eligible_nodes.len() < numa_nodes as usize {
return sequential_fallback();
}
let mut order = Vec::with_capacity(llcs as usize);
let node_offset = llc_offset / max_per_node.max(1);
for guest_node in 0..numa_nodes as usize {
let host_idx = (guest_node + node_offset) % eligible_nodes.len();
let (_, host_llcs) = &eligible_nodes[host_idx];
let within_offset = llc_offset % host_llcs.len();
let count = if guest_node < remainder {
base_per_node + 1
} else {
base_per_node
};
for i in 0..count {
let llc_idx = host_llcs[(i + within_offset) % host_llcs.len()];
order.push(llc_idx);
}
}
order
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum LlcLockMode {
Exclusive,
#[allow(dead_code)]
Shared,
}
#[derive(Debug)]
pub enum LockOutcome {
Acquired {
#[allow(dead_code)]
llc_offset: usize,
locks: Vec<std::os::fd::OwnedFd>,
},
Unavailable(#[allow(dead_code)] String),
}
pub fn acquire_resource_locks(
plan: &PinningPlan,
llc_indices: &[usize],
llc_mode: LlcLockMode,
) -> Result<LockOutcome> {
if cargo_test_mode_active() {
return Ok(LockOutcome::Acquired {
llc_offset: llc_indices.first().copied().unwrap_or(0),
locks: Vec::new(),
});
}
match try_acquire_all(plan, llc_indices, llc_mode) {
Ok(locks) => Ok(LockOutcome::Acquired {
llc_offset: llc_indices.first().copied().unwrap_or(0),
locks,
}),
Err(reason) => Ok(LockOutcome::Unavailable(reason)),
}
}
fn llc_lock_prefix() -> String {
format!("{}/ktstr-llc-", crate::cache::resolve_lock_dir().display())
}
fn cpu_lock_prefix() -> String {
format!("{}/ktstr-cpu-", crate::cache::resolve_lock_dir().display())
}
#[cfg(test)]
thread_local! {
static LLC_LOCK_PREFIX_OVERRIDE: std::cell::RefCell<Option<String>> =
const { std::cell::RefCell::new(None) };
static CPU_LOCK_PREFIX_OVERRIDE: std::cell::RefCell<Option<String>> =
const { std::cell::RefCell::new(None) };
}
fn llc_lock_path(llc_idx: usize) -> String {
#[cfg(test)]
{
if let Some(p) = LLC_LOCK_PREFIX_OVERRIDE.with(|p| p.borrow().clone()) {
return format!("{p}{llc_idx}.lock");
}
}
format!("{}{llc_idx}.lock", llc_lock_prefix())
}
fn cpu_lock_path(cpu: usize) -> String {
#[cfg(test)]
{
if let Some(p) = CPU_LOCK_PREFIX_OVERRIDE.with(|p| p.borrow().clone()) {
return format!("{p}{cpu}.lock");
}
}
format!("{}{cpu}.lock", cpu_lock_prefix())
}
fn try_acquire_all(
plan: &PinningPlan,
llc_indices: &[usize],
llc_mode: LlcLockMode,
) -> std::result::Result<Vec<std::os::fd::OwnedFd>, String> {
let flock_mode = match llc_mode {
LlcLockMode::Exclusive => FlockMode::Exclusive,
LlcLockMode::Shared => FlockMode::Shared,
};
let mut locks = Vec::new();
for &llc_idx in llc_indices {
let path = llc_lock_path(llc_idx);
match try_flock(&path, flock_mode) {
Ok(Some(fd)) => locks.push(fd),
Ok(None) => return Err(format!("LLC {llc_idx} busy")),
Err(e) => return Err(format!("LLC {llc_idx}: {e}")),
}
}
if llc_mode != LlcLockMode::Exclusive {
for &(_vcpu, host_cpu) in &plan.assignments {
let path = cpu_lock_path(host_cpu);
match try_flock(&path, FlockMode::Exclusive) {
Ok(Some(fd)) => locks.push(fd),
Ok(None) => return Err(format!("CPU {host_cpu} busy")),
Err(e) => return Err(format!("CPU {host_cpu}: {e}")),
}
}
if let Some(cpu) = plan.service_cpu {
let path = cpu_lock_path(cpu);
match try_flock(&path, FlockMode::Exclusive) {
Ok(Some(fd)) => locks.push(fd),
Ok(None) => return Err(format!("service CPU {cpu} busy")),
Err(e) => return Err(format!("service CPU {cpu}: {e}")),
}
}
}
Ok(locks)
}
pub(crate) fn pid_window_offset(pid: u32, max_start: usize) -> usize {
use std::hash::{BuildHasher, Hasher};
let mut hasher = ahash::RandomState::with_seeds(0, 0, 0, 0).build_hasher();
hasher.write(&pid.to_le_bytes());
(hasher.finish() as usize) % max_start
}
#[derive(Debug)]
#[allow(dead_code)]
pub struct CpuLockResult {
pub locks: Vec<std::os::fd::OwnedFd>,
pub cpus: Vec<usize>,
}
#[allow(dead_code)]
pub fn acquire_cpu_locks(
count: usize,
total_host_cpus: usize,
host_topo: Option<&HostTopology>,
) -> Result<CpuLockResult> {
if count == 0 {
return Ok(CpuLockResult {
locks: Vec::new(),
cpus: Vec::new(),
});
}
if count > total_host_cpus {
return Err(anyhow::Error::new(ResourceContention {
reason: format!(
"no {count} consecutive CPUs available on a {total_host_cpus}-CPU host\n \
hint: pass --no-perf-mode or set KTSTR_NO_PERF_MODE=1 to run without CPU reservation"
),
}));
}
let max_start = total_host_cpus - count + 1;
let start_offset = pid_window_offset(std::process::id(), max_start);
for step in 0..max_start {
let offset = (start_offset + step) % max_start;
match try_acquire_cpu_window(offset, count) {
Ok(mut locks) => {
let cpus: Vec<usize> = (offset..offset + count).collect();
if let Some(topo) = host_topo {
match acquire_llc_shared_locks(topo, &cpus) {
Ok(llc_locks) => locks.extend(llc_locks),
Err(_) => {
drop(locks);
continue;
}
}
}
return Ok(CpuLockResult { locks, cpus });
}
Err(_) => continue,
}
}
Err(anyhow::Error::new(ResourceContention {
reason: format!(
"no {count} consecutive CPUs available\n \
hint: pass --no-perf-mode or set KTSTR_NO_PERF_MODE=1 to run without CPU reservation"
),
}))
}
pub(crate) fn host_allowed_cpus() -> Vec<usize> {
#[cfg(test)]
{
if let Some(override_set) = ALLOWED_CPUS_OVERRIDE.with(|p| p.borrow().clone()) {
return override_set;
}
}
if let Some(cpus) = crate::cpu_util::read_affinity(0) {
return cpus.into_iter().map(|c| c as usize).collect();
}
if let Ok(raw) = std::fs::read_to_string("/proc/self/status") {
for line in raw.lines() {
if let Some(v) = line.strip_prefix("Cpus_allowed_list:")
&& let Some(parsed) = crate::cpu_util::parse_cpu_list(v.trim())
{
return parsed.into_iter().map(|c| c as usize).collect();
}
}
}
Vec::new()
}
#[cfg(test)]
thread_local! {
pub(crate) static ALLOWED_CPUS_OVERRIDE: std::cell::RefCell<Option<Vec<usize>>> =
const { std::cell::RefCell::new(None) };
}
fn default_cpu_budget(allowed_cpus: usize) -> usize {
allowed_cpus.saturating_mul(30).div_ceil(100).max(1)
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct CpuCap {
n: std::num::NonZeroUsize,
}
impl CpuCap {
pub fn new(n: usize) -> Result<Self> {
std::num::NonZeroUsize::new(n)
.map(|n| CpuCap { n })
.ok_or_else(|| anyhow::anyhow!("--cpu-cap must be ≥ 1 CPU (got 0)"))
}
pub fn resolve(cli_flag: Option<usize>) -> Result<Option<CpuCap>> {
if let Some(n) = cli_flag {
return Ok(Some(CpuCap::new(n)?));
}
match std::env::var("KTSTR_CPU_CAP") {
Ok(s) if s.is_empty() => Ok(None),
Ok(s) => {
let n: usize = s
.parse()
.with_context(|| format!("KTSTR_CPU_CAP is not a valid integer: {s:?}"))?;
Ok(Some(CpuCap::new(n)?))
}
Err(std::env::VarError::NotPresent) => Ok(None),
Err(std::env::VarError::NotUnicode(raw)) => {
anyhow::bail!(
"KTSTR_CPU_CAP contains non-UTF-8 bytes ({} bytes): {raw:?}. \
Set an integer value or unset.",
raw.len(),
)
}
}
}
pub fn effective_count(&self, allowed_cpus: usize) -> Result<usize> {
let n = self.n.get();
if n > allowed_cpus {
return Err(anyhow::Error::new(ResourceContention {
reason: format!(
"--cpu-cap N = {n} exceeds the {allowed_cpus} CPUs this \
process is allowed on (from sched_getaffinity / \
Cpus_allowed_list). Pick a cap ≤ {allowed_cpus}, release \
the cgroup/taskset constraint restricting this process, \
or omit --cpu-cap to use the 30% default of the allowed \
set."
),
}));
}
Ok(n)
}
}
#[derive(Debug, Clone)]
pub(crate) struct LlcSnapshot {
pub(crate) llc_idx: usize,
pub(crate) lockfile_path: std::path::PathBuf,
pub(crate) holders: Vec<crate::flock::HolderInfo>,
pub(crate) holder_count: usize,
}
#[derive(Debug)]
pub struct LlcPlan {
pub locked_llcs: Vec<usize>,
pub cpus: Vec<usize>,
pub mems: std::collections::BTreeSet<usize>,
#[allow(dead_code)]
pub(crate) snapshot: Vec<LlcSnapshot>,
#[allow(dead_code)] pub(crate) locks: Vec<std::os::fd::OwnedFd>,
}
const ACQUIRE_MAX_TOCTOU_RETRIES: u32 = 3;
const TOCTOU_RETRY_DELAYS: [std::time::Duration; ACQUIRE_MAX_TOCTOU_RETRIES as usize] = [
std::time::Duration::from_millis(10),
std::time::Duration::from_millis(50),
std::time::Duration::from_millis(200),
];
fn discover_llc_snapshots(
topo: &HostTopology,
allowed: &std::collections::BTreeSet<usize>,
mountinfo: &str,
) -> Result<Vec<LlcSnapshot>> {
let mut snapshots: Vec<LlcSnapshot> = Vec::with_capacity(topo.llc_groups.len());
for llc_idx in 0..topo.llc_groups.len() {
if !topo.llc_groups[llc_idx]
.cpus
.iter()
.any(|c| allowed.contains(c))
{
continue;
}
let path = std::path::PathBuf::from(llc_lock_path(llc_idx));
crate::flock::materialize(&path)?;
let holders =
crate::flock::read_holders_with_mountinfo(&path, mountinfo).unwrap_or_default();
let holder_count = holders.len();
snapshots.push(LlcSnapshot {
llc_idx,
lockfile_path: path,
holders,
holder_count,
});
}
Ok(snapshots)
}
fn plan_from_snapshots(
snapshots: &[LlcSnapshot],
target_cpus: usize,
topo: &HostTopology,
allowed: &std::collections::BTreeSet<usize>,
distance_fn: impl Fn(usize, usize) -> u8,
) -> Vec<usize> {
if target_cpus == 0 {
return Vec::new();
}
let llc_allowed_cpus = |idx: usize| -> usize {
topo.llc_groups[idx]
.cpus
.iter()
.filter(|c| allowed.contains(c))
.count()
};
let total_allowed_in_llcs: usize = (0..snapshots.len()).map(llc_allowed_cpus).sum();
if target_cpus >= total_allowed_in_llcs {
let mut all: Vec<usize> = (0..snapshots.len())
.filter(|&idx| llc_allowed_cpus(idx) > 0)
.collect();
all.sort_unstable();
return all;
}
let eligible = |s: &&LlcSnapshot| -> bool { llc_allowed_cpus(s.llc_idx) > 0 };
let mut consolidation: Vec<&LlcSnapshot> = snapshots
.iter()
.filter(|s| s.holder_count > 0)
.filter(eligible)
.collect();
let mut fresh: Vec<&LlcSnapshot> = snapshots
.iter()
.filter(|s| s.holder_count == 0)
.filter(eligible)
.collect();
consolidation.sort_by(|a, b| {
b.holder_count
.cmp(&a.holder_count)
.then(a.llc_idx.cmp(&b.llc_idx))
});
fresh.sort_by_key(|s| s.llc_idx);
let ranked: Vec<&LlcSnapshot> = consolidation.into_iter().chain(fresh).collect();
if ranked.is_empty() {
return Vec::new();
}
let seed = ranked[0];
let seed_node = topo.llc_numa_node(seed.llc_idx);
let node_order = topo.numa_nodes_sorted_by_distance(seed_node, distance_fn);
let mut selected: Vec<usize> = Vec::new();
let mut picked: std::collections::HashSet<usize> = std::collections::HashSet::new();
let mut accumulated: usize = 0;
for node in node_order {
if accumulated >= target_cpus {
break;
}
for snap in &ranked {
if accumulated >= target_cpus {
break;
}
if picked.contains(&snap.llc_idx) {
continue;
}
if topo.llc_numa_node(snap.llc_idx) != node {
continue;
}
selected.push(snap.llc_idx);
picked.insert(snap.llc_idx);
accumulated += llc_allowed_cpus(snap.llc_idx);
}
}
selected.sort_unstable();
selected
}
fn try_acquire_llc_plan_locks(
selected: &[usize],
snapshots: &[LlcSnapshot],
) -> Result<Option<Vec<std::os::fd::OwnedFd>>> {
let mut locks: Vec<std::os::fd::OwnedFd> = Vec::with_capacity(selected.len());
for &idx in selected {
let snap = snapshots
.iter()
.find(|s| s.llc_idx == idx)
.expect("selected index must come from snapshots — plan invariant");
match crate::flock::try_flock(&snap.lockfile_path, FlockMode::Shared)? {
Some(fd) => locks.push(fd),
None => {
drop(locks);
return Ok(None);
}
}
}
Ok(Some(locks))
}
pub fn acquire_llc_plan(
topo: &HostTopology,
test_topo: &crate::topology::TestTopology,
cpu_cap: Option<CpuCap>,
) -> Result<LlcPlan> {
if cargo_test_mode_active() {
let allowed = host_allowed_cpus();
if allowed.is_empty() {
return Err(ResourceContention {
reason: "could not determine allowed CPU set \
(sched_getaffinity and /proc/self/status both failed)"
.into(),
}
.into());
}
let _ = test_topo;
let _ = cpu_cap;
let allowed_set: std::collections::BTreeSet<usize> = allowed.iter().copied().collect();
let locked_llcs: Vec<usize> = topo
.llc_groups
.iter()
.enumerate()
.filter_map(|(idx, group)| {
if group.cpus.iter().any(|c| allowed_set.contains(c)) {
Some(idx)
} else {
None
}
})
.collect();
let mems: std::collections::BTreeSet<usize> = locked_llcs
.iter()
.filter_map(|&idx| {
topo.llc_groups
.get(idx)
.and_then(|g| g.cpus.first().copied())
.and_then(|c| topo.cpu_to_node.get(&c).copied())
})
.collect();
return Ok(LlcPlan {
locked_llcs,
cpus: allowed,
mems,
snapshot: Vec::new(),
locks: Vec::new(),
});
}
acquire_llc_plan_with_acquire_fn(topo, test_topo, cpu_cap, try_acquire_llc_plan_locks)
}
fn acquire_llc_plan_with_acquire_fn<F>(
topo: &HostTopology,
test_topo: &crate::topology::TestTopology,
cpu_cap: Option<CpuCap>,
mut acquire_fn: F,
) -> Result<LlcPlan>
where
F: FnMut(&[usize], &[LlcSnapshot]) -> Result<Option<Vec<std::os::fd::OwnedFd>>>,
{
let allowed_vec = host_allowed_cpus();
if allowed_vec.is_empty() {
return Err(ResourceContention {
reason: "could not determine allowed CPU set \
(sched_getaffinity and /proc/self/status both failed)"
.into(),
}
.into());
}
let allowed: std::collections::BTreeSet<usize> = allowed_vec.iter().copied().collect();
let allowed_cpus = allowed.len();
let target_cpus = match cpu_cap {
Some(cap) => cap.effective_count(allowed_cpus)?,
None => default_cpu_budget(allowed_cpus),
};
if target_cpus == 0 {
return Err(ResourceContention {
reason: "CPU budget resolved to zero".into(),
}
.into());
}
let mountinfo = crate::flock::read_mountinfo().map_err(|e| ResourceContention {
reason: format!("read /proc/self/mountinfo: {e}"),
})?;
let mut attempt: u32 = 0;
loop {
let snapshots =
discover_llc_snapshots(topo, &allowed, &mountinfo).map_err(|e| ResourceContention {
reason: format!("discover LLC snapshots: {e}"),
})?;
let selected = plan_from_snapshots(&snapshots, target_cpus, topo, &allowed, |from, to| {
test_topo.numa_distance(from, to)
});
if selected.is_empty() {
return Err(ResourceContention {
reason: format!(
"no host LLC overlaps the process's \
{allowed_cpus}-CPU allowed set — sysfs LLC groups \
and sched_getaffinity disagree"
),
}
.into());
}
match acquire_fn(&selected, &snapshots).map_err(|e| ResourceContention {
reason: format!("acquire LLC locks: {e}"),
})? {
Some(locks) => {
let mut cpus: Vec<usize> = Vec::new();
let mut mems: std::collections::BTreeSet<usize> = std::collections::BTreeSet::new();
'outer: for &idx in &selected {
let group = &topo.llc_groups[idx];
for &cpu in &group.cpus {
if !allowed.contains(&cpu) {
continue;
}
if cpus.len() >= target_cpus {
break 'outer;
}
cpus.push(cpu);
let node = topo.cpu_to_node.get(&cpu).copied().unwrap_or(0);
mems.insert(node);
}
}
return Ok(LlcPlan {
locked_llcs: selected,
cpus,
mems,
snapshot: snapshots,
locks,
});
}
None => {
if attempt >= ACQUIRE_MAX_TOCTOU_RETRIES {
let final_snapshots = discover_llc_snapshots(topo, &allowed, &mountinfo)?;
let holders: Vec<String> = final_snapshots
.iter()
.filter(|s| !s.holders.is_empty())
.map(|s| {
format!(
"LLC {}: {}",
s.llc_idx,
crate::flock::format_holder_list(&s.holders)
)
})
.collect();
let holder_text = if holders.is_empty() {
"<none recorded>".to_string()
} else {
holders.join("; ")
};
return Err(anyhow::Error::new(ResourceContention {
reason: format!(
"acquire_llc_plan: could not reserve {target_cpus} \
CPU(s) after {attempts} attempts; holders: \
{holder_text}. Run `ktstr locks --json` to see \
every ktstr lock on this host.",
attempts = ACQUIRE_MAX_TOCTOU_RETRIES + 1,
),
}));
}
std::thread::sleep(TOCTOU_RETRY_DELAYS[attempt as usize]);
attempt += 1;
}
}
}
}
pub fn make_jobs_for_plan(plan: &LlcPlan) -> usize {
plan.cpus.len().max(1)
}
pub fn format_llc_list(locked: &[usize], topo: &HostTopology) -> String {
let parts: Vec<String> = locked
.iter()
.map(|&idx| {
if topo.cpu_to_node.is_empty() {
idx.to_string()
} else {
let node = topo.llc_numa_node(idx);
format!("{idx} (node {node})")
}
})
.collect();
format!("[{}]", parts.join(", "))
}
pub fn warn_if_cross_node_spill(plan: &LlcPlan, topo: &HostTopology) {
if should_warn_cross_node(&plan.mems) {
eprintln!(
"ktstr: reserving LLCs {list} across {n} NUMA nodes \
(preferred single-node contiguous unavailable). Build \
will run; memory-access latency may be higher.",
list = format_llc_list(&plan.locked_llcs, topo),
n = plan.mems.len(),
);
}
}
fn should_warn_cross_node(mems: &std::collections::BTreeSet<usize>) -> bool {
mems.len() > 1
}
#[allow(dead_code)]
fn acquire_llc_shared_locks(
topo: &HostTopology,
cpus: &[usize],
) -> std::result::Result<Vec<std::os::fd::OwnedFd>, String> {
let mut llc_indices: Vec<usize> = Vec::new();
for &cpu in cpus {
for (idx, group) in topo.llc_groups.iter().enumerate() {
if group.cpus.contains(&cpu) && !llc_indices.contains(&idx) {
llc_indices.push(idx);
}
}
}
let mut locks = Vec::new();
for &llc_idx in &llc_indices {
let path = llc_lock_path(llc_idx);
match try_flock(&path, FlockMode::Shared) {
Ok(Some(fd)) => locks.push(fd),
Ok(None) => return Err(format!("LLC {llc_idx} exclusively held")),
Err(e) => return Err(format!("LLC {llc_idx}: {e}")),
}
}
Ok(locks)
}
#[allow(dead_code)]
fn try_acquire_cpu_window(
offset: usize,
count: usize,
) -> std::result::Result<Vec<std::os::fd::OwnedFd>, String> {
let mut locks = Vec::with_capacity(count);
for cpu in offset..offset + count {
let path = cpu_lock_path(cpu);
match try_flock(&path, FlockMode::Exclusive) {
Ok(Some(fd)) => locks.push(fd),
Ok(None) => return Err(format!("CPU {cpu} busy")),
Err(e) => return Err(format!("CPU {cpu}: {e}")),
}
}
Ok(locks)
}
pub unsafe fn mbind_to_nodes(addr: *mut u8, len: usize, nodes: &[usize]) {
if nodes.is_empty() || len == 0 {
return;
}
let node_set: std::collections::BTreeSet<usize> = nodes.iter().copied().collect();
let (nodemask, maxnode) = crate::workload::build_nodemask(&node_set);
let rc = unsafe {
libc::syscall(
libc::SYS_mbind,
addr as *mut libc::c_void,
len,
libc::MPOL_BIND,
nodemask.as_ptr(),
maxnode,
0u32,
)
};
if rc == 0 {
eprintln!(
"performance_mode: mbind {} MB to NUMA node(s) {:?}",
len >> 20,
nodes,
);
} else {
let err = std::io::Error::last_os_error();
eprintln!(
"performance_mode: WARNING: mbind to node(s) {:?} failed: {err}",
nodes,
);
}
}
use crate::topology::parse_cpu_list_lenient;
pub fn hugepages_free() -> u64 {
std::fs::read_to_string("/sys/kernel/mm/hugepages/hugepages-2048kB/free_hugepages")
.ok()
.and_then(|s| s.trim().parse::<u64>().ok())
.unwrap_or(0)
}
pub fn hugepages_needed(memory_mb: u32) -> u64 {
(memory_mb as u64).div_ceil(2)
}
pub fn host_load_estimate() -> Option<(usize, usize)> {
let stat = std::fs::read_to_string("/proc/stat").ok()?;
let procs_running = stat
.lines()
.find(|l| l.starts_with("procs_running "))?
.split_whitespace()
.nth(1)?
.parse::<usize>()
.ok()?;
let online = std::fs::read_to_string("/sys/devices/system/cpu/online").ok()?;
let total = parse_cpu_list_lenient(&online).len();
Some((procs_running, total))
}
#[cfg(test)]
mod tests;