1use scirs2_core::ndarray::{Array2, ArrayViewMut1, ArrayViewMut2};
33use std::alloc::{GlobalAlloc, Layout, System};
34use std::collections::VecDeque;
35use std::ptr::NonNull;
36use std::sync::Mutex;
37
38#[cfg(any(target_os = "linux", target_os = "android"))]
40use libc;
41#[cfg(target_os = "linux")]
42use std::fs;
43
44use std::sync::atomic::Ordering;
46
47#[cfg(test)]
50use num_cpus;
51
52#[cfg(not(test))]
54mod num_cpus {
55 pub fn get() -> usize {
56 std::thread::available_parallelism()
57 .map(|n| n.get())
58 .unwrap_or(4)
59 }
60}
61
62#[derive(Debug, Clone)]
64pub struct MemoryPoolConfig {
65 pub max_pool_size: usize,
67 pub cache_line_size: usize,
69 pub numa_aware: bool,
71 pub prefetch_distance: usize,
73 pub arena_block_size: usize,
75 pub numa_node_hint: i32,
77 pub auto_numa_discovery: bool,
79 pub enable_thread_affinity: bool,
81 pub enable_memory_warming: bool,
83 pub large_object_threshold: usize,
85 pub max_memory_usage: usize,
87}
88
89impl Default for MemoryPoolConfig {
90 fn default() -> Self {
91 Self {
92 max_pool_size: 1000,
93 cache_line_size: 64,
94 numa_aware: true,
95 prefetch_distance: 8,
96 arena_block_size: 1024 * 1024, numa_node_hint: -1, auto_numa_discovery: true,
99 enable_thread_affinity: true,
100 enable_memory_warming: true,
101 large_object_threshold: 64 * 1024, max_memory_usage: 1024 * 1024 * 1024, }
104 }
105}
106
107pub struct DistancePool {
109 config: MemoryPoolConfig,
110 distance_buffers: Mutex<VecDeque<Box<[f64]>>>,
111 index_buffers: Mutex<VecDeque<Box<[usize]>>>,
112 matrix_buffers: Mutex<VecDeque<Array2<f64>>>,
113 large_buffers: Mutex<VecDeque<Box<[f64]>>>, stats: PoolStatistics,
115 memory_usage: std::sync::atomic::AtomicUsize, numa_node: std::sync::atomic::AtomicI32, }
118
119impl DistancePool {
120 pub fn new(capacity: usize) -> Self {
122 Self::with_config(capacity, MemoryPoolConfig::default())
123 }
124
125 pub fn with_config(capacity: usize, config: MemoryPoolConfig) -> Self {
127 let numa_node = if config.numa_aware && config.numa_node_hint >= 0 {
128 config.numa_node_hint
129 } else {
130 Self::detect_numa_node()
131 };
132
133 Self {
134 config,
135 distance_buffers: Mutex::new(VecDeque::with_capacity(capacity)),
136 index_buffers: Mutex::new(VecDeque::with_capacity(capacity)),
137 matrix_buffers: Mutex::new(VecDeque::with_capacity(capacity / 4)), large_buffers: Mutex::new(VecDeque::with_capacity(capacity / 10)), stats: PoolStatistics::new(),
140 memory_usage: std::sync::atomic::AtomicUsize::new(0),
141 numa_node: std::sync::atomic::AtomicI32::new(numa_node),
142 }
143 }
144
145 pub fn get_distance_buffer(&self, size: usize) -> DistanceBuffer {
147 let buffer_size_bytes = size * std::mem::size_of::<f64>();
149 let is_large = buffer_size_bytes > self.config.large_object_threshold;
150
151 let current_usage = self.memory_usage.load(std::sync::atomic::Ordering::Relaxed);
153 if current_usage + buffer_size_bytes > self.config.max_memory_usage {
154 self.cleanup_excess_memory();
155 }
156
157 let buffer = if is_large {
158 self.get_large_buffer(size)
159 } else {
160 let mut buffers = self.distance_buffers.lock().expect("Operation failed");
161
162 for i in 0..buffers.len() {
164 if buffers[i].len() >= size && buffers[i].len() <= size * 2 {
165 let buffer = buffers.remove(i).expect("Operation failed");
166 self.stats.record_hit();
167 return DistanceBuffer::new(buffer, self);
168 }
169 }
170
171 self.stats.record_miss();
173 self.create_aligned_buffer(size)
174 };
175
176 self.memory_usage
178 .fetch_add(buffer_size_bytes, std::sync::atomic::Ordering::Relaxed);
179
180 DistanceBuffer::new(buffer, self)
181 }
182
183 fn get_large_buffer(&self, size: usize) -> Box<[f64]> {
185 let mut buffers = self.large_buffers.lock().expect("Operation failed");
186
187 for i in 0..buffers.len() {
189 if buffers[i].len() == size {
190 let buffer = buffers.remove(i).expect("Operation failed");
191 self.stats.record_hit();
192 return buffer;
193 }
194 }
195
196 self.stats.record_miss();
198 if self.config.numa_aware {
199 self.create_numa_aligned_buffer(size)
200 } else {
201 self.create_aligned_buffer(size)
202 }
203 }
204
205 pub fn get_index_buffer(&self, size: usize) -> IndexBuffer {
207 let mut buffers = self.index_buffers.lock().expect("Operation failed");
208
209 for i in 0..buffers.len() {
211 if buffers[i].len() >= size && buffers[i].len() <= size * 2 {
212 let buffer = buffers.remove(i).expect("Operation failed");
213 self.stats.record_hit();
214 return IndexBuffer::new(buffer, self);
215 }
216 }
217
218 self.stats.record_miss();
220 let new_buffer = vec![0usize; size].into_boxed_slice();
221 IndexBuffer::new(new_buffer, self)
222 }
223
224 pub fn get_matrix_buffer(&self, rows: usize, cols: usize) -> MatrixBuffer {
226 let mut buffers = self.matrix_buffers.lock().expect("Operation failed");
227
228 for i in 0..buffers.len() {
230 let (r, c) = buffers[i].dim();
231 if r >= rows && c >= cols && r <= rows * 2 && c <= cols * 2 {
232 let mut matrix = buffers.remove(i).expect("Operation failed");
233 matrix = matrix.slice_mut(s![..rows, ..cols]).to_owned();
235 self.stats.record_hit();
236 return MatrixBuffer::new(matrix, self);
237 }
238 }
239
240 self.stats.record_miss();
242 let matrix = Array2::zeros((rows, cols));
243 MatrixBuffer::new(matrix, self)
244 }
245
246 fn create_aligned_buffer(&self, size: usize) -> Box<[f64]> {
248 let layout = Layout::from_size_align(
249 size * std::mem::size_of::<f64>(),
250 self.config.cache_line_size,
251 )
252 .expect("Operation failed");
253
254 unsafe {
255 let ptr = System.alloc(layout) as *mut f64;
256 if ptr.is_null() {
257 panic!("Failed to allocate aligned memory");
258 }
259
260 if self.config.enable_memory_warming {
262 std::ptr::write_bytes(ptr, 0, size);
263 }
264
265 Box::from_raw(std::ptr::slice_from_raw_parts_mut(ptr, size))
267 }
268 }
269
270 fn create_numa_aligned_buffer(&self, size: usize) -> Box<[f64]> {
272 let numa_node = self.numa_node.load(Ordering::Relaxed);
273
274 #[cfg(target_os = "linux")]
275 {
276 if self.config.numa_aware && numa_node >= 0 {
277 match Self::allocate_on_numa_node_linux(size, numa_node as u32) {
278 Ok(buffer) => {
279 if self.config.enable_memory_warming {
280 Self::warm_memory(&buffer);
281 }
282 return buffer;
283 }
284 Err(_) => {
285 }
287 }
288 }
289 }
290
291 #[cfg(target_os = "windows")]
292 {
293 if self.config.numa_aware && numa_node >= 0 {
294 match Self::allocate_on_numa_node_windows(size, numa_node as u32) {
295 Ok(buffer) => {
296 if self.config.enable_memory_warming {
297 Self::warm_memory(&buffer);
298 }
299 return buffer;
300 }
301 Err(_) => {
302 }
304 }
305 }
306 }
307
308 let buffer = self.create_aligned_buffer(size);
310
311 if self.config.enable_memory_warming {
313 Self::warm_memory(&buffer);
314 }
315
316 buffer
317 }
318
319 #[cfg(target_os = "linux")]
321 fn allocate_on_numa_node_linux(
322 size: usize,
323 node: u32,
324 ) -> Result<Box<[f64]>, Box<dyn std::error::Error>> {
325 let total_size = size * std::mem::size_of::<f64>();
326 let layout = Layout::from_size_align(total_size, 64)?;
327
328 unsafe {
329 let ptr = System.alloc(layout) as *mut f64;
331 if ptr.is_null() {
332 return Err("Failed to allocate memory".into());
333 }
334
335 std::ptr::write_bytes(ptr, 0, size);
337
338 Ok(Box::from_raw(std::ptr::slice_from_raw_parts_mut(ptr, size)))
339 }
340 }
341
342 #[cfg(target_os = "windows")]
344 fn allocate_on_numa_node_windows(
345 size: usize,
346 _node: u32,
347 ) -> Result<Box<[f64]>, Box<dyn std::error::Error>> {
348 Ok(vec![0.0_f64; size].into_boxed_slice())
352 }
353
354 pub fn bind_thread_to_numa_node(node: u32) -> Result<(), Box<dyn std::error::Error>> {
356 #[cfg(target_os = "linux")]
357 {
358 Self::bind_thread_to_numa_node_linux(node)
359 }
360 #[cfg(target_os = "windows")]
361 {
362 Self::bind_thread_to_numa_node_windows(node)
363 }
364 #[cfg(not(any(target_os = "linux", target_os = "windows")))]
365 {
366 Ok(()) }
368 }
369
370 #[cfg(target_os = "linux")]
371 fn bind_thread_to_numa_node_linux(node: u32) -> Result<(), Box<dyn std::error::Error>> {
372 if let Some(_cpu_count) = Self::get_node_cpu_count(node) {
377 let mut cpu_set: libc::cpu_set_t = unsafe { std::mem::zeroed() };
378
379 let cpulist_path = format!("/sys/devices/system/node/node{}/cpulist", node);
381 if let Ok(cpulist) = fs::read_to_string(&cpulist_path) {
382 for range in cpulist.trim().split(',') {
383 if let Some((start, end)) = range.split_once('-') {
384 if let (Ok(s), Ok(e)) = (start.parse::<u32>(), end.parse::<u32>()) {
385 for cpu in s..=e {
386 unsafe { libc::CPU_SET(cpu as usize, &mut cpu_set) };
387 }
388 }
389 } else if let Ok(cpu) = range.parse::<u32>() {
390 unsafe { libc::CPU_SET(cpu as usize, &mut cpu_set) };
391 }
392 }
393
394 unsafe {
396 libc::sched_setaffinity(
397 0, std::mem::size_of::<libc::cpu_set_t>(),
399 &cpu_set,
400 );
401 }
402 }
403 }
404
405 Ok(())
406 }
407
408 #[cfg(target_os = "windows")]
409 fn bind_thread_to_numa_node_windows(node: u32) -> Result<(), Box<dyn std::error::Error>> {
410 Ok(())
412 }
413
414 fn warm_memory(buffer: &[f64]) {
416 if buffer.is_empty() {
417 return;
418 }
419
420 let page_size = 4096; let elements_per_page = page_size / std::mem::size_of::<f64>();
423
424 for i in (0..buffer.len()).step_by(elements_per_page) {
425 unsafe {
427 std::ptr::read_volatile(&buffer[i]);
428 }
429 }
430 }
431
432 fn detect_numa_node() -> i32 {
434 #[cfg(target_os = "linux")]
435 {
436 Self::detect_numa_node_linux().unwrap_or(0)
437 }
438 #[cfg(target_os = "windows")]
439 {
440 Self::detect_numa_node_windows().unwrap_or(0)
441 }
442 #[cfg(not(any(target_os = "linux", target_os = "windows")))]
443 {
444 0 }
446 }
447
448 #[cfg(target_os = "linux")]
450 fn detect_numa_node_linux() -> Option<i32> {
451 let _tid = unsafe { libc::gettid() };
453
454 match Self::get_current_numa_node_linux() {
456 Ok(node) => Some(node),
457 Err(_) => {
458 Self::detect_numa_from_cpu_linux()
460 }
461 }
462 }
463
464 #[cfg(target_os = "linux")]
465 fn get_current_numa_node_linux() -> Result<i32, Box<dyn std::error::Error>> {
466 let mut cpu: u32 = 0;
468 let mut node: u32 = 0;
469
470 let result = unsafe {
471 libc::syscall(
472 libc::SYS_getcpu,
473 &mut cpu as *mut u32,
474 &mut node as *mut u32,
475 std::ptr::null_mut::<libc::c_void>(),
476 )
477 };
478
479 if result == 0 {
480 Ok(node as i32)
481 } else {
482 Err("getcpu syscall failed".into())
483 }
484 }
485
486 #[cfg(target_os = "linux")]
487 fn detect_numa_from_cpu_linux() -> Option<i32> {
488 if let Ok(entries) = fs::read_dir("/sys/devices/system/node") {
490 for entry in entries.flatten() {
491 let name = entry.file_name();
492 if let Some(name_str) = name.to_str() {
493 if let Some(stripped) = name_str.strip_prefix("node") {
494 if let Ok(node_num) = stripped.parse::<i32>() {
495 return Some(node_num);
497 }
498 }
499 }
500 }
501 }
502 None
503 }
504
505 #[cfg(target_os = "windows")]
507 fn detect_numa_node_windows() -> Option<i32> {
508 Some(0)
512 }
513
514 pub fn get_numa_topology() -> NumaTopology {
516 #[cfg(target_os = "linux")]
517 {
518 Self::get_numa_topology_linux()
519 }
520 #[cfg(target_os = "windows")]
521 {
522 Self::get_numa_topology_windows()
523 }
524 #[cfg(not(any(target_os = "linux", target_os = "windows")))]
525 {
526 NumaTopology::default()
527 }
528 }
529
530 #[cfg(target_os = "linux")]
531 fn get_numa_topology_linux() -> NumaTopology {
532 let mut topology = NumaTopology::default();
533
534 if let Ok(entries) = fs::read_dir("/sys/devices/system/node") {
536 for entry in entries.flatten() {
537 let name = entry.file_name();
538 if let Some(name_str) = name.to_str() {
539 if let Some(stripped) = name_str.strip_prefix("node") {
540 if let Ok(_nodeid) = stripped.parse::<u32>() {
541 let meminfo_path =
543 format!("/sys/devices/system/node/{name_str}/meminfo");
544 if let Ok(meminfo) = fs::read_to_string(&meminfo_path) {
545 if let Some(total_kb) = Self::parse_meminfo_total(&meminfo) {
546 topology.nodes.push(NumaNode {
547 id: _nodeid,
548 total_memory_bytes: total_kb * 1024,
549 available_memory_bytes: total_kb * 1024, cpu_count: Self::get_node_cpu_count(_nodeid).unwrap_or(1),
551 });
552 }
553 }
554 }
555 }
556 }
557 }
558 }
559
560 if topology.nodes.is_empty() {
562 topology.nodes.push(NumaNode {
563 id: 0,
564 total_memory_bytes: Self::get_total_system_memory()
565 .unwrap_or(8 * 1024 * 1024 * 1024), available_memory_bytes: Self::get_available_system_memory()
567 .unwrap_or(4 * 1024 * 1024 * 1024), cpu_count: num_cpus::get() as u32,
569 });
570 }
571
572 topology
573 }
574
575 #[cfg(target_os = "linux")]
576 fn parse_meminfo_total(meminfo: &str) -> Option<u64> {
577 for line in meminfo.lines() {
578 if line.starts_with("Node") && line.contains("MemTotal:") {
579 let parts: Vec<&str> = line.split_whitespace().collect();
580 if parts.len() >= 3 {
581 return parts[2].parse().ok();
582 }
583 }
584 }
585 None
586 }
587
588 #[cfg(target_os = "linux")]
589 fn get_node_cpu_count(_nodeid: u32) -> Option<u32> {
590 let cpulist_path = format!("/sys/devices/system/node/node{}/cpulist", _nodeid);
591 if let Ok(cpulist) = fs::read_to_string(&cpulist_path) {
592 let mut count = 0;
594 for range in cpulist.trim().split(',') {
595 if let Some((start, end)) = range.split_once('-') {
596 if let (Ok(s), Ok(e)) = (start.parse::<u32>(), end.parse::<u32>()) {
597 count += e - s + 1;
598 }
599 } else if range.parse::<u32>().is_ok() {
600 count += 1;
601 }
602 }
603 Some(count)
604 } else {
605 None
606 }
607 }
608
609 #[cfg(target_os = "linux")]
610 fn get_total_system_memory() -> Option<u64> {
611 if let Ok(meminfo) = fs::read_to_string("/proc/meminfo") {
612 for line in meminfo.lines() {
613 if line.starts_with("MemTotal:") {
614 let parts: Vec<&str> = line.split_whitespace().collect();
615 if parts.len() >= 2 {
616 return parts[1].parse::<u64>().ok().map(|kb| kb * 1024);
617 }
618 }
619 }
620 }
621 None
622 }
623
624 #[cfg(target_os = "linux")]
625 fn get_available_system_memory() -> Option<u64> {
626 if let Ok(meminfo) = fs::read_to_string("/proc/meminfo") {
627 for line in meminfo.lines() {
628 if line.starts_with("MemAvailable:") {
629 let parts: Vec<&str> = line.split_whitespace().collect();
630 if parts.len() >= 2 {
631 return parts[1].parse::<u64>().ok().map(|kb| kb * 1024);
632 }
633 }
634 }
635 }
636 None
637 }
638
639 #[cfg(target_os = "windows")]
640 fn get_numa_topology_windows() -> NumaTopology {
641 NumaTopology::default()
644 }
645
646 fn cleanup_excess_memory(&self) {
648 let cleanup_ratio = 0.25; {
652 let mut buffers = self.distance_buffers.lock().expect("Operation failed");
653 let cleanup_count = (buffers.len() as f64 * cleanup_ratio) as usize;
654 for _ in 0..cleanup_count {
655 if let Some(buffer) = buffers.pop_back() {
656 let freed_bytes = buffer.len() * std::mem::size_of::<f64>();
657 self.memory_usage
658 .fetch_sub(freed_bytes, std::sync::atomic::Ordering::Relaxed);
659 }
660 }
661 }
662
663 {
664 let mut buffers = self.large_buffers.lock().expect("Operation failed");
665 let cleanup_count = (buffers.len() as f64 * cleanup_ratio) as usize;
666 for _ in 0..cleanup_count {
667 if let Some(buffer) = buffers.pop_back() {
668 let freed_bytes = buffer.len() * std::mem::size_of::<f64>();
669 self.memory_usage
670 .fetch_sub(freed_bytes, std::sync::atomic::Ordering::Relaxed);
671 }
672 }
673 }
674 }
675
676 fn return_distance_buffer(&self, buffer: Box<[f64]>) {
678 let buffer_size_bytes = buffer.len() * std::mem::size_of::<f64>();
679 let is_large = buffer_size_bytes > self.config.large_object_threshold;
680
681 self.memory_usage
683 .fetch_sub(buffer_size_bytes, std::sync::atomic::Ordering::Relaxed);
684
685 if is_large {
686 let mut buffers = self.large_buffers.lock().expect("Operation failed");
687 if buffers.len() < self.config.max_pool_size / 10 {
688 buffers.push_back(buffer);
689 }
690 } else {
692 let mut buffers = self.distance_buffers.lock().expect("Operation failed");
693 if buffers.len() < self.config.max_pool_size {
694 buffers.push_back(buffer);
695 }
696 }
698 }
699
700 fn return_index_buffer(&self, buffer: Box<[usize]>) {
702 let mut buffers = self.index_buffers.lock().expect("Operation failed");
703 if buffers.len() < self.config.max_pool_size {
704 buffers.push_back(buffer);
705 }
706 }
707
708 fn return_matrix_buffer(&self, matrix: Array2<f64>) {
710 let mut buffers = self.matrix_buffers.lock().expect("Operation failed");
711 if buffers.len() < self.config.max_pool_size / 4 {
712 buffers.push_back(matrix);
714 }
715 }
716
717 pub fn statistics(&self) -> PoolStatistics {
719 self.stats.clone()
720 }
721
722 pub fn memory_usage(&self) -> usize {
724 self.memory_usage.load(std::sync::atomic::Ordering::Relaxed)
725 }
726
727 pub fn current_numa_node(&self) -> i32 {
729 self.numa_node.load(std::sync::atomic::Ordering::Relaxed)
730 }
731
732 pub fn pool_info(&self) -> PoolInfo {
734 let distance_count = self
735 .distance_buffers
736 .lock()
737 .expect("Operation failed")
738 .len();
739 let index_count = self.index_buffers.lock().expect("Operation failed").len();
740 let matrix_count = self.matrix_buffers.lock().expect("Operation failed").len();
741 let large_count = self.large_buffers.lock().expect("Operation failed").len();
742
743 PoolInfo {
744 distance_buffer_count: distance_count,
745 index_buffer_count: index_count,
746 matrix_buffer_count: matrix_count,
747 large_buffer_count: large_count,
748 total_memory_usage: self.memory_usage(),
749 numa_node: self.current_numa_node(),
750 hit_rate: self.stats.hit_rate(),
751 }
752 }
753
754 pub fn clear(&self) {
756 self.distance_buffers
757 .lock()
758 .expect("Operation failed")
759 .clear();
760 self.index_buffers.lock().expect("Operation failed").clear();
761 self.matrix_buffers
762 .lock()
763 .expect("Operation failed")
764 .clear();
765 self.large_buffers.lock().expect("Operation failed").clear();
766 self.memory_usage
767 .store(0, std::sync::atomic::Ordering::Relaxed);
768 self.stats.reset();
769 }
770}
771
772use scirs2_core::ndarray::s;
774
775pub struct DistanceBuffer<'a> {
777 buffer: Option<Box<[f64]>>,
778 pool: &'a DistancePool,
779}
780
781impl<'a> DistanceBuffer<'a> {
782 fn new(buffer: Box<[f64]>, pool: &'a DistancePool) -> Self {
783 Self {
784 buffer: Some(buffer),
785 pool,
786 }
787 }
788
789 pub fn as_mut_slice(&mut self) -> &mut [f64] {
791 self.buffer.as_mut().expect("Operation failed").as_mut()
792 }
793
794 pub fn as_slice(&self) -> &[f64] {
796 self.buffer.as_ref().expect("Operation failed").as_ref()
797 }
798
799 pub fn len(&self) -> usize {
801 self.buffer.as_ref().expect("Operation failed").len()
802 }
803
804 pub fn is_empty(&self) -> bool {
806 self.len() == 0
807 }
808
809 pub fn as_array_mut(&mut self) -> ArrayViewMut1<f64> {
811 ArrayViewMut1::from(self.as_mut_slice())
812 }
813}
814
815impl Drop for DistanceBuffer<'_> {
816 fn drop(&mut self) {
817 if let Some(buffer) = self.buffer.take() {
818 self.pool.return_distance_buffer(buffer);
819 }
820 }
821}
822
823pub struct IndexBuffer<'a> {
825 buffer: Option<Box<[usize]>>,
826 pool: &'a DistancePool,
827}
828
829impl<'a> IndexBuffer<'a> {
830 fn new(buffer: Box<[usize]>, pool: &'a DistancePool) -> Self {
831 Self {
832 buffer: Some(buffer),
833 pool,
834 }
835 }
836
837 pub fn as_mut_slice(&mut self) -> &mut [usize] {
839 self.buffer.as_mut().expect("Operation failed").as_mut()
840 }
841
842 pub fn as_slice(&self) -> &[usize] {
844 self.buffer.as_ref().expect("Operation failed").as_ref()
845 }
846
847 pub fn len(&self) -> usize {
849 self.buffer.as_ref().expect("Operation failed").len()
850 }
851
852 pub fn is_empty(&self) -> bool {
854 self.len() == 0
855 }
856}
857
858impl Drop for IndexBuffer<'_> {
859 fn drop(&mut self) {
860 if let Some(buffer) = self.buffer.take() {
861 self.pool.return_index_buffer(buffer);
862 }
863 }
864}
865
866pub struct MatrixBuffer<'a> {
868 matrix: Option<Array2<f64>>,
869 pool: &'a DistancePool,
870}
871
872impl<'a> MatrixBuffer<'a> {
873 fn new(matrix: Array2<f64>, pool: &'a DistancePool) -> Self {
874 Self {
875 matrix: Some(matrix),
876 pool,
877 }
878 }
879
880 pub fn as_mut(&mut self) -> ArrayViewMut2<f64> {
882 self.matrix.as_mut().expect("Operation failed").view_mut()
883 }
884
885 pub fn dim(&mut self) -> (usize, usize) {
887 self.matrix.as_ref().expect("Operation failed").dim()
888 }
889
890 pub fn fill(&mut self, value: f64) {
892 self.matrix.as_mut().expect("Operation failed").fill(value);
893 }
894}
895
896impl Drop for MatrixBuffer<'_> {
897 fn drop(&mut self) {
898 if let Some(matrix) = self.matrix.take() {
899 self.pool.return_matrix_buffer(matrix);
900 }
901 }
902}
903
904pub struct ClusteringArena {
906 config: MemoryPoolConfig,
907 current_block: Mutex<Option<ArenaBlock>>,
908 full_blocks: Mutex<Vec<ArenaBlock>>,
909 stats: ArenaStatistics,
910}
911
912impl ClusteringArena {
913 pub fn new() -> Self {
915 Self::with_config(MemoryPoolConfig::default())
916 }
917
918 pub fn with_config(config: MemoryPoolConfig) -> Self {
920 Self {
921 config,
922 current_block: Mutex::new(None),
923 full_blocks: Mutex::new(Vec::new()),
924 stats: ArenaStatistics::new(),
925 }
926 }
927
928 pub fn alloc_temp_vec<T: Default + Clone>(&self, size: usize) -> ArenaVec<T> {
930 let layout = Layout::array::<T>(size).expect("Operation failed");
931 let ptr = self.allocate_raw(layout);
932
933 unsafe {
934 for i in 0..size {
936 std::ptr::write(ptr.as_ptr().add(i) as *mut T, T::default());
937 }
938
939 ArenaVec::new(ptr.as_ptr() as *mut T, size)
940 }
941 }
942
943 fn allocate_raw(&self, layout: Layout) -> NonNull<u8> {
945 let mut current = self.current_block.lock().expect("Operation failed");
946
947 if current.is_none()
948 || !current
949 .as_ref()
950 .expect("Operation failed")
951 .can_allocate(layout)
952 {
953 if let Some(old_block) = current.take() {
955 self.full_blocks
956 .lock()
957 .expect("Operation failed")
958 .push(old_block);
959 }
960 *current = Some(ArenaBlock::new(self.config.arena_block_size));
961 }
962
963 current.as_mut().expect("Operation failed").allocate(layout)
964 }
965
966 pub fn reset(&self) {
968 let mut current = self.current_block.lock().expect("Operation failed");
969 let mut full_blocks = self.full_blocks.lock().expect("Operation failed");
970
971 if let Some(block) = current.take() {
972 full_blocks.push(block);
973 }
974
975 for block in full_blocks.iter_mut() {
977 block.reset();
978 }
979
980 if let Some(block) = full_blocks.pop() {
982 *current = Some(block);
983 }
984
985 self.stats.reset();
986 }
987
988 pub fn statistics(&self) -> ArenaStatistics {
990 self.stats.clone()
991 }
992}
993
994impl Default for ClusteringArena {
995 fn default() -> Self {
996 Self::new()
997 }
998}
999
1000struct ArenaBlock {
1002 memory: NonNull<u8>,
1003 size: usize,
1004 offset: usize,
1005}
1006
1007unsafe impl Send for ArenaBlock {}
1009unsafe impl Sync for ArenaBlock {}
1010
1011impl ArenaBlock {
1012 fn new(size: usize) -> Self {
1013 let layout = Layout::from_size_align(size, 64).expect("Operation failed"); let memory =
1015 unsafe { NonNull::new(System.alloc(layout)).expect("Failed to allocate arena block") };
1016
1017 Self {
1018 memory,
1019 size,
1020 offset: 0,
1021 }
1022 }
1023
1024 fn can_allocate(&self, layout: Layout) -> bool {
1025 let aligned_offset = (self.offset + layout.align() - 1) & !(layout.align() - 1);
1026 aligned_offset + layout.size() <= self.size
1027 }
1028
1029 fn allocate(&mut self, layout: Layout) -> NonNull<u8> {
1030 assert!(self.can_allocate(layout));
1031
1032 self.offset = (self.offset + layout.align() - 1) & !(layout.align() - 1);
1034
1035 let ptr = unsafe { NonNull::new_unchecked(self.memory.as_ptr().add(self.offset)) };
1036 self.offset += layout.size();
1037
1038 ptr
1039 }
1040
1041 fn reset(&mut self) {
1042 self.offset = 0;
1043 }
1044}
1045
1046impl Drop for ArenaBlock {
1047 fn drop(&mut self) {
1048 let layout = Layout::from_size_align(self.size, 64).expect("Operation failed");
1049 unsafe {
1050 System.dealloc(self.memory.as_ptr(), layout);
1051 }
1052 }
1053}
1054
1055pub struct ArenaVec<T> {
1057 ptr: *mut T,
1058 len: usize,
1059 phantom: std::marker::PhantomData<T>,
1060}
1061
1062impl<T> ArenaVec<T> {
1063 fn new(ptr: *mut T, len: usize) -> Self {
1064 Self {
1065 ptr,
1066 len,
1067 phantom: std::marker::PhantomData,
1068 }
1069 }
1070
1071 pub fn as_mut_slice(&mut self) -> &mut [T] {
1073 unsafe { std::slice::from_raw_parts_mut(self.ptr, self.len) }
1074 }
1075
1076 pub fn as_slice(&mut self) -> &[T] {
1078 unsafe { std::slice::from_raw_parts(self.ptr, self.len) }
1079 }
1080
1081 pub fn len(&mut self) -> usize {
1083 self.len
1084 }
1085
1086 pub fn is_empty(&self) -> bool {
1088 self.len == 0
1089 }
1090}
1091
1092#[derive(Debug, Clone)]
1096pub struct PoolInfo {
1097 pub distance_buffer_count: usize,
1099 pub index_buffer_count: usize,
1101 pub matrix_buffer_count: usize,
1103 pub large_buffer_count: usize,
1105 pub total_memory_usage: usize,
1107 pub numa_node: i32,
1109 pub hit_rate: f64,
1111}
1112
1113#[derive(Debug)]
1115pub struct PoolStatistics {
1116 hits: std::sync::atomic::AtomicUsize,
1117 misses: std::sync::atomic::AtomicUsize,
1118 total_allocations: std::sync::atomic::AtomicUsize,
1119}
1120
1121impl PoolStatistics {
1122 fn new() -> Self {
1123 Self {
1124 hits: std::sync::atomic::AtomicUsize::new(0),
1125 misses: std::sync::atomic::AtomicUsize::new(0),
1126 total_allocations: std::sync::atomic::AtomicUsize::new(0),
1127 }
1128 }
1129
1130 fn record_hit(&self) {
1131 self.hits.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1132 }
1133
1134 fn record_miss(&self) {
1135 self.misses
1136 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1137 self.total_allocations
1138 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1139 }
1140
1141 fn reset(&self) {
1142 self.hits.store(0, std::sync::atomic::Ordering::Relaxed);
1143 self.misses.store(0, std::sync::atomic::Ordering::Relaxed);
1144 self.total_allocations
1145 .store(0, std::sync::atomic::Ordering::Relaxed);
1146 }
1147
1148 pub fn hit_rate(&self) -> f64 {
1150 let hits = self.hits.load(std::sync::atomic::Ordering::Relaxed);
1151 let total = hits + self.misses.load(std::sync::atomic::Ordering::Relaxed);
1152 if total == 0 {
1153 0.0
1154 } else {
1155 hits as f64 / total as f64 * 100.0
1156 }
1157 }
1158
1159 pub fn total_requests(&self) -> usize {
1161 self.hits.load(std::sync::atomic::Ordering::Relaxed)
1162 + self.misses.load(std::sync::atomic::Ordering::Relaxed)
1163 }
1164
1165 pub fn total_allocations(&self) -> usize {
1167 self.total_allocations
1168 .load(std::sync::atomic::Ordering::Relaxed)
1169 }
1170}
1171
1172impl Clone for PoolStatistics {
1173 fn clone(&self) -> Self {
1174 Self {
1175 hits: std::sync::atomic::AtomicUsize::new(
1176 self.hits.load(std::sync::atomic::Ordering::Relaxed),
1177 ),
1178 misses: std::sync::atomic::AtomicUsize::new(
1179 self.misses.load(std::sync::atomic::Ordering::Relaxed),
1180 ),
1181 total_allocations: std::sync::atomic::AtomicUsize::new(
1182 self.total_allocations
1183 .load(std::sync::atomic::Ordering::Relaxed),
1184 ),
1185 }
1186 }
1187}
1188
1189#[derive(Debug)]
1191pub struct ArenaStatistics {
1192 blocks_allocated: std::sync::atomic::AtomicUsize,
1193 total_memory: std::sync::atomic::AtomicUsize,
1194 active_objects: std::sync::atomic::AtomicUsize,
1195}
1196
1197impl ArenaStatistics {
1198 fn new() -> Self {
1199 Self {
1200 blocks_allocated: std::sync::atomic::AtomicUsize::new(0),
1201 total_memory: std::sync::atomic::AtomicUsize::new(0),
1202 active_objects: std::sync::atomic::AtomicUsize::new(0),
1203 }
1204 }
1205
1206 fn reset(&self) {
1207 self.blocks_allocated
1208 .store(0, std::sync::atomic::Ordering::Relaxed);
1209 self.total_memory
1210 .store(0, std::sync::atomic::Ordering::Relaxed);
1211 self.active_objects
1212 .store(0, std::sync::atomic::Ordering::Relaxed);
1213 }
1214
1215 pub fn blocks_allocated(&self) -> usize {
1217 self.blocks_allocated
1218 .load(std::sync::atomic::Ordering::Relaxed)
1219 }
1220
1221 pub fn total_memory(&self) -> usize {
1223 self.total_memory.load(std::sync::atomic::Ordering::Relaxed)
1224 }
1225
1226 pub fn active_objects(&self) -> usize {
1228 self.active_objects
1229 .load(std::sync::atomic::Ordering::Relaxed)
1230 }
1231}
1232
1233impl Clone for ArenaStatistics {
1234 fn clone(&self) -> Self {
1235 Self {
1236 blocks_allocated: std::sync::atomic::AtomicUsize::new(
1237 self.blocks_allocated
1238 .load(std::sync::atomic::Ordering::Relaxed),
1239 ),
1240 total_memory: std::sync::atomic::AtomicUsize::new(
1241 self.total_memory.load(std::sync::atomic::Ordering::Relaxed),
1242 ),
1243 active_objects: std::sync::atomic::AtomicUsize::new(
1244 self.active_objects
1245 .load(std::sync::atomic::Ordering::Relaxed),
1246 ),
1247 }
1248 }
1249}
1250
1251#[derive(Debug, Clone)]
1253pub struct NumaTopology {
1254 pub nodes: Vec<NumaNode>,
1256}
1257
1258#[derive(Debug, Clone)]
1260pub struct NumaNode {
1261 pub id: u32,
1263 pub total_memory_bytes: u64,
1265 pub available_memory_bytes: u64,
1267 pub cpu_count: u32,
1269}
1270
1271impl Default for NumaTopology {
1272 fn default() -> Self {
1273 Self {
1274 nodes: vec![NumaNode {
1275 id: 0,
1276 total_memory_bytes: 8 * 1024 * 1024 * 1024, available_memory_bytes: 4 * 1024 * 1024 * 1024, cpu_count: 4, }],
1280 }
1281 }
1282}
1283
1284impl NumaTopology {
1285 pub fn get_optimal_node(&self) -> u32 {
1287 if !self.nodes.is_empty() {
1290 self.nodes[0].id
1291 } else {
1292 0
1293 }
1294 }
1295
1296 pub fn get_node_with_most_memory(&self) -> Option<u32> {
1298 self.nodes
1299 .iter()
1300 .max_by_key(|node| node.available_memory_bytes)
1301 .map(|node| node.id)
1302 }
1303
1304 pub fn total_system_memory(&self) -> u64 {
1306 self.nodes.iter().map(|node| node.total_memory_bytes).sum()
1307 }
1308
1309 pub fn total_available_memory(&self) -> u64 {
1311 self.nodes
1312 .iter()
1313 .map(|node| node.available_memory_bytes)
1314 .sum()
1315 }
1316
1317 pub fn has_node(&self, _nodeid: u32) -> bool {
1319 self.nodes.iter().any(|node| node.id == _nodeid)
1320 }
1321
1322 pub fn get_node_info(&self, _nodeid: u32) -> Option<&NumaNode> {
1324 self.nodes.iter().find(|node| node.id == _nodeid)
1325 }
1326}
1327
1328static GLOBAL_DISTANCE_POOL: std::sync::OnceLock<DistancePool> = std::sync::OnceLock::new();
1330static GLOBAL_CLUSTERING_ARENA: std::sync::OnceLock<ClusteringArena> = std::sync::OnceLock::new();
1331
1332#[allow(dead_code)]
1334pub fn global_distance_pool() -> &'static DistancePool {
1335 GLOBAL_DISTANCE_POOL.get_or_init(|| DistancePool::new(1000))
1336}
1337
1338#[allow(dead_code)]
1340pub fn global_clustering_arena() -> &'static ClusteringArena {
1341 GLOBAL_CLUSTERING_ARENA.get_or_init(ClusteringArena::new)
1342}
1343
1344#[allow(dead_code)]
1346pub fn create_numa_optimized_pool(capacity: usize) -> DistancePool {
1347 let config = MemoryPoolConfig {
1348 numa_aware: true,
1349 auto_numa_discovery: true,
1350 enable_thread_affinity: true,
1351 ..Default::default()
1352 };
1353
1354 DistancePool::with_config(capacity, config)
1355}
1356
1357#[allow(dead_code)]
1359pub fn get_numa_topology() -> NumaTopology {
1360 DistancePool::get_numa_topology()
1361}
1362
1363#[allow(dead_code)]
1365pub fn test_numa_capabilities() -> NumaCapabilities {
1366 NumaCapabilities::detect()
1367}
1368
1369#[derive(Debug, Clone)]
1371pub struct NumaCapabilities {
1372 pub numa_available: bool,
1374 pub num_nodes: u32,
1376 pub memory_binding_supported: bool,
1378 pub thread_affinity_supported: bool,
1380 pub platform_details: String,
1382}
1383
1384impl NumaCapabilities {
1385 pub fn detect() -> Self {
1387 #[cfg(target_os = "linux")]
1388 {
1389 Self::detect_linux()
1390 }
1391 #[cfg(target_os = "windows")]
1392 {
1393 Self::detect_windows()
1394 }
1395 #[cfg(not(any(target_os = "linux", target_os = "windows")))]
1396 {
1397 Self {
1398 numa_available: false,
1399 num_nodes: 1,
1400 memory_binding_supported: false,
1401 thread_affinity_supported: false,
1402 platform_details: "Unsupported platform".to_string(),
1403 }
1404 }
1405 }
1406
1407 #[cfg(target_os = "linux")]
1408 fn detect_linux() -> Self {
1409 let numa_available = std::path::Path::new("/sys/devices/system/node").exists();
1410 let num_nodes = if numa_available {
1411 DistancePool::get_numa_topology().nodes.len() as u32
1412 } else {
1413 1
1414 };
1415
1416 Self {
1417 numa_available,
1418 num_nodes,
1419 memory_binding_supported: numa_available,
1420 thread_affinity_supported: true, platform_details: format!("Linux with {num_nodes} NUMA nodes"),
1422 }
1423 }
1424
1425 #[cfg(target_os = "windows")]
1426 fn detect_windows() -> Self {
1427 Self {
1428 numa_available: true, num_nodes: 1, memory_binding_supported: true,
1431 thread_affinity_supported: true,
1432 platform_details: "Windows NUMA support".to_string(),
1433 }
1434 }
1435
1436 pub fn should_enable_numa(&self) -> bool {
1438 self.numa_available && self.num_nodes > 1
1439 }
1440
1441 pub fn recommended_memory_strategy(&self) -> &'static str {
1443 if self.should_enable_numa() {
1444 "NUMA-aware"
1445 } else {
1446 "Standard"
1447 }
1448 }
1449}
1450
1451#[cfg(test)]
1452mod tests {
1453 use super::*;
1454
1455 #[test]
1456 fn test_distance_pool() {
1457 let pool = DistancePool::new(10);
1458
1459 let mut buffer1 = pool.get_distance_buffer(100);
1461 assert_eq!(buffer1.len(), 100);
1462
1463 buffer1.as_mut_slice()[0] = 42.0;
1465 assert_eq!(buffer1.as_slice()[0], 42.0);
1466
1467 let buffer2 = pool.get_distance_buffer(50);
1469 assert_eq!(buffer2.len(), 50);
1470
1471 drop(buffer1);
1473
1474 let buffer3 = pool.get_distance_buffer(100);
1476 assert_eq!(buffer3.len(), 100);
1477 }
1479
1480 #[test]
1481 fn test_arena_allocator() {
1482 let arena = ClusteringArena::new();
1483
1484 let mut vec1 = arena.alloc_temp_vec::<f64>(100);
1486 let mut vec2 = arena.alloc_temp_vec::<usize>(50);
1487
1488 vec1.as_mut_slice()[0] = std::f64::consts::PI;
1490 vec2.as_mut_slice()[0] = 42;
1491
1492 assert_eq!(vec1.as_slice()[0], std::f64::consts::PI);
1493 assert_eq!(vec2.as_slice()[0], 42);
1494
1495 arena.reset();
1497
1498 let mut vec3 = arena.alloc_temp_vec::<f64>(200);
1500 vec3.as_mut_slice()[0] = 2.71;
1501 assert_eq!(vec3.as_slice()[0], 2.71);
1502 }
1503
1504 #[test]
1505 fn test_pool_statistics() {
1506 let pool = DistancePool::new(2);
1507
1508 let stats = pool.statistics();
1510 assert_eq!(stats.total_requests(), 0);
1511 assert_eq!(stats.total_allocations(), 0);
1512
1513 let _buffer1 = pool.get_distance_buffer(100);
1515 let stats = pool.statistics();
1516 assert_eq!(stats.total_requests(), 1);
1517 assert_eq!(stats.total_allocations(), 1);
1518 assert!(stats.hit_rate() < 1.0);
1519
1520 drop(_buffer1);
1522 let _buffer2 = pool.get_distance_buffer(100);
1523 let stats = pool.statistics();
1524 assert_eq!(stats.total_requests(), 2);
1525 assert_eq!(stats.total_allocations(), 1); assert!(stats.hit_rate() > 0.0);
1527 }
1528
1529 #[test]
1530 fn test_matrix_buffer() {
1531 let pool = DistancePool::new(5);
1532
1533 let mut matrix = pool.get_matrix_buffer(10, 10);
1534 assert_eq!(matrix.dim(), (10, 10));
1535
1536 matrix.fill(42.0);
1537 drop(matrix);
1540
1541 let mut matrix2 = pool.get_matrix_buffer(8, 8);
1543 assert_eq!(matrix2.dim(), (8, 8));
1544 }
1545
1546 #[test]
1547 fn test_global_pools() {
1548 let pool = global_distance_pool();
1550 let arena = global_clustering_arena();
1551
1552 let buffer = pool.get_distance_buffer(10);
1553 let _vec = arena.alloc_temp_vec::<f64>(10);
1554
1555 }
1557
1558 #[cfg(target_os = "windows")]
1559 #[test]
1560 fn test_windows_numa_fallback_returns_ok() {
1561 let result = DistancePool::allocate_on_numa_node_windows(1024, 0);
1562 assert!(result.is_ok());
1563 let buf = result.expect("allocation should succeed");
1564 assert_eq!(buf.len(), 1024);
1565 }
1566}