use bytes::BytesMut;
use parking_lot::{Mutex, RwLock};
use std::alloc::{GlobalAlloc, Layout, System};
use std::collections::HashMap;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
#[derive(Debug, Clone, Default)]
pub struct NumaTopology {
pub numa_nodes: Vec<NumaNode>,
pub cpu_to_node: HashMap<usize, usize>,
pub current_node: usize,
}
#[derive(Debug, Clone)]
pub struct NumaNode {
pub id: usize,
pub cpus: Vec<usize>,
pub memory_gb: usize,
pub available_memory: usize,
}
impl NumaTopology {
pub fn detect() -> Self {
let cpu_count = num_cpus::get();
let numa_node_count = (cpu_count / 4).max(1);
let mut numa_nodes = Vec::new();
let mut cpu_to_node = HashMap::new();
for node_id in 0..numa_node_count {
let cpus_per_node = cpu_count / numa_node_count;
let start_cpu = node_id * cpus_per_node;
let end_cpu = if node_id == numa_node_count - 1 {
cpu_count
} else {
start_cpu + cpus_per_node
};
let cpus: Vec<usize> = (start_cpu..end_cpu).collect();
for &cpu in &cpus {
cpu_to_node.insert(cpu, node_id);
}
numa_nodes.push(NumaNode {
id: node_id,
cpus,
memory_gb: 8, available_memory: 8 * 1024 * 1024 * 1024, });
}
let current_node = Self::get_current_numa_node(&cpu_to_node);
Self {
numa_nodes,
cpu_to_node,
current_node,
}
}
fn get_current_numa_node(_cpu_to_node: &HashMap<usize, usize>) -> usize {
0
}
pub fn get_local_node(&self) -> &NumaNode {
&self.numa_nodes[self.current_node]
}
pub fn get_optimal_node_for_cpu(&self, cpu_id: usize) -> Option<&NumaNode> {
self.cpu_to_node
.get(&cpu_id)
.and_then(|&node_id| self.numa_nodes.get(node_id))
}
}
pub struct NumaAwareAllocator {
topology: NumaTopology,
node_pools: Vec<Arc<NodeMemoryPool>>,
allocation_stats: Arc<RwLock<InternalAllocationStats>>,
global_fallback: System,
}
#[derive(Debug)]
struct NodeMemoryPool {
node_id: usize,
small_pool: Mutex<Vec<*mut u8>>, medium_pool: Mutex<Vec<*mut u8>>, large_pool: Mutex<Vec<*mut u8>>, huge_pool: Mutex<Vec<*mut u8>>,
allocated_bytes: AtomicUsize,
deallocated_bytes: AtomicUsize,
pool_hits: AtomicUsize,
pool_misses: AtomicUsize,
}
impl NodeMemoryPool {
fn new(node_id: usize) -> Self {
Self {
node_id,
small_pool: Mutex::new(Vec::with_capacity(1000)),
medium_pool: Mutex::new(Vec::with_capacity(500)),
large_pool: Mutex::new(Vec::with_capacity(100)),
huge_pool: Mutex::new(Vec::with_capacity(50)),
allocated_bytes: AtomicUsize::new(0),
deallocated_bytes: AtomicUsize::new(0),
pool_hits: AtomicUsize::new(0),
pool_misses: AtomicUsize::new(0),
}
}
fn get_pool_for_size(&self, size: usize) -> &Mutex<Vec<*mut u8>> {
if size <= 1024 {
&self.small_pool
} else if size <= 65536 {
&self.medium_pool
} else if size <= 1048576 {
&self.large_pool
} else {
&self.huge_pool
}
}
fn allocate_from_pool(&self, layout: Layout) -> Option<*mut u8> {
let pool = self.get_pool_for_size(layout.size());
let mut pool_vec = pool.lock();
if let Some(ptr) = pool_vec.pop() {
self.pool_hits.fetch_add(1, Ordering::Relaxed);
self.allocated_bytes
.fetch_add(layout.size(), Ordering::Relaxed);
Some(ptr)
} else {
self.pool_misses.fetch_add(1, Ordering::Relaxed);
None
}
}
fn deallocate_to_pool(&self, ptr: *mut u8, layout: Layout) {
let pool = self.get_pool_for_size(layout.size());
let mut pool_vec = pool.lock();
let max_pool_size = match layout.size() {
s if s <= 1024 => 1000,
s if s <= 65536 => 500,
s if s <= 1048576 => 100,
_ => 50,
};
if pool_vec.len() < max_pool_size {
pool_vec.push(ptr);
self.deallocated_bytes
.fetch_add(layout.size(), Ordering::Relaxed);
} else {
unsafe {
std::alloc::dealloc(ptr, layout);
}
}
}
fn get_stats(&self) -> NodePoolStats {
NodePoolStats {
node_id: self.node_id,
allocated_bytes: self.allocated_bytes.load(Ordering::Relaxed),
deallocated_bytes: self.deallocated_bytes.load(Ordering::Relaxed),
pool_hits: self.pool_hits.load(Ordering::Relaxed),
pool_misses: self.pool_misses.load(Ordering::Relaxed),
pool_sizes: PoolSizes {
small: self.small_pool.lock().len(),
medium: self.medium_pool.lock().len(),
large: self.large_pool.lock().len(),
huge: self.huge_pool.lock().len(),
},
}
}
}
#[derive(Debug, Clone, Default)]
pub struct NodePoolStats {
pub node_id: usize,
pub allocated_bytes: usize,
pub deallocated_bytes: usize,
pub pool_hits: usize,
pub pool_misses: usize,
pub pool_sizes: PoolSizes,
}
#[derive(Debug, Clone, Default)]
pub struct PoolSizes {
pub small: usize,
pub medium: usize,
pub large: usize,
pub huge: usize,
}
impl NodePoolStats {
pub fn hit_rate(&self) -> f64 {
let total = self.pool_hits + self.pool_misses;
if total == 0 {
0.0
} else {
self.pool_hits as f64 / total as f64
}
}
pub fn net_allocated(&self) -> i64 {
self.allocated_bytes as i64 - self.deallocated_bytes as i64
}
}
#[derive(Debug, Default, Clone)]
struct InternalAllocationStats {
total_allocations: usize,
total_deallocations: usize,
numa_local_allocs: usize,
numa_remote_allocs: usize,
fallback_allocs: usize,
bytes_allocated: usize,
bytes_deallocated: usize,
}
impl NumaAwareAllocator {
pub fn new() -> Self {
let topology = NumaTopology::detect();
let mut node_pools = Vec::new();
for node in &topology.numa_nodes {
node_pools.push(Arc::new(NodeMemoryPool::new(node.id)));
}
Self {
topology,
node_pools,
allocation_stats: Arc::new(RwLock::new(InternalAllocationStats::default())),
global_fallback: System,
}
}
pub fn allocate_numa_local(&self, layout: Layout) -> *mut u8 {
let current_node = self.topology.current_node;
if let Some(pool) = self.node_pools.get(current_node) {
if let Some(ptr) = pool.allocate_from_pool(layout) {
self.update_stats(|stats| {
stats.total_allocations += 1;
stats.numa_local_allocs += 1;
stats.bytes_allocated += layout.size();
});
return ptr;
}
}
let ptr = unsafe { self.global_fallback.alloc(layout) };
if !ptr.is_null() {
self.update_stats(|stats| {
stats.total_allocations += 1;
stats.fallback_allocs += 1;
stats.bytes_allocated += layout.size();
});
}
ptr
}
pub fn deallocate_numa_aware(&self, ptr: *mut u8, layout: Layout) {
let current_node = self.topology.current_node;
if let Some(pool) = self.node_pools.get(current_node) {
pool.deallocate_to_pool(ptr, layout);
self.update_stats(|stats| {
stats.total_deallocations += 1;
stats.bytes_deallocated += layout.size();
});
} else {
unsafe { self.global_fallback.dealloc(ptr, layout) };
self.update_stats(|stats| {
stats.total_deallocations += 1;
stats.bytes_deallocated += layout.size();
});
}
}
fn update_stats<F>(&self, f: F)
where
F: FnOnce(&mut InternalAllocationStats),
{
let mut stats = self.allocation_stats.write();
f(&mut stats);
}
pub fn get_topology(&self) -> &NumaTopology {
&self.topology
}
pub fn get_allocation_stats(&self) -> AllocationStats {
let stats = self.allocation_stats.read().clone();
AllocationStats {
total_allocations: stats.total_allocations,
total_deallocations: stats.total_deallocations,
numa_local_allocs: stats.numa_local_allocs,
numa_remote_allocs: stats.numa_remote_allocs,
fallback_allocs: stats.fallback_allocs,
bytes_allocated: stats.bytes_allocated,
bytes_deallocated: stats.bytes_deallocated,
}
}
pub fn get_node_stats(&self) -> Vec<NodePoolStats> {
self.node_pools
.iter()
.map(|pool| pool.get_stats())
.collect()
}
pub fn get_comprehensive_stats(&self) -> NumaStats {
NumaStats {
topology: self.topology.clone(),
allocation_stats: self.get_allocation_stats(),
node_stats: self.get_node_stats(),
}
}
}
#[derive(Debug, Clone, Default)]
pub struct AllocationStats {
pub total_allocations: usize,
pub total_deallocations: usize,
pub numa_local_allocs: usize,
pub numa_remote_allocs: usize,
pub fallback_allocs: usize,
pub bytes_allocated: usize,
pub bytes_deallocated: usize,
}
impl AllocationStats {
pub fn numa_locality_rate(&self) -> f64 {
if self.total_allocations == 0 {
0.0
} else {
self.numa_local_allocs as f64 / self.total_allocations as f64
}
}
pub fn pool_efficiency(&self) -> f64 {
let pool_allocs = self.numa_local_allocs + self.numa_remote_allocs;
if self.total_allocations == 0 {
0.0
} else {
pool_allocs as f64 / self.total_allocations as f64
}
}
pub fn net_allocated_bytes(&self) -> i64 {
self.bytes_allocated as i64 - self.bytes_deallocated as i64
}
}
#[derive(Debug, Clone, Default)]
pub struct NumaStats {
pub topology: NumaTopology,
pub allocation_stats: AllocationStats,
pub node_stats: Vec<NodePoolStats>,
}
impl NumaStats {
pub fn report(&self) -> String {
let mut report = String::new();
report.push_str("NUMA Allocation Statistics:\n");
report.push_str(&format!(
"Topology: {} NUMA nodes, {} CPUs\n",
self.topology.numa_nodes.len(),
self.topology
.numa_nodes
.iter()
.map(|n| n.cpus.len())
.sum::<usize>()
));
report.push_str(&format!(
"NUMA Locality: {:.1}% ({}/{} allocations)\n",
self.allocation_stats.numa_locality_rate() * 100.0,
self.allocation_stats.numa_local_allocs,
self.allocation_stats.total_allocations
));
report.push_str(&format!(
"Pool Efficiency: {:.1}% (hit rate across all nodes)\n",
self.allocation_stats.pool_efficiency() * 100.0
));
report.push_str(&format!(
"Memory Usage: {}MB allocated, {}MB net\n",
self.allocation_stats.bytes_allocated / (1024 * 1024),
self.allocation_stats.net_allocated_bytes() / (1024 * 1024)
));
report.push_str("\nPer-Node Statistics:\n");
for node_stat in &self.node_stats {
report.push_str(&format!(
"Node {}: {:.1}% hit rate, {}MB net, pools: S:{} M:{} L:{} H:{}\n",
node_stat.node_id,
node_stat.hit_rate() * 100.0,
node_stat.net_allocated() / (1024 * 1024),
node_stat.pool_sizes.small,
node_stat.pool_sizes.medium,
node_stat.pool_sizes.large,
node_stat.pool_sizes.huge
));
}
report
}
}
pub struct NumaBytesMutAllocator {
numa_allocator: Arc<NumaAwareAllocator>,
cache_line_size: usize,
}
impl NumaBytesMutAllocator {
pub fn new(numa_allocator: Arc<NumaAwareAllocator>) -> Self {
Self {
numa_allocator,
cache_line_size: 64, }
}
pub fn allocate_aligned(&self, capacity: usize) -> BytesMut {
let aligned_capacity = self.align_to_cache_line(capacity);
let layout = Layout::from_size_align(aligned_capacity, self.cache_line_size)
.expect("Invalid layout");
let ptr = self.numa_allocator.allocate_numa_local(layout);
if ptr.is_null() {
BytesMut::with_capacity(capacity)
} else {
BytesMut::with_capacity(capacity)
}
}
fn align_to_cache_line(&self, size: usize) -> usize {
(size + self.cache_line_size - 1) & !(self.cache_line_size - 1)
}
pub fn get_stats(&self) -> NumaStats {
Default::default()
}
}
thread_local! {
static LOCAL_NUMA_ALLOCATOR: std::cell::RefCell<Option<Arc<NumaAwareAllocator>>> =
std::cell::RefCell::new(None);
}
pub fn init_thread_local_numa_allocator() -> Arc<NumaAwareAllocator> {
let allocator = Arc::new(NumaAwareAllocator::new());
LOCAL_NUMA_ALLOCATOR.with(|local| {
*local.borrow_mut() = Some(allocator.clone());
});
allocator
}
pub fn get_thread_local_numa_allocator() -> Option<Arc<NumaAwareAllocator>> {
LOCAL_NUMA_ALLOCATOR.with(|local| local.borrow().clone())
}
pub fn numa_alloc_fast(size: usize) -> Option<*mut u8> {
get_thread_local_numa_allocator().and_then(|allocator| {
let layout = Layout::from_size_align(size, 8).ok()?;
let ptr = allocator.allocate_numa_local(layout);
if ptr.is_null() {
None
} else {
Some(ptr)
}
})
}
pub fn numa_dealloc_fast(ptr: *mut u8, size: usize) {
if let Some(allocator) = get_thread_local_numa_allocator() {
if let Ok(layout) = Layout::from_size_align(size, 8) {
allocator.deallocate_numa_aware(ptr, layout);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_numa_topology_detection() {
let topology = NumaTopology::detect();
assert!(!topology.numa_nodes.is_empty());
assert!(!topology.cpu_to_node.is_empty());
println!("Detected NUMA topology: {:?}", topology);
}
#[test]
fn test_numa_allocator() {
let allocator = NumaAwareAllocator::new();
let layout = Layout::from_size_align(1024, 8).unwrap();
let ptr = allocator.allocate_numa_local(layout);
assert!(!ptr.is_null());
allocator.deallocate_numa_aware(ptr, layout);
let stats = allocator.get_comprehensive_stats();
println!("NUMA stats: {}", stats.report());
}
#[test]
fn test_thread_local_allocator() {
let allocator = init_thread_local_numa_allocator();
let ptr = numa_alloc_fast(2048).expect("Allocation failed");
numa_dealloc_fast(ptr, 2048);
let stats = allocator.get_comprehensive_stats();
assert!(stats.allocation_stats.total_allocations > 0);
}
}