use std::alloc::{Layout, alloc, dealloc};
use std::ptr::NonNull;
use std::sync::atomic::{AtomicUsize, Ordering};
pub type NumaNode = u32;
pub type NumaResult<T> = Result<T, NumaError>;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum NumaError {
NotAvailable,
InvalidNode(NumaNode),
AllocationFailed,
PinningFailed,
SystemError(String),
}
impl std::fmt::Display for NumaError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
NumaError::NotAvailable => write!(f, "NUMA not available"),
NumaError::InvalidNode(n) => write!(f, "invalid NUMA node: {}", n),
NumaError::AllocationFailed => write!(f, "NUMA allocation failed"),
NumaError::PinningFailed => write!(f, "thread pinning failed"),
NumaError::SystemError(e) => write!(f, "system error: {}", e),
}
}
}
impl std::error::Error for NumaError {}
#[derive(Debug, Clone)]
pub struct NumaTopology {
pub num_nodes: usize,
pub cpus_per_node: Vec<Vec<usize>>,
pub memory_per_node: Vec<usize>,
pub distances: Vec<Vec<u32>>,
}
impl NumaTopology {
pub fn detect() -> Self {
#[cfg(target_os = "linux")]
{
Self::detect_linux().unwrap_or_else(Self::single_node)
}
#[cfg(not(target_os = "linux"))]
{
Self::single_node()
}
}
pub fn single_node() -> Self {
let num_cpus = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1);
Self {
num_nodes: 1,
cpus_per_node: vec![(0..num_cpus).collect()],
memory_per_node: vec![0], distances: vec![vec![10]], }
}
#[cfg(target_os = "linux")]
fn detect_linux() -> Option<Self> {
use std::fs;
let node_path = "/sys/devices/system/node";
let entries = fs::read_dir(node_path).ok()?;
let mut nodes = Vec::new();
for entry in entries.flatten() {
let name = entry.file_name();
let name_str = name.to_string_lossy();
if name_str.starts_with("node") {
if let Ok(num) = name_str[4..].parse::<usize>() {
nodes.push(num);
}
}
}
if nodes.is_empty() {
return None;
}
nodes.sort();
let num_nodes = nodes.len();
let mut cpus_per_node = Vec::new();
for node in &nodes {
let cpu_path = format!("{}/node{}/cpulist", node_path, node);
let cpulist = fs::read_to_string(cpu_path).ok()?;
let cpus = Self::parse_cpulist(&cpulist);
cpus_per_node.push(cpus);
}
let mut memory_per_node = Vec::new();
for node in &nodes {
let mem_path = format!("{}/node{}/meminfo", node_path, node);
let meminfo = fs::read_to_string(mem_path).unwrap_or_default();
let mem = Self::parse_meminfo(&meminfo);
memory_per_node.push(mem);
}
let mut distances = vec![vec![20u32; num_nodes]; num_nodes];
for i in 0..num_nodes {
distances[i][i] = 10; }
Some(Self {
num_nodes,
cpus_per_node,
memory_per_node,
distances,
})
}
#[cfg(target_os = "linux")]
fn parse_cpulist(cpulist: &str) -> Vec<usize> {
let mut cpus = Vec::new();
for part in cpulist.trim().split(',') {
if part.contains('-') {
let range: Vec<&str> = part.split('-').collect();
if range.len() == 2 {
if let (Ok(start), Ok(end)) =
(range[0].parse::<usize>(), range[1].parse::<usize>())
{
cpus.extend(start..=end);
}
}
} else if let Ok(cpu) = part.parse::<usize>() {
cpus.push(cpu);
}
}
cpus
}
#[cfg(target_os = "linux")]
fn parse_meminfo(meminfo: &str) -> usize {
for line in meminfo.lines() {
if line.starts_with("Node") && line.contains("MemTotal:") {
let parts: Vec<&str> = line.split_whitespace().collect();
if parts.len() >= 4 {
if let Ok(kb) = parts[3].parse::<usize>() {
return kb * 1024; }
}
}
}
0
}
pub fn local_cpus(&self, node: NumaNode) -> &[usize] {
self.cpus_per_node
.get(node as usize)
.map(|v| v.as_slice())
.unwrap_or(&[])
}
pub fn distance(&self, from: NumaNode, to: NumaNode) -> u32 {
self.distances
.get(from as usize)
.and_then(|row| row.get(to as usize))
.copied()
.unwrap_or(u32::MAX)
}
pub fn nearest_nodes(&self, node: NumaNode) -> Vec<NumaNode> {
let mut nodes: Vec<(NumaNode, u32)> = (0..self.num_nodes as NumaNode)
.map(|n| (n, self.distance(node, n)))
.collect();
nodes.sort_by_key(|&(_, d)| d);
nodes.into_iter().map(|(n, _)| n).collect()
}
}
pub struct NumaBuffer {
ptr: NonNull<u8>,
size: usize,
layout: Layout,
node: Option<NumaNode>,
faulted: bool,
}
unsafe impl Send for NumaBuffer {}
unsafe impl Sync for NumaBuffer {}
impl NumaBuffer {
#[inline]
pub fn as_ptr(&self) -> *const u8 {
self.ptr.as_ptr()
}
#[inline]
pub fn as_mut_ptr(&mut self) -> *mut u8 {
self.ptr.as_ptr()
}
#[inline]
pub fn len(&self) -> usize {
self.size
}
#[inline]
pub fn is_empty(&self) -> bool {
self.size == 0
}
#[inline]
pub fn node(&self) -> Option<NumaNode> {
self.node
}
#[inline]
pub fn is_faulted(&self) -> bool {
self.faulted
}
#[inline]
pub fn as_slice(&self) -> &[u8] {
unsafe { std::slice::from_raw_parts(self.ptr.as_ptr(), self.size) }
}
#[inline]
pub fn as_mut_slice(&mut self) -> &mut [u8] {
unsafe { std::slice::from_raw_parts_mut(self.ptr.as_ptr(), self.size) }
}
}
impl Drop for NumaBuffer {
fn drop(&mut self) {
unsafe {
dealloc(self.ptr.as_ptr(), self.layout);
}
}
}
pub struct NumaAllocator {
topology: NumaTopology,
page_size: usize,
allocated: AtomicUsize,
allocations_per_node: Vec<AtomicUsize>,
}
impl NumaAllocator {
pub fn new() -> NumaResult<Self> {
let topology = NumaTopology::detect();
Self::with_topology(topology)
}
pub fn with_topology(topology: NumaTopology) -> NumaResult<Self> {
let page_size = Self::get_page_size();
let allocations_per_node = (0..topology.num_nodes)
.map(|_| AtomicUsize::new(0))
.collect();
Ok(Self {
topology,
page_size,
allocated: AtomicUsize::new(0),
allocations_per_node,
})
}
fn get_page_size() -> usize {
#[cfg(unix)]
{
let page_size = unsafe { libc::sysconf(libc::_SC_PAGESIZE) };
if page_size > 0 {
return page_size as usize;
}
}
4096
}
pub fn allocate_on_node(&self, size: usize, node: NumaNode) -> NumaResult<NumaBuffer> {
if node as usize >= self.topology.num_nodes {
return Err(NumaError::InvalidNode(node));
}
let aligned_size = (size + self.page_size - 1) & !(self.page_size - 1);
let layout = Layout::from_size_align(aligned_size, self.page_size)
.map_err(|_| NumaError::AllocationFailed)?;
let ptr = unsafe { alloc(layout) };
let ptr = NonNull::new(ptr).ok_or(NumaError::AllocationFailed)?;
self.allocated.fetch_add(aligned_size, Ordering::Relaxed);
self.allocations_per_node[node as usize].fetch_add(1, Ordering::Relaxed);
Ok(NumaBuffer {
ptr,
size: aligned_size,
layout,
node: Some(node),
faulted: false,
})
}
pub fn allocate(&self, size: usize) -> NumaResult<NumaBuffer> {
let aligned_size = (size + self.page_size - 1) & !(self.page_size - 1);
let layout = Layout::from_size_align(aligned_size, self.page_size)
.map_err(|_| NumaError::AllocationFailed)?;
let ptr = unsafe { alloc(layout) };
let ptr = NonNull::new(ptr).ok_or(NumaError::AllocationFailed)?;
self.allocated.fetch_add(aligned_size, Ordering::Relaxed);
Ok(NumaBuffer {
ptr,
size: aligned_size,
layout,
node: None,
faulted: false,
})
}
pub fn prefault(&self, buffer: &mut NumaBuffer) {
if buffer.faulted {
return;
}
let page_size = self.page_size;
let ptr = buffer.as_mut_ptr();
let size = buffer.len();
for offset in (0..size).step_by(page_size) {
unsafe {
std::ptr::write_volatile(ptr.add(offset), 0);
}
}
std::sync::atomic::fence(Ordering::SeqCst);
buffer.faulted = true;
}
pub fn topology(&self) -> &NumaTopology {
&self.topology
}
pub fn total_allocated(&self) -> usize {
self.allocated.load(Ordering::Relaxed)
}
pub fn page_size(&self) -> usize {
self.page_size
}
}
impl Default for NumaAllocator {
fn default() -> Self {
Self::new().unwrap_or_else(|_| Self::with_topology(NumaTopology::single_node()).unwrap())
}
}
pub struct ThreadPinner {
topology: NumaTopology,
}
impl ThreadPinner {
pub fn new(topology: NumaTopology) -> Self {
Self { topology }
}
#[cfg(target_os = "linux")]
pub fn pin_to_cpu(&self, cpu: usize) -> NumaResult<()> {
use std::mem::size_of;
unsafe {
let mut cpuset: libc::cpu_set_t = std::mem::zeroed();
libc::CPU_ZERO(&mut cpuset);
libc::CPU_SET(cpu, &mut cpuset);
let result = libc::sched_setaffinity(
0, size_of::<libc::cpu_set_t>(),
&cpuset,
);
if result == 0 {
Ok(())
} else {
Err(NumaError::PinningFailed)
}
}
}
#[cfg(not(target_os = "linux"))]
pub fn pin_to_cpu(&self, _cpu: usize) -> NumaResult<()> {
Err(NumaError::NotAvailable)
}
pub fn pin_to_node(&self, node: NumaNode) -> NumaResult<()> {
let cpus = self.topology.local_cpus(node);
if cpus.is_empty() {
return Err(NumaError::InvalidNode(node));
}
self.pin_to_cpu(cpus[0])
}
#[cfg(target_os = "linux")]
pub fn current_cpu(&self) -> Option<usize> {
unsafe {
let cpu = libc::sched_getcpu();
if cpu >= 0 { Some(cpu as usize) } else { None }
}
}
#[cfg(not(target_os = "linux"))]
pub fn current_cpu(&self) -> Option<usize> {
None
}
pub fn current_node(&self) -> Option<NumaNode> {
let cpu = self.current_cpu()?;
for (node, cpus) in self.topology.cpus_per_node.iter().enumerate() {
if cpus.contains(&cpu) {
return Some(node as NumaNode);
}
}
None
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum AllocationStrategy {
Fixed(NumaNode),
Local,
RoundRobin,
Interleave,
}
pub struct NumaVectorStorage<T> {
buffers: Vec<NumaBuffer>,
len: usize,
capacity: usize,
element_size: usize,
#[allow(dead_code)]
allocator: NumaAllocator,
_phantom: std::marker::PhantomData<T>,
}
impl<T: Copy> NumaVectorStorage<T> {
pub fn with_capacity_on_node(capacity: usize, node: NumaNode) -> NumaResult<Self> {
let allocator = NumaAllocator::new()?;
let element_size = std::mem::size_of::<T>();
let byte_size = capacity * element_size;
let mut buffer = allocator.allocate_on_node(byte_size, node)?;
allocator.prefault(&mut buffer);
Ok(Self {
buffers: vec![buffer],
len: 0,
capacity,
element_size,
allocator,
_phantom: std::marker::PhantomData,
})
}
pub fn get(&self, index: usize) -> Option<&T> {
if index >= self.len {
return None;
}
let buffer = &self.buffers[0];
let offset = index * self.element_size;
if offset + self.element_size > buffer.len() {
return None;
}
unsafe { Some(&*(buffer.as_ptr().add(offset) as *const T)) }
}
pub fn push(&mut self, value: T) -> NumaResult<()> {
if self.len >= self.capacity {
return Err(NumaError::AllocationFailed);
}
let buffer = &mut self.buffers[0];
let offset = self.len * self.element_size;
unsafe {
std::ptr::write(buffer.as_mut_ptr().add(offset) as *mut T, value);
}
self.len += 1;
Ok(())
}
pub fn len(&self) -> usize {
self.len
}
pub fn is_empty(&self) -> bool {
self.len == 0
}
pub fn capacity(&self) -> usize {
self.capacity
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_topology_single_node() {
let topo = NumaTopology::single_node();
assert_eq!(topo.num_nodes, 1);
assert!(!topo.cpus_per_node[0].is_empty());
assert_eq!(topo.distance(0, 0), 10);
}
#[test]
fn test_topology_detect() {
let topo = NumaTopology::detect();
assert!(topo.num_nodes >= 1);
assert!(!topo.cpus_per_node.is_empty());
}
#[test]
fn test_allocator_basic() {
let allocator = NumaAllocator::default();
let buffer = allocator.allocate(4096).unwrap();
assert!(buffer.len() >= 4096);
assert!(!buffer.is_faulted());
}
#[test]
fn test_allocator_on_node() {
let allocator = NumaAllocator::default();
let buffer = allocator.allocate_on_node(8192, 0).unwrap();
assert!(buffer.len() >= 8192);
assert_eq!(buffer.node(), Some(0));
}
#[test]
fn test_prefault() {
let allocator = NumaAllocator::default();
let mut buffer = allocator.allocate(65536).unwrap();
assert!(!buffer.is_faulted());
allocator.prefault(&mut buffer);
assert!(buffer.is_faulted());
}
#[test]
fn test_buffer_read_write() {
let allocator = NumaAllocator::default();
let mut buffer = allocator.allocate(4096).unwrap();
allocator.prefault(&mut buffer);
let slice = buffer.as_mut_slice();
for (i, byte) in slice.iter_mut().enumerate() {
*byte = (i % 256) as u8;
}
let slice = buffer.as_slice();
for (i, &byte) in slice.iter().enumerate() {
assert_eq!(byte, (i % 256) as u8);
}
}
#[test]
fn test_invalid_node() {
let allocator = NumaAllocator::default();
let result = allocator.allocate_on_node(4096, 999);
assert!(matches!(result, Err(NumaError::InvalidNode(999))));
}
#[test]
fn test_total_allocated() {
let allocator = NumaAllocator::default();
let initial = allocator.total_allocated();
let _b1 = allocator.allocate(4096).unwrap();
let _b2 = allocator.allocate(8192).unwrap();
let total = allocator.total_allocated();
assert!(total >= initial + 4096 + 8192);
}
#[test]
fn test_nearest_nodes() {
let topo = NumaTopology {
num_nodes: 3,
cpus_per_node: vec![vec![0, 1], vec![2, 3], vec![4, 5]],
memory_per_node: vec![0, 0, 0],
distances: vec![vec![10, 20, 30], vec![20, 10, 20], vec![30, 20, 10]],
};
let nearest = topo.nearest_nodes(0);
assert_eq!(nearest[0], 0); assert_eq!(nearest[1], 1); assert_eq!(nearest[2], 2); }
#[test]
fn test_vector_storage() {
let storage: NumaVectorStorage<f32> =
NumaVectorStorage::with_capacity_on_node(100, 0).unwrap();
assert_eq!(storage.len(), 0);
assert_eq!(storage.capacity(), 100);
}
#[test]
fn test_vector_storage_push_get() {
let mut storage: NumaVectorStorage<u64> =
NumaVectorStorage::with_capacity_on_node(10, 0).unwrap();
storage.push(42).unwrap();
storage.push(123).unwrap();
assert_eq!(storage.len(), 2);
assert_eq!(storage.get(0), Some(&42));
assert_eq!(storage.get(1), Some(&123));
assert_eq!(storage.get(2), None);
}
#[test]
fn test_thread_pinner() {
let topo = NumaTopology::detect();
let pinner = ThreadPinner::new(topo);
let _ = pinner.current_cpu();
let _ = pinner.current_node();
}
#[test]
fn test_thread_pinner_pin_to_cpu() {
let topo = NumaTopology::detect();
let pinner = ThreadPinner::new(topo.clone());
if let Some(cpus) = topo.cpus_per_node.first() {
if let Some(&cpu) = cpus.first() {
let result = pinner.pin_to_cpu(cpu);
#[cfg(target_os = "linux")]
assert!(result.is_ok(), "Pin should succeed on Linux");
#[cfg(not(target_os = "linux"))]
assert!(matches!(result, Err(NumaError::NotAvailable)));
}
}
}
#[test]
fn test_thread_pinner_pin_to_node() {
let topo = NumaTopology::detect();
let pinner = ThreadPinner::new(topo.clone());
let result = pinner.pin_to_node(0);
#[cfg(target_os = "linux")]
{
if !topo.cpus_per_node.is_empty() && !topo.cpus_per_node[0].is_empty() {
assert!(result.is_ok());
}
}
#[cfg(not(target_os = "linux"))]
assert!(matches!(result, Err(NumaError::NotAvailable)));
}
#[test]
fn test_thread_pinner_current_cpu_libc() {
let topo = NumaTopology::detect();
let pinner = ThreadPinner::new(topo);
#[cfg(target_os = "linux")]
{
let cpu = pinner.current_cpu();
assert!(cpu.is_some(), "sched_getcpu should work on Linux");
}
#[cfg(not(target_os = "linux"))]
{
let cpu = pinner.current_cpu();
assert!(cpu.is_none());
}
}
#[test]
fn test_page_size_libc() {
let allocator = NumaAllocator::default();
let page_size = allocator.page_size;
assert!(page_size >= 4096);
assert!(page_size.is_power_of_two());
#[cfg(unix)]
{
let libc_page_size = unsafe { libc::sysconf(libc::_SC_PAGESIZE) };
if libc_page_size > 0 {
assert_eq!(page_size, libc_page_size as usize);
}
}
}
}