Skip to main content

scirs2_core/memory_efficient/
numa_topology.rs

1//! NUMA topology detection and management.
2//!
3//! This module provides utilities for detecting and working with NUMA (Non-Uniform Memory Access)
4//! topologies on systems that support it. NUMA awareness can significantly improve performance
5//! for memory-intensive operations by reducing cross-node memory access latency.
6//!
7//! # Supported Platforms
8//!
9//! - **Linux**: Full support via `/sys/devices/system/node` interface
10//! - **Windows**: Full support via Windows API (requires `windows-sys` crate)
11//! - **macOS/BSD**: Graceful fallback (returns None - these systems don't typically have NUMA)
12//!
13//! # Optional Features
14//!
15//! - `numa`: Enable libnuma integration for advanced NUMA management on Linux
16
17use crate::error::{CoreError, CoreResult, ErrorContext, ErrorLocation};
18use serde::{Deserialize, Serialize};
19
20/// NUMA node information
21#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
22pub struct NumaNode {
23    /// Node ID
24    pub node_id: usize,
25
26    /// CPU cores associated with this node
27    pub cpu_list: Vec<usize>,
28
29    /// Memory available on this node (in bytes)
30    pub memory_bytes: u64,
31
32    /// Memory free on this node (in bytes)
33    pub memory_free_bytes: u64,
34}
35
36impl NumaNode {
37    /// Create a new NUMA node
38    pub fn new(node_id: usize, cpu_list: Vec<usize>, memory_bytes: u64) -> Self {
39        Self {
40            node_id,
41            cpu_list,
42            memory_bytes,
43            memory_free_bytes: memory_bytes,
44        }
45    }
46
47    /// Get the number of CPUs in this node
48    pub fn num_cpus(&self) -> usize {
49        self.cpu_list.len()
50    }
51
52    /// Check if a CPU belongs to this node
53    pub fn contains_cpu(&self, cpu_id: usize) -> bool {
54        self.cpu_list.contains(&cpu_id)
55    }
56
57    /// Get memory utilization percentage
58    pub fn memory_utilization(&self) -> f64 {
59        if self.memory_bytes == 0 {
60            0.0
61        } else {
62            (self.memory_bytes - self.memory_free_bytes) as f64 / self.memory_bytes as f64
63        }
64    }
65}
66
67/// NUMA topology information for the system
68#[derive(Debug, Clone, Serialize, Deserialize)]
69pub struct NumaTopology {
70    /// All NUMA nodes in the system
71    pub nodes: Vec<NumaNode>,
72
73    /// Whether the system is NUMA-aware
74    pub is_numa: bool,
75}
76
77impl NumaTopology {
78    /// Create a new NUMA topology
79    pub fn new(nodes: Vec<NumaNode>, is_numa: bool) -> Self {
80        Self { nodes, is_numa }
81    }
82
83    /// Get the number of NUMA nodes
84    pub fn num_nodes(&self) -> usize {
85        self.nodes.len()
86    }
87
88    /// Get a specific NUMA node by ID
89    pub fn get_node(&self, node_id: usize) -> Option<&NumaNode> {
90        self.nodes.iter().find(|node| node.node_id == node_id)
91    }
92
93    /// Find which NUMA node contains a specific CPU
94    pub fn find_node_for_cpu(&self, cpu_id: usize) -> Option<&NumaNode> {
95        self.nodes.iter().find(|node| node.contains_cpu(cpu_id))
96    }
97
98    /// Get total memory across all NUMA nodes
99    pub fn total_memory(&self) -> u64 {
100        self.nodes.iter().map(|node| node.memory_bytes).sum()
101    }
102
103    /// Get total free memory across all NUMA nodes
104    pub fn total_free_memory(&self) -> u64 {
105        self.nodes.iter().map(|node| node.memory_free_bytes).sum()
106    }
107
108    /// Detect NUMA topology on the current system
109    ///
110    /// Returns `None` if NUMA is not supported or detection fails
111    pub fn detect() -> Option<Self> {
112        #[cfg(target_os = "linux")]
113        {
114            Self::detect_linux().ok()
115        }
116
117        #[cfg(target_os = "windows")]
118        {
119            Self::detect_windows().ok()
120        }
121
122        #[cfg(not(any(target_os = "linux", target_os = "windows")))]
123        {
124            // macOS, BSD, and other systems - no NUMA support
125            None
126        }
127    }
128
129    /// Detect NUMA topology on Linux using sysfs
130    #[cfg(target_os = "linux")]
131    fn detect_linux() -> CoreResult<Self> {
132        use std::fs;
133        use std::path::Path;
134
135        let node_path = Path::new("/sys/devices/system/node");
136
137        if !node_path.exists() {
138            // No NUMA support - return single node with all CPUs
139            return Self::detect_non_numa();
140        }
141
142        let mut nodes = Vec::new();
143
144        // Iterate through node directories
145        let entries = fs::read_dir(node_path).map_err(|e| {
146            CoreError::IoError(
147                ErrorContext::new(format!("Failed to read NUMA node directory: {e}"))
148                    .with_location(ErrorLocation::new(file!(), line!())),
149            )
150        })?;
151
152        for entry in entries {
153            let entry = entry.map_err(|e| {
154                CoreError::IoError(
155                    ErrorContext::new(format!("Failed to read NUMA directory entry: {e}"))
156                        .with_location(ErrorLocation::new(file!(), line!())),
157                )
158            })?;
159
160            let path = entry.path();
161            let filename = path.file_name().and_then(|s| s.to_str()).unwrap_or("");
162
163            // Check if this is a node directory (e.g., node0, node1)
164            if let Some(node_id_str) = filename.strip_prefix("node") {
165                if let Ok(node_id) = node_id_str.parse::<usize>() {
166                    // Read CPU list
167                    let cpulist_path = path.join("cpulist");
168                    let cpu_list = if cpulist_path.exists() {
169                        let cpulist_str = fs::read_to_string(&cpulist_path).map_err(|e| {
170                            CoreError::IoError(
171                                ErrorContext::new(format!("Failed to read cpulist: {e}"))
172                                    .with_location(ErrorLocation::new(file!(), line!())),
173                            )
174                        })?;
175
176                        Self::parse_cpu_list(&cpulist_str.trim())?
177                    } else {
178                        Vec::new()
179                    };
180
181                    // Read memory info
182                    let meminfo_path = path.join("meminfo");
183                    let (memory_bytes, memory_free_bytes) = if meminfo_path.exists() {
184                        Self::parse_node_meminfo(&meminfo_path)?
185                    } else {
186                        (0, 0)
187                    };
188
189                    let mut node = NumaNode::new(node_id, cpu_list, memory_bytes);
190                    node.memory_free_bytes = memory_free_bytes;
191
192                    nodes.push(node);
193                }
194            }
195        }
196
197        // Sort nodes by ID
198        nodes.sort_by_key(|node| node.node_id);
199
200        if nodes.is_empty() {
201            return Self::detect_non_numa();
202        }
203
204        Ok(Self::new(nodes, true))
205    }
206
207    /// Parse Linux CPU list format (e.g., "0-3,5,7-9")
208    #[cfg(target_os = "linux")]
209    fn parse_cpu_list(cpulist: &str) -> CoreResult<Vec<usize>> {
210        let mut cpus = Vec::new();
211
212        for range in cpulist.split(',') {
213            let range = range.trim();
214            if range.is_empty() {
215                continue;
216            }
217
218            if range.contains('-') {
219                // Range format (e.g., "0-3")
220                let parts: Vec<&str> = range.split('-').collect();
221                if parts.len() == 2 {
222                    let start = parts[0].parse::<usize>().map_err(|e| {
223                        CoreError::InvalidArgument(
224                            ErrorContext::new(format!("Invalid CPU range start: {e}"))
225                                .with_location(ErrorLocation::new(file!(), line!())),
226                        )
227                    })?;
228                    let end = parts[1].parse::<usize>().map_err(|e| {
229                        CoreError::InvalidArgument(
230                            ErrorContext::new(format!("Invalid CPU range end: {e}"))
231                                .with_location(ErrorLocation::new(file!(), line!())),
232                        )
233                    })?;
234
235                    cpus.extend(start..=end);
236                }
237            } else {
238                // Single CPU
239                let cpu = range.parse::<usize>().map_err(|e| {
240                    CoreError::InvalidArgument(
241                        ErrorContext::new(format!("Invalid CPU ID: {e}"))
242                            .with_location(ErrorLocation::new(file!(), line!())),
243                    )
244                })?;
245                cpus.push(cpu);
246            }
247        }
248
249        Ok(cpus)
250    }
251
252    /// Parse NUMA node meminfo file
253    #[cfg(target_os = "linux")]
254    fn parse_node_meminfo(meminfo_path: &std::path::Path) -> CoreResult<(u64, u64)> {
255        use std::fs;
256
257        let contents = fs::read_to_string(meminfo_path).map_err(|e| {
258            CoreError::IoError(
259                ErrorContext::new(format!("Failed to read meminfo: {e}"))
260                    .with_location(ErrorLocation::new(file!(), line!())),
261            )
262        })?;
263
264        let mut total_kb = 0u64;
265        let mut free_kb = 0u64;
266
267        for line in contents.lines() {
268            let parts: Vec<&str> = line.split_whitespace().collect();
269            if parts.len() >= 4 {
270                if parts[2] == "MemTotal:" {
271                    total_kb = parts[3].parse::<u64>().unwrap_or(0);
272                } else if parts[2] == "MemFree:" {
273                    free_kb = parts[3].parse::<u64>().unwrap_or(0);
274                }
275            }
276        }
277
278        // Convert KB to bytes
279        Ok((total_kb * 1024, free_kb * 1024))
280    }
281
282    /// Detect NUMA topology on Windows using `GetLogicalProcessorInformationEx`.
283    ///
284    /// Queries the OS for NUMA node topology via `RelationNumaNode` processor
285    /// relationship records, extracting per-node CPU affinity masks.  Memory
286    /// information is obtained from `GlobalMemoryStatusEx` and distributed
287    /// evenly across all detected nodes as an approximation.
288    ///
289    /// Record layout within `SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX` (64-bit):
290    /// ```text
291    /// [0..4]  Relationship (i32)
292    /// [4..8]  Size (u32)
293    /// [8..12] NodeNumber (u32)                 ← NUMA node ID
294    /// [12..30] Reserved ([u8;18])
295    /// [30..32] GroupCount (u16)
296    /// [32..40] GROUP_AFFINITY::Mask (usize)    ← CPU affinity mask (first field)
297    /// ```
298    #[cfg(target_os = "windows")]
299    fn detect_windows() -> CoreResult<Self> {
300        use windows_sys::Win32::Foundation::FALSE;
301        use windows_sys::Win32::System::SystemInformation::{
302            GetLogicalProcessorInformationEx, GlobalMemoryStatusEx, RelationNumaNode,
303            MEMORYSTATUSEX, SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX,
304        };
305
306        // ----------------------------------------------------------------
307        // 1. Obtain total / available system memory
308        // ----------------------------------------------------------------
309        let (total_mem, free_mem): (u64, u64) = unsafe {
310            let mut mem_status: MEMORYSTATUSEX = std::mem::zeroed();
311            mem_status.dwLength = std::mem::size_of::<MEMORYSTATUSEX>() as u32;
312            if GlobalMemoryStatusEx(&mut mem_status) == FALSE {
313                (0u64, 0u64)
314            } else {
315                (mem_status.ullTotalPhys, mem_status.ullAvailPhys)
316            }
317        };
318
319        // ----------------------------------------------------------------
320        // 2. Query required buffer size (first call returns FALSE with size)
321        // ----------------------------------------------------------------
322        let mut buf_len: u32 = 0;
323        unsafe {
324            GetLogicalProcessorInformationEx(RelationNumaNode, std::ptr::null_mut(), &mut buf_len);
325        }
326        if buf_len == 0 {
327            return Self::detect_non_numa();
328        }
329
330        // ----------------------------------------------------------------
331        // 3. Allocate buffer and retrieve NUMA information
332        // ----------------------------------------------------------------
333        let mut buf: Vec<u8> = vec![0u8; buf_len as usize];
334        let success = unsafe {
335            GetLogicalProcessorInformationEx(
336                RelationNumaNode,
337                buf.as_mut_ptr() as *mut SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX,
338                &mut buf_len,
339            )
340        };
341        if success == FALSE {
342            return Self::detect_non_numa();
343        }
344
345        // ----------------------------------------------------------------
346        // 4. Walk variable-length records
347        // ----------------------------------------------------------------
348        // Offsets (all relative to record start):
349        //   [0..4]  Relationship (i32)
350        //   [4..8]  Size (u32)
351        //   [8..12] NodeNumber (u32)  → node_id
352        //   [32..]  GROUP_AFFINITY::Mask (usize) = CPU affinity mask
353        // Derivation: 8 (header) + 4 (NodeNumber) + 18 (Reserved) + 2 (GroupCount) = 32
354        const NODE_NUMBER_OFFSET: usize = 8;
355        const MASK_OFFSET: usize = 8 + 4 + 18 + 2; // = 32
356        const MASK_SIZE: usize = std::mem::size_of::<usize>();
357        const RELATION_NUMA_NODE: u32 = 1;
358
359        let mut nodes: Vec<NumaNode> = Vec::new();
360        let mut offset: usize = 0;
361
362        while offset + 8 <= buf_len as usize {
363            let record_size = u32::from_ne_bytes([
364                buf[offset + 4],
365                buf[offset + 5],
366                buf[offset + 6],
367                buf[offset + 7],
368            ]) as usize;
369
370            if record_size == 0 || offset + record_size > buf_len as usize {
371                break;
372            }
373
374            let relationship = u32::from_ne_bytes([
375                buf[offset],
376                buf[offset + 1],
377                buf[offset + 2],
378                buf[offset + 3],
379            ]);
380
381            if relationship == RELATION_NUMA_NODE
382                && offset + MASK_OFFSET + MASK_SIZE <= offset + record_size
383            {
384                let node_number = u32::from_ne_bytes([
385                    buf[offset + NODE_NUMBER_OFFSET],
386                    buf[offset + NODE_NUMBER_OFFSET + 1],
387                    buf[offset + NODE_NUMBER_OFFSET + 2],
388                    buf[offset + NODE_NUMBER_OFFSET + 3],
389                ]) as usize;
390
391                let abs_mask_start = offset + MASK_OFFSET;
392                let abs_mask_end = abs_mask_start + MASK_SIZE;
393                if abs_mask_end <= buf.len() {
394                    let mut mask_arr = [0u8; 8];
395                    mask_arr[..MASK_SIZE].copy_from_slice(&buf[abs_mask_start..abs_mask_end]);
396                    let mask = usize::from_ne_bytes(mask_arr);
397
398                    let cpu_list: Vec<usize> = (0..usize::BITS as usize)
399                        .filter(|&bit| (mask >> bit) & 1 == 1)
400                        .collect();
401
402                    // Memory will be balanced after all nodes are collected
403                    let node = NumaNode::new(node_number, cpu_list, 0);
404                    nodes.push(node);
405                }
406            }
407
408            offset += record_size;
409        }
410
411        if nodes.is_empty() {
412            return Self::detect_non_numa();
413        }
414
415        // Distribute memory evenly across detected nodes
416        let node_count = nodes.len() as u64;
417        let per_node_total = if node_count > 0 {
418            total_mem / node_count
419        } else {
420            0
421        };
422        let per_node_free = if node_count > 0 {
423            free_mem / node_count
424        } else {
425            0
426        };
427        for node in &mut nodes {
428            node.memory_bytes = per_node_total;
429            node.memory_free_bytes = per_node_free;
430        }
431
432        nodes.sort_by_key(|node| node.node_id);
433        let is_numa = nodes.len() > 1;
434        Ok(Self::new(nodes, is_numa))
435    }
436
437    /// Fallback for non-NUMA systems
438    fn detect_non_numa() -> CoreResult<Self> {
439        use crate::memory_efficient::platform_memory::PlatformMemoryInfo;
440
441        // Create a single node with all available CPUs and memory
442        let num_cpus = std::thread::available_parallelism()
443            .map(|n| n.get())
444            .unwrap_or(1);
445
446        let cpu_list: Vec<usize> = (0..num_cpus).collect();
447
448        // Get total system memory
449        let memory_info = PlatformMemoryInfo::detect();
450        let (memory_bytes, memory_free_bytes) = if let Some(info) = memory_info {
451            (info.total_memory as u64, info.available_memory as u64)
452        } else {
453            (0, 0)
454        };
455
456        let mut node = NumaNode::new(0, cpu_list, memory_bytes);
457        node.memory_free_bytes = memory_free_bytes;
458
459        Ok(Self::new(vec![node], false))
460    }
461}
462
463/// NUMA-aware memory allocator hint
464#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
465pub enum NumaPolicy {
466    /// Default system policy
467    Default,
468    /// Bind to specific node
469    Bind(usize),
470    /// Interleave across all nodes
471    Interleave,
472    /// Prefer specific node but allow fallback
473    Preferred(usize),
474}
475
476#[cfg(test)]
477mod tests {
478    use super::*;
479
480    #[test]
481    fn test_numa_node_creation() {
482        let node = NumaNode::new(0, vec![0, 1, 2, 3], 8 * 1024 * 1024 * 1024);
483
484        assert_eq!(node.node_id, 0);
485        assert_eq!(node.num_cpus(), 4);
486        assert!(node.contains_cpu(2));
487        assert!(!node.contains_cpu(4));
488    }
489
490    #[test]
491    fn test_numa_topology_creation() {
492        let node0 = NumaNode::new(0, vec![0, 1], 4 * 1024 * 1024 * 1024);
493        let node1 = NumaNode::new(1, vec![2, 3], 4 * 1024 * 1024 * 1024);
494
495        let topology = NumaTopology::new(vec![node0, node1], true);
496
497        assert_eq!(topology.num_nodes(), 2);
498        assert!(topology.is_numa);
499        assert_eq!(topology.total_memory(), 8 * 1024 * 1024 * 1024);
500    }
501
502    #[test]
503    fn test_find_node_for_cpu() {
504        let node0 = NumaNode::new(0, vec![0, 1], 4 * 1024 * 1024 * 1024);
505        let node1 = NumaNode::new(1, vec![2, 3], 4 * 1024 * 1024 * 1024);
506
507        let topology = NumaTopology::new(vec![node0, node1], true);
508
509        let node = topology.find_node_for_cpu(2);
510        assert!(node.is_some());
511        assert_eq!(node.expect("Node not found").node_id, 1);
512    }
513
514    #[test]
515    #[cfg(target_os = "linux")]
516    fn test_parse_cpu_list() {
517        // Test single CPU
518        let cpus = NumaTopology::parse_cpu_list("0").expect("Parse failed");
519        assert_eq!(cpus, vec![0]);
520
521        // Test range
522        let cpus = NumaTopology::parse_cpu_list("0-3").expect("Parse failed");
523        assert_eq!(cpus, vec![0, 1, 2, 3]);
524
525        // Test complex list
526        let cpus = NumaTopology::parse_cpu_list("0-2,5,7-9").expect("Parse failed");
527        assert_eq!(cpus, vec![0, 1, 2, 5, 7, 8, 9]);
528
529        // Test with whitespace
530        let cpus = NumaTopology::parse_cpu_list(" 0-1, 3 ").expect("Parse failed");
531        assert_eq!(cpus, vec![0, 1, 3]);
532    }
533
534    #[test]
535    fn test_numa_detection() {
536        // This test will work differently on different platforms
537        let topology = NumaTopology::detect();
538
539        // Should always return something (even if non-NUMA fallback)
540        // We can't assert much more since it depends on the system
541        if let Some(topo) = topology {
542            assert!(topo.num_nodes() > 0);
543            assert!(topo.total_memory() > 0 || topo.total_memory() == 0); // Allow 0 for test environments
544        }
545    }
546
547    #[test]
548    fn test_memory_utilization() {
549        let mut node = NumaNode::new(0, vec![0, 1], 1000);
550        node.memory_free_bytes = 600;
551
552        let utilization = node.memory_utilization();
553        assert!((utilization - 0.4).abs() < 1e-10);
554    }
555}