use std::sync::atomic::{AtomicBool, Ordering};
static NUMA_DETECTED: AtomicBool = AtomicBool::new(false);
#[inline]
pub fn is_numa_available() -> bool {
NUMA_DETECTED.load(Ordering::Relaxed)
}
#[cfg(target_os = "linux")]
mod inner {
use super::*;
use std::fs;
use std::path::Path;
#[derive(Debug, Clone)]
pub struct NumaNode {
pub id: u32,
pub cpus: Vec<u32>,
}
static TOPOLOGY: std::sync::OnceLock<Vec<NumaNode>> = std::sync::OnceLock::new();
fn parse_cpu_list(s: &str) -> Vec<u32> {
let mut cpus = Vec::new();
for part in s.split(',') {
let part = part.trim();
if part.is_empty() {
continue;
}
if let Some((start, end)) = part.split_once('-') {
if let (Ok(s), Ok(e)) = (start.trim().parse::<u32>(), end.trim().parse::<u32>()) {
cpus.extend(s..=e);
}
} else if let Ok(cpu) = part.parse::<u32>() {
cpus.push(cpu);
}
}
cpus
}
fn detect_topology() -> Option<Vec<NumaNode>> {
let node_dir = Path::new("/sys/devices/system/node");
if !node_dir.exists() {
return None;
}
let mut nodes = Vec::new();
let entries = fs::read_dir(node_dir).ok()?;
for entry in entries.flatten() {
let name = entry.file_name();
let name_str = name.to_string_lossy();
if !name_str.starts_with("node") {
continue;
}
let id: u32 = name_str[4..].parse().ok()?;
let cpulist_path = entry.path().join("cpulist");
let cpulist = fs::read_to_string(cpulist_path).ok()?;
let cpus = parse_cpu_list(&cpulist);
if !cpus.is_empty() {
nodes.push(NumaNode { id, cpus });
}
}
nodes.sort_by_key(|n| n.id);
if nodes.len() >= 2 {
Some(nodes)
} else {
None
}
}
pub fn init_numa() {
TOPOLOGY.get_or_init(|| match detect_topology() {
Some(topo) => {
log::info!(
"NUMA topology detected: {} nodes, {} total CPUs",
topo.len(),
topo.iter().map(|n| n.cpus.len()).sum::<usize>()
);
for node in &topo {
log::debug!(" node {}: cpus {:?}", node.id, node.cpus);
}
NUMA_DETECTED.store(true, Ordering::Relaxed);
topo
}
None => {
log::debug!("Single NUMA node or sysfs unavailable — pinning disabled");
Vec::new()
}
});
}
pub fn topology() -> &'static [NumaNode] {
TOPOLOGY.get().map(|v| v.as_slice()).unwrap_or(&[])
}
pub fn node_count() -> usize {
topology().len()
}
pub fn pin_thread_to_node(node_id: usize) -> Result<(), std::io::Error> {
let topo = topology();
if topo.is_empty() || node_id >= topo.len() {
return Ok(());
}
let node = &topo[node_id];
pin_thread_to_cpus(&node.cpus)
}
pub fn pin_thread_to_cpus(cpus: &[u32]) -> Result<(), std::io::Error> {
if cpus.is_empty() {
return Ok(());
}
unsafe {
let mut cpuset: libc::cpu_set_t = std::mem::zeroed();
for &cpu in cpus {
libc::CPU_SET(cpu as usize, &mut cpuset);
}
let ret = libc::sched_setaffinity(
0, std::mem::size_of::<libc::cpu_set_t>(),
&cpuset,
);
if ret == 0 {
Ok(())
} else {
Err(std::io::Error::last_os_error())
}
}
}
pub fn pin_worker(worker_index: usize) {
let topo = topology();
if topo.is_empty() {
return;
}
let node_id = worker_index % topo.len();
if let Err(e) = pin_thread_to_node(node_id) {
log::warn!(
"Failed to pin worker {} to NUMA node {}: {}",
worker_index,
node_id,
e
);
} else {
log::trace!("Worker {} pinned to NUMA node {}", worker_index, node_id);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_cpu_list_range() {
assert_eq!(parse_cpu_list("0-3"), vec![0, 1, 2, 3]);
}
#[test]
fn test_parse_cpu_list_mixed() {
assert_eq!(parse_cpu_list("0-2,5,8-9"), vec![0, 1, 2, 5, 8, 9]);
}
#[test]
fn test_parse_cpu_list_single() {
assert_eq!(parse_cpu_list("7"), vec![7]);
}
#[test]
fn test_parse_cpu_list_empty() {
assert!(parse_cpu_list("").is_empty());
}
#[test]
fn test_parse_cpu_list_whitespace() {
assert_eq!(parse_cpu_list(" 1 - 3 , 5 "), vec![1, 2, 3, 5]);
}
#[test]
fn test_init_numa_idempotent() {
init_numa();
init_numa(); }
#[test]
fn test_topology_returns_slice() {
init_numa();
let topo = topology();
let _ = topo.len();
}
#[test]
fn test_pin_worker_no_panic() {
init_numa();
pin_worker(0);
pin_worker(999);
}
#[test]
fn test_pin_thread_to_node_invalid() {
init_numa();
assert!(pin_thread_to_node(9999).is_ok());
}
#[test]
fn test_pin_thread_to_cpus_empty() {
assert!(pin_thread_to_cpus(&[]).is_ok());
}
#[test]
fn test_node_count() {
init_numa();
let _ = node_count();
}
}
}
#[cfg(not(target_os = "linux"))]
mod inner {
#[inline]
pub fn init_numa() {}
#[inline]
pub fn node_count() -> usize {
0
}
#[inline]
pub fn pin_worker(_worker_index: usize) {}
#[inline]
pub fn pin_thread_to_node(_node_id: usize) -> Result<(), std::io::Error> {
Ok(())
}
#[inline]
pub fn pin_thread_to_cpus(_cpus: &[u32]) -> Result<(), std::io::Error> {
Ok(())
}
}
pub use inner::*;