use crate::error::{CoreError, CoreResult, ErrorContext};
#[derive(Debug, Clone)]
pub struct NumaNode {
pub id: usize,
pub cpu_ids: Vec<usize>,
pub memory_mb: usize,
pub free_memory_mb: usize,
}
#[derive(Debug, Clone)]
pub struct NumaTopology {
pub nodes: Vec<NumaNode>,
pub num_nodes: usize,
}
impl NumaTopology {
pub fn discover() -> Self {
#[cfg(all(target_os = "linux", feature = "libnuma"))]
{
if let Some(topo) = Self::discover_libnuma() {
return topo;
}
}
#[cfg(target_os = "linux")]
{
if let Some(topo) = Self::discover_linux() {
return topo;
}
}
Self::single_node_fallback()
}
#[cfg(all(target_os = "linux", feature = "libnuma"))]
fn discover_libnuma() -> Option<Self> {
use std::os::raw::{c_int, c_longlong, c_uint, c_ulong};
use std::ffi::c_void;
#[link(name = "numa")]
extern "C" {
fn numa_available() -> c_int;
fn numa_max_node() -> c_int;
fn numa_node_size64(node: c_int, freep: *mut c_longlong) -> c_longlong;
fn numa_allocate_cpumask() -> *mut c_void;
fn numa_node_to_cpus(node: c_int, mask: *mut c_void) -> c_int;
fn numa_bitmask_isbitset(bmp: *const c_void, n: c_uint) -> c_int;
fn numa_num_configured_cpus() -> c_int;
fn numa_bitmask_free(bmp: *mut c_void);
}
if unsafe { numa_available() } == -1 {
return None;
}
let max_node = unsafe { numa_max_node() };
if max_node <= 0 {
return None;
}
let total_cpus = unsafe { numa_num_configured_cpus() }.max(0) as usize;
let mut nodes: Vec<NumaNode> = Vec::new();
for raw_node in 0..=max_node {
let mut cpu_ids: Vec<usize> = Vec::new();
unsafe {
let mask_ptr = numa_allocate_cpumask();
if !mask_ptr.is_null() {
if numa_node_to_cpus(raw_node, mask_ptr) == 0 {
for cpu in 0..total_cpus {
if numa_bitmask_isbitset(mask_ptr as *const c_void, cpu as c_uint) != 0
{
cpu_ids.push(cpu);
}
}
}
numa_bitmask_free(mask_ptr);
}
}
let mut free_bytes: c_longlong = 0;
let total_bytes =
unsafe { numa_node_size64(raw_node, &mut free_bytes as *mut c_longlong) };
let memory_mb = if total_bytes > 0 {
total_bytes as usize / (1024 * 1024)
} else {
0
};
let free_memory_mb = if free_bytes > 0 {
free_bytes as usize / (1024 * 1024)
} else {
0
};
if !cpu_ids.is_empty() {
nodes.push(NumaNode {
id: raw_node as usize,
cpu_ids,
memory_mb,
free_memory_mb,
});
}
}
if nodes.is_empty() {
return None;
}
let num_nodes = nodes.len();
Some(NumaTopology { nodes, num_nodes })
}
fn single_node_fallback() -> Self {
let cpu_count = num_cpus_count();
let node = NumaNode {
id: 0,
cpu_ids: (0..cpu_count).collect(),
memory_mb: 0,
free_memory_mb: 0,
};
NumaTopology {
num_nodes: 1,
nodes: vec![node],
}
}
#[cfg(target_os = "linux")]
fn discover_linux() -> Option<Self> {
use std::fs;
use std::path::Path;
let node_base = Path::new("/sys/devices/system/node");
if !node_base.exists() {
return None;
}
let mut nodes: Vec<NumaNode> = Vec::new();
let mut idx = 0usize;
loop {
let node_dir = node_base.join(format!("node{idx}"));
if !node_dir.exists() {
break;
}
let cpu_ids = fs::read_to_string(node_dir.join("cpulist"))
.map(|s| parse_cpu_list(s.trim()))
.unwrap_or_default();
let (memory_mb, free_memory_mb) = fs::read_to_string(node_dir.join("meminfo"))
.map(|s| parse_meminfo(&s))
.unwrap_or((0, 0));
nodes.push(NumaNode {
id: idx,
cpu_ids,
memory_mb,
free_memory_mb,
});
idx += 1;
}
if nodes.is_empty() {
return None;
}
let num_nodes = nodes.len();
Some(NumaTopology { nodes, num_nodes })
}
pub fn node_for_cpu(&self, cpu_id: usize) -> Option<usize> {
self.nodes
.iter()
.find(|n| n.cpu_ids.contains(&cpu_id))
.map(|n| n.id)
}
pub fn current_node(&self) -> usize {
let cpu = current_cpu_id();
self.node_for_cpu(cpu).unwrap_or(0)
}
}
fn num_cpus_count() -> usize {
std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1)
}
fn current_cpu_id() -> usize {
#[cfg(target_os = "linux")]
{
extern "C" {
fn sched_getcpu() -> std::os::raw::c_int;
}
let cpu = unsafe { sched_getcpu() };
if cpu >= 0 {
return cpu as usize;
}
}
0
}
#[cfg(target_os = "linux")]
fn parse_cpu_list(s: &str) -> Vec<usize> {
let mut cpus = Vec::new();
for part in s.split(',') {
let part = part.trim();
if part.is_empty() {
continue;
}
if let Some((lo, hi)) = part.split_once('-') {
let lo: usize = lo.trim().parse().unwrap_or(0);
let hi: usize = hi.trim().parse().unwrap_or(0);
for c in lo..=hi {
cpus.push(c);
}
} else if let Ok(c) = part.parse::<usize>() {
cpus.push(c);
}
}
cpus
}
#[cfg(target_os = "linux")]
fn parse_meminfo(s: &str) -> (usize, usize) {
let mut total = 0usize;
let mut free = 0usize;
for line in s.lines() {
if line.contains("MemTotal") {
total = extract_kb(line) / 1024;
} else if line.contains("MemFree") {
free = extract_kb(line) / 1024;
}
}
(total, free)
}
#[cfg(target_os = "linux")]
fn extract_kb(line: &str) -> usize {
line.split_whitespace()
.find_map(|w| w.parse::<usize>().ok())
.unwrap_or(0)
}
pub struct NumaBuffer<T> {
data: Vec<T>,
node_id: usize,
}
impl<T: Default + Clone> NumaBuffer<T> {
pub fn new(size: usize, node_id: usize) -> Self {
NumaBuffer {
data: vec![T::default(); size],
node_id,
}
}
pub fn as_slice(&self) -> &[T] {
&self.data
}
pub fn as_mut_slice(&mut self) -> &mut [T] {
&mut self.data
}
pub fn node_id(&self) -> usize {
self.node_id
}
pub fn len(&self) -> usize {
self.data.len()
}
pub fn is_empty(&self) -> bool {
self.data.is_empty()
}
}
pub struct NumaAwarePool<T: Default + Clone + Send> {
per_node_pools: Vec<Vec<Vec<T>>>,
block_size: usize,
topology: NumaTopology,
}
impl<T: Default + Clone + Send> NumaAwarePool<T> {
pub fn new(block_size: usize) -> Self {
let topology = NumaTopology::discover();
let num_nodes = topology.num_nodes;
NumaAwarePool {
per_node_pools: vec![Vec::new(); num_nodes],
block_size,
topology,
}
}
pub fn allocate(&mut self, node_id: Option<usize>) -> Vec<T> {
let node = self.resolve_node(node_id);
if let Some(block) = self.per_node_pools[node].pop() {
return block;
}
vec![T::default(); self.block_size]
}
pub fn deallocate(&mut self, block: Vec<T>, node_id: Option<usize>) {
if block.len() != self.block_size {
return; }
let node = self.resolve_node(node_id);
self.per_node_pools[node].push(block);
}
pub fn stats(&self) -> Vec<(usize, usize)> {
self.per_node_pools
.iter()
.enumerate()
.map(|(i, pool)| (i, pool.len()))
.collect()
}
fn resolve_node(&self, hint: Option<usize>) -> usize {
let n = match hint {
Some(id) => id,
None => self.topology.current_node(),
};
n.min(self.topology.num_nodes.saturating_sub(1))
}
}
pub fn validate_node_id(topology: &NumaTopology, node_id: usize) -> CoreResult<()> {
if node_id < topology.num_nodes {
Ok(())
} else {
Err(CoreError::InvalidArgument(ErrorContext::new(format!(
"NUMA node_id {node_id} is out of range (topology has {} nodes)",
topology.num_nodes
))))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_topology_discover_returns_at_least_one_node() {
let topo = NumaTopology::discover();
assert!(topo.num_nodes >= 1);
assert_eq!(topo.nodes.len(), topo.num_nodes);
for node in &topo.nodes {
assert!(!node.cpu_ids.is_empty());
}
}
#[test]
fn test_current_node_within_bounds() {
let topo = NumaTopology::discover();
let cur = topo.current_node();
assert!(cur < topo.num_nodes);
}
#[test]
fn test_numa_buffer_basic() {
let mut buf: NumaBuffer<f64> = NumaBuffer::new(1024, 0);
assert_eq!(buf.len(), 1024);
assert!(!buf.is_empty());
assert_eq!(buf.node_id(), 0);
assert!(buf.as_slice().iter().all(|&v| v == 0.0));
buf.as_mut_slice()[0] = 3.15;
assert_eq!(buf.as_slice()[0], 3.15);
}
#[test]
fn test_numa_buffer_empty() {
let buf: NumaBuffer<u8> = NumaBuffer::new(0, 0);
assert!(buf.is_empty());
assert_eq!(buf.len(), 0);
}
#[test]
fn test_pool_allocate_deallocate() {
let mut pool: NumaAwarePool<u64> = NumaAwarePool::new(64);
let block = pool.allocate(Some(0));
assert_eq!(block.len(), 64);
assert!(block.iter().all(|&v| v == 0));
pool.deallocate(block, Some(0));
let stats = pool.stats();
assert_eq!(stats[0].1, 1);
let _block2 = pool.allocate(Some(0));
let stats2 = pool.stats();
assert_eq!(stats2[0].1, 0); }
#[test]
fn test_pool_wrong_size_discarded() {
let mut pool: NumaAwarePool<u32> = NumaAwarePool::new(32);
pool.deallocate(vec![0u32; 16], Some(0));
let stats = pool.stats();
assert_eq!(stats[0].1, 0);
}
#[test]
fn test_pool_current_node_allocation() {
let mut pool: NumaAwarePool<i32> = NumaAwarePool::new(8);
let block = pool.allocate(None);
assert_eq!(block.len(), 8);
pool.deallocate(block, None);
}
#[test]
fn test_validate_node_id() {
let topo = NumaTopology::discover();
assert!(validate_node_id(&topo, 0).is_ok());
assert!(validate_node_id(&topo, topo.num_nodes).is_err());
}
#[test]
fn test_node_for_cpu() {
let topo = NumaTopology::discover();
let node = topo.node_for_cpu(0);
assert!(node.is_some());
assert!(topo.node_for_cpu(usize::MAX).is_none());
}
}