1use anyhow::Result;
21use serde::{Deserialize, Serialize};
22use std::collections::{HashMap, VecDeque};
23use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
24use std::sync::Arc;
25use std::time::{Duration, Instant};
26use tokio::sync::RwLock;
27use tracing::info;
28
29#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct NumaConfig {
32 pub enabled: bool,
34 pub preferred_node: Option<usize>,
36 pub auto_detect_topology: bool,
38 pub local_memory_allocation: bool,
40 pub allocation_strategy: NumaAllocationStrategy,
42 pub affinity_mode: CpuAffinityMode,
44 pub buffer_pool_config: NumaBufferPoolConfig,
46 pub cross_socket_optimization: bool,
48 pub interleave_policy: MemoryInterleavePolicy,
50 pub worker_distribution: WorkerDistributionStrategy,
52 pub bandwidth_threshold_mbps: u64,
54 pub enable_memory_migration: bool,
56}
57
58impl Default for NumaConfig {
59 fn default() -> Self {
60 Self {
61 enabled: true,
62 preferred_node: None,
63 auto_detect_topology: true,
64 local_memory_allocation: true,
65 allocation_strategy: NumaAllocationStrategy::LocalFirst,
66 affinity_mode: CpuAffinityMode::Strict,
67 buffer_pool_config: NumaBufferPoolConfig::default(),
68 cross_socket_optimization: true,
69 interleave_policy: MemoryInterleavePolicy::None,
70 worker_distribution: WorkerDistributionStrategy::Balanced,
71 bandwidth_threshold_mbps: 10000,
72 enable_memory_migration: false,
73 }
74 }
75}
76
77#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
79pub enum NumaAllocationStrategy {
80 LocalFirst,
82 Interleave,
84 Preferred(usize),
86 RoundRobin,
88 BandwidthAware,
90 LatencyOptimized,
92}
93
94#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
96pub enum CpuAffinityMode {
97 Strict,
99 Soft,
101 None,
103 NumaLocal,
105 CacheAware,
107}
108
109#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
111pub enum MemoryInterleavePolicy {
112 None,
114 All,
116 Specific(Vec<usize>),
118 PageLevel,
120 CacheLineLevel,
122}
123
124#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
126pub enum WorkerDistributionStrategy {
127 Balanced,
129 Concentrated,
131 Dynamic,
133 BandwidthAware,
135 LatencyOptimized,
137}
138
139#[derive(Debug, Clone, Serialize, Deserialize)]
141pub struct NumaBufferPoolConfig {
142 pub buffer_size: usize,
144 pub buffers_per_node: usize,
146 pub enable_migration: bool,
148 pub max_in_flight: usize,
150 pub pre_allocate: bool,
152 pub use_huge_pages: bool,
154 pub huge_page_size: HugePageSize,
156}
157
158impl Default for NumaBufferPoolConfig {
159 fn default() -> Self {
160 Self {
161 buffer_size: 64 * 1024, buffers_per_node: 1024,
163 enable_migration: false,
164 max_in_flight: 4096,
165 pre_allocate: true,
166 use_huge_pages: false,
167 huge_page_size: HugePageSize::Size2MB,
168 }
169 }
170}
171
172#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
174pub enum HugePageSize {
175 Size2MB,
177 Size1GB,
179}
180
181#[derive(Debug, Clone, Serialize, Deserialize)]
183pub struct NumaNode {
184 pub id: usize,
186 pub cpus: Vec<usize>,
188 pub total_memory: u64,
190 pub free_memory: u64,
192 pub memory_bandwidth_mbps: u64,
194 pub distances: HashMap<usize, u32>,
196 pub online: bool,
198}
199
200#[derive(Debug, Clone, Serialize, Deserialize)]
202pub struct NumaTopology {
203 pub num_nodes: usize,
205 pub nodes: Vec<NumaNode>,
207 pub total_cpus: usize,
209 pub total_memory: u64,
211 pub distance_matrix: Vec<Vec<u32>>,
213 pub cpu_to_node: HashMap<usize, usize>,
215}
216
217#[derive(Debug)]
219pub struct NumaBuffer {
220 data: Vec<u8>,
222 node_id: usize,
224 id: u64,
226 allocated_at: Instant,
228 last_accessed: Instant,
230 access_count: AtomicU64,
232 in_use: AtomicBool,
234}
235
236impl NumaBuffer {
237 pub fn new(size: usize, node_id: usize, id: u64) -> Self {
239 Self {
240 data: vec![0u8; size],
241 node_id,
242 id,
243 allocated_at: Instant::now(),
244 last_accessed: Instant::now(),
245 access_count: AtomicU64::new(0),
246 in_use: AtomicBool::new(false),
247 }
248 }
249
250 pub fn data(&self) -> &[u8] {
252 self.access_count.fetch_add(1, Ordering::Relaxed);
253 &self.data
254 }
255
256 pub fn data_mut(&mut self) -> &mut [u8] {
258 self.access_count.fetch_add(1, Ordering::Relaxed);
259 &mut self.data
260 }
261
262 pub fn size(&self) -> usize {
264 self.data.len()
265 }
266
267 pub fn node_id(&self) -> usize {
269 self.node_id
270 }
271
272 pub fn id(&self) -> u64 {
274 self.id
275 }
276
277 pub fn acquire(&self) -> bool {
279 self.in_use
280 .compare_exchange(false, true, Ordering::SeqCst, Ordering::Relaxed)
281 .is_ok()
282 }
283
284 pub fn release(&self) {
286 self.in_use.store(false, Ordering::Release);
287 }
288
289 pub fn is_in_use(&self) -> bool {
291 self.in_use.load(Ordering::Acquire)
292 }
293}
294
295pub struct NumaBufferPool {
297 buffers: Arc<RwLock<HashMap<usize, VecDeque<NumaBuffer>>>>,
299 config: NumaBufferPoolConfig,
301 next_id: AtomicU64,
303 stats: Arc<RwLock<NumaBufferPoolStats>>,
305 topology: Arc<NumaTopology>,
307}
308
309#[derive(Debug, Clone, Default, Serialize, Deserialize)]
311pub struct NumaBufferPoolStats {
312 pub total_allocations: u64,
314 pub local_allocations: u64,
316 pub remote_allocations: u64,
318 pub buffer_hits: u64,
320 pub buffer_misses: u64,
322 pub current_buffers: u64,
324 pub buffers_in_use: u64,
326 pub total_memory_bytes: u64,
328 pub per_node_stats: HashMap<usize, NodeBufferStats>,
330}
331
332#[derive(Debug, Clone, Default, Serialize, Deserialize)]
334pub struct NodeBufferStats {
335 pub allocations: u64,
337 pub current_buffers: u64,
339 pub memory_bytes: u64,
341 pub avg_access_latency_ns: f64,
343}
344
345impl NumaBufferPool {
346 pub fn new(config: NumaBufferPoolConfig, topology: Arc<NumaTopology>) -> Self {
348 Self {
349 buffers: Arc::new(RwLock::new(HashMap::new())),
350 config,
351 next_id: AtomicU64::new(0),
352 stats: Arc::new(RwLock::new(NumaBufferPoolStats::default())),
353 topology,
354 }
355 }
356
357 pub async fn pre_allocate(&self) -> Result<()> {
359 if !self.config.pre_allocate {
360 return Ok(());
361 }
362
363 let mut buffers = self.buffers.write().await;
364 let mut stats = self.stats.write().await;
365
366 for node in &self.topology.nodes {
367 let node_buffers = buffers.entry(node.id).or_insert_with(VecDeque::new);
368
369 for _ in 0..self.config.buffers_per_node {
370 let id = self.next_id.fetch_add(1, Ordering::SeqCst);
371 let buffer = NumaBuffer::new(self.config.buffer_size, node.id, id);
372 node_buffers.push_back(buffer);
373
374 stats.total_allocations += 1;
375 stats.local_allocations += 1;
376 stats.current_buffers += 1;
377 stats.total_memory_bytes += self.config.buffer_size as u64;
378
379 let node_stats = stats.per_node_stats.entry(node.id).or_default();
380 node_stats.allocations += 1;
381 node_stats.current_buffers += 1;
382 node_stats.memory_bytes += self.config.buffer_size as u64;
383 }
384 }
385
386 info!(
387 "Pre-allocated {} buffers across {} nodes",
388 stats.current_buffers, self.topology.num_nodes
389 );
390
391 Ok(())
392 }
393
394 pub async fn acquire(&self, preferred_node: usize) -> Result<NumaBuffer> {
396 let mut buffers = self.buffers.write().await;
397 let mut stats = self.stats.write().await;
398
399 if let Some(node_buffers) = buffers.get_mut(&preferred_node) {
401 if let Some(buffer) = node_buffers.pop_front() {
402 stats.buffer_hits += 1;
403 stats.buffers_in_use += 1;
404 let node_stats = stats.per_node_stats.entry(preferred_node).or_default();
405 node_stats.current_buffers = node_stats.current_buffers.saturating_sub(1);
406 return Ok(buffer);
407 }
408 }
409
410 for node in &self.topology.nodes {
412 if node.id == preferred_node {
413 continue;
414 }
415 if let Some(node_buffers) = buffers.get_mut(&node.id) {
416 if let Some(buffer) = node_buffers.pop_front() {
417 stats.buffer_hits += 1;
418 stats.buffers_in_use += 1;
419 stats.remote_allocations += 1;
420 let node_stats = stats.per_node_stats.entry(node.id).or_default();
421 node_stats.current_buffers = node_stats.current_buffers.saturating_sub(1);
422 return Ok(buffer);
423 }
424 }
425 }
426
427 stats.buffer_misses += 1;
429 stats.total_allocations += 1;
430 stats.buffers_in_use += 1;
431 stats.total_memory_bytes += self.config.buffer_size as u64;
432
433 let node_stats = stats.per_node_stats.entry(preferred_node).or_default();
434 node_stats.allocations += 1;
435 node_stats.memory_bytes += self.config.buffer_size as u64;
436
437 let id = self.next_id.fetch_add(1, Ordering::SeqCst);
438 Ok(NumaBuffer::new(self.config.buffer_size, preferred_node, id))
439 }
440
441 pub async fn release(&self, buffer: NumaBuffer) {
443 let mut buffers = self.buffers.write().await;
444 let mut stats = self.stats.write().await;
445
446 stats.buffers_in_use = stats.buffers_in_use.saturating_sub(1);
447
448 let node_buffers = buffers.entry(buffer.node_id).or_insert_with(VecDeque::new);
449 let node_stats = stats.per_node_stats.entry(buffer.node_id).or_default();
450 node_stats.current_buffers += 1;
451 stats.current_buffers += 1;
452
453 node_buffers.push_back(buffer);
454 }
455
456 pub async fn get_stats(&self) -> NumaBufferPoolStats {
458 self.stats.read().await.clone()
459 }
460}
461
462pub struct NumaWorker {
464 id: usize,
466 node_id: usize,
468 cpu_affinity: Vec<usize>,
470 running: Arc<AtomicBool>,
472 stats: Arc<RwLock<NumaWorkerStats>>,
474}
475
476#[derive(Debug, Clone, Default, Serialize, Deserialize)]
478pub struct NumaWorkerStats {
479 pub tasks_processed: u64,
481 pub avg_task_latency_us: f64,
483 pub max_task_latency_us: u64,
485 pub cross_node_accesses: u64,
487 pub local_accesses: u64,
489 pub cpu_time_us: u64,
491}
492
493impl NumaWorker {
494 pub fn new(id: usize, node_id: usize, cpu_affinity: Vec<usize>) -> Self {
496 Self {
497 id,
498 node_id,
499 cpu_affinity,
500 running: Arc::new(AtomicBool::new(false)),
501 stats: Arc::new(RwLock::new(NumaWorkerStats::default())),
502 }
503 }
504
505 pub fn id(&self) -> usize {
507 self.id
508 }
509
510 pub fn node_id(&self) -> usize {
512 self.node_id
513 }
514
515 pub fn cpu_affinity(&self) -> &[usize] {
517 &self.cpu_affinity
518 }
519
520 pub fn is_running(&self) -> bool {
522 self.running.load(Ordering::Acquire)
523 }
524
525 pub async fn get_stats(&self) -> NumaWorkerStats {
527 self.stats.read().await.clone()
528 }
529
530 pub async fn record_task(&self, latency_us: u64, is_local: bool) {
532 let mut stats = self.stats.write().await;
533 stats.tasks_processed += 1;
534 stats.avg_task_latency_us =
535 (stats.avg_task_latency_us * (stats.tasks_processed - 1) as f64 + latency_us as f64)
536 / stats.tasks_processed as f64;
537 stats.max_task_latency_us = stats.max_task_latency_us.max(latency_us);
538
539 if is_local {
540 stats.local_accesses += 1;
541 } else {
542 stats.cross_node_accesses += 1;
543 }
544 }
545}
546
547pub struct NumaThreadPool {
549 workers: Arc<RwLock<HashMap<usize, Vec<NumaWorker>>>>,
551 config: NumaConfig,
553 topology: Arc<NumaTopology>,
555 running: Arc<AtomicBool>,
557 stats: Arc<RwLock<NumaThreadPoolStats>>,
559 round_robin_index: AtomicUsize,
561}
562
563#[derive(Debug, Clone, Default, Serialize, Deserialize)]
565pub struct NumaThreadPoolStats {
566 pub total_workers: usize,
568 pub workers_per_node: HashMap<usize, usize>,
570 pub tasks_submitted: u64,
572 pub tasks_completed: u64,
574 pub avg_queue_depth: f64,
576 pub load_imbalance_ratio: f64,
578}
579
580impl NumaThreadPool {
581 pub async fn new(config: NumaConfig, topology: Arc<NumaTopology>) -> Result<Self> {
583 let pool = Self {
584 workers: Arc::new(RwLock::new(HashMap::new())),
585 config,
586 topology,
587 running: Arc::new(AtomicBool::new(false)),
588 stats: Arc::new(RwLock::new(NumaThreadPoolStats::default())),
589 round_robin_index: AtomicUsize::new(0),
590 };
591
592 pool.initialize_workers().await?;
593
594 Ok(pool)
595 }
596
597 async fn initialize_workers(&self) -> Result<()> {
599 let mut workers = self.workers.write().await;
600 let mut stats = self.stats.write().await;
601
602 let workers_per_node = match &self.config.worker_distribution {
603 WorkerDistributionStrategy::Balanced => {
604 let _total_cpus: usize = self.topology.nodes.iter().map(|n| n.cpus.len()).sum();
606 let workers_per_cpu = 1;
607 self.topology
608 .nodes
609 .iter()
610 .map(|n| (n.id, n.cpus.len() * workers_per_cpu))
611 .collect::<HashMap<_, _>>()
612 }
613 WorkerDistributionStrategy::Concentrated => {
614 let preferred = self.config.preferred_node.unwrap_or(0);
616 self.topology
617 .nodes
618 .iter()
619 .map(|n| {
620 if n.id == preferred {
621 (n.id, n.cpus.len() * 2)
622 } else {
623 (n.id, 1)
624 }
625 })
626 .collect()
627 }
628 _ => {
629 self.topology
631 .nodes
632 .iter()
633 .map(|n| (n.id, n.cpus.len()))
634 .collect()
635 }
636 };
637
638 let mut worker_id = 0;
639 for node in &self.topology.nodes {
640 let count = workers_per_node.get(&node.id).copied().unwrap_or(1);
641 let node_workers = workers.entry(node.id).or_insert_with(Vec::new);
642
643 for i in 0..count {
644 let cpu = node.cpus.get(i % node.cpus.len()).copied().unwrap_or(0);
645 let worker = NumaWorker::new(worker_id, node.id, vec![cpu]);
646 node_workers.push(worker);
647 worker_id += 1;
648 }
649
650 stats.workers_per_node.insert(node.id, node_workers.len());
651 }
652
653 stats.total_workers = worker_id;
654
655 info!(
656 "Initialized NUMA thread pool with {} workers across {} nodes",
657 stats.total_workers, self.topology.num_nodes
658 );
659
660 Ok(())
661 }
662
663 pub async fn get_stats(&self) -> NumaThreadPoolStats {
665 self.stats.read().await.clone()
666 }
667
668 pub async fn start(&self) -> Result<()> {
670 self.running.store(true, Ordering::Release);
671 info!("NUMA thread pool started");
672 Ok(())
673 }
674
675 pub async fn stop(&self) -> Result<()> {
677 self.running.store(false, Ordering::Release);
678 info!("NUMA thread pool stopped");
679 Ok(())
680 }
681
682 pub fn is_running(&self) -> bool {
684 self.running.load(Ordering::Acquire)
685 }
686
687 pub async fn get_next_worker(&self) -> Option<usize> {
689 let workers = self.workers.read().await;
690 let total_workers: usize = workers.values().map(|v| v.len()).sum();
691
692 if total_workers == 0 {
693 return None;
694 }
695
696 let index = self.round_robin_index.fetch_add(1, Ordering::SeqCst) % total_workers;
697 Some(index)
698 }
699}
700
701pub struct NumaStreamProcessor {
703 config: NumaConfig,
705 topology: Arc<NumaTopology>,
707 buffer_pool: Arc<NumaBufferPool>,
709 thread_pool: Arc<NumaThreadPool>,
711 running: Arc<AtomicBool>,
713 stats: Arc<RwLock<NumaProcessorStats>>,
715}
716
717#[derive(Debug, Clone, Default, Serialize, Deserialize)]
719pub struct NumaProcessorStats {
720 pub events_processed: u64,
722 pub avg_processing_latency_us: f64,
724 pub max_processing_latency_us: u64,
726 pub memory_bandwidth_utilization: f64,
728 pub cross_node_transfers: u64,
730 pub local_node_hits: u64,
732 pub cache_miss_rate: f64,
734 pub per_node_stats: HashMap<usize, NodeProcessorStats>,
736}
737
738#[derive(Debug, Clone, Default, Serialize, Deserialize)]
740pub struct NodeProcessorStats {
741 pub events_processed: u64,
743 pub avg_latency_us: f64,
745 pub memory_usage_bytes: u64,
747 pub cpu_utilization: f64,
749}
750
751impl NumaStreamProcessor {
752 pub async fn new(config: NumaConfig) -> Result<Self> {
754 let topology = Arc::new(Self::detect_topology(&config).await?);
756
757 let buffer_pool = Arc::new(NumaBufferPool::new(
759 config.buffer_pool_config.clone(),
760 topology.clone(),
761 ));
762
763 buffer_pool.pre_allocate().await?;
765
766 let thread_pool = Arc::new(NumaThreadPool::new(config.clone(), topology.clone()).await?);
768
769 Ok(Self {
770 config,
771 topology,
772 buffer_pool,
773 thread_pool,
774 running: Arc::new(AtomicBool::new(false)),
775 stats: Arc::new(RwLock::new(NumaProcessorStats::default())),
776 })
777 }
778
779 async fn detect_topology(config: &NumaConfig) -> Result<NumaTopology> {
781 if !config.auto_detect_topology {
782 return Ok(NumaTopology {
784 num_nodes: 1,
785 nodes: vec![NumaNode {
786 id: 0,
787 cpus: (0..num_cpus::get()).collect(),
788 total_memory: 8 * 1024 * 1024 * 1024, free_memory: 4 * 1024 * 1024 * 1024,
790 memory_bandwidth_mbps: 50000,
791 distances: HashMap::from([(0, 10)]),
792 online: true,
793 }],
794 total_cpus: num_cpus::get(),
795 total_memory: 8 * 1024 * 1024 * 1024,
796 distance_matrix: vec![vec![10]],
797 cpu_to_node: (0..num_cpus::get()).map(|cpu| (cpu, 0)).collect(),
798 });
799 }
800
801 #[cfg(target_os = "linux")]
804 {
805 Self::detect_linux_numa_topology().await
806 }
807
808 #[cfg(not(target_os = "linux"))]
809 {
810 Self::detect_fallback_topology().await
812 }
813 }
814
815 #[cfg(target_os = "linux")]
816 async fn detect_linux_numa_topology() -> Result<NumaTopology> {
817 use std::fs;
818 use std::path::Path;
819
820 let numa_path = Path::new("/sys/devices/system/node");
821
822 if !numa_path.exists() {
823 return Self::detect_fallback_topology().await;
824 }
825
826 let mut nodes = Vec::new();
827 let mut cpu_to_node = HashMap::new();
828
829 for entry in fs::read_dir(numa_path)? {
831 let entry = entry?;
832 let name = entry.file_name().to_string_lossy().to_string();
833
834 if !name.starts_with("node") {
835 continue;
836 }
837
838 let node_id: usize = name[4..].parse().unwrap_or(0);
839 let node_path = entry.path();
840
841 let cpulist_path = node_path.join("cpulist");
843 let cpus = if cpulist_path.exists() {
844 let content = fs::read_to_string(cpulist_path)?;
845 Self::parse_cpu_list(&content)
846 } else {
847 vec![]
848 };
849
850 for &cpu in &cpus {
852 cpu_to_node.insert(cpu, node_id);
853 }
854
855 let meminfo_path = node_path.join("meminfo");
857 let (total_memory, free_memory) = if meminfo_path.exists() {
858 let content = fs::read_to_string(meminfo_path)?;
859 Self::parse_meminfo(&content)
860 } else {
861 (8 * 1024 * 1024 * 1024, 4 * 1024 * 1024 * 1024)
862 };
863
864 nodes.push(NumaNode {
865 id: node_id,
866 cpus,
867 total_memory,
868 free_memory,
869 memory_bandwidth_mbps: 50000, distances: HashMap::new(),
871 online: true,
872 });
873 }
874
875 if nodes.is_empty() {
876 return Self::detect_fallback_topology().await;
877 }
878
879 nodes.sort_by_key(|n| n.id);
881
882 let distance_path = numa_path.join("node0/distance");
884 let distance_matrix = if distance_path.exists() {
885 Self::read_distance_matrix(&nodes).await?
886 } else {
887 vec![vec![10; nodes.len()]; nodes.len()]
888 };
889
890 for (i, node) in nodes.iter_mut().enumerate() {
892 for (j, &dist) in distance_matrix[i].iter().enumerate() {
893 node.distances.insert(j, dist);
894 }
895 }
896
897 let total_cpus = nodes.iter().map(|n| n.cpus.len()).sum();
898 let total_memory = nodes.iter().map(|n| n.total_memory).sum();
899 let num_nodes = nodes.len();
900
901 info!(
902 "Detected NUMA topology: {} nodes, {} CPUs, {} MB total memory",
903 num_nodes,
904 total_cpus,
905 total_memory / (1024 * 1024)
906 );
907
908 Ok(NumaTopology {
909 num_nodes,
910 nodes,
911 total_cpus,
912 total_memory,
913 distance_matrix,
914 cpu_to_node,
915 })
916 }
917
918 #[cfg(target_os = "linux")]
919 fn parse_cpu_list(content: &str) -> Vec<usize> {
920 let mut cpus = Vec::new();
921
922 for part in content.trim().split(',') {
923 if part.contains('-') {
924 let range: Vec<&str> = part.split('-').collect();
925 if range.len() == 2 {
926 if let (Ok(start), Ok(end)) =
927 (range[0].parse::<usize>(), range[1].parse::<usize>())
928 {
929 cpus.extend(start..=end);
930 }
931 }
932 } else if let Ok(cpu) = part.parse::<usize>() {
933 cpus.push(cpu);
934 }
935 }
936
937 cpus
938 }
939
940 #[cfg(target_os = "linux")]
941 fn parse_meminfo(content: &str) -> (u64, u64) {
942 let mut total = 0u64;
943 let mut free = 0u64;
944
945 for line in content.lines() {
946 if line.contains("MemTotal:") {
947 if let Some(val) = line.split_whitespace().nth(3) {
948 total = val.parse().unwrap_or(0) * 1024; }
950 } else if line.contains("MemFree:") {
951 if let Some(val) = line.split_whitespace().nth(3) {
952 free = val.parse().unwrap_or(0) * 1024;
953 }
954 }
955 }
956
957 (total, free)
958 }
959
960 #[cfg(target_os = "linux")]
961 async fn read_distance_matrix(nodes: &[NumaNode]) -> Result<Vec<Vec<u32>>> {
962 use std::fs;
963
964 let mut matrix = vec![vec![10u32; nodes.len()]; nodes.len()];
965
966 for (i, node) in nodes.iter().enumerate() {
967 let path = format!("/sys/devices/system/node/node{}/distance", node.id);
968 if let Ok(content) = fs::read_to_string(&path) {
969 let distances: Vec<u32> = content
970 .split_whitespace()
971 .filter_map(|s| s.parse().ok())
972 .collect();
973
974 for (j, &dist) in distances.iter().enumerate() {
975 if j < nodes.len() {
976 matrix[i][j] = dist;
977 }
978 }
979 }
980 }
981
982 Ok(matrix)
983 }
984
985 async fn detect_fallback_topology() -> Result<NumaTopology> {
986 let num_cpus = num_cpus::get();
987
988 Ok(NumaTopology {
989 num_nodes: 1,
990 nodes: vec![NumaNode {
991 id: 0,
992 cpus: (0..num_cpus).collect(),
993 total_memory: 8 * 1024 * 1024 * 1024,
994 free_memory: 4 * 1024 * 1024 * 1024,
995 memory_bandwidth_mbps: 50000,
996 distances: HashMap::from([(0, 10)]),
997 online: true,
998 }],
999 total_cpus: num_cpus,
1000 total_memory: 8 * 1024 * 1024 * 1024,
1001 distance_matrix: vec![vec![10]],
1002 cpu_to_node: (0..num_cpus).map(|cpu| (cpu, 0)).collect(),
1003 })
1004 }
1005
1006 pub async fn start(&self) -> Result<()> {
1008 self.running.store(true, Ordering::Release);
1009 self.thread_pool.start().await?;
1010 info!("NUMA stream processor started");
1011 Ok(())
1012 }
1013
1014 pub async fn stop(&self) -> Result<()> {
1016 self.running.store(false, Ordering::Release);
1017 self.thread_pool.stop().await?;
1018 info!("NUMA stream processor stopped");
1019 Ok(())
1020 }
1021
1022 pub async fn process_event(
1024 &self,
1025 data: &[u8],
1026 preferred_node: Option<usize>,
1027 ) -> Result<Vec<u8>> {
1028 let start_time = Instant::now();
1029 let node_id = preferred_node.unwrap_or(0);
1030
1031 let mut buffer = self.buffer_pool.acquire(node_id).await?;
1033
1034 let len = data.len().min(buffer.size());
1036 buffer.data_mut()[..len].copy_from_slice(&data[..len]);
1037
1038 let result = buffer.data()[..len].to_vec();
1040
1041 let latency_us = start_time.elapsed().as_micros() as u64;
1043 let is_local = buffer.node_id() == node_id;
1044
1045 let mut stats = self.stats.write().await;
1046 stats.events_processed += 1;
1047 stats.avg_processing_latency_us = (stats.avg_processing_latency_us
1048 * (stats.events_processed - 1) as f64
1049 + latency_us as f64)
1050 / stats.events_processed as f64;
1051 stats.max_processing_latency_us = stats.max_processing_latency_us.max(latency_us);
1052
1053 if is_local {
1054 stats.local_node_hits += 1;
1055 } else {
1056 stats.cross_node_transfers += 1;
1057 }
1058
1059 let node_stats = stats.per_node_stats.entry(node_id).or_default();
1060 node_stats.events_processed += 1;
1061 node_stats.avg_latency_us = (node_stats.avg_latency_us
1062 * (node_stats.events_processed - 1) as f64
1063 + latency_us as f64)
1064 / node_stats.events_processed as f64;
1065
1066 self.buffer_pool.release(buffer).await;
1068
1069 Ok(result)
1070 }
1071
1072 pub async fn process_batch(
1074 &self,
1075 events: Vec<Vec<u8>>,
1076 preferred_node: Option<usize>,
1077 ) -> Result<Vec<Vec<u8>>> {
1078 let mut results = Vec::with_capacity(events.len());
1079
1080 for event in events {
1081 let result = self.process_event(&event, preferred_node).await?;
1082 results.push(result);
1083 }
1084
1085 Ok(results)
1086 }
1087
1088 pub async fn get_stats(&self) -> NumaProcessorStats {
1090 self.stats.read().await.clone()
1091 }
1092
1093 pub async fn get_buffer_pool_stats(&self) -> NumaBufferPoolStats {
1095 self.buffer_pool.get_stats().await
1096 }
1097
1098 pub async fn get_thread_pool_stats(&self) -> NumaThreadPoolStats {
1100 self.thread_pool.get_stats().await
1101 }
1102
1103 pub fn get_topology(&self) -> &NumaTopology {
1105 &self.topology
1106 }
1107
1108 pub fn get_config(&self) -> &NumaConfig {
1110 &self.config
1111 }
1112
1113 pub fn get_node_for_cpu(&self, cpu: usize) -> Option<usize> {
1115 self.topology.cpu_to_node.get(&cpu).copied()
1116 }
1117
1118 pub fn get_node_distance(&self, from: usize, to: usize) -> u32 {
1120 if from < self.topology.distance_matrix.len()
1121 && to < self.topology.distance_matrix[from].len()
1122 {
1123 self.topology.distance_matrix[from][to]
1124 } else {
1125 10 }
1127 }
1128
1129 pub async fn find_closest_available_node(&self, from: usize) -> usize {
1131 let stats = self.buffer_pool.get_stats().await;
1132
1133 let mut best_node = from;
1134 let mut best_score = u32::MAX;
1135
1136 for node in &self.topology.nodes {
1137 if node.id == from {
1138 best_node = node.id;
1139 break;
1140 }
1141
1142 let distance = self.get_node_distance(from, node.id);
1143 let buffer_count = stats
1144 .per_node_stats
1145 .get(&node.id)
1146 .map(|s| s.current_buffers)
1147 .unwrap_or(0);
1148
1149 let score = distance.saturating_sub(buffer_count as u32 / 100);
1151
1152 if score < best_score {
1153 best_score = score;
1154 best_node = node.id;
1155 }
1156 }
1157
1158 best_node
1159 }
1160}
1161
1162type BandwidthSamples = Arc<RwLock<HashMap<usize, VecDeque<(Instant, u64)>>>>;
1164
1165pub struct MemoryBandwidthMonitor {
1167 samples: BandwidthSamples,
1169 window_size: Duration,
1171 max_samples: usize,
1173}
1174
1175impl MemoryBandwidthMonitor {
1176 pub fn new(window_size: Duration) -> Self {
1178 Self {
1179 samples: Arc::new(RwLock::new(HashMap::new())),
1180 window_size,
1181 max_samples: 1000,
1182 }
1183 }
1184
1185 pub async fn record_sample(&self, node_id: usize, bytes_transferred: u64) {
1187 let mut samples = self.samples.write().await;
1188 let node_samples = samples.entry(node_id).or_insert_with(VecDeque::new);
1189
1190 let now = Instant::now();
1191 node_samples.push_back((now, bytes_transferred));
1192
1193 while node_samples.len() > self.max_samples {
1195 node_samples.pop_front();
1196 }
1197
1198 let cutoff = now - self.window_size;
1200 while let Some((time, _)) = node_samples.front() {
1201 if *time < cutoff {
1202 node_samples.pop_front();
1203 } else {
1204 break;
1205 }
1206 }
1207 }
1208
1209 pub async fn get_bandwidth(&self, node_id: usize) -> f64 {
1211 let samples = self.samples.read().await;
1212
1213 if let Some(node_samples) = samples.get(&node_id) {
1214 if node_samples.len() < 2 {
1215 return 0.0;
1216 }
1217
1218 let first = node_samples.front().unwrap();
1219 let last = node_samples.back().unwrap();
1220
1221 let total_bytes: u64 = node_samples.iter().map(|(_, b)| b).sum();
1222 let duration = last.0.duration_since(first.0);
1223
1224 if duration.as_secs_f64() > 0.0 {
1225 (total_bytes as f64 / duration.as_secs_f64()) / (1024.0 * 1024.0)
1226 } else {
1227 0.0
1228 }
1229 } else {
1230 0.0
1231 }
1232 }
1233
1234 pub async fn get_all_bandwidth(&self) -> HashMap<usize, f64> {
1236 let samples = self.samples.read().await;
1237 let node_ids: Vec<usize> = samples.keys().copied().collect();
1238 drop(samples);
1239
1240 let mut result = HashMap::new();
1241 for node_id in node_ids {
1242 let bandwidth = self.get_bandwidth(node_id).await;
1243 result.insert(node_id, bandwidth);
1244 }
1245
1246 result
1247 }
1248}
1249
1250#[cfg(test)]
1251mod tests {
1252 use super::*;
1253
1254 #[tokio::test]
1255 async fn test_numa_config_default() {
1256 let config = NumaConfig::default();
1257 assert!(config.enabled);
1258 assert!(config.auto_detect_topology);
1259 assert!(config.local_memory_allocation);
1260 }
1261
1262 #[tokio::test]
1263 async fn test_numa_buffer() {
1264 let buffer = NumaBuffer::new(1024, 0, 1);
1265 assert_eq!(buffer.size(), 1024);
1266 assert_eq!(buffer.node_id(), 0);
1267 assert_eq!(buffer.id(), 1);
1268 assert!(!buffer.is_in_use());
1269
1270 assert!(buffer.acquire());
1271 assert!(buffer.is_in_use());
1272 assert!(!buffer.acquire()); buffer.release();
1275 assert!(!buffer.is_in_use());
1276 }
1277
1278 #[tokio::test]
1279 async fn test_numa_topology_detection() {
1280 let config = NumaConfig {
1281 auto_detect_topology: false, ..Default::default()
1283 };
1284
1285 let processor = NumaStreamProcessor::new(config).await.unwrap();
1286 let topology = processor.get_topology();
1287
1288 assert!(topology.num_nodes >= 1);
1289 assert!(topology.total_cpus >= 1);
1290 assert!(!topology.nodes.is_empty());
1291 }
1292
1293 #[tokio::test]
1294 async fn test_numa_buffer_pool() {
1295 let topology = Arc::new(NumaTopology {
1296 num_nodes: 1,
1297 nodes: vec![NumaNode {
1298 id: 0,
1299 cpus: vec![0, 1, 2, 3],
1300 total_memory: 8 * 1024 * 1024 * 1024,
1301 free_memory: 4 * 1024 * 1024 * 1024,
1302 memory_bandwidth_mbps: 50000,
1303 distances: HashMap::from([(0, 10)]),
1304 online: true,
1305 }],
1306 total_cpus: 4,
1307 total_memory: 8 * 1024 * 1024 * 1024,
1308 distance_matrix: vec![vec![10]],
1309 cpu_to_node: (0..4).map(|cpu| (cpu, 0)).collect(),
1310 });
1311
1312 let config = NumaBufferPoolConfig {
1313 buffer_size: 1024,
1314 buffers_per_node: 10,
1315 pre_allocate: true,
1316 ..Default::default()
1317 };
1318
1319 let pool = NumaBufferPool::new(config, topology);
1320 pool.pre_allocate().await.unwrap();
1321
1322 let stats = pool.get_stats().await;
1323 assert_eq!(stats.current_buffers, 10);
1324
1325 let buffer = pool.acquire(0).await.unwrap();
1327 assert_eq!(buffer.node_id(), 0);
1328
1329 pool.release(buffer).await;
1330 }
1331
1332 #[tokio::test]
1333 async fn test_numa_stream_processor() {
1334 let config = NumaConfig {
1335 auto_detect_topology: false,
1336 buffer_pool_config: NumaBufferPoolConfig {
1337 buffer_size: 1024,
1338 buffers_per_node: 10,
1339 pre_allocate: true,
1340 ..Default::default()
1341 },
1342 ..Default::default()
1343 };
1344
1345 let processor = NumaStreamProcessor::new(config).await.unwrap();
1346 processor.start().await.unwrap();
1347
1348 let data = vec![1u8, 2, 3, 4, 5];
1350 let result = processor.process_event(&data, Some(0)).await.unwrap();
1351 assert_eq!(result, data);
1352
1353 let stats = processor.get_stats().await;
1354 assert_eq!(stats.events_processed, 1);
1355
1356 processor.stop().await.unwrap();
1357 }
1358
1359 #[tokio::test]
1360 async fn test_numa_batch_processing() {
1361 let config = NumaConfig {
1362 auto_detect_topology: false,
1363 ..Default::default()
1364 };
1365
1366 let processor = NumaStreamProcessor::new(config).await.unwrap();
1367 processor.start().await.unwrap();
1368
1369 let events = vec![vec![1u8, 2, 3], vec![4u8, 5, 6], vec![7u8, 8, 9]];
1370
1371 let results = processor
1372 .process_batch(events.clone(), Some(0))
1373 .await
1374 .unwrap();
1375 assert_eq!(results.len(), 3);
1376 assert_eq!(results, events);
1377
1378 let stats = processor.get_stats().await;
1379 assert_eq!(stats.events_processed, 3);
1380
1381 processor.stop().await.unwrap();
1382 }
1383
1384 #[tokio::test]
1385 async fn test_memory_bandwidth_monitor() {
1386 let monitor = MemoryBandwidthMonitor::new(Duration::from_secs(10));
1387
1388 monitor.record_sample(0, 1024 * 1024).await;
1390 tokio::time::sleep(Duration::from_millis(10)).await;
1391 monitor.record_sample(0, 2 * 1024 * 1024).await;
1392 tokio::time::sleep(Duration::from_millis(10)).await;
1393 monitor.record_sample(0, 3 * 1024 * 1024).await;
1394
1395 let bandwidth = monitor.get_bandwidth(0).await;
1396 assert!(bandwidth >= 0.0);
1397 }
1398
1399 #[tokio::test]
1400 async fn test_numa_thread_pool() {
1401 let topology = Arc::new(NumaTopology {
1402 num_nodes: 2,
1403 nodes: vec![
1404 NumaNode {
1405 id: 0,
1406 cpus: vec![0, 1],
1407 total_memory: 4 * 1024 * 1024 * 1024,
1408 free_memory: 2 * 1024 * 1024 * 1024,
1409 memory_bandwidth_mbps: 50000,
1410 distances: HashMap::from([(0, 10), (1, 20)]),
1411 online: true,
1412 },
1413 NumaNode {
1414 id: 1,
1415 cpus: vec![2, 3],
1416 total_memory: 4 * 1024 * 1024 * 1024,
1417 free_memory: 2 * 1024 * 1024 * 1024,
1418 memory_bandwidth_mbps: 50000,
1419 distances: HashMap::from([(0, 20), (1, 10)]),
1420 online: true,
1421 },
1422 ],
1423 total_cpus: 4,
1424 total_memory: 8 * 1024 * 1024 * 1024,
1425 distance_matrix: vec![vec![10, 20], vec![20, 10]],
1426 cpu_to_node: HashMap::from([(0, 0), (1, 0), (2, 1), (3, 1)]),
1427 });
1428
1429 let config = NumaConfig {
1430 worker_distribution: WorkerDistributionStrategy::Balanced,
1431 ..Default::default()
1432 };
1433
1434 let pool = NumaThreadPool::new(config, topology).await.unwrap();
1435 pool.start().await.unwrap();
1436
1437 let stats = pool.get_stats().await;
1438 assert_eq!(stats.total_workers, 4);
1439 assert!(pool.is_running());
1440
1441 pool.stop().await.unwrap();
1442 assert!(!pool.is_running());
1443 }
1444
1445 #[tokio::test]
1446 async fn test_numa_worker() {
1447 let worker = NumaWorker::new(0, 0, vec![0, 1]);
1448 assert_eq!(worker.id(), 0);
1449 assert_eq!(worker.node_id(), 0);
1450 assert_eq!(worker.cpu_affinity(), &[0, 1]);
1451 assert!(!worker.is_running());
1452
1453 worker.record_task(100, true).await;
1454 let stats = worker.get_stats().await;
1455 assert_eq!(stats.tasks_processed, 1);
1456 assert_eq!(stats.local_accesses, 1);
1457 }
1458}