1use scirs2_core::ndarray::{Array2, ArrayViewMut1, ArrayViewMut2};
33use std::alloc::{GlobalAlloc, Layout, System};
34use std::collections::VecDeque;
35use std::ptr::NonNull;
36use std::sync::Mutex;
37
38#[cfg(any(target_os = "linux", target_os = "android"))]
40use libc;
41#[cfg(target_os = "linux")]
42use std::fs;
43
44use std::sync::atomic::Ordering;
46
47#[cfg(test)]
50use num_cpus;
51
52#[cfg(not(test))]
54mod num_cpus {
55 pub fn get() -> usize {
56 std::thread::available_parallelism()
57 .map(|n| n.get())
58 .unwrap_or(4)
59 }
60}
61
62#[derive(Debug, Clone)]
64pub struct MemoryPoolConfig {
65 pub max_pool_size: usize,
67 pub cache_line_size: usize,
69 pub numa_aware: bool,
71 pub prefetch_distance: usize,
73 pub arena_block_size: usize,
75 pub numa_node_hint: i32,
77 pub auto_numa_discovery: bool,
79 pub enable_thread_affinity: bool,
81 pub enable_memory_warming: bool,
83 pub large_object_threshold: usize,
85 pub max_memory_usage: usize,
87}
88
89impl Default for MemoryPoolConfig {
90 fn default() -> Self {
91 Self {
92 max_pool_size: 1000,
93 cache_line_size: 64,
94 numa_aware: true,
95 prefetch_distance: 8,
96 arena_block_size: 1024 * 1024, numa_node_hint: -1, auto_numa_discovery: true,
99 enable_thread_affinity: true,
100 enable_memory_warming: true,
101 large_object_threshold: 64 * 1024, max_memory_usage: 1024 * 1024 * 1024, }
104 }
105}
106
107pub struct DistancePool {
109 config: MemoryPoolConfig,
110 distance_buffers: Mutex<VecDeque<Box<[f64]>>>,
111 index_buffers: Mutex<VecDeque<Box<[usize]>>>,
112 matrix_buffers: Mutex<VecDeque<Array2<f64>>>,
113 large_buffers: Mutex<VecDeque<Box<[f64]>>>, stats: PoolStatistics,
115 memory_usage: std::sync::atomic::AtomicUsize, numa_node: std::sync::atomic::AtomicI32, }
118
119impl DistancePool {
120 pub fn new(capacity: usize) -> Self {
122 Self::with_config(capacity, MemoryPoolConfig::default())
123 }
124
125 pub fn with_config(capacity: usize, config: MemoryPoolConfig) -> Self {
127 let numa_node = if config.numa_aware && config.numa_node_hint >= 0 {
128 config.numa_node_hint
129 } else {
130 Self::detect_numa_node()
131 };
132
133 Self {
134 config,
135 distance_buffers: Mutex::new(VecDeque::with_capacity(capacity)),
136 index_buffers: Mutex::new(VecDeque::with_capacity(capacity)),
137 matrix_buffers: Mutex::new(VecDeque::with_capacity(capacity / 4)), large_buffers: Mutex::new(VecDeque::with_capacity(capacity / 10)), stats: PoolStatistics::new(),
140 memory_usage: std::sync::atomic::AtomicUsize::new(0),
141 numa_node: std::sync::atomic::AtomicI32::new(numa_node),
142 }
143 }
144
145 pub fn get_distance_buffer(&self, size: usize) -> DistanceBuffer {
147 let buffer_size_bytes = size * std::mem::size_of::<f64>();
149 let is_large = buffer_size_bytes > self.config.large_object_threshold;
150
151 let current_usage = self.memory_usage.load(std::sync::atomic::Ordering::Relaxed);
153 if current_usage + buffer_size_bytes > self.config.max_memory_usage {
154 self.cleanup_excess_memory();
155 }
156
157 let buffer = if is_large {
158 self.get_large_buffer(size)
159 } else {
160 let mut buffers = self.distance_buffers.lock().expect("Operation failed");
161
162 for i in 0..buffers.len() {
164 if buffers[i].len() >= size && buffers[i].len() <= size * 2 {
165 let buffer = buffers.remove(i).expect("Operation failed");
166 self.stats.record_hit();
167 return DistanceBuffer::new(buffer, self);
168 }
169 }
170
171 self.stats.record_miss();
173 self.create_aligned_buffer(size)
174 };
175
176 self.memory_usage
178 .fetch_add(buffer_size_bytes, std::sync::atomic::Ordering::Relaxed);
179
180 DistanceBuffer::new(buffer, self)
181 }
182
183 fn get_large_buffer(&self, size: usize) -> Box<[f64]> {
185 let mut buffers = self.large_buffers.lock().expect("Operation failed");
186
187 for i in 0..buffers.len() {
189 if buffers[i].len() == size {
190 let buffer = buffers.remove(i).expect("Operation failed");
191 self.stats.record_hit();
192 return buffer;
193 }
194 }
195
196 self.stats.record_miss();
198 if self.config.numa_aware {
199 self.create_numa_aligned_buffer(size)
200 } else {
201 self.create_aligned_buffer(size)
202 }
203 }
204
205 pub fn get_index_buffer(&self, size: usize) -> IndexBuffer {
207 let mut buffers = self.index_buffers.lock().expect("Operation failed");
208
209 for i in 0..buffers.len() {
211 if buffers[i].len() >= size && buffers[i].len() <= size * 2 {
212 let buffer = buffers.remove(i).expect("Operation failed");
213 self.stats.record_hit();
214 return IndexBuffer::new(buffer, self);
215 }
216 }
217
218 self.stats.record_miss();
220 let new_buffer = vec![0usize; size].into_boxed_slice();
221 IndexBuffer::new(new_buffer, self)
222 }
223
224 pub fn get_matrix_buffer(&self, rows: usize, cols: usize) -> MatrixBuffer {
226 let mut buffers = self.matrix_buffers.lock().expect("Operation failed");
227
228 for i in 0..buffers.len() {
230 let (r, c) = buffers[i].dim();
231 if r >= rows && c >= cols && r <= rows * 2 && c <= cols * 2 {
232 let mut matrix = buffers.remove(i).expect("Operation failed");
233 matrix = matrix.slice_mut(s![..rows, ..cols]).to_owned();
235 self.stats.record_hit();
236 return MatrixBuffer::new(matrix, self);
237 }
238 }
239
240 self.stats.record_miss();
242 let matrix = Array2::zeros((rows, cols));
243 MatrixBuffer::new(matrix, self)
244 }
245
246 fn create_aligned_buffer(&self, size: usize) -> Box<[f64]> {
248 let layout = Layout::from_size_align(
249 size * std::mem::size_of::<f64>(),
250 self.config.cache_line_size,
251 )
252 .expect("Operation failed");
253
254 unsafe {
255 let ptr = System.alloc(layout) as *mut f64;
256 if ptr.is_null() {
257 panic!("Failed to allocate aligned memory");
258 }
259
260 if self.config.enable_memory_warming {
262 std::ptr::write_bytes(ptr, 0, size);
263 }
264
265 Box::from_raw(std::slice::from_raw_parts_mut(ptr, size))
267 }
268 }
269
270 fn create_numa_aligned_buffer(&self, size: usize) -> Box<[f64]> {
272 let numa_node = self.numa_node.load(Ordering::Relaxed);
273
274 #[cfg(target_os = "linux")]
275 {
276 if self.config.numa_aware && numa_node >= 0 {
277 match Self::allocate_on_numa_node_linux(size, numa_node as u32) {
278 Ok(buffer) => {
279 if self.config.enable_memory_warming {
280 Self::warm_memory(&buffer);
281 }
282 return buffer;
283 }
284 Err(_) => {
285 }
287 }
288 }
289 }
290
291 #[cfg(target_os = "windows")]
292 {
293 if self.config.numa_aware && numa_node >= 0 {
294 match Self::allocate_on_numa_node_windows(size, numa_node as u32) {
295 Ok(buffer) => {
296 if self.config.enable_memory_warming {
297 Self::warm_memory(&buffer);
298 }
299 return buffer;
300 }
301 Err(_) => {
302 }
304 }
305 }
306 }
307
308 let buffer = self.create_aligned_buffer(size);
310
311 if self.config.enable_memory_warming {
313 Self::warm_memory(&buffer);
314 }
315
316 buffer
317 }
318
319 #[cfg(target_os = "linux")]
321 fn allocate_on_numa_node_linux(
322 size: usize,
323 node: u32,
324 ) -> Result<Box<[f64]>, Box<dyn std::error::Error>> {
325 let total_size = size * std::mem::size_of::<f64>();
326 let layout = Layout::from_size_align(total_size, 64)?;
327
328 unsafe {
329 let ptr = System.alloc(layout) as *mut f64;
331 if ptr.is_null() {
332 return Err("Failed to allocate memory".into());
333 }
334
335 std::ptr::write_bytes(ptr, 0, size);
337
338 Ok(Box::from_raw(std::slice::from_raw_parts_mut(ptr, size)))
339 }
340 }
341
342 #[cfg(target_os = "windows")]
344 fn allocate_on_numa_node_windows(
345 size: usize,
346 node: u32,
347 ) -> Result<Box<[f64]>, Box<dyn std::error::Error>> {
348 Err("Windows NUMA allocation not implemented".into())
351 }
352
353 pub fn bind_thread_to_numa_node(node: u32) -> Result<(), Box<dyn std::error::Error>> {
355 #[cfg(target_os = "linux")]
356 {
357 Self::bind_thread_to_numa_node_linux(node)
358 }
359 #[cfg(target_os = "windows")]
360 {
361 Self::bind_thread_to_numa_node_windows(node)
362 }
363 #[cfg(not(any(target_os = "linux", target_os = "windows")))]
364 {
365 Ok(()) }
367 }
368
369 #[cfg(target_os = "linux")]
370 fn bind_thread_to_numa_node_linux(node: u32) -> Result<(), Box<dyn std::error::Error>> {
371 if let Some(_cpu_count) = Self::get_node_cpu_count(node) {
376 let mut cpu_set: libc::cpu_set_t = unsafe { std::mem::zeroed() };
377
378 let cpulist_path = format!("/sys/devices/system/node/node{}/cpulist", node);
380 if let Ok(cpulist) = fs::read_to_string(&cpulist_path) {
381 for range in cpulist.trim().split(',') {
382 if let Some((start, end)) = range.split_once('-') {
383 if let (Ok(s), Ok(e)) = (start.parse::<u32>(), end.parse::<u32>()) {
384 for cpu in s..=e {
385 unsafe { libc::CPU_SET(cpu as usize, &mut cpu_set) };
386 }
387 }
388 } else if let Ok(cpu) = range.parse::<u32>() {
389 unsafe { libc::CPU_SET(cpu as usize, &mut cpu_set) };
390 }
391 }
392
393 unsafe {
395 libc::sched_setaffinity(
396 0, std::mem::size_of::<libc::cpu_set_t>(),
398 &cpu_set,
399 );
400 }
401 }
402 }
403
404 Ok(())
405 }
406
407 #[cfg(target_os = "windows")]
408 fn bind_thread_to_numa_node_windows(node: u32) -> Result<(), Box<dyn std::error::Error>> {
409 Ok(())
411 }
412
413 fn warm_memory(buffer: &[f64]) {
415 if buffer.is_empty() {
416 return;
417 }
418
419 let page_size = 4096; let elements_per_page = page_size / std::mem::size_of::<f64>();
422
423 for i in (0..buffer.len()).step_by(elements_per_page) {
424 unsafe {
426 std::ptr::read_volatile(&buffer[i]);
427 }
428 }
429 }
430
431 fn detect_numa_node() -> i32 {
433 #[cfg(target_os = "linux")]
434 {
435 Self::detect_numa_node_linux().unwrap_or(0)
436 }
437 #[cfg(target_os = "windows")]
438 {
439 Self::detect_numa_node_windows().unwrap_or(0)
440 }
441 #[cfg(not(any(target_os = "linux", target_os = "windows")))]
442 {
443 0 }
445 }
446
447 #[cfg(target_os = "linux")]
449 fn detect_numa_node_linux() -> Option<i32> {
450 let _tid = unsafe { libc::gettid() };
452
453 match Self::get_current_numa_node_linux() {
455 Ok(node) => Some(node),
456 Err(_) => {
457 Self::detect_numa_from_cpu_linux()
459 }
460 }
461 }
462
463 #[cfg(target_os = "linux")]
464 fn get_current_numa_node_linux() -> Result<i32, Box<dyn std::error::Error>> {
465 let mut cpu: u32 = 0;
467 let mut node: u32 = 0;
468
469 let result = unsafe {
470 libc::syscall(
471 libc::SYS_getcpu,
472 &mut cpu as *mut u32,
473 &mut node as *mut u32,
474 std::ptr::null_mut::<libc::c_void>(),
475 )
476 };
477
478 if result == 0 {
479 Ok(node as i32)
480 } else {
481 Err("getcpu syscall failed".into())
482 }
483 }
484
485 #[cfg(target_os = "linux")]
486 fn detect_numa_from_cpu_linux() -> Option<i32> {
487 if let Ok(entries) = fs::read_dir("/sys/devices/system/node") {
489 for entry in entries.flatten() {
490 let name = entry.file_name();
491 if let Some(name_str) = name.to_str() {
492 if let Some(stripped) = name_str.strip_prefix("node") {
493 if let Ok(node_num) = stripped.parse::<i32>() {
494 return Some(node_num);
496 }
497 }
498 }
499 }
500 }
501 None
502 }
503
504 #[cfg(target_os = "windows")]
506 fn detect_numa_node_windows() -> Option<i32> {
507 Some(0)
511 }
512
513 pub fn get_numa_topology() -> NumaTopology {
515 #[cfg(target_os = "linux")]
516 {
517 Self::get_numa_topology_linux()
518 }
519 #[cfg(target_os = "windows")]
520 {
521 Self::get_numa_topology_windows()
522 }
523 #[cfg(not(any(target_os = "linux", target_os = "windows")))]
524 {
525 NumaTopology::default()
526 }
527 }
528
529 #[cfg(target_os = "linux")]
530 fn get_numa_topology_linux() -> NumaTopology {
531 let mut topology = NumaTopology::default();
532
533 if let Ok(entries) = fs::read_dir("/sys/devices/system/node") {
535 for entry in entries.flatten() {
536 let name = entry.file_name();
537 if let Some(name_str) = name.to_str() {
538 if let Some(stripped) = name_str.strip_prefix("node") {
539 if let Ok(_nodeid) = stripped.parse::<u32>() {
540 let meminfo_path =
542 format!("/sys/devices/system/node/{name_str}/meminfo");
543 if let Ok(meminfo) = fs::read_to_string(&meminfo_path) {
544 if let Some(total_kb) = Self::parse_meminfo_total(&meminfo) {
545 topology.nodes.push(NumaNode {
546 id: _nodeid,
547 total_memory_bytes: total_kb * 1024,
548 available_memory_bytes: total_kb * 1024, cpu_count: Self::get_node_cpu_count(_nodeid).unwrap_or(1),
550 });
551 }
552 }
553 }
554 }
555 }
556 }
557 }
558
559 if topology.nodes.is_empty() {
561 topology.nodes.push(NumaNode {
562 id: 0,
563 total_memory_bytes: Self::get_total_system_memory()
564 .unwrap_or(8 * 1024 * 1024 * 1024), available_memory_bytes: Self::get_available_system_memory()
566 .unwrap_or(4 * 1024 * 1024 * 1024), cpu_count: num_cpus::get() as u32,
568 });
569 }
570
571 topology
572 }
573
574 #[cfg(target_os = "linux")]
575 fn parse_meminfo_total(meminfo: &str) -> Option<u64> {
576 for line in meminfo.lines() {
577 if line.starts_with("Node") && line.contains("MemTotal:") {
578 let parts: Vec<&str> = line.split_whitespace().collect();
579 if parts.len() >= 3 {
580 return parts[2].parse().ok();
581 }
582 }
583 }
584 None
585 }
586
587 #[cfg(target_os = "linux")]
588 fn get_node_cpu_count(_nodeid: u32) -> Option<u32> {
589 let cpulist_path = format!("/sys/devices/system/node/node{}/cpulist", _nodeid);
590 if let Ok(cpulist) = fs::read_to_string(&cpulist_path) {
591 let mut count = 0;
593 for range in cpulist.trim().split(',') {
594 if let Some((start, end)) = range.split_once('-') {
595 if let (Ok(s), Ok(e)) = (start.parse::<u32>(), end.parse::<u32>()) {
596 count += e - s + 1;
597 }
598 } else if range.parse::<u32>().is_ok() {
599 count += 1;
600 }
601 }
602 Some(count)
603 } else {
604 None
605 }
606 }
607
608 #[cfg(target_os = "linux")]
609 fn get_total_system_memory() -> Option<u64> {
610 if let Ok(meminfo) = fs::read_to_string("/proc/meminfo") {
611 for line in meminfo.lines() {
612 if line.starts_with("MemTotal:") {
613 let parts: Vec<&str> = line.split_whitespace().collect();
614 if parts.len() >= 2 {
615 return parts[1].parse::<u64>().ok().map(|kb| kb * 1024);
616 }
617 }
618 }
619 }
620 None
621 }
622
623 #[cfg(target_os = "linux")]
624 fn get_available_system_memory() -> Option<u64> {
625 if let Ok(meminfo) = fs::read_to_string("/proc/meminfo") {
626 for line in meminfo.lines() {
627 if line.starts_with("MemAvailable:") {
628 let parts: Vec<&str> = line.split_whitespace().collect();
629 if parts.len() >= 2 {
630 return parts[1].parse::<u64>().ok().map(|kb| kb * 1024);
631 }
632 }
633 }
634 }
635 None
636 }
637
638 #[cfg(target_os = "windows")]
639 fn get_numa_topology_windows() -> NumaTopology {
640 NumaTopology::default()
643 }
644
645 fn cleanup_excess_memory(&self) {
647 let cleanup_ratio = 0.25; {
651 let mut buffers = self.distance_buffers.lock().expect("Operation failed");
652 let cleanup_count = (buffers.len() as f64 * cleanup_ratio) as usize;
653 for _ in 0..cleanup_count {
654 if let Some(buffer) = buffers.pop_back() {
655 let freed_bytes = buffer.len() * std::mem::size_of::<f64>();
656 self.memory_usage
657 .fetch_sub(freed_bytes, std::sync::atomic::Ordering::Relaxed);
658 }
659 }
660 }
661
662 {
663 let mut buffers = self.large_buffers.lock().expect("Operation failed");
664 let cleanup_count = (buffers.len() as f64 * cleanup_ratio) as usize;
665 for _ in 0..cleanup_count {
666 if let Some(buffer) = buffers.pop_back() {
667 let freed_bytes = buffer.len() * std::mem::size_of::<f64>();
668 self.memory_usage
669 .fetch_sub(freed_bytes, std::sync::atomic::Ordering::Relaxed);
670 }
671 }
672 }
673 }
674
675 fn return_distance_buffer(&self, buffer: Box<[f64]>) {
677 let buffer_size_bytes = buffer.len() * std::mem::size_of::<f64>();
678 let is_large = buffer_size_bytes > self.config.large_object_threshold;
679
680 self.memory_usage
682 .fetch_sub(buffer_size_bytes, std::sync::atomic::Ordering::Relaxed);
683
684 if is_large {
685 let mut buffers = self.large_buffers.lock().expect("Operation failed");
686 if buffers.len() < self.config.max_pool_size / 10 {
687 buffers.push_back(buffer);
688 }
689 } else {
691 let mut buffers = self.distance_buffers.lock().expect("Operation failed");
692 if buffers.len() < self.config.max_pool_size {
693 buffers.push_back(buffer);
694 }
695 }
697 }
698
699 fn return_index_buffer(&self, buffer: Box<[usize]>) {
701 let mut buffers = self.index_buffers.lock().expect("Operation failed");
702 if buffers.len() < self.config.max_pool_size {
703 buffers.push_back(buffer);
704 }
705 }
706
707 fn return_matrix_buffer(&self, matrix: Array2<f64>) {
709 let mut buffers = self.matrix_buffers.lock().expect("Operation failed");
710 if buffers.len() < self.config.max_pool_size / 4 {
711 buffers.push_back(matrix);
713 }
714 }
715
716 pub fn statistics(&self) -> PoolStatistics {
718 self.stats.clone()
719 }
720
721 pub fn memory_usage(&self) -> usize {
723 self.memory_usage.load(std::sync::atomic::Ordering::Relaxed)
724 }
725
726 pub fn current_numa_node(&self) -> i32 {
728 self.numa_node.load(std::sync::atomic::Ordering::Relaxed)
729 }
730
731 pub fn pool_info(&self) -> PoolInfo {
733 let distance_count = self
734 .distance_buffers
735 .lock()
736 .expect("Operation failed")
737 .len();
738 let index_count = self.index_buffers.lock().expect("Operation failed").len();
739 let matrix_count = self.matrix_buffers.lock().expect("Operation failed").len();
740 let large_count = self.large_buffers.lock().expect("Operation failed").len();
741
742 PoolInfo {
743 distance_buffer_count: distance_count,
744 index_buffer_count: index_count,
745 matrix_buffer_count: matrix_count,
746 large_buffer_count: large_count,
747 total_memory_usage: self.memory_usage(),
748 numa_node: self.current_numa_node(),
749 hit_rate: self.stats.hit_rate(),
750 }
751 }
752
753 pub fn clear(&self) {
755 self.distance_buffers
756 .lock()
757 .expect("Operation failed")
758 .clear();
759 self.index_buffers.lock().expect("Operation failed").clear();
760 self.matrix_buffers
761 .lock()
762 .expect("Operation failed")
763 .clear();
764 self.large_buffers.lock().expect("Operation failed").clear();
765 self.memory_usage
766 .store(0, std::sync::atomic::Ordering::Relaxed);
767 self.stats.reset();
768 }
769}
770
771use scirs2_core::ndarray::s;
773
774pub struct DistanceBuffer<'a> {
776 buffer: Option<Box<[f64]>>,
777 pool: &'a DistancePool,
778}
779
780impl<'a> DistanceBuffer<'a> {
781 fn new(buffer: Box<[f64]>, pool: &'a DistancePool) -> Self {
782 Self {
783 buffer: Some(buffer),
784 pool,
785 }
786 }
787
788 pub fn as_mut_slice(&mut self) -> &mut [f64] {
790 self.buffer.as_mut().expect("Operation failed").as_mut()
791 }
792
793 pub fn as_slice(&self) -> &[f64] {
795 self.buffer.as_ref().expect("Operation failed").as_ref()
796 }
797
798 pub fn len(&self) -> usize {
800 self.buffer.as_ref().expect("Operation failed").len()
801 }
802
803 pub fn is_empty(&self) -> bool {
805 self.len() == 0
806 }
807
808 pub fn as_array_mut(&mut self) -> ArrayViewMut1<f64> {
810 ArrayViewMut1::from(self.as_mut_slice())
811 }
812}
813
814impl Drop for DistanceBuffer<'_> {
815 fn drop(&mut self) {
816 if let Some(buffer) = self.buffer.take() {
817 self.pool.return_distance_buffer(buffer);
818 }
819 }
820}
821
822pub struct IndexBuffer<'a> {
824 buffer: Option<Box<[usize]>>,
825 pool: &'a DistancePool,
826}
827
828impl<'a> IndexBuffer<'a> {
829 fn new(buffer: Box<[usize]>, pool: &'a DistancePool) -> Self {
830 Self {
831 buffer: Some(buffer),
832 pool,
833 }
834 }
835
836 pub fn as_mut_slice(&mut self) -> &mut [usize] {
838 self.buffer.as_mut().expect("Operation failed").as_mut()
839 }
840
841 pub fn as_slice(&self) -> &[usize] {
843 self.buffer.as_ref().expect("Operation failed").as_ref()
844 }
845
846 pub fn len(&self) -> usize {
848 self.buffer.as_ref().expect("Operation failed").len()
849 }
850
851 pub fn is_empty(&self) -> bool {
853 self.len() == 0
854 }
855}
856
857impl Drop for IndexBuffer<'_> {
858 fn drop(&mut self) {
859 if let Some(buffer) = self.buffer.take() {
860 self.pool.return_index_buffer(buffer);
861 }
862 }
863}
864
865pub struct MatrixBuffer<'a> {
867 matrix: Option<Array2<f64>>,
868 pool: &'a DistancePool,
869}
870
871impl<'a> MatrixBuffer<'a> {
872 fn new(matrix: Array2<f64>, pool: &'a DistancePool) -> Self {
873 Self {
874 matrix: Some(matrix),
875 pool,
876 }
877 }
878
879 pub fn as_mut(&mut self) -> ArrayViewMut2<f64> {
881 self.matrix.as_mut().expect("Operation failed").view_mut()
882 }
883
884 pub fn dim(&mut self) -> (usize, usize) {
886 self.matrix.as_ref().expect("Operation failed").dim()
887 }
888
889 pub fn fill(&mut self, value: f64) {
891 self.matrix.as_mut().expect("Operation failed").fill(value);
892 }
893}
894
895impl Drop for MatrixBuffer<'_> {
896 fn drop(&mut self) {
897 if let Some(matrix) = self.matrix.take() {
898 self.pool.return_matrix_buffer(matrix);
899 }
900 }
901}
902
903pub struct ClusteringArena {
905 config: MemoryPoolConfig,
906 current_block: Mutex<Option<ArenaBlock>>,
907 full_blocks: Mutex<Vec<ArenaBlock>>,
908 stats: ArenaStatistics,
909}
910
911impl ClusteringArena {
912 pub fn new() -> Self {
914 Self::with_config(MemoryPoolConfig::default())
915 }
916
917 pub fn with_config(config: MemoryPoolConfig) -> Self {
919 Self {
920 config,
921 current_block: Mutex::new(None),
922 full_blocks: Mutex::new(Vec::new()),
923 stats: ArenaStatistics::new(),
924 }
925 }
926
927 pub fn alloc_temp_vec<T: Default + Clone>(&self, size: usize) -> ArenaVec<T> {
929 let layout = Layout::array::<T>(size).expect("Operation failed");
930 let ptr = self.allocate_raw(layout);
931
932 unsafe {
933 for i in 0..size {
935 std::ptr::write(ptr.as_ptr().add(i) as *mut T, T::default());
936 }
937
938 ArenaVec::new(ptr.as_ptr() as *mut T, size)
939 }
940 }
941
942 fn allocate_raw(&self, layout: Layout) -> NonNull<u8> {
944 let mut current = self.current_block.lock().expect("Operation failed");
945
946 if current.is_none()
947 || !current
948 .as_ref()
949 .expect("Operation failed")
950 .can_allocate(layout)
951 {
952 if let Some(old_block) = current.take() {
954 self.full_blocks
955 .lock()
956 .expect("Operation failed")
957 .push(old_block);
958 }
959 *current = Some(ArenaBlock::new(self.config.arena_block_size));
960 }
961
962 current.as_mut().expect("Operation failed").allocate(layout)
963 }
964
965 pub fn reset(&self) {
967 let mut current = self.current_block.lock().expect("Operation failed");
968 let mut full_blocks = self.full_blocks.lock().expect("Operation failed");
969
970 if let Some(block) = current.take() {
971 full_blocks.push(block);
972 }
973
974 for block in full_blocks.iter_mut() {
976 block.reset();
977 }
978
979 if let Some(block) = full_blocks.pop() {
981 *current = Some(block);
982 }
983
984 self.stats.reset();
985 }
986
987 pub fn statistics(&self) -> ArenaStatistics {
989 self.stats.clone()
990 }
991}
992
993impl Default for ClusteringArena {
994 fn default() -> Self {
995 Self::new()
996 }
997}
998
999struct ArenaBlock {
1001 memory: NonNull<u8>,
1002 size: usize,
1003 offset: usize,
1004}
1005
1006unsafe impl Send for ArenaBlock {}
1008unsafe impl Sync for ArenaBlock {}
1009
1010impl ArenaBlock {
1011 fn new(size: usize) -> Self {
1012 let layout = Layout::from_size_align(size, 64).expect("Operation failed"); let memory =
1014 unsafe { NonNull::new(System.alloc(layout)).expect("Failed to allocate arena block") };
1015
1016 Self {
1017 memory,
1018 size,
1019 offset: 0,
1020 }
1021 }
1022
1023 fn can_allocate(&self, layout: Layout) -> bool {
1024 let aligned_offset = (self.offset + layout.align() - 1) & !(layout.align() - 1);
1025 aligned_offset + layout.size() <= self.size
1026 }
1027
1028 fn allocate(&mut self, layout: Layout) -> NonNull<u8> {
1029 assert!(self.can_allocate(layout));
1030
1031 self.offset = (self.offset + layout.align() - 1) & !(layout.align() - 1);
1033
1034 let ptr = unsafe { NonNull::new_unchecked(self.memory.as_ptr().add(self.offset)) };
1035 self.offset += layout.size();
1036
1037 ptr
1038 }
1039
1040 fn reset(&mut self) {
1041 self.offset = 0;
1042 }
1043}
1044
1045impl Drop for ArenaBlock {
1046 fn drop(&mut self) {
1047 let layout = Layout::from_size_align(self.size, 64).expect("Operation failed");
1048 unsafe {
1049 System.dealloc(self.memory.as_ptr(), layout);
1050 }
1051 }
1052}
1053
1054pub struct ArenaVec<T> {
1056 ptr: *mut T,
1057 len: usize,
1058 phantom: std::marker::PhantomData<T>,
1059}
1060
1061impl<T> ArenaVec<T> {
1062 fn new(ptr: *mut T, len: usize) -> Self {
1063 Self {
1064 ptr,
1065 len,
1066 phantom: std::marker::PhantomData,
1067 }
1068 }
1069
1070 pub fn as_mut_slice(&mut self) -> &mut [T] {
1072 unsafe { std::slice::from_raw_parts_mut(self.ptr, self.len) }
1073 }
1074
1075 pub fn as_slice(&mut self) -> &[T] {
1077 unsafe { std::slice::from_raw_parts(self.ptr, self.len) }
1078 }
1079
1080 pub fn len(&mut self) -> usize {
1082 self.len
1083 }
1084
1085 pub fn is_empty(&self) -> bool {
1087 self.len == 0
1088 }
1089}
1090
1091#[derive(Debug, Clone)]
1095pub struct PoolInfo {
1096 pub distance_buffer_count: usize,
1098 pub index_buffer_count: usize,
1100 pub matrix_buffer_count: usize,
1102 pub large_buffer_count: usize,
1104 pub total_memory_usage: usize,
1106 pub numa_node: i32,
1108 pub hit_rate: f64,
1110}
1111
1112#[derive(Debug)]
1114pub struct PoolStatistics {
1115 hits: std::sync::atomic::AtomicUsize,
1116 misses: std::sync::atomic::AtomicUsize,
1117 total_allocations: std::sync::atomic::AtomicUsize,
1118}
1119
1120impl PoolStatistics {
1121 fn new() -> Self {
1122 Self {
1123 hits: std::sync::atomic::AtomicUsize::new(0),
1124 misses: std::sync::atomic::AtomicUsize::new(0),
1125 total_allocations: std::sync::atomic::AtomicUsize::new(0),
1126 }
1127 }
1128
1129 fn record_hit(&self) {
1130 self.hits.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1131 }
1132
1133 fn record_miss(&self) {
1134 self.misses
1135 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1136 self.total_allocations
1137 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1138 }
1139
1140 fn reset(&self) {
1141 self.hits.store(0, std::sync::atomic::Ordering::Relaxed);
1142 self.misses.store(0, std::sync::atomic::Ordering::Relaxed);
1143 self.total_allocations
1144 .store(0, std::sync::atomic::Ordering::Relaxed);
1145 }
1146
1147 pub fn hit_rate(&self) -> f64 {
1149 let hits = self.hits.load(std::sync::atomic::Ordering::Relaxed);
1150 let total = hits + self.misses.load(std::sync::atomic::Ordering::Relaxed);
1151 if total == 0 {
1152 0.0
1153 } else {
1154 hits as f64 / total as f64 * 100.0
1155 }
1156 }
1157
1158 pub fn total_requests(&self) -> usize {
1160 self.hits.load(std::sync::atomic::Ordering::Relaxed)
1161 + self.misses.load(std::sync::atomic::Ordering::Relaxed)
1162 }
1163
1164 pub fn total_allocations(&self) -> usize {
1166 self.total_allocations
1167 .load(std::sync::atomic::Ordering::Relaxed)
1168 }
1169}
1170
1171impl Clone for PoolStatistics {
1172 fn clone(&self) -> Self {
1173 Self {
1174 hits: std::sync::atomic::AtomicUsize::new(
1175 self.hits.load(std::sync::atomic::Ordering::Relaxed),
1176 ),
1177 misses: std::sync::atomic::AtomicUsize::new(
1178 self.misses.load(std::sync::atomic::Ordering::Relaxed),
1179 ),
1180 total_allocations: std::sync::atomic::AtomicUsize::new(
1181 self.total_allocations
1182 .load(std::sync::atomic::Ordering::Relaxed),
1183 ),
1184 }
1185 }
1186}
1187
1188#[derive(Debug)]
1190pub struct ArenaStatistics {
1191 blocks_allocated: std::sync::atomic::AtomicUsize,
1192 total_memory: std::sync::atomic::AtomicUsize,
1193 active_objects: std::sync::atomic::AtomicUsize,
1194}
1195
1196impl ArenaStatistics {
1197 fn new() -> Self {
1198 Self {
1199 blocks_allocated: std::sync::atomic::AtomicUsize::new(0),
1200 total_memory: std::sync::atomic::AtomicUsize::new(0),
1201 active_objects: std::sync::atomic::AtomicUsize::new(0),
1202 }
1203 }
1204
1205 fn reset(&self) {
1206 self.blocks_allocated
1207 .store(0, std::sync::atomic::Ordering::Relaxed);
1208 self.total_memory
1209 .store(0, std::sync::atomic::Ordering::Relaxed);
1210 self.active_objects
1211 .store(0, std::sync::atomic::Ordering::Relaxed);
1212 }
1213
1214 pub fn blocks_allocated(&self) -> usize {
1216 self.blocks_allocated
1217 .load(std::sync::atomic::Ordering::Relaxed)
1218 }
1219
1220 pub fn total_memory(&self) -> usize {
1222 self.total_memory.load(std::sync::atomic::Ordering::Relaxed)
1223 }
1224
1225 pub fn active_objects(&self) -> usize {
1227 self.active_objects
1228 .load(std::sync::atomic::Ordering::Relaxed)
1229 }
1230}
1231
1232impl Clone for ArenaStatistics {
1233 fn clone(&self) -> Self {
1234 Self {
1235 blocks_allocated: std::sync::atomic::AtomicUsize::new(
1236 self.blocks_allocated
1237 .load(std::sync::atomic::Ordering::Relaxed),
1238 ),
1239 total_memory: std::sync::atomic::AtomicUsize::new(
1240 self.total_memory.load(std::sync::atomic::Ordering::Relaxed),
1241 ),
1242 active_objects: std::sync::atomic::AtomicUsize::new(
1243 self.active_objects
1244 .load(std::sync::atomic::Ordering::Relaxed),
1245 ),
1246 }
1247 }
1248}
1249
1250#[derive(Debug, Clone)]
1252pub struct NumaTopology {
1253 pub nodes: Vec<NumaNode>,
1255}
1256
1257#[derive(Debug, Clone)]
1259pub struct NumaNode {
1260 pub id: u32,
1262 pub total_memory_bytes: u64,
1264 pub available_memory_bytes: u64,
1266 pub cpu_count: u32,
1268}
1269
1270impl Default for NumaTopology {
1271 fn default() -> Self {
1272 Self {
1273 nodes: vec![NumaNode {
1274 id: 0,
1275 total_memory_bytes: 8 * 1024 * 1024 * 1024, available_memory_bytes: 4 * 1024 * 1024 * 1024, cpu_count: 4, }],
1279 }
1280 }
1281}
1282
1283impl NumaTopology {
1284 pub fn get_optimal_node(&self) -> u32 {
1286 if !self.nodes.is_empty() {
1289 self.nodes[0].id
1290 } else {
1291 0
1292 }
1293 }
1294
1295 pub fn get_node_with_most_memory(&self) -> Option<u32> {
1297 self.nodes
1298 .iter()
1299 .max_by_key(|node| node.available_memory_bytes)
1300 .map(|node| node.id)
1301 }
1302
1303 pub fn total_system_memory(&self) -> u64 {
1305 self.nodes.iter().map(|node| node.total_memory_bytes).sum()
1306 }
1307
1308 pub fn total_available_memory(&self) -> u64 {
1310 self.nodes
1311 .iter()
1312 .map(|node| node.available_memory_bytes)
1313 .sum()
1314 }
1315
1316 pub fn has_node(&self, _nodeid: u32) -> bool {
1318 self.nodes.iter().any(|node| node.id == _nodeid)
1319 }
1320
1321 pub fn get_node_info(&self, _nodeid: u32) -> Option<&NumaNode> {
1323 self.nodes.iter().find(|node| node.id == _nodeid)
1324 }
1325}
1326
1327static GLOBAL_DISTANCE_POOL: std::sync::OnceLock<DistancePool> = std::sync::OnceLock::new();
1329static GLOBAL_CLUSTERING_ARENA: std::sync::OnceLock<ClusteringArena> = std::sync::OnceLock::new();
1330
1331#[allow(dead_code)]
1333pub fn global_distance_pool() -> &'static DistancePool {
1334 GLOBAL_DISTANCE_POOL.get_or_init(|| DistancePool::new(1000))
1335}
1336
1337#[allow(dead_code)]
1339pub fn global_clustering_arena() -> &'static ClusteringArena {
1340 GLOBAL_CLUSTERING_ARENA.get_or_init(ClusteringArena::new)
1341}
1342
1343#[allow(dead_code)]
1345pub fn create_numa_optimized_pool(capacity: usize) -> DistancePool {
1346 let config = MemoryPoolConfig {
1347 numa_aware: true,
1348 auto_numa_discovery: true,
1349 enable_thread_affinity: true,
1350 ..Default::default()
1351 };
1352
1353 DistancePool::with_config(capacity, config)
1354}
1355
1356#[allow(dead_code)]
1358pub fn get_numa_topology() -> NumaTopology {
1359 DistancePool::get_numa_topology()
1360}
1361
1362#[allow(dead_code)]
1364pub fn test_numa_capabilities() -> NumaCapabilities {
1365 NumaCapabilities::detect()
1366}
1367
1368#[derive(Debug, Clone)]
1370pub struct NumaCapabilities {
1371 pub numa_available: bool,
1373 pub num_nodes: u32,
1375 pub memory_binding_supported: bool,
1377 pub thread_affinity_supported: bool,
1379 pub platform_details: String,
1381}
1382
1383impl NumaCapabilities {
1384 pub fn detect() -> Self {
1386 #[cfg(target_os = "linux")]
1387 {
1388 Self::detect_linux()
1389 }
1390 #[cfg(target_os = "windows")]
1391 {
1392 Self::detect_windows()
1393 }
1394 #[cfg(not(any(target_os = "linux", target_os = "windows")))]
1395 {
1396 Self {
1397 numa_available: false,
1398 num_nodes: 1,
1399 memory_binding_supported: false,
1400 thread_affinity_supported: false,
1401 platform_details: "Unsupported platform".to_string(),
1402 }
1403 }
1404 }
1405
1406 #[cfg(target_os = "linux")]
1407 fn detect_linux() -> Self {
1408 let numa_available = std::path::Path::new("/sys/devices/system/node").exists();
1409 let num_nodes = if numa_available {
1410 DistancePool::get_numa_topology().nodes.len() as u32
1411 } else {
1412 1
1413 };
1414
1415 Self {
1416 numa_available,
1417 num_nodes,
1418 memory_binding_supported: numa_available,
1419 thread_affinity_supported: true, platform_details: format!("Linux with {num_nodes} NUMA nodes"),
1421 }
1422 }
1423
1424 #[cfg(target_os = "windows")]
1425 fn detect_windows() -> Self {
1426 Self {
1427 numa_available: true, num_nodes: 1, memory_binding_supported: true,
1430 thread_affinity_supported: true,
1431 platform_details: "Windows NUMA support".to_string(),
1432 }
1433 }
1434
1435 pub fn should_enable_numa(&self) -> bool {
1437 self.numa_available && self.num_nodes > 1
1438 }
1439
1440 pub fn recommended_memory_strategy(&self) -> &'static str {
1442 if self.should_enable_numa() {
1443 "NUMA-aware"
1444 } else {
1445 "Standard"
1446 }
1447 }
1448}
1449
1450#[cfg(test)]
1451mod tests {
1452 use super::*;
1453
1454 #[test]
1455 fn test_distance_pool() {
1456 let pool = DistancePool::new(10);
1457
1458 let mut buffer1 = pool.get_distance_buffer(100);
1460 assert_eq!(buffer1.len(), 100);
1461
1462 buffer1.as_mut_slice()[0] = 42.0;
1464 assert_eq!(buffer1.as_slice()[0], 42.0);
1465
1466 let buffer2 = pool.get_distance_buffer(50);
1468 assert_eq!(buffer2.len(), 50);
1469
1470 drop(buffer1);
1472
1473 let buffer3 = pool.get_distance_buffer(100);
1475 assert_eq!(buffer3.len(), 100);
1476 }
1478
1479 #[test]
1480 fn test_arena_allocator() {
1481 let arena = ClusteringArena::new();
1482
1483 let mut vec1 = arena.alloc_temp_vec::<f64>(100);
1485 let mut vec2 = arena.alloc_temp_vec::<usize>(50);
1486
1487 vec1.as_mut_slice()[0] = std::f64::consts::PI;
1489 vec2.as_mut_slice()[0] = 42;
1490
1491 assert_eq!(vec1.as_slice()[0], std::f64::consts::PI);
1492 assert_eq!(vec2.as_slice()[0], 42);
1493
1494 arena.reset();
1496
1497 let mut vec3 = arena.alloc_temp_vec::<f64>(200);
1499 vec3.as_mut_slice()[0] = 2.71;
1500 assert_eq!(vec3.as_slice()[0], 2.71);
1501 }
1502
1503 #[test]
1504 fn test_pool_statistics() {
1505 let pool = DistancePool::new(2);
1506
1507 let stats = pool.statistics();
1509 assert_eq!(stats.total_requests(), 0);
1510 assert_eq!(stats.total_allocations(), 0);
1511
1512 let _buffer1 = pool.get_distance_buffer(100);
1514 let stats = pool.statistics();
1515 assert_eq!(stats.total_requests(), 1);
1516 assert_eq!(stats.total_allocations(), 1);
1517 assert!(stats.hit_rate() < 1.0);
1518
1519 drop(_buffer1);
1521 let _buffer2 = pool.get_distance_buffer(100);
1522 let stats = pool.statistics();
1523 assert_eq!(stats.total_requests(), 2);
1524 assert_eq!(stats.total_allocations(), 1); assert!(stats.hit_rate() > 0.0);
1526 }
1527
1528 #[test]
1529 fn test_matrix_buffer() {
1530 let pool = DistancePool::new(5);
1531
1532 let mut matrix = pool.get_matrix_buffer(10, 10);
1533 assert_eq!(matrix.dim(), (10, 10));
1534
1535 matrix.fill(42.0);
1536 drop(matrix);
1539
1540 let mut matrix2 = pool.get_matrix_buffer(8, 8);
1542 assert_eq!(matrix2.dim(), (8, 8));
1543 }
1544
1545 #[test]
1546 fn test_global_pools() {
1547 let pool = global_distance_pool();
1549 let arena = global_clustering_arena();
1550
1551 let buffer = pool.get_distance_buffer(10);
1552 let _vec = arena.alloc_temp_vec::<f64>(10);
1553
1554 }
1556}