use crate::numa::{NumaNode, NumaTopology};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CacheInfo {
pub level: u8,
pub size_kb: u64,
pub line_size: u32,
pub associativity: u32,
pub shared_cpus: Vec<usize>,
}
impl CacheInfo {
pub fn is_unified(&self) -> bool {
self.level >= 2 }
}
#[derive(Debug, Clone)]
pub struct PackageInfo {
pub id: usize,
pub cpus: Vec<usize>,
pub numa_nodes: Vec<usize>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BindError(pub String);
impl std::fmt::Display for BindError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "CPU binding error: {}", self.0)
}
}
impl std::error::Error for BindError {}
pub trait HwTopology: Send + Sync {
fn numa_nodes(&self) -> &[NumaNode];
fn cpu_count(&self) -> usize;
fn packages(&self) -> Vec<PackageInfo>;
fn cache_info(&self) -> Vec<CacheInfo>;
fn bind_thread(&self, cpus: &[usize]) -> Result<(), BindError>;
fn numa_node_for_cpu(&self, cpu: usize) -> Option<usize>;
fn is_hwloc_backed(&self) -> bool;
fn backend_name(&self) -> &'static str;
}
#[cfg(feature = "hwloc")]
mod hwloc_backend {
use super::*;
use std::collections::HashMap;
use hwloc2::{Topology, ObjectType, CpuBindFlags};
pub struct HwlocBackend {
topology: Topology,
numa_nodes: Vec<NumaNode>,
cpu_to_node: HashMap<usize, usize>,
}
impl HwlocBackend {
pub fn new() -> Result<Self, String> {
let topology = Topology::new()
.ok_or_else(|| "hwloc: failed to create topology object".to_string())?;
let (numa_nodes, cpu_to_node) = Self::extract_numa(&topology);
Ok(Self { topology, numa_nodes, cpu_to_node })
}
fn extract_numa(topo: &Topology) -> (Vec<NumaNode>, HashMap<usize, usize>) {
let mut numa_nodes = Vec::new();
let mut cpu_to_node = HashMap::new();
if let Ok(objects) = topo.objects_with_type(&ObjectType::NUMANode) {
for obj in &objects {
let id = obj.logical_index() as usize;
let cpus: Vec<usize> = obj.cpuset()
.iter()
.flat_map(|cs| cs.iter().map(|c| c as usize))
.collect();
for &cpu in &cpus {
cpu_to_node.insert(cpu, id);
}
let memory_bytes = Some(obj.memory().local_memory());
numa_nodes.push(NumaNode { id, cpus, memory_bytes });
}
}
if numa_nodes.is_empty() {
let cpus = Self::all_pu_cpus(topo);
for &c in &cpus { cpu_to_node.insert(c, 0); }
numa_nodes.push(NumaNode { id: 0, cpus, memory_bytes: None });
}
(numa_nodes, cpu_to_node)
}
fn all_pu_cpus(topo: &Topology) -> Vec<usize> {
topo.objects_with_type(&ObjectType::PU)
.map(|objs| objs.iter().map(|o| o.logical_index() as usize).collect())
.unwrap_or_else(|_| (0..num_cpus::get()).collect())
}
fn extract_packages(topo: &Topology, numa_nodes: &[NumaNode]) -> Vec<PackageInfo> {
let objects = match topo.objects_with_type(&ObjectType::Package) {
Ok(o) if !o.is_empty() => o,
_ => return Vec::new(),
};
objects.iter().map(|obj| {
let id = obj.logical_index() as usize;
let cpus: Vec<usize> = obj.cpuset()
.iter()
.flat_map(|cs| cs.iter().map(|c| c as usize))
.collect();
let numa_node_ids = numa_nodes.iter()
.filter(|n| n.cpus.iter().any(|c| cpus.contains(c)))
.map(|n| n.id)
.collect();
PackageInfo { id, cpus, numa_nodes: numa_node_ids }
}).collect()
}
fn extract_caches(topo: &Topology) -> Vec<CacheInfo> {
let mut caches = Vec::new();
let cache_levels: &[(u8, ObjectType)] = &[
(1, ObjectType::L1Cache),
(2, ObjectType::L2Cache),
(3, ObjectType::L3Cache),
];
for &(level, ref obj_type) in cache_levels {
let objects = match topo.objects_with_type(obj_type) {
Ok(o) => o,
Err(_) => continue,
};
for obj in &objects {
let shared_cpus: Vec<usize> = obj.cpuset()
.iter()
.flat_map(|cs| cs.iter().map(|c| c as usize))
.collect();
let (size_kb, line_size, associativity) =
extract_cache_attrs(obj, level);
caches.push(CacheInfo {
level,
size_kb,
line_size,
associativity,
shared_cpus,
});
}
}
caches
}
}
fn extract_cache_attrs(
obj: &hwloc2::TopologyObject,
_level: u8,
) -> (u64, u32, u32) {
if let Some(ca) = obj.cache_attributes() {
let size_kb = ca.size() / 1024;
let line = ca.line_size() as u32;
let assoc = ca.associativity().unsigned_abs();
(size_kb, line, assoc)
} else {
(0, 64, 0)
}
}
impl HwTopology for HwlocBackend {
fn numa_nodes(&self) -> &[NumaNode] { &self.numa_nodes }
fn cpu_count(&self) -> usize {
self.topology.objects_with_type(&ObjectType::PU)
.map(|v| v.len())
.unwrap_or_else(|_| num_cpus::get())
}
fn packages(&self) -> Vec<PackageInfo> {
Self::extract_packages(&self.topology, &self.numa_nodes)
}
fn cache_info(&self) -> Vec<CacheInfo> {
Self::extract_caches(&self.topology)
}
fn bind_thread(&self, cpus: &[usize]) -> Result<(), BindError> {
if cpus.is_empty() { return Ok(()); }
let mut cpuset = hwloc2::CpuSet::new();
for &cpu in cpus {
cpuset.set(cpu as u32);
}
self.topology
.set_cpubind(cpuset, CpuBindFlags::THREAD)
.map_err(|e| BindError(format!("hwloc bind_thread failed: {:?}", e)))
}
fn numa_node_for_cpu(&self, cpu: usize) -> Option<usize> {
self.cpu_to_node.get(&cpu).copied()
}
fn is_hwloc_backed(&self) -> bool { true }
fn backend_name(&self) -> &'static str { "hwloc2 2.2.0" }
}
}
mod sysfs_backend {
use super::*;
pub struct SysfsBackend {
numa: NumaTopology,
caches: Vec<CacheInfo>,
packages: Vec<PackageInfo>,
}
impl SysfsBackend {
pub fn detect() -> Self {
let numa = NumaTopology::detect();
let packages = Self::build_packages(&numa);
let caches = Self::probe_caches();
Self { numa, caches, packages }
}
fn build_packages(numa: &NumaTopology) -> Vec<PackageInfo> {
numa.nodes.iter().map(|n| PackageInfo {
id: n.id,
cpus: n.cpus.clone(),
numa_nodes: vec![n.id],
}).collect()
}
fn probe_caches() -> Vec<CacheInfo> {
let mut caches = Vec::new();
#[cfg(target_os = "linux")]
{
let cpu_count = num_cpus::get();
for cpu in 0..cpu_count {
let base = format!("/sys/devices/system/cpu/cpu{}/cache", cpu);
if let Ok(entries) = std::fs::read_dir(&base) {
for entry in entries.flatten() {
let path = entry.path();
let level = read_u64_sysfs(&path.join("level"))
.map(|v| v as u8)
.unwrap_or(0);
if level == 0 { continue; }
let size_kb = read_size_kb(&path.join("size"))
.unwrap_or(0);
let line_size = read_u64_sysfs(&path.join("coherency_line_size"))
.map(|v| v as u32)
.unwrap_or(64);
let shared_str = std::fs::read_to_string(path.join("shared_cpu_list"))
.unwrap_or_default();
let shared_cpus = parse_cpu_list(&shared_str);
if shared_cpus.first() != Some(&cpu) { continue; }
caches.push(CacheInfo {
level, size_kb, line_size,
associativity: 0,
shared_cpus,
});
}
}
}
caches.sort_by_key(|c| (c.level, c.shared_cpus.first().copied().unwrap_or(0)));
}
if caches.is_empty() {
caches.push(CacheInfo {
level: 3,
size_kb: 8192,
line_size: 64,
associativity: 0,
shared_cpus: (0..num_cpus::get()).collect(),
});
}
caches
}
}
impl HwTopology for SysfsBackend {
fn numa_nodes(&self) -> &[NumaNode] { &self.numa.nodes }
fn cpu_count(&self) -> usize { num_cpus::get() }
fn packages(&self) -> Vec<PackageInfo> { self.packages.clone() }
fn cache_info(&self) -> Vec<CacheInfo> { self.caches.clone() }
fn bind_thread(&self, cpus: &[usize]) -> Result<(), BindError> {
bind_thread_sysfs(cpus)
}
fn numa_node_for_cpu(&self, cpu: usize) -> Option<usize> {
self.numa.node_for_cpu(cpu)
}
fn is_hwloc_backed(&self) -> bool { false }
fn backend_name(&self) -> &'static str { "sysfs-fallback" }
}
fn read_u64_sysfs(path: &std::path::Path) -> Option<u64> {
std::fs::read_to_string(path).ok()
.and_then(|s| s.trim().parse::<u64>().ok())
}
fn read_size_kb(path: &std::path::Path) -> Option<u64> {
let s = std::fs::read_to_string(path).ok()?;
let s = s.trim();
if let Some(stripped) = s.strip_suffix('K') {
stripped.parse().ok()
} else if let Some(stripped) = s.strip_suffix('M') {
stripped.parse::<u64>().ok().map(|v| v * 1024)
} else {
s.parse::<u64>().ok().map(|v| v / 1024)
}
}
fn parse_cpu_list(s: &str) -> Vec<usize> {
let mut cpus = Vec::new();
for part in s.trim().split(',') {
let part = part.trim();
if let Some((a, b)) = part.split_once('-') {
if let (Ok(lo), Ok(hi)) = (a.parse::<usize>(), b.parse::<usize>()) {
cpus.extend(lo..=hi);
}
} else if let Ok(n) = part.parse::<usize>() {
cpus.push(n);
}
}
cpus
}
}
fn bind_thread_sysfs(cpus: &[usize]) -> Result<(), BindError> {
if cpus.is_empty() {
return Ok(());
}
#[cfg(target_os = "linux")]
{
use std::mem;
unsafe {
let mut cpuset: libc::cpu_set_t = mem::zeroed();
for &cpu in cpus {
libc::CPU_SET(cpu, &mut cpuset);
}
let tid = libc::pthread_self();
let rc = libc::pthread_setaffinity_np(
tid,
mem::size_of::<libc::cpu_set_t>(),
&cpuset,
);
if rc != 0 {
return Err(BindError(format!(
"pthread_setaffinity_np failed: errno={}", rc
)));
}
}
return Ok(());
}
#[cfg(target_os = "macos")]
{
let _ = cpus;
return Err(BindError("Thread affinity not supported on macOS".into()));
}
#[allow(unreachable_code)]
Err(BindError("Thread affinity not implemented on this platform".into()))
}
pub struct TopologyProvider {
inner: Box<dyn HwTopology>,
}
impl TopologyProvider {
pub fn detect() -> Self {
#[cfg(feature = "hwloc")]
{
match hwloc_backend::HwlocBackend::new() {
Ok(backend) => {
log::info!("Hardware topology: using hwloc2 backend");
return Self { inner: Box::new(backend) };
}
Err(e) => {
log::warn!("hwloc2 init failed ({}); falling back to sysfs", e);
}
}
}
log::info!("Hardware topology: using sysfs fallback backend");
Self { inner: Box::new(sysfs_backend::SysfsBackend::detect()) }
}
pub fn sysfs() -> Self {
Self { inner: Box::new(sysfs_backend::SysfsBackend::detect()) }
}
}
impl HwTopology for TopologyProvider {
fn numa_nodes(&self) -> &[NumaNode] { self.inner.numa_nodes() }
fn cpu_count(&self) -> usize { self.inner.cpu_count() }
fn packages(&self) -> Vec<PackageInfo> { self.inner.packages() }
fn cache_info(&self) -> Vec<CacheInfo> { self.inner.cache_info() }
fn bind_thread(&self, cpus: &[usize]) -> Result<(), BindError> { self.inner.bind_thread(cpus) }
fn numa_node_for_cpu(&self, cpu: usize) -> Option<usize> { self.inner.numa_node_for_cpu(cpu) }
fn is_hwloc_backed(&self) -> bool { self.inner.is_hwloc_backed() }
fn backend_name(&self) -> &'static str { self.inner.backend_name() }
}
pub struct HwlocWorkerAffinity {
topology: TopologyProvider,
strategy: AffinityStrategy,
num_workers: usize,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum AffinityStrategy {
None,
NUMARoundRobin,
NUMADense,
PhysicalCores,
L3CacheDomain,
}
impl HwlocWorkerAffinity {
pub fn new(topology: TopologyProvider, strategy: AffinityStrategy, num_workers: usize) -> Self {
Self { topology, strategy, num_workers }
}
pub fn cpus_for_worker(&self, worker_id: usize) -> Vec<usize> {
match self.strategy {
AffinityStrategy::None => Vec::new(),
AffinityStrategy::NUMARoundRobin => {
let nodes = self.topology.numa_nodes();
if nodes.is_empty() { return Vec::new(); }
let node_idx = worker_id % nodes.len();
let node = &nodes[node_idx];
let local_idx = worker_id / nodes.len();
node.cpus.get(local_idx % node.cpus.len().max(1))
.map(|&cpu| vec![cpu])
.unwrap_or_default()
}
AffinityStrategy::NUMADense => {
let nodes = self.topology.numa_nodes();
let mut offset = 0;
for node in nodes {
if worker_id < offset + node.cpus.len() {
let local = worker_id - offset;
return node.cpus.get(local).map(|&c| vec![c]).unwrap_or_default();
}
offset += node.cpus.len();
}
Vec::new()
}
AffinityStrategy::PhysicalCores => {
let all_cpus: Vec<usize> = (0..self.topology.cpu_count()).collect();
let physical: Vec<usize> = all_cpus.iter()
.step_by(2) .copied()
.collect();
physical.get(worker_id % physical.len().max(1))
.map(|&c| vec![c])
.unwrap_or_default()
}
AffinityStrategy::L3CacheDomain => {
let caches: Vec<_> = self.topology.cache_info()
.into_iter()
.filter(|c| c.level == 3)
.collect();
if caches.is_empty() { return Vec::new(); }
let domain = &caches[worker_id % caches.len()];
domain.shared_cpus.clone()
}
}
}
pub fn pin_current_thread(&self, worker_id: usize) -> Result<(), BindError> {
let cpus = self.cpus_for_worker(worker_id);
if cpus.is_empty() {
return Ok(());
}
self.topology.bind_thread(&cpus)
}
pub fn describe(&self) -> String {
format!(
"HwlocWorkerAffinity {{ backend={}, strategy={:?}, workers={}, numa_nodes={}, cpus={} }}",
self.topology.backend_name(),
self.strategy,
self.num_workers,
self.topology.numa_nodes().len(),
self.topology.cpu_count(),
)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn detect_returns_valid_topology() {
let topo = TopologyProvider::detect();
assert!(topo.cpu_count() >= 1);
assert!(!topo.numa_nodes().is_empty());
let caches = topo.cache_info();
assert!(!caches.is_empty(), "expected at least one cache entry");
}
#[test]
fn sysfs_backend_sane() {
let topo = TopologyProvider::sysfs();
assert_eq!(topo.backend_name(), "sysfs-fallback");
assert!(!topo.is_hwloc_backed());
assert!(topo.cpu_count() >= 1);
}
#[test]
fn packages_non_empty() {
let topo = TopologyProvider::detect();
let pkgs = topo.packages();
assert!(!pkgs.is_empty());
for pkg in &pkgs {
assert!(!pkg.cpus.is_empty(), "package {} has no CPUs", pkg.id);
}
}
#[test]
fn cache_levels_sane() {
let topo = TopologyProvider::detect();
for cache in topo.cache_info() {
assert!(cache.level >= 1 && cache.level <= 4,
"unexpected cache level {}", cache.level);
assert!(cache.line_size >= 16,
"suspiciously small line size {}", cache.line_size);
assert!(!cache.shared_cpus.is_empty(),
"cache L{} has no associated CPUs", cache.level);
}
}
#[test]
fn numa_node_for_cpu_consistent() {
let topo = TopologyProvider::detect();
for node in topo.numa_nodes() {
for &cpu in &node.cpus {
let result = topo.numa_node_for_cpu(cpu);
assert_eq!(result, Some(node.id),
"cpu {} should map to node {} but got {:?}", cpu, node.id, result);
}
}
}
#[test]
fn worker_affinity_numa_round_robin() {
let topo = TopologyProvider::detect();
let affinity = HwlocWorkerAffinity::new(topo, AffinityStrategy::NUMARoundRobin, 8);
for w in 0..8 {
let _cpus = affinity.cpus_for_worker(w);
}
}
#[test]
fn worker_affinity_l3_domain() {
let topo = TopologyProvider::detect();
let affinity = HwlocWorkerAffinity::new(topo, AffinityStrategy::L3CacheDomain, 4);
for w in 0..4 {
let cpus = affinity.cpus_for_worker(w);
if !cpus.is_empty() {
for &cpu in &cpus {
assert!(cpu < affinity.topology.cpu_count(),
"CPU index {} out of range", cpu);
}
}
}
}
#[test]
fn bind_thread_none_strategy_is_noop() {
let topo = TopologyProvider::detect();
let affinity = HwlocWorkerAffinity::new(topo, AffinityStrategy::None, 4);
assert!(affinity.pin_current_thread(0).is_ok());
}
}