use anyhow::Result;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, VecDeque};
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::RwLock;
use tracing::info;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NumaConfig {
pub enabled: bool,
pub preferred_node: Option<usize>,
pub auto_detect_topology: bool,
pub local_memory_allocation: bool,
pub allocation_strategy: NumaAllocationStrategy,
pub affinity_mode: CpuAffinityMode,
pub buffer_pool_config: NumaBufferPoolConfig,
pub cross_socket_optimization: bool,
pub interleave_policy: MemoryInterleavePolicy,
pub worker_distribution: WorkerDistributionStrategy,
pub bandwidth_threshold_mbps: u64,
pub enable_memory_migration: bool,
}
impl Default for NumaConfig {
fn default() -> Self {
Self {
enabled: true,
preferred_node: None,
auto_detect_topology: true,
local_memory_allocation: true,
allocation_strategy: NumaAllocationStrategy::LocalFirst,
affinity_mode: CpuAffinityMode::Strict,
buffer_pool_config: NumaBufferPoolConfig::default(),
cross_socket_optimization: true,
interleave_policy: MemoryInterleavePolicy::None,
worker_distribution: WorkerDistributionStrategy::Balanced,
bandwidth_threshold_mbps: 10000,
enable_memory_migration: false,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum NumaAllocationStrategy {
LocalFirst,
Interleave,
Preferred(usize),
RoundRobin,
BandwidthAware,
LatencyOptimized,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum CpuAffinityMode {
Strict,
Soft,
None,
NumaLocal,
CacheAware,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum MemoryInterleavePolicy {
None,
All,
Specific(Vec<usize>),
PageLevel,
CacheLineLevel,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum WorkerDistributionStrategy {
Balanced,
Concentrated,
Dynamic,
BandwidthAware,
LatencyOptimized,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NumaBufferPoolConfig {
pub buffer_size: usize,
pub buffers_per_node: usize,
pub enable_migration: bool,
pub max_in_flight: usize,
pub pre_allocate: bool,
pub use_huge_pages: bool,
pub huge_page_size: HugePageSize,
}
impl Default for NumaBufferPoolConfig {
fn default() -> Self {
Self {
buffer_size: 64 * 1024, buffers_per_node: 1024,
enable_migration: false,
max_in_flight: 4096,
pre_allocate: true,
use_huge_pages: false,
huge_page_size: HugePageSize::Size2MB,
}
}
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
pub enum HugePageSize {
Size2MB,
Size1GB,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NumaNode {
pub id: usize,
pub cpus: Vec<usize>,
pub total_memory: u64,
pub free_memory: u64,
pub memory_bandwidth_mbps: u64,
pub distances: HashMap<usize, u32>,
pub online: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NumaTopology {
pub num_nodes: usize,
pub nodes: Vec<NumaNode>,
pub total_cpus: usize,
pub total_memory: u64,
pub distance_matrix: Vec<Vec<u32>>,
pub cpu_to_node: HashMap<usize, usize>,
}
#[derive(Debug)]
pub struct NumaBuffer {
data: Vec<u8>,
node_id: usize,
id: u64,
allocated_at: Instant,
last_accessed: Instant,
access_count: AtomicU64,
in_use: AtomicBool,
}
impl NumaBuffer {
pub fn new(size: usize, node_id: usize, id: u64) -> Self {
Self {
data: vec![0u8; size],
node_id,
id,
allocated_at: Instant::now(),
last_accessed: Instant::now(),
access_count: AtomicU64::new(0),
in_use: AtomicBool::new(false),
}
}
pub fn data(&self) -> &[u8] {
self.access_count.fetch_add(1, Ordering::Relaxed);
&self.data
}
pub fn data_mut(&mut self) -> &mut [u8] {
self.access_count.fetch_add(1, Ordering::Relaxed);
&mut self.data
}
pub fn size(&self) -> usize {
self.data.len()
}
pub fn node_id(&self) -> usize {
self.node_id
}
pub fn id(&self) -> u64 {
self.id
}
pub fn acquire(&self) -> bool {
self.in_use
.compare_exchange(false, true, Ordering::SeqCst, Ordering::Relaxed)
.is_ok()
}
pub fn release(&self) {
self.in_use.store(false, Ordering::Release);
}
pub fn is_in_use(&self) -> bool {
self.in_use.load(Ordering::Acquire)
}
}
pub struct NumaBufferPool {
buffers: Arc<RwLock<HashMap<usize, VecDeque<NumaBuffer>>>>,
config: NumaBufferPoolConfig,
next_id: AtomicU64,
stats: Arc<RwLock<NumaBufferPoolStats>>,
topology: Arc<NumaTopology>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct NumaBufferPoolStats {
pub total_allocations: u64,
pub local_allocations: u64,
pub remote_allocations: u64,
pub buffer_hits: u64,
pub buffer_misses: u64,
pub current_buffers: u64,
pub buffers_in_use: u64,
pub total_memory_bytes: u64,
pub per_node_stats: HashMap<usize, NodeBufferStats>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct NodeBufferStats {
pub allocations: u64,
pub current_buffers: u64,
pub memory_bytes: u64,
pub avg_access_latency_ns: f64,
}
impl NumaBufferPool {
pub fn new(config: NumaBufferPoolConfig, topology: Arc<NumaTopology>) -> Self {
Self {
buffers: Arc::new(RwLock::new(HashMap::new())),
config,
next_id: AtomicU64::new(0),
stats: Arc::new(RwLock::new(NumaBufferPoolStats::default())),
topology,
}
}
pub async fn pre_allocate(&self) -> Result<()> {
if !self.config.pre_allocate {
return Ok(());
}
let mut buffers = self.buffers.write().await;
let mut stats = self.stats.write().await;
for node in &self.topology.nodes {
let node_buffers = buffers.entry(node.id).or_insert_with(VecDeque::new);
for _ in 0..self.config.buffers_per_node {
let id = self.next_id.fetch_add(1, Ordering::SeqCst);
let buffer = NumaBuffer::new(self.config.buffer_size, node.id, id);
node_buffers.push_back(buffer);
stats.total_allocations += 1;
stats.local_allocations += 1;
stats.current_buffers += 1;
stats.total_memory_bytes += self.config.buffer_size as u64;
let node_stats = stats.per_node_stats.entry(node.id).or_default();
node_stats.allocations += 1;
node_stats.current_buffers += 1;
node_stats.memory_bytes += self.config.buffer_size as u64;
}
}
info!(
"Pre-allocated {} buffers across {} nodes",
stats.current_buffers, self.topology.num_nodes
);
Ok(())
}
pub async fn acquire(&self, preferred_node: usize) -> Result<NumaBuffer> {
let mut buffers = self.buffers.write().await;
let mut stats = self.stats.write().await;
if let Some(node_buffers) = buffers.get_mut(&preferred_node) {
if let Some(buffer) = node_buffers.pop_front() {
stats.buffer_hits += 1;
stats.buffers_in_use += 1;
let node_stats = stats.per_node_stats.entry(preferred_node).or_default();
node_stats.current_buffers = node_stats.current_buffers.saturating_sub(1);
return Ok(buffer);
}
}
for node in &self.topology.nodes {
if node.id == preferred_node {
continue;
}
if let Some(node_buffers) = buffers.get_mut(&node.id) {
if let Some(buffer) = node_buffers.pop_front() {
stats.buffer_hits += 1;
stats.buffers_in_use += 1;
stats.remote_allocations += 1;
let node_stats = stats.per_node_stats.entry(node.id).or_default();
node_stats.current_buffers = node_stats.current_buffers.saturating_sub(1);
return Ok(buffer);
}
}
}
stats.buffer_misses += 1;
stats.total_allocations += 1;
stats.buffers_in_use += 1;
stats.total_memory_bytes += self.config.buffer_size as u64;
let node_stats = stats.per_node_stats.entry(preferred_node).or_default();
node_stats.allocations += 1;
node_stats.memory_bytes += self.config.buffer_size as u64;
let id = self.next_id.fetch_add(1, Ordering::SeqCst);
Ok(NumaBuffer::new(self.config.buffer_size, preferred_node, id))
}
pub async fn release(&self, buffer: NumaBuffer) {
let mut buffers = self.buffers.write().await;
let mut stats = self.stats.write().await;
stats.buffers_in_use = stats.buffers_in_use.saturating_sub(1);
let node_buffers = buffers.entry(buffer.node_id).or_insert_with(VecDeque::new);
let node_stats = stats.per_node_stats.entry(buffer.node_id).or_default();
node_stats.current_buffers += 1;
stats.current_buffers += 1;
node_buffers.push_back(buffer);
}
pub async fn get_stats(&self) -> NumaBufferPoolStats {
self.stats.read().await.clone()
}
}
pub struct NumaWorker {
id: usize,
node_id: usize,
cpu_affinity: Vec<usize>,
running: Arc<AtomicBool>,
stats: Arc<RwLock<NumaWorkerStats>>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct NumaWorkerStats {
pub tasks_processed: u64,
pub avg_task_latency_us: f64,
pub max_task_latency_us: u64,
pub cross_node_accesses: u64,
pub local_accesses: u64,
pub cpu_time_us: u64,
}
impl NumaWorker {
pub fn new(id: usize, node_id: usize, cpu_affinity: Vec<usize>) -> Self {
Self {
id,
node_id,
cpu_affinity,
running: Arc::new(AtomicBool::new(false)),
stats: Arc::new(RwLock::new(NumaWorkerStats::default())),
}
}
pub fn id(&self) -> usize {
self.id
}
pub fn node_id(&self) -> usize {
self.node_id
}
pub fn cpu_affinity(&self) -> &[usize] {
&self.cpu_affinity
}
pub fn is_running(&self) -> bool {
self.running.load(Ordering::Acquire)
}
pub async fn get_stats(&self) -> NumaWorkerStats {
self.stats.read().await.clone()
}
pub async fn record_task(&self, latency_us: u64, is_local: bool) {
let mut stats = self.stats.write().await;
stats.tasks_processed += 1;
stats.avg_task_latency_us =
(stats.avg_task_latency_us * (stats.tasks_processed - 1) as f64 + latency_us as f64)
/ stats.tasks_processed as f64;
stats.max_task_latency_us = stats.max_task_latency_us.max(latency_us);
if is_local {
stats.local_accesses += 1;
} else {
stats.cross_node_accesses += 1;
}
}
}
pub struct NumaThreadPool {
workers: Arc<RwLock<HashMap<usize, Vec<NumaWorker>>>>,
config: NumaConfig,
topology: Arc<NumaTopology>,
running: Arc<AtomicBool>,
stats: Arc<RwLock<NumaThreadPoolStats>>,
round_robin_index: AtomicUsize,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct NumaThreadPoolStats {
pub total_workers: usize,
pub workers_per_node: HashMap<usize, usize>,
pub tasks_submitted: u64,
pub tasks_completed: u64,
pub avg_queue_depth: f64,
pub load_imbalance_ratio: f64,
}
impl NumaThreadPool {
pub async fn new(config: NumaConfig, topology: Arc<NumaTopology>) -> Result<Self> {
let pool = Self {
workers: Arc::new(RwLock::new(HashMap::new())),
config,
topology,
running: Arc::new(AtomicBool::new(false)),
stats: Arc::new(RwLock::new(NumaThreadPoolStats::default())),
round_robin_index: AtomicUsize::new(0),
};
pool.initialize_workers().await?;
Ok(pool)
}
async fn initialize_workers(&self) -> Result<()> {
let mut workers = self.workers.write().await;
let mut stats = self.stats.write().await;
let workers_per_node = match &self.config.worker_distribution {
WorkerDistributionStrategy::Balanced => {
let _total_cpus: usize = self.topology.nodes.iter().map(|n| n.cpus.len()).sum();
let workers_per_cpu = 1;
self.topology
.nodes
.iter()
.map(|n| (n.id, n.cpus.len() * workers_per_cpu))
.collect::<HashMap<_, _>>()
}
WorkerDistributionStrategy::Concentrated => {
let preferred = self.config.preferred_node.unwrap_or(0);
self.topology
.nodes
.iter()
.map(|n| {
if n.id == preferred {
(n.id, n.cpus.len() * 2)
} else {
(n.id, 1)
}
})
.collect()
}
_ => {
self.topology
.nodes
.iter()
.map(|n| (n.id, n.cpus.len()))
.collect()
}
};
let mut worker_id = 0;
for node in &self.topology.nodes {
let count = workers_per_node.get(&node.id).copied().unwrap_or(1);
let node_workers = workers.entry(node.id).or_insert_with(Vec::new);
for i in 0..count {
let cpu = node.cpus.get(i % node.cpus.len()).copied().unwrap_or(0);
let worker = NumaWorker::new(worker_id, node.id, vec![cpu]);
node_workers.push(worker);
worker_id += 1;
}
stats.workers_per_node.insert(node.id, node_workers.len());
}
stats.total_workers = worker_id;
info!(
"Initialized NUMA thread pool with {} workers across {} nodes",
stats.total_workers, self.topology.num_nodes
);
Ok(())
}
pub async fn get_stats(&self) -> NumaThreadPoolStats {
self.stats.read().await.clone()
}
pub async fn start(&self) -> Result<()> {
self.running.store(true, Ordering::Release);
info!("NUMA thread pool started");
Ok(())
}
pub async fn stop(&self) -> Result<()> {
self.running.store(false, Ordering::Release);
info!("NUMA thread pool stopped");
Ok(())
}
pub fn is_running(&self) -> bool {
self.running.load(Ordering::Acquire)
}
pub async fn get_next_worker(&self) -> Option<usize> {
let workers = self.workers.read().await;
let total_workers: usize = workers.values().map(|v| v.len()).sum();
if total_workers == 0 {
return None;
}
let index = self.round_robin_index.fetch_add(1, Ordering::SeqCst) % total_workers;
Some(index)
}
}
pub struct NumaStreamProcessor {
config: NumaConfig,
topology: Arc<NumaTopology>,
buffer_pool: Arc<NumaBufferPool>,
thread_pool: Arc<NumaThreadPool>,
running: Arc<AtomicBool>,
stats: Arc<RwLock<NumaProcessorStats>>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct NumaProcessorStats {
pub events_processed: u64,
pub avg_processing_latency_us: f64,
pub max_processing_latency_us: u64,
pub memory_bandwidth_utilization: f64,
pub cross_node_transfers: u64,
pub local_node_hits: u64,
pub cache_miss_rate: f64,
pub per_node_stats: HashMap<usize, NodeProcessorStats>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct NodeProcessorStats {
pub events_processed: u64,
pub avg_latency_us: f64,
pub memory_usage_bytes: u64,
pub cpu_utilization: f64,
}
impl NumaStreamProcessor {
pub async fn new(config: NumaConfig) -> Result<Self> {
let topology = Arc::new(Self::detect_topology(&config).await?);
let buffer_pool = Arc::new(NumaBufferPool::new(
config.buffer_pool_config.clone(),
topology.clone(),
));
buffer_pool.pre_allocate().await?;
let thread_pool = Arc::new(NumaThreadPool::new(config.clone(), topology.clone()).await?);
Ok(Self {
config,
topology,
buffer_pool,
thread_pool,
running: Arc::new(AtomicBool::new(false)),
stats: Arc::new(RwLock::new(NumaProcessorStats::default())),
})
}
async fn detect_topology(config: &NumaConfig) -> Result<NumaTopology> {
if !config.auto_detect_topology {
return Ok(NumaTopology {
num_nodes: 1,
nodes: vec![NumaNode {
id: 0,
cpus: (0..num_cpus::get()).collect(),
total_memory: 8 * 1024 * 1024 * 1024, free_memory: 4 * 1024 * 1024 * 1024,
memory_bandwidth_mbps: 50000,
distances: HashMap::from([(0, 10)]),
online: true,
}],
total_cpus: num_cpus::get(),
total_memory: 8 * 1024 * 1024 * 1024,
distance_matrix: vec![vec![10]],
cpu_to_node: (0..num_cpus::get()).map(|cpu| (cpu, 0)).collect(),
});
}
#[cfg(target_os = "linux")]
{
Self::detect_linux_numa_topology().await
}
#[cfg(not(target_os = "linux"))]
{
Self::detect_fallback_topology().await
}
}
#[cfg(target_os = "linux")]
async fn detect_linux_numa_topology() -> Result<NumaTopology> {
use std::fs;
use std::path::Path;
let numa_path = Path::new("/sys/devices/system/node");
if !numa_path.exists() {
return Self::detect_fallback_topology().await;
}
let mut nodes = Vec::new();
let mut cpu_to_node = HashMap::new();
for entry in fs::read_dir(numa_path)? {
let entry = entry?;
let name = entry.file_name().to_string_lossy().to_string();
if !name.starts_with("node") {
continue;
}
let node_id: usize = name[4..].parse().unwrap_or(0);
let node_path = entry.path();
let cpulist_path = node_path.join("cpulist");
let cpus = if cpulist_path.exists() {
let content = fs::read_to_string(cpulist_path)?;
Self::parse_cpu_list(&content)
} else {
vec![]
};
for &cpu in &cpus {
cpu_to_node.insert(cpu, node_id);
}
let meminfo_path = node_path.join("meminfo");
let (total_memory, free_memory) = if meminfo_path.exists() {
let content = fs::read_to_string(meminfo_path)?;
Self::parse_meminfo(&content)
} else {
(8 * 1024 * 1024 * 1024, 4 * 1024 * 1024 * 1024)
};
nodes.push(NumaNode {
id: node_id,
cpus,
total_memory,
free_memory,
memory_bandwidth_mbps: 50000, distances: HashMap::new(),
online: true,
});
}
if nodes.is_empty() {
return Self::detect_fallback_topology().await;
}
nodes.sort_by_key(|n| n.id);
let distance_path = numa_path.join("node0/distance");
let distance_matrix = if distance_path.exists() {
Self::read_distance_matrix(&nodes).await?
} else {
vec![vec![10; nodes.len()]; nodes.len()]
};
for (i, node) in nodes.iter_mut().enumerate() {
for (j, &dist) in distance_matrix[i].iter().enumerate() {
node.distances.insert(j, dist);
}
}
let total_cpus = nodes.iter().map(|n| n.cpus.len()).sum();
let total_memory = nodes.iter().map(|n| n.total_memory).sum();
let num_nodes = nodes.len();
info!(
"Detected NUMA topology: {} nodes, {} CPUs, {} MB total memory",
num_nodes,
total_cpus,
total_memory / (1024 * 1024)
);
Ok(NumaTopology {
num_nodes,
nodes,
total_cpus,
total_memory,
distance_matrix,
cpu_to_node,
})
}
#[cfg(target_os = "linux")]
fn parse_cpu_list(content: &str) -> Vec<usize> {
let mut cpus = Vec::new();
for part in content.trim().split(',') {
if part.contains('-') {
let range: Vec<&str> = part.split('-').collect();
if range.len() == 2 {
if let (Ok(start), Ok(end)) =
(range[0].parse::<usize>(), range[1].parse::<usize>())
{
cpus.extend(start..=end);
}
}
} else if let Ok(cpu) = part.parse::<usize>() {
cpus.push(cpu);
}
}
cpus
}
#[cfg(target_os = "linux")]
fn parse_meminfo(content: &str) -> (u64, u64) {
let mut total = 0u64;
let mut free = 0u64;
for line in content.lines() {
if line.contains("MemTotal:") {
if let Some(val) = line.split_whitespace().nth(3) {
total = val.parse().unwrap_or(0) * 1024; }
} else if line.contains("MemFree:") {
if let Some(val) = line.split_whitespace().nth(3) {
free = val.parse().unwrap_or(0) * 1024;
}
}
}
(total, free)
}
#[cfg(target_os = "linux")]
async fn read_distance_matrix(nodes: &[NumaNode]) -> Result<Vec<Vec<u32>>> {
use std::fs;
let mut matrix = vec![vec![10u32; nodes.len()]; nodes.len()];
for (i, node) in nodes.iter().enumerate() {
let path = format!("/sys/devices/system/node/node{}/distance", node.id);
if let Ok(content) = fs::read_to_string(&path) {
let distances: Vec<u32> = content
.split_whitespace()
.filter_map(|s| s.parse().ok())
.collect();
for (j, &dist) in distances.iter().enumerate() {
if j < nodes.len() {
matrix[i][j] = dist;
}
}
}
}
Ok(matrix)
}
async fn detect_fallback_topology() -> Result<NumaTopology> {
let num_cpus = num_cpus::get();
Ok(NumaTopology {
num_nodes: 1,
nodes: vec![NumaNode {
id: 0,
cpus: (0..num_cpus).collect(),
total_memory: 8 * 1024 * 1024 * 1024,
free_memory: 4 * 1024 * 1024 * 1024,
memory_bandwidth_mbps: 50000,
distances: HashMap::from([(0, 10)]),
online: true,
}],
total_cpus: num_cpus,
total_memory: 8 * 1024 * 1024 * 1024,
distance_matrix: vec![vec![10]],
cpu_to_node: (0..num_cpus).map(|cpu| (cpu, 0)).collect(),
})
}
pub async fn start(&self) -> Result<()> {
self.running.store(true, Ordering::Release);
self.thread_pool.start().await?;
info!("NUMA stream processor started");
Ok(())
}
pub async fn stop(&self) -> Result<()> {
self.running.store(false, Ordering::Release);
self.thread_pool.stop().await?;
info!("NUMA stream processor stopped");
Ok(())
}
pub async fn process_event(
&self,
data: &[u8],
preferred_node: Option<usize>,
) -> Result<Vec<u8>> {
let start_time = Instant::now();
let node_id = preferred_node.unwrap_or(0);
let mut buffer = self.buffer_pool.acquire(node_id).await?;
let len = data.len().min(buffer.size());
buffer.data_mut()[..len].copy_from_slice(&data[..len]);
let result = buffer.data()[..len].to_vec();
let latency_us = start_time.elapsed().as_micros() as u64;
let is_local = buffer.node_id() == node_id;
let mut stats = self.stats.write().await;
stats.events_processed += 1;
stats.avg_processing_latency_us = (stats.avg_processing_latency_us
* (stats.events_processed - 1) as f64
+ latency_us as f64)
/ stats.events_processed as f64;
stats.max_processing_latency_us = stats.max_processing_latency_us.max(latency_us);
if is_local {
stats.local_node_hits += 1;
} else {
stats.cross_node_transfers += 1;
}
let node_stats = stats.per_node_stats.entry(node_id).or_default();
node_stats.events_processed += 1;
node_stats.avg_latency_us = (node_stats.avg_latency_us
* (node_stats.events_processed - 1) as f64
+ latency_us as f64)
/ node_stats.events_processed as f64;
self.buffer_pool.release(buffer).await;
Ok(result)
}
pub async fn process_batch(
&self,
events: Vec<Vec<u8>>,
preferred_node: Option<usize>,
) -> Result<Vec<Vec<u8>>> {
let mut results = Vec::with_capacity(events.len());
for event in events {
let result = self.process_event(&event, preferred_node).await?;
results.push(result);
}
Ok(results)
}
pub async fn get_stats(&self) -> NumaProcessorStats {
self.stats.read().await.clone()
}
pub async fn get_buffer_pool_stats(&self) -> NumaBufferPoolStats {
self.buffer_pool.get_stats().await
}
pub async fn get_thread_pool_stats(&self) -> NumaThreadPoolStats {
self.thread_pool.get_stats().await
}
pub fn get_topology(&self) -> &NumaTopology {
&self.topology
}
pub fn get_config(&self) -> &NumaConfig {
&self.config
}
pub fn get_node_for_cpu(&self, cpu: usize) -> Option<usize> {
self.topology.cpu_to_node.get(&cpu).copied()
}
pub fn get_node_distance(&self, from: usize, to: usize) -> u32 {
if from < self.topology.distance_matrix.len()
&& to < self.topology.distance_matrix[from].len()
{
self.topology.distance_matrix[from][to]
} else {
10 }
}
pub async fn find_closest_available_node(&self, from: usize) -> usize {
let stats = self.buffer_pool.get_stats().await;
let mut best_node = from;
let mut best_score = u32::MAX;
for node in &self.topology.nodes {
if node.id == from {
best_node = node.id;
break;
}
let distance = self.get_node_distance(from, node.id);
let buffer_count = stats
.per_node_stats
.get(&node.id)
.map(|s| s.current_buffers)
.unwrap_or(0);
let score = distance.saturating_sub(buffer_count as u32 / 100);
if score < best_score {
best_score = score;
best_node = node.id;
}
}
best_node
}
}
type BandwidthSamples = Arc<RwLock<HashMap<usize, VecDeque<(Instant, u64)>>>>;
pub struct MemoryBandwidthMonitor {
samples: BandwidthSamples,
window_size: Duration,
max_samples: usize,
}
impl MemoryBandwidthMonitor {
pub fn new(window_size: Duration) -> Self {
Self {
samples: Arc::new(RwLock::new(HashMap::new())),
window_size,
max_samples: 1000,
}
}
pub async fn record_sample(&self, node_id: usize, bytes_transferred: u64) {
let mut samples = self.samples.write().await;
let node_samples = samples.entry(node_id).or_insert_with(VecDeque::new);
let now = Instant::now();
node_samples.push_back((now, bytes_transferred));
while node_samples.len() > self.max_samples {
node_samples.pop_front();
}
let cutoff = now - self.window_size;
while let Some((time, _)) = node_samples.front() {
if *time < cutoff {
node_samples.pop_front();
} else {
break;
}
}
}
pub async fn get_bandwidth(&self, node_id: usize) -> f64 {
let samples = self.samples.read().await;
if let Some(node_samples) = samples.get(&node_id) {
if node_samples.len() < 2 {
return 0.0;
}
let first = node_samples
.front()
.expect("node_samples validated to have at least 2 elements");
let last = node_samples
.back()
.expect("node_samples validated to have at least 2 elements");
let total_bytes: u64 = node_samples.iter().map(|(_, b)| b).sum();
let duration = last.0.duration_since(first.0);
if duration.as_secs_f64() > 0.0 {
(total_bytes as f64 / duration.as_secs_f64()) / (1024.0 * 1024.0)
} else {
0.0
}
} else {
0.0
}
}
pub async fn get_all_bandwidth(&self) -> HashMap<usize, f64> {
let samples = self.samples.read().await;
let node_ids: Vec<usize> = samples.keys().copied().collect();
drop(samples);
let mut result = HashMap::new();
for node_id in node_ids {
let bandwidth = self.get_bandwidth(node_id).await;
result.insert(node_id, bandwidth);
}
result
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_numa_config_default() {
let config = NumaConfig::default();
assert!(config.enabled);
assert!(config.auto_detect_topology);
assert!(config.local_memory_allocation);
}
#[tokio::test]
async fn test_numa_buffer() {
let buffer = NumaBuffer::new(1024, 0, 1);
assert_eq!(buffer.size(), 1024);
assert_eq!(buffer.node_id(), 0);
assert_eq!(buffer.id(), 1);
assert!(!buffer.is_in_use());
assert!(buffer.acquire());
assert!(buffer.is_in_use());
assert!(!buffer.acquire());
buffer.release();
assert!(!buffer.is_in_use());
}
#[tokio::test]
async fn test_numa_topology_detection() {
let config = NumaConfig {
auto_detect_topology: false, ..Default::default()
};
let processor = NumaStreamProcessor::new(config).await.unwrap();
let topology = processor.get_topology();
assert!(topology.num_nodes >= 1);
assert!(topology.total_cpus >= 1);
assert!(!topology.nodes.is_empty());
}
#[tokio::test]
async fn test_numa_buffer_pool() {
let topology = Arc::new(NumaTopology {
num_nodes: 1,
nodes: vec![NumaNode {
id: 0,
cpus: vec![0, 1, 2, 3],
total_memory: 8 * 1024 * 1024 * 1024,
free_memory: 4 * 1024 * 1024 * 1024,
memory_bandwidth_mbps: 50000,
distances: HashMap::from([(0, 10)]),
online: true,
}],
total_cpus: 4,
total_memory: 8 * 1024 * 1024 * 1024,
distance_matrix: vec![vec![10]],
cpu_to_node: (0..4).map(|cpu| (cpu, 0)).collect(),
});
let config = NumaBufferPoolConfig {
buffer_size: 1024,
buffers_per_node: 10,
pre_allocate: true,
..Default::default()
};
let pool = NumaBufferPool::new(config, topology);
pool.pre_allocate().await.unwrap();
let stats = pool.get_stats().await;
assert_eq!(stats.current_buffers, 10);
let buffer = pool.acquire(0).await.unwrap();
assert_eq!(buffer.node_id(), 0);
pool.release(buffer).await;
}
#[tokio::test]
async fn test_numa_stream_processor() {
let config = NumaConfig {
auto_detect_topology: false,
buffer_pool_config: NumaBufferPoolConfig {
buffer_size: 1024,
buffers_per_node: 10,
pre_allocate: true,
..Default::default()
},
..Default::default()
};
let processor = NumaStreamProcessor::new(config).await.unwrap();
processor.start().await.unwrap();
let data = vec![1u8, 2, 3, 4, 5];
let result = processor.process_event(&data, Some(0)).await.unwrap();
assert_eq!(result, data);
let stats = processor.get_stats().await;
assert_eq!(stats.events_processed, 1);
processor.stop().await.unwrap();
}
#[tokio::test]
async fn test_numa_batch_processing() {
let config = NumaConfig {
auto_detect_topology: false,
..Default::default()
};
let processor = NumaStreamProcessor::new(config).await.unwrap();
processor.start().await.unwrap();
let events = vec![vec![1u8, 2, 3], vec![4u8, 5, 6], vec![7u8, 8, 9]];
let results = processor
.process_batch(events.clone(), Some(0))
.await
.unwrap();
assert_eq!(results.len(), 3);
assert_eq!(results, events);
let stats = processor.get_stats().await;
assert_eq!(stats.events_processed, 3);
processor.stop().await.unwrap();
}
#[tokio::test]
async fn test_memory_bandwidth_monitor() {
let monitor = MemoryBandwidthMonitor::new(Duration::from_secs(10));
monitor.record_sample(0, 1024 * 1024).await;
tokio::time::sleep(Duration::from_millis(10)).await;
monitor.record_sample(0, 2 * 1024 * 1024).await;
tokio::time::sleep(Duration::from_millis(10)).await;
monitor.record_sample(0, 3 * 1024 * 1024).await;
let bandwidth = monitor.get_bandwidth(0).await;
assert!(bandwidth >= 0.0);
}
#[tokio::test]
async fn test_numa_thread_pool() {
let topology = Arc::new(NumaTopology {
num_nodes: 2,
nodes: vec![
NumaNode {
id: 0,
cpus: vec![0, 1],
total_memory: 4 * 1024 * 1024 * 1024,
free_memory: 2 * 1024 * 1024 * 1024,
memory_bandwidth_mbps: 50000,
distances: HashMap::from([(0, 10), (1, 20)]),
online: true,
},
NumaNode {
id: 1,
cpus: vec![2, 3],
total_memory: 4 * 1024 * 1024 * 1024,
free_memory: 2 * 1024 * 1024 * 1024,
memory_bandwidth_mbps: 50000,
distances: HashMap::from([(0, 20), (1, 10)]),
online: true,
},
],
total_cpus: 4,
total_memory: 8 * 1024 * 1024 * 1024,
distance_matrix: vec![vec![10, 20], vec![20, 10]],
cpu_to_node: HashMap::from([(0, 0), (1, 0), (2, 1), (3, 1)]),
});
let config = NumaConfig {
worker_distribution: WorkerDistributionStrategy::Balanced,
..Default::default()
};
let pool = NumaThreadPool::new(config, topology).await.unwrap();
pool.start().await.unwrap();
let stats = pool.get_stats().await;
assert_eq!(stats.total_workers, 4);
assert!(pool.is_running());
pool.stop().await.unwrap();
assert!(!pool.is_running());
}
#[tokio::test]
async fn test_numa_worker() {
let worker = NumaWorker::new(0, 0, vec![0, 1]);
assert_eq!(worker.id(), 0);
assert_eq!(worker.node_id(), 0);
assert_eq!(worker.cpu_affinity(), &[0, 1]);
assert!(!worker.is_running());
worker.record_task(100, true).await;
let stats = worker.get_stats().await;
assert_eq!(stats.tasks_processed, 1);
assert_eq!(stats.local_accesses, 1);
}
}