1use ::ndarray::{ArrayBase, Data, Dimension, Ix2, IxDyn, ViewRepr};
56use std::alloc::{alloc, dealloc, Layout};
57use std::any::TypeId;
58use std::collections::{HashMap, VecDeque};
59use std::marker::PhantomData;
60use std::ptr::NonNull;
61use std::sync::{
62 atomic::{AtomicUsize, Ordering},
63 Arc, Mutex,
64};
65use std::time::{Duration, Instant};
66
67pub struct ChunkProcessor<'a, A, S, D>
69where
70 S: Data<Elem = A>,
71 D: Dimension,
72{
73 array: &'a ArrayBase<S, D>,
74 chunkshape: D,
76 #[allow(dead_code)]
78 position: D,
79}
80
81impl<'a, A, S, D> ChunkProcessor<'a, A, S, D>
82where
83 S: Data<Elem = A>,
84 D: Dimension,
85{
86 pub fn new(array: &'a ArrayBase<S, D>, chunkshape: D) -> Self {
88 let position = D::zeros(array.ndim());
89 Self {
90 array,
91 chunkshape,
92 position,
93 }
94 }
95
96 pub fn process_chunks_dyn<F>(&mut self, mut f: F)
99 where
100 F: FnMut(&ArrayBase<ViewRepr<&A>, IxDyn>, IxDyn),
101 {
102 use ::ndarray::{IntoDimension, Slice};
103
104 let arrayshape = self.array.shape();
106 let chunkshape = self.chunkshape.slice();
107
108 let mut num_chunks_per_dim = vec![];
110 for i in 0..arrayshape.len() {
111 let n_chunks = arrayshape[i].div_ceil(chunkshape[i]);
112 num_chunks_per_dim.push(n_chunks);
113 }
114
115 let mut chunk_indices = vec![0; arrayshape.len()];
117 loop {
118 let mut slices = vec![];
120 let mut position_vec = vec![];
121
122 for i in 0..arrayshape.len() {
123 let start = chunk_indices[i] * chunkshape[i];
124 let end = ((chunk_indices[i] + 1) * chunkshape[i]).min(arrayshape[i]);
125 slices.push(Slice::from(start..end));
126 position_vec.push(start);
127 }
128
129 let position = position_vec.into_dimension();
131
132 let dyn_array = self.array.view().into_dyn();
135
136 use ndarray::{SliceInfo, SliceInfoElem};
138 let slice_elems: Vec<SliceInfoElem> = slices
139 .into_iter()
140 .map(|s| SliceInfoElem::Slice {
141 start: s.start,
142 end: s.end,
143 step: s.step,
144 })
145 .collect();
146
147 let slice_info = unsafe {
148 SliceInfo::<Vec<SliceInfoElem>, IxDyn, IxDyn>::new(slice_elems)
149 .expect("Failed to create slice info")
150 };
151
152 let view = dyn_array.slice(slice_info);
153 f(&view, position);
154
155 let mut carry = true;
157 for i in 0..chunk_indices.len() {
158 if carry {
159 chunk_indices[i] += 1;
160 if chunk_indices[i] >= num_chunks_per_dim[i] {
161 chunk_indices[i] = 0;
162 } else {
163 carry = false;
164 }
165 }
166 }
167
168 if carry {
170 break;
171 }
172 }
173 }
174
175 pub fn num_chunks(&self) -> usize {
177 let arrayshape = self.array.shape();
178 let chunkshape = self.chunkshape.slice();
179
180 let mut total_chunks = 1;
181 for i in 0..arrayshape.len() {
182 let n_chunks = arrayshape[i].div_ceil(chunkshape[i]);
183 total_chunks *= n_chunks;
184 }
185
186 total_chunks
187 }
188}
189
190pub struct ChunkProcessor2D<'a, A, S>
192where
193 S: Data<Elem = A>,
194{
195 array: &'a ArrayBase<S, Ix2>,
196 chunkshape: (usize, usize),
197 #[allow(dead_code)]
199 current_row: usize,
200 #[allow(dead_code)]
201 current_col: usize,
202}
203
204impl<'a, A, S> ChunkProcessor2D<'a, A, S>
205where
206 S: Data<Elem = A>,
207{
208 pub fn new(array: &'a ArrayBase<S, Ix2>, chunkshape: (usize, usize)) -> Self {
210 Self {
211 array,
212 chunkshape,
213 current_row: 0,
214 current_col: 0,
215 }
216 }
217
218 pub fn process_chunks<F>(&mut self, mut f: F)
220 where
221 F: FnMut(&ArrayBase<ViewRepr<&A>, Ix2>, (usize, usize)),
222 {
223 let (rows, cols) = self.array.dim();
224 let (chunk_rows, chunk_cols) = self.chunkshape;
225
226 for row_start in (0..rows).step_by(chunk_rows) {
227 for col_start in (0..cols).step_by(chunk_cols) {
228 let row_end = (row_start + chunk_rows).min(rows);
229 let col_end = (col_start + chunk_cols).min(cols);
230
231 let chunk = self
233 .array
234 .slice(crate::s![row_start..row_end, col_start..col_end]);
235
236 f(&chunk, (row_start, col_start));
238 }
239 }
240 }
241}
242
243#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
245pub enum AllocationStrategy {
246 System,
248 Pool,
250 Arena,
252 Stack,
254 NumaAware,
256 CacheAligned,
258 HugePage,
260 MemoryMapped,
262}
263
264#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
266pub enum MemoryPressure {
267 Low,
268 Medium,
269 High,
270 Critical,
271}
272
273#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
275pub enum AccessPattern {
276 Sequential,
277 Random,
278 Temporal,
279 Spatial,
280 WriteOnce,
281 ReadOnly,
282 Streaming,
283}
284
285#[derive(Debug, Clone)]
287pub struct MemoryConfig {
288 pub strategy: AllocationStrategy,
290 pub access_pattern: AccessPattern,
292 pub enable_prefetch: bool,
294 pub alignment: usize,
296 pub numa_aware: bool,
298 pub max_memory: Option<usize>,
300 pub enable_compression: bool,
302 pub pressure_threshold: f64,
304}
305
306impl Default for MemoryConfig {
307 fn default() -> Self {
308 Self {
309 strategy: AllocationStrategy::Pool,
310 access_pattern: AccessPattern::Sequential,
311 enable_prefetch: true,
312 alignment: 64, numa_aware: false,
314 max_memory: None,
315 enable_compression: false,
316 pressure_threshold: 0.8,
317 }
318 }
319}
320
321pub struct AdvancedBufferPool<T: Clone + Default> {
323 vectors: Vec<Vec<T>>,
324 arrays: Vec<crate::ndarray::Array1<T>>,
325 config: MemoryConfig,
326 stats: PoolStatistics,
327 size_classes: Vec<SizeClass<T>>,
328 arena_allocator: Option<ArenaAllocator>,
329 numa_topology: NumaTopology,
330}
331
332#[derive(Debug, Default)]
334pub struct PoolStatistics {
335 pub total_allocations: AtomicUsize,
336 pub total_deallocations: AtomicUsize,
337 pub pool_hits: AtomicUsize,
338 pub pool_misses: AtomicUsize,
339 pub bytes_allocated: AtomicUsize,
340 pub bytes_deallocated: AtomicUsize,
341 pub peak_memory: AtomicUsize,
342}
343
344impl Clone for PoolStatistics {
345 fn clone(&self) -> Self {
346 Self {
347 total_allocations: AtomicUsize::new(self.total_allocations.load(Ordering::SeqCst)),
348 total_deallocations: AtomicUsize::new(self.total_deallocations.load(Ordering::SeqCst)),
349 pool_hits: AtomicUsize::new(self.pool_hits.load(Ordering::SeqCst)),
350 pool_misses: AtomicUsize::new(self.pool_misses.load(Ordering::SeqCst)),
351 bytes_allocated: AtomicUsize::new(self.bytes_allocated.load(Ordering::SeqCst)),
352 bytes_deallocated: AtomicUsize::new(self.bytes_deallocated.load(Ordering::SeqCst)),
353 peak_memory: AtomicUsize::new(self.peak_memory.load(Ordering::SeqCst)),
354 }
355 }
356}
357
358#[derive(Debug)]
360struct SizeClass<T> {
361 size: usize,
362 buffers: VecDeque<Vec<T>>,
363 max_buffers: usize,
364}
365
366#[derive(Debug)]
368struct ArenaAllocator {
369 chunks: Vec<ArenaChunk>,
370 current_chunk: usize,
371 chunk_size: usize,
372}
373
374#[derive(Debug)]
376struct ArenaChunk {
377 ptr: NonNull<u8>,
378 size: usize,
379 offset: usize,
380}
381
382unsafe impl Send for ArenaChunk {}
385unsafe impl Sync for ArenaChunk {}
386
387#[derive(Debug, Clone)]
389struct NumaTopology {
390 nodes: Vec<NumaNode>,
391 current_node: usize,
392}
393
394#[derive(Debug, Clone)]
396struct NumaNode {
397 id: usize,
398 memory_size: usize,
399 cpu_cores: Vec<usize>,
400}
401
402pub struct BufferPool<T: Clone + Default> {
404 inner: AdvancedBufferPool<T>,
405}
406
407impl<T: Clone + Default> AdvancedBufferPool<T> {
408 pub fn new() -> Self {
410 Self::with_config(MemoryConfig::default())
411 }
412
413 pub fn with_config(config: MemoryConfig) -> Self {
415 let size_classes = Self::create_size_classes();
416 let numa_topology = Self::detect_numa_topology();
417
418 Self {
419 vectors: Vec::new(),
420 arrays: Vec::new(),
421 config,
422 stats: PoolStatistics::default(),
423 size_classes,
424 arena_allocator: None,
425 numa_topology,
426 }
427 }
428
429 fn create_size_classes() -> Vec<SizeClass<T>> {
431 let sizes = [
432 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768, 65536,
433 ];
434 sizes
435 .iter()
436 .map(|&size| SizeClass {
437 size,
438 buffers: VecDeque::new(),
439 max_buffers: 16, })
441 .collect()
442 }
443
444 fn detect_numa_topology() -> NumaTopology {
446 NumaTopology {
448 nodes: vec![NumaNode {
449 id: 0,
450 memory_size: 16 * 1024 * 1024 * 1024, cpu_cores: (0..std::thread::available_parallelism()
452 .expect("Operation failed")
453 .get())
454 .collect(),
455 }],
456 current_node: 0,
457 }
458 }
459
460 pub fn memory_pressure(&self) -> MemoryPressure {
462 let used_memory = self.stats.bytes_allocated.load(Ordering::Relaxed);
463 let max_memory = self.config.max_memory.unwrap_or(usize::MAX);
464
465 let pressure_ratio = used_memory as f64 / max_memory as f64;
466
467 if pressure_ratio > 0.9 {
468 MemoryPressure::Critical
469 } else if pressure_ratio > 0.7 {
470 MemoryPressure::High
471 } else if pressure_ratio > 0.5 {
472 MemoryPressure::Medium
473 } else {
474 MemoryPressure::Low
475 }
476 }
477
478 pub fn acquire_vec_advanced(&mut self, capacity: usize) -> Vec<T> {
480 self.stats.total_allocations.fetch_add(1, Ordering::Relaxed);
481
482 match self.config.strategy {
483 AllocationStrategy::Pool => self.acquire_from_pool(capacity),
484 AllocationStrategy::Arena => self.acquire_from_arena(capacity),
485 AllocationStrategy::NumaAware => self.acquire_numa_aware(capacity),
486 AllocationStrategy::CacheAligned => self.acquire_cache_aligned(capacity),
487 _ => self.acquire_system(capacity),
488 }
489 }
490
491 fn acquire_from_pool(&mut self, capacity: usize) -> Vec<T> {
493 for size_class in &mut self.size_classes {
495 if size_class.size >= capacity {
496 if let Some(mut vec) = size_class.buffers.pop_front() {
497 self.stats.pool_hits.fetch_add(1, Ordering::Relaxed);
498 vec.clear();
499 vec.resize(capacity, T::default());
500 return vec;
501 }
502 break;
503 }
504 }
505
506 self.stats.pool_misses.fetch_add(1, Ordering::Relaxed);
507 vec![T::default(); capacity]
508 }
509
510 fn acquire_from_arena(&mut self, capacity: usize) -> Vec<T> {
512 if self.arena_allocator.is_none() {
513 self.arena_allocator = Some(ArenaAllocator::new(1024 * 1024)); }
515
516 vec![T::default(); capacity]
519 }
520
521 fn acquire_numa_aware(&mut self, capacity: usize) -> Vec<T> {
523 vec![T::default(); capacity]
525 }
526
527 fn acquire_cache_aligned(&mut self, capacity: usize) -> Vec<T> {
529 let mut vec =
531 Vec::with_capacity(capacity + self.config.alignment / std::mem::size_of::<T>());
532 vec.resize(capacity, T::default());
533 vec
534 }
535
536 fn acquire_system(&mut self, capacity: usize) -> Vec<T> {
538 vec![T::default(); capacity]
539 }
540
541 pub fn release_vec_advanced(&mut self, vec: Vec<T>) {
543 self.stats
544 .total_deallocations
545 .fetch_add(1, Ordering::Relaxed);
546
547 match self.config.strategy {
548 AllocationStrategy::Pool => self.release_to_pool(vec),
549 AllocationStrategy::Arena => self.release_to_arena(vec),
550 _ => {} }
552 }
553
554 fn release_to_pool(&mut self, vec: Vec<T>) {
556 let capacity = vec.capacity();
557
558 for size_class in &mut self.size_classes {
560 if size_class.size >= capacity {
561 if size_class.buffers.len() < size_class.max_buffers {
562 size_class.buffers.push_back(vec);
563 }
564 break;
565 }
566 }
567 }
568
569 fn release_to_arena(&mut self, _vec: Vec<T>) {
571 }
573
574 pub unsafe fn prefetch_memory(&self, ptr: *const T, size: usize) {
584 if self.config.enable_prefetch {
585 unsafe {
586 for i in 0..size {
587 let addr = ptr.add(i) as *const u8;
588 #[cfg(target_arch = "x86_64")]
589 {
590 use std::arch::x86_64::*;
591 _mm_prefetch(addr as *const i8, _MM_HINT_T0);
592 }
593 }
594 }
595 }
596 }
597
598 pub fn get_statistics(&self) -> PoolStatistics {
600 self.stats.clone()
601 }
602
603 pub fn reset_arena(&mut self) {
605 if let Some(ref mut arena) = self.arena_allocator {
606 arena.reset();
607 }
608 }
609
610 pub fn compact(&mut self) {
612 for size_class in &mut self.size_classes {
613 let target_size = size_class.max_buffers / 2;
615 while size_class.buffers.len() > target_size {
616 size_class.buffers.pop_back();
617 }
618 }
619 }
620
621 pub fn memory_report(&self) -> MemoryReport {
623 let allocated = self.stats.bytes_allocated.load(Ordering::Relaxed);
624 let deallocated = self.stats.bytes_deallocated.load(Ordering::Relaxed);
625 let pool_efficiency = {
626 let hits = self.stats.pool_hits.load(Ordering::Relaxed);
627 let total = hits + self.stats.pool_misses.load(Ordering::Relaxed);
628 if total > 0 {
629 hits as f64 / total as f64
630 } else {
631 0.0
632 }
633 };
634
635 MemoryReport {
636 current_usage: allocated.saturating_sub(deallocated),
637 peak_usage: self.stats.peak_memory.load(Ordering::Relaxed),
638 pool_efficiency,
639 pressure_level: self.memory_pressure(),
640 fragmentation_ratio: self.calculate_fragmentation(),
641 }
642 }
643
644 fn calculate_fragmentation(&self) -> f64 {
646 let mut total_pooled = 0;
648 let mut total_capacity = 0;
649
650 for size_class in &self.size_classes {
651 total_pooled += size_class.buffers.len() * size_class.size;
652 total_capacity += size_class.max_buffers * size_class.size;
653 }
654
655 if total_capacity > 0 {
656 1.0 - (total_pooled as f64 / total_capacity as f64)
657 } else {
658 0.0
659 }
660 }
661}
662
663impl<T: Clone + Default> Default for AdvancedBufferPool<T> {
664 fn default() -> Self {
665 Self::new()
666 }
667}
668
669#[derive(Debug, Clone)]
671pub struct MemoryReport {
672 pub current_usage: usize,
673 pub peak_usage: usize,
674 pub pool_efficiency: f64,
675 pub pressure_level: MemoryPressure,
676 pub fragmentation_ratio: f64,
677}
678
679impl<T: Clone + Default> BufferPool<T> {
680 pub fn new() -> Self {
682 Self {
683 inner: AdvancedBufferPool::new(),
684 }
685 }
686
687 pub fn with_config(config: MemoryConfig) -> Self {
689 Self {
690 inner: AdvancedBufferPool::with_config(config),
691 }
692 }
693
694 pub fn acquire_vec(&mut self, capacity: usize) -> Vec<T> {
696 self.inner.acquire_vec_advanced(capacity)
697 }
698
699 pub fn release_vec(&mut self, vec: Vec<T>) {
701 self.inner.release_vec_advanced(vec);
702 }
703
704 pub fn acquire_array(&mut self, size: usize) -> crate::ndarray::Array1<T> {
706 for i in 0..self.inner.arrays.len() {
708 if self.inner.arrays[i].len() >= size {
709 let mut array = self.inner.arrays.swap_remove(i);
711 if array.len() != size {
713 array = crate::ndarray::Array1::from_elem(size, T::default());
714 }
715 return array;
716 }
717 }
718
719 crate::ndarray::Array1::from_elem(size, T::default())
721 }
722
723 pub fn release_array(&mut self, array: crate::ndarray::Array1<T>) {
725 self.inner.arrays.push(array);
727 }
728
729 pub fn clear(&mut self) {
731 self.inner.vectors.clear();
732 self.inner.arrays.clear();
733 }
734
735 pub fn get_statistics(&self) -> PoolStatistics {
737 self.inner.get_statistics()
738 }
739
740 pub fn memory_report(&self) -> MemoryReport {
742 self.inner.memory_report()
743 }
744
745 pub fn compact(&mut self) {
747 self.inner.compact();
748 }
749}
750
751impl<T: Clone + Default> Default for BufferPool<T> {
752 fn default() -> Self {
753 Self::new()
754 }
755}
756
757pub struct GlobalBufferPool {
759 pools: Mutex<HashMap<TypeId, Box<dyn std::any::Any + Send + Sync>>>,
761}
762
763impl GlobalBufferPool {
764 pub fn new() -> Self {
766 Self {
767 pools: Mutex::new(HashMap::new()),
768 }
769 }
770
771 pub fn get_pool<T: Clone + Default + 'static + Send + Sync>(
773 &self,
774 ) -> Arc<Mutex<BufferPool<T>>> {
775 let type_id = TypeId::of::<T>();
776 let mut pools = self.pools.lock().expect("Operation failed");
777
778 use std::collections::hash_map::Entry;
779 match pools.entry(type_id) {
780 Entry::Vacant(entry) => {
781 let pool = Arc::new(Mutex::new(BufferPool::<T>::new()));
783 entry.insert(Box::new(pool.clone()));
784 pool
785 }
786 Entry::Occupied(entry) => {
787 match entry.get().downcast_ref::<Arc<Mutex<BufferPool<T>>>>() {
789 Some(pool) => pool.clone(),
790 None => panic!("Type mismatch in global buffer pool"),
791 }
792 }
793 }
794 }
795
796 pub fn clear_all(&self) {
798 let mut pools = self.pools.lock().expect("Operation failed");
799 pools.clear();
800 }
801}
802
803impl Default for GlobalBufferPool {
805 fn default() -> Self {
806 Self::new()
807 }
808}
809
810#[allow(dead_code)]
812pub fn global_buffer_pool() -> &'static GlobalBufferPool {
813 use once_cell::sync::Lazy;
814 static GLOBAL_POOL: Lazy<GlobalBufferPool> = Lazy::new(GlobalBufferPool::new);
815 &GLOBAL_POOL
816}
817
818pub struct ZeroCopyView<'a, T, D>
820where
821 D: Dimension,
822{
823 phantom: PhantomData<T>,
824 inner: crate::ndarray::ArrayView<'a, T, D>,
825}
826
827impl<'a, T, D> ZeroCopyView<'a, T, D>
828where
829 D: Dimension,
830{
831 pub fn new(array: &'a crate::ndarray::Array<T, D>) -> Self {
833 Self {
834 phantom: PhantomData,
835 inner: array.view(),
836 }
837 }
838
839 pub fn view(&self) -> crate::ndarray::ArrayView<'a, T, D> {
841 self.inner.clone()
842 }
843
844 pub fn transform<F, U>(&self, f: F) -> crate::ndarray::Array<U, D>
846 where
847 F: Fn(&T) -> U,
848 U: Clone,
849 {
850 self.inner.map(f)
851 }
852}
853
854pub struct MemoryTracker {
856 allocations: Mutex<HashMap<String, usize>>,
857}
858
859impl MemoryTracker {
860 pub fn new() -> Self {
862 Self {
863 allocations: Mutex::new(HashMap::new()),
864 }
865 }
866}
867
868impl Default for MemoryTracker {
869 fn default() -> Self {
870 Self::new()
871 }
872}
873
874impl MemoryTracker {
875 pub fn track_allocation(&self, name: &str, size: usize) {
877 let mut allocations = self.allocations.lock().expect("Operation failed");
878 *allocations.entry(name.to_string()).or_insert(0) += size;
879 }
880
881 pub fn track_deallocation(&self, name: &str, size: usize) {
883 let mut allocations = self.allocations.lock().expect("Operation failed");
884 if let Some(current) = allocations.get_mut(name) {
885 *current = current.saturating_sub(size);
886 }
887 }
888
889 pub fn get_usage(&self, name: &str) -> usize {
891 let allocations = self.allocations.lock().expect("Operation failed");
892 allocations.get(name).copied().unwrap_or_default()
893 }
894
895 pub fn get_total_usage(&self) -> usize {
897 let allocations = self.allocations.lock().expect("Operation failed");
898 allocations.values().sum()
899 }
900
901 pub fn reset(&self) {
903 let mut allocations = self.allocations.lock().expect("Operation failed");
904 allocations.clear();
905 }
906}
907
908#[allow(dead_code)]
910pub fn global_memory_tracker() -> &'static MemoryTracker {
911 use once_cell::sync::Lazy;
912 static GLOBAL_TRACKER: Lazy<MemoryTracker> = Lazy::new(MemoryTracker::new);
913 &GLOBAL_TRACKER
914}
915
916impl ArenaAllocator {
917 fn new(chunk_size: usize) -> Self {
919 Self {
920 chunks: Vec::new(),
921 current_chunk: 0,
922 chunk_size,
923 }
924 }
925
926 fn allocate(&mut self, size: usize, align: usize) -> Option<NonNull<u8>> {
928 if self.chunks.is_empty() || !self.can_allocate_in_current_chunk(size, align) {
929 self.add_chunk();
930 }
931
932 if let Some(chunk) = self.chunks.get_mut(self.current_chunk) {
933 let aligned_offset = (chunk.offset + align - 1) & !(align - 1);
934 if aligned_offset + size <= chunk.size {
935 let ptr = unsafe { chunk.ptr.as_ptr().add(aligned_offset) };
936 chunk.offset = aligned_offset + size;
937 NonNull::new(ptr)
938 } else {
939 None
940 }
941 } else {
942 None
943 }
944 }
945
946 fn can_allocate_in_current_chunk(&self, size: usize, align: usize) -> bool {
948 if let Some(chunk) = self.chunks.get(self.current_chunk) {
949 let aligned_offset = (chunk.offset + align - 1) & !(align - 1);
950 aligned_offset + size <= chunk.size
951 } else {
952 false
953 }
954 }
955
956 fn add_chunk(&mut self) {
958 let layout = Layout::from_size_align(self.chunk_size, 64).expect("Operation failed");
959 if let Some(ptr) = NonNull::new(unsafe { alloc(layout) }) {
960 self.chunks.push(ArenaChunk {
961 ptr,
962 size: self.chunk_size,
963 offset: 0,
964 });
965 self.current_chunk = self.chunks.len() - 1;
966 }
967 }
968
969 fn reset(&mut self) {
971 for chunk in &mut self.chunks {
972 chunk.offset = 0;
973 }
974 self.current_chunk = 0;
975 }
976}
977
978impl Drop for ArenaAllocator {
979 fn drop(&mut self) {
980 for chunk in &self.chunks {
981 let layout = Layout::from_size_align(chunk.size, 64).expect("Operation failed");
982 unsafe {
983 dealloc(chunk.ptr.as_ptr(), layout);
984 }
985 }
986 }
987}
988
989pub struct SmartAllocator {
991 config: MemoryConfig,
992 usage_history: VecDeque<AllocationRecord>,
993 current_strategy: AllocationStrategy,
994 performance_metrics: AllocationMetrics,
995}
996
997#[derive(Debug, Clone)]
999struct AllocationRecord {
1000 size: usize,
1001 timestamp: Instant,
1002 access_pattern: AccessPattern,
1003 lifetime: Option<Duration>,
1004}
1005
1006#[derive(Debug, Clone, Default)]
1008pub struct AllocationMetrics {
1009 pub total_allocations: usize,
1010 pub average_allocation_time: Duration,
1011 pub memory_efficiency: f64,
1012 pub cache_hit_ratio: f64,
1013}
1014
1015impl SmartAllocator {
1016 pub fn new(config: MemoryConfig) -> Self {
1018 Self {
1019 current_strategy: config.strategy,
1020 config,
1021 usage_history: VecDeque::with_capacity(1000),
1022 performance_metrics: AllocationMetrics::default(),
1023 }
1024 }
1025
1026 pub fn allocate(&mut self, size: usize) -> Vec<u8> {
1028 let start_time = Instant::now();
1029
1030 self.usage_history.push_back(AllocationRecord {
1032 size,
1033 timestamp: start_time,
1034 access_pattern: self.config.access_pattern,
1035 lifetime: None,
1036 });
1037
1038 self.adapt_strategy();
1040
1041 let result = match self.current_strategy {
1043 AllocationStrategy::Pool => self.allocate_pooled(size),
1044 AllocationStrategy::Arena => self.allocate_arena(size),
1045 AllocationStrategy::CacheAligned => self.allocate_aligned(size),
1046 _ => vec![0; size],
1047 };
1048
1049 let allocation_time = start_time.elapsed();
1051 self.update_metrics(allocation_time);
1052
1053 result
1054 }
1055
1056 fn adapt_strategy(&mut self) {
1058 if self.usage_history.len() < 10 {
1059 return; }
1061
1062 let recent_allocations: Vec<_> = self.usage_history.iter().rev().take(100).collect();
1063
1064 let avg_size: usize =
1066 recent_allocations.iter().map(|r| r.size).sum::<usize>() / recent_allocations.len();
1067 let has_repeating_sizes = self.has_repeating_sizes(&recent_allocations);
1068 let is_temporal_locality = self.has_temporal_locality(&recent_allocations);
1069
1070 if has_repeating_sizes && avg_size < 4096 {
1072 self.current_strategy = AllocationStrategy::Pool;
1073 } else if is_temporal_locality {
1074 self.current_strategy = AllocationStrategy::Arena;
1075 } else if avg_size > 1024 * 1024 {
1076 self.current_strategy = AllocationStrategy::HugePage;
1077 } else {
1078 self.current_strategy = AllocationStrategy::CacheAligned;
1079 }
1080 }
1081
1082 fn has_repeating_sizes(&self, records: &[&AllocationRecord]) -> bool {
1084 let mut size_counts = HashMap::new();
1085 for record in records {
1086 *size_counts.entry(record.size).or_insert(0) += 1;
1087 }
1088 size_counts.values().any(|&count| count > records.len() / 4)
1089 }
1090
1091 fn has_temporal_locality(&self, records: &[&AllocationRecord]) -> bool {
1093 if records.len() < 5 {
1094 return false;
1095 }
1096
1097 let mut intervals = Vec::new();
1098 for window in records.windows(2) {
1099 if let Some(interval) = window[0]
1100 .timestamp
1101 .checked_duration_since(window[1].timestamp)
1102 {
1103 intervals.push(interval);
1104 }
1105 }
1106
1107 if intervals.is_empty() {
1108 return false;
1109 }
1110
1111 let avg_interval = intervals.iter().sum::<Duration>() / intervals.len() as u32;
1112 intervals
1113 .iter()
1114 .all(|&interval| interval < avg_interval * 2)
1115 }
1116
1117 fn allocate_pooled(&mut self, size: usize) -> Vec<u8> {
1119 vec![0; size]
1121 }
1122
1123 fn allocate_arena(&mut self, size: usize) -> Vec<u8> {
1125 vec![0; size]
1127 }
1128
1129 fn allocate_aligned(&mut self, size: usize) -> Vec<u8> {
1131 let aligned_size = (size + self.config.alignment - 1) & !(self.config.alignment - 1);
1132 vec![0; aligned_size]
1133 }
1134
1135 fn update_metrics(&mut self, allocation_time: Duration) {
1137 self.performance_metrics.total_allocations += 1;
1138
1139 let total_time = self.performance_metrics.average_allocation_time
1140 * (self.performance_metrics.total_allocations - 1) as u32
1141 + allocation_time;
1142 self.performance_metrics.average_allocation_time =
1143 total_time / self.performance_metrics.total_allocations as u32;
1144 }
1145
1146 pub fn get_metrics(&self) -> &AllocationMetrics {
1148 &self.performance_metrics
1149 }
1150}
1151
1152pub struct BandwidthOptimizer {
1154 access_patterns: HashMap<String, AccessPattern>,
1155 bandwidth_measurements: VecDeque<BandwidthMeasurement>,
1156 optimal_strategies: HashMap<AccessPattern, AllocationStrategy>,
1157}
1158
1159#[derive(Debug, Clone)]
1161struct BandwidthMeasurement {
1162 pattern: AccessPattern,
1163 strategy: AllocationStrategy,
1164 bandwidth_gbps: f64,
1165 timestamp: Instant,
1166}
1167
1168impl BandwidthOptimizer {
1169 pub fn new() -> Self {
1171 let mut optimal_strategies = HashMap::new();
1172 optimal_strategies.insert(AccessPattern::Sequential, AllocationStrategy::CacheAligned);
1173 optimal_strategies.insert(AccessPattern::Random, AllocationStrategy::Pool);
1174 optimal_strategies.insert(AccessPattern::Streaming, AllocationStrategy::HugePage);
1175
1176 Self {
1177 access_patterns: HashMap::new(),
1178 bandwidth_measurements: VecDeque::with_capacity(1000),
1179 optimal_strategies,
1180 }
1181 }
1182
1183 pub fn register_pattern(&mut self, workload: &str, pattern: AccessPattern) {
1185 self.access_patterns.insert(workload.to_string(), pattern);
1186 }
1187
1188 pub fn get_optimal_strategy(&self, workload: &str) -> Option<AllocationStrategy> {
1190 self.access_patterns
1191 .get(workload)
1192 .and_then(|pattern| self.optimal_strategies.get(pattern))
1193 .copied()
1194 }
1195
1196 pub fn record_bandwidth(
1198 &mut self,
1199 pattern: AccessPattern,
1200 strategy: AllocationStrategy,
1201 bandwidth_gbps: f64,
1202 ) {
1203 self.bandwidth_measurements.push_back(BandwidthMeasurement {
1204 pattern,
1205 strategy,
1206 bandwidth_gbps,
1207 timestamp: Instant::now(),
1208 });
1209
1210 if self.bandwidth_measurements.len() > 1000 {
1212 self.bandwidth_measurements.pop_front();
1213 }
1214
1215 self.update_optimal_strategies();
1217 }
1218
1219 fn update_optimal_strategies(&mut self) {
1221 let mut pattern_performance: HashMap<AccessPattern, HashMap<AllocationStrategy, f64>> =
1222 HashMap::new();
1223
1224 for measurement in &self.bandwidth_measurements {
1225 pattern_performance
1226 .entry(measurement.pattern)
1227 .or_insert_with(HashMap::new)
1228 .entry(measurement.strategy)
1229 .and_modify(|avg| *avg = (*avg + measurement.bandwidth_gbps) / 2.0)
1230 .or_insert(measurement.bandwidth_gbps);
1231 }
1232
1233 for (pattern, strategies) in pattern_performance {
1235 if let Some((&best_strategy, _)) = strategies
1236 .iter()
1237 .max_by(|(_, a), (_, b)| a.partial_cmp(b).expect("Operation failed"))
1238 {
1239 self.optimal_strategies.insert(pattern, best_strategy);
1240 }
1241 }
1242 }
1243
1244 pub fn get_bandwidth_stats(&self, pattern: AccessPattern) -> Option<(f64, f64, f64)> {
1246 let measurements: Vec<f64> = self
1247 .bandwidth_measurements
1248 .iter()
1249 .filter(|m| m.pattern == pattern)
1250 .map(|m| m.bandwidth_gbps)
1251 .collect();
1252
1253 if measurements.is_empty() {
1254 return None;
1255 }
1256
1257 let sum: f64 = measurements.iter().sum();
1258 let avg = sum / measurements.len() as f64;
1259 let min = measurements.iter().fold(f64::INFINITY, |a, &b| a.min(b));
1260 let max = measurements
1261 .iter()
1262 .fold(f64::NEG_INFINITY, |a, &b| a.max(b));
1263
1264 Some((avg, min, max))
1265 }
1266}
1267
1268impl Default for BandwidthOptimizer {
1269 fn default() -> Self {
1270 Self::new()
1271 }
1272}
1273
1274pub mod metrics;
1276
1277#[cfg(feature = "gpu")]
1279pub mod cross_device;
1280
1281pub mod out_of_core;
1283
1284#[cfg(feature = "memory_compression")]
1286pub mod compressed_buffers;
1287
1288pub mod safety;
1290
1291#[cfg(feature = "memory_management")]
1293pub mod leak_detection;
1294
1295pub use metrics::{
1297 format_memory_report, generate_memory_report, track_allocation, track_deallocation,
1298 track_resize,
1299};
1300
1301#[cfg(feature = "memory_management")]
1303pub use leak_detection::{
1304 LeakCheckGuard, LeakDetectionConfig, LeakDetector, LeakReport, LeakType, MemoryCheckpoint,
1305 MemoryLeak, ProfilerTool, ValgrindIntegration,
1306};
1307
1308pub fn create_optimized_pool<T: Clone + Default + 'static>() -> AdvancedBufferPool<T> {
1310 let config = MemoryConfig {
1311 strategy: AllocationStrategy::Pool,
1312 access_pattern: AccessPattern::Sequential,
1313 enable_prefetch: true,
1314 alignment: 64,
1315 numa_aware: true,
1316 max_memory: None,
1317 enable_compression: false,
1318 pressure_threshold: 0.8,
1319 };
1320 AdvancedBufferPool::with_config(config)
1321}
1322
1323pub fn create_scientific_pool<T: Clone + Default + 'static>() -> AdvancedBufferPool<T> {
1325 let config = MemoryConfig {
1326 strategy: AllocationStrategy::CacheAligned,
1327 access_pattern: AccessPattern::Sequential,
1328 enable_prefetch: true,
1329 alignment: 64,
1330 numa_aware: true,
1331 max_memory: None,
1332 enable_compression: false,
1333 pressure_threshold: 0.9,
1334 };
1335 AdvancedBufferPool::with_config(config)
1336}
1337
1338pub fn create_large_data_pool<T: Clone + Default + 'static>() -> AdvancedBufferPool<T> {
1340 let config = MemoryConfig {
1341 strategy: AllocationStrategy::HugePage,
1342 access_pattern: AccessPattern::Streaming,
1343 enable_prefetch: true,
1344 alignment: 2 * 1024 * 1024, numa_aware: true,
1346 max_memory: None,
1347 enable_compression: true,
1348 pressure_threshold: 0.7,
1349 };
1350 AdvancedBufferPool::with_config(config)
1351}
1352
1353static GLOBAL_SMART_ALLOCATOR: std::sync::LazyLock<Arc<Mutex<SmartAllocator>>> =
1355 std::sync::LazyLock::new(|| Arc::new(Mutex::new(SmartAllocator::new(MemoryConfig::default()))));
1356
1357pub fn global_smart_allocator() -> Arc<Mutex<SmartAllocator>> {
1359 GLOBAL_SMART_ALLOCATOR.clone()
1360}
1361
1362static GLOBAL_BANDWIDTH_OPTIMIZER: std::sync::LazyLock<Arc<Mutex<BandwidthOptimizer>>> =
1364 std::sync::LazyLock::new(|| Arc::new(Mutex::new(BandwidthOptimizer::new())));
1365
1366pub fn global_bandwidth_optimizer() -> Arc<Mutex<BandwidthOptimizer>> {
1368 GLOBAL_BANDWIDTH_OPTIMIZER.clone()
1369}
1370
1371#[cfg(test)]
1372mod tests {
1373 use super::*;
1374 use std::thread;
1375 use std::time::Duration;
1376
1377 #[test]
1378 fn test_advanced_buffer_pool() {
1379 let mut pool = AdvancedBufferPool::<f64>::new();
1380
1381 let vec1 = pool.acquire_vec_advanced(1000);
1383 assert_eq!(vec1.len(), 1000);
1384
1385 pool.release_vec_advanced(vec1);
1386
1387 let vec2 = pool.acquire_vec_advanced(800);
1389 assert_eq!(vec2.len(), 800);
1390
1391 let stats = pool.get_statistics();
1392 assert_eq!(stats.total_allocations.load(Ordering::Relaxed), 2);
1393 assert_eq!(stats.total_deallocations.load(Ordering::Relaxed), 1);
1394 }
1395
1396 #[test]
1397 fn test_memory_config() {
1398 let config = MemoryConfig {
1399 strategy: AllocationStrategy::CacheAligned,
1400 access_pattern: AccessPattern::Random,
1401 enable_prefetch: true,
1402 alignment: 128,
1403 numa_aware: true,
1404 max_memory: Some(1024 * 1024 * 1024), enable_compression: false,
1406 pressure_threshold: 0.7,
1407 };
1408
1409 let mut pool = AdvancedBufferPool::<i32>::with_config(config.clone());
1410 assert_eq!(pool.config.alignment, 128);
1411 assert_eq!(pool.config.strategy, AllocationStrategy::CacheAligned);
1412
1413 let vec = pool.acquire_vec_advanced(256);
1414 assert_eq!(vec.len(), 256);
1415 }
1416
1417 #[test]
1418 fn test_memory_pressure() {
1419 let config = MemoryConfig {
1420 max_memory: Some(1024),
1421 ..Default::default()
1422 };
1423 let pool = AdvancedBufferPool::<u8>::with_config(config);
1424
1425 let pressure = pool.memory_pressure();
1427 assert_eq!(pressure, MemoryPressure::Low);
1428 }
1429
1430 #[test]
1431 fn test_pool_statistics() {
1432 let mut pool = AdvancedBufferPool::<f32>::new();
1433
1434 let _vec1 = pool.acquire_vec_advanced(100);
1436 let _vec2 = pool.acquire_vec_advanced(200);
1437 let _vec3 = pool.acquire_vec_advanced(50);
1438
1439 let stats = pool.get_statistics();
1440 assert_eq!(stats.total_allocations.load(Ordering::Relaxed), 3);
1441 assert_eq!(stats.total_deallocations.load(Ordering::Relaxed), 0);
1442 }
1443
1444 #[test]
1445 fn test_memory_report() {
1446 let mut pool = AdvancedBufferPool::<u64>::new();
1447 let _vec = pool.acquire_vec_advanced(500);
1448
1449 let report = pool.memory_report();
1450 assert!(report.pool_efficiency >= 0.0 && report.pool_efficiency <= 1.0);
1452 assert!(report.fragmentation_ratio >= 0.0 && report.fragmentation_ratio <= 1.0);
1453 }
1454
1455 #[test]
1456 fn test_pool_compaction() {
1457 let mut pool = AdvancedBufferPool::<i64>::new();
1458
1459 for _ in 0..20 {
1461 let vec = pool.acquire_vec_advanced(128);
1462 pool.release_vec_advanced(vec);
1463 }
1464
1465 pool.compact();
1467
1468 let vec = pool.acquire_vec_advanced(128);
1470 assert_eq!(vec.len(), 128);
1471 }
1472
1473 #[test]
1474 fn test_smart_allocator() {
1475 let mut allocator = SmartAllocator::new(MemoryConfig::default());
1476
1477 let buffer1 = allocator.allocate(1024);
1479 assert_eq!(buffer1.len(), 1024);
1480
1481 let buffer2 = allocator.allocate(2048);
1482 assert_eq!(buffer2.len(), 2048);
1483
1484 let metrics = allocator.get_metrics();
1485 assert_eq!(metrics.total_allocations, 2);
1486 }
1487
1488 #[test]
1489 fn test_smart_allocator_adaptation() {
1490 let mut allocator = SmartAllocator::new(MemoryConfig::default());
1491 let initial_strategy = allocator.current_strategy;
1492
1493 for _ in 0..15 {
1495 let _buffer = allocator.allocate(512);
1496 }
1497
1498 let _final_strategy = allocator.current_strategy;
1500 }
1501
1502 #[test]
1503 fn test_bandwidth_optimizer() {
1504 let mut optimizer = BandwidthOptimizer::new();
1505
1506 optimizer.register_pattern("matrix_multiply", AccessPattern::Spatial);
1508 optimizer.register_pattern("vector_sum", AccessPattern::Sequential);
1509
1510 let strategy = optimizer.get_optimal_strategy("vector_sum");
1512 assert!(strategy.is_some());
1513
1514 optimizer.record_bandwidth(
1516 AccessPattern::Sequential,
1517 AllocationStrategy::CacheAligned,
1518 25.0,
1519 );
1520 optimizer.record_bandwidth(AccessPattern::Random, AllocationStrategy::Pool, 12.0);
1521
1522 let stats = optimizer.get_bandwidth_stats(AccessPattern::Sequential);
1524 assert!(stats.is_some());
1525 let (avg, min, max) = stats.expect("Operation failed");
1526 assert_eq!(avg, 25.0);
1527 assert_eq!(min, 25.0);
1528 assert_eq!(max, 25.0);
1529 }
1530
1531 #[test]
1532 fn test_convenience_functions() {
1533 let _optimized_pool = create_optimized_pool::<f64>();
1534 let _scientific_pool = create_scientific_pool::<f32>();
1535 let _large_data_pool = create_large_data_pool::<u8>();
1536
1537 let _smart_allocator = global_smart_allocator();
1539 let _bandwidth_optimizer = global_bandwidth_optimizer();
1540 }
1541
1542 #[test]
1543 fn test_legacy_buffer_pool_compatibility() {
1544 let mut pool = BufferPool::<i32>::new();
1545
1546 let vec = pool.acquire_vec(100);
1548 assert_eq!(vec.len(), 100);
1549
1550 pool.release_vec(vec);
1551
1552 let stats = pool.get_statistics();
1554 assert!(stats.total_allocations.load(Ordering::Relaxed) > 0);
1555
1556 let report = pool.memory_report();
1557 assert!(report.fragmentation_ratio >= 0.0);
1558
1559 pool.compact();
1560 }
1561
1562 #[test]
1563 fn test_allocation_strategies() {
1564 let strategies = [
1565 AllocationStrategy::System,
1566 AllocationStrategy::Pool,
1567 AllocationStrategy::Arena,
1568 AllocationStrategy::CacheAligned,
1569 AllocationStrategy::NumaAware,
1570 ];
1571
1572 for strategy in &strategies {
1573 let config = MemoryConfig {
1574 strategy: *strategy,
1575 ..Default::default()
1576 };
1577 let mut pool = AdvancedBufferPool::<u32>::with_config(config);
1578 let vec = pool.acquire_vec_advanced(256);
1579 assert_eq!(vec.len(), 256);
1580 }
1581 }
1582
1583 #[test]
1584 fn test_access_patterns() {
1585 let patterns = [
1586 AccessPattern::Sequential,
1587 AccessPattern::Random,
1588 AccessPattern::Temporal,
1589 AccessPattern::Spatial,
1590 AccessPattern::WriteOnce,
1591 AccessPattern::ReadOnly,
1592 AccessPattern::Streaming,
1593 ];
1594
1595 for pattern in &patterns {
1596 let config = MemoryConfig {
1597 access_pattern: *pattern,
1598 ..Default::default()
1599 };
1600 let pool = AdvancedBufferPool::<f64>::with_config(config);
1601 assert_eq!(pool.config.access_pattern, *pattern);
1602 }
1603 }
1604
1605 #[test]
1606 fn test_memory_pressure_levels() {
1607 assert!(MemoryPressure::Low < MemoryPressure::Medium);
1608 assert!(MemoryPressure::Medium < MemoryPressure::High);
1609 assert!(MemoryPressure::High < MemoryPressure::Critical);
1610 }
1611
1612 #[test]
1613 fn test_concurrent_pool_access() {
1614 let pool = Arc::new(Mutex::new(AdvancedBufferPool::<i32>::new()));
1615 let mut handles = vec![];
1616
1617 for i in 0..4 {
1618 let pool_clone = Arc::clone(&pool);
1619 let handle = thread::spawn(move || {
1620 let mut pool = pool_clone.lock().expect("Operation failed");
1621 let vec = pool.acquire_vec_advanced(100 + i * 50);
1622 thread::sleep(Duration::from_millis(10));
1623 pool.release_vec_advanced(vec);
1624 });
1625 handles.push(handle);
1626 }
1627
1628 for handle in handles {
1629 handle.join().expect("Operation failed");
1630 }
1631
1632 let pool = pool.lock().expect("Operation failed");
1633 let stats = pool.get_statistics();
1634 assert_eq!(stats.total_allocations.load(Ordering::Relaxed), 4);
1635 assert_eq!(stats.total_deallocations.load(Ordering::Relaxed), 4);
1636 }
1637
1638 #[test]
1639 fn test_arena_allocator_safety() {
1640 let mut arena = ArenaAllocator::new(4096);
1641
1642 let ptr1 = arena.allocate(64, 8);
1644 assert!(ptr1.is_some());
1645
1646 let ptr2 = arena.allocate(128, 16);
1647 assert!(ptr2.is_some());
1648
1649 arena.reset();
1651
1652 let ptr3 = arena.allocate(32, 4);
1653 assert!(ptr3.is_some());
1654 }
1655
1656 #[test]
1657 fn test_bandwidth_optimizer_pattern_learning() {
1658 let mut optimizer = BandwidthOptimizer::new();
1659
1660 optimizer.record_bandwidth(
1662 AccessPattern::Sequential,
1663 AllocationStrategy::CacheAligned,
1664 30.0,
1665 );
1666 optimizer.record_bandwidth(AccessPattern::Sequential, AllocationStrategy::Pool, 20.0);
1667 optimizer.record_bandwidth(AccessPattern::Sequential, AllocationStrategy::Arena, 25.0);
1668
1669 let stats = optimizer.get_bandwidth_stats(AccessPattern::Sequential);
1671 assert!(stats.is_some());
1672 let (avg, min, max) = stats.expect("Operation failed");
1673 assert!(avg > 20.0);
1674 assert_eq!(min, 20.0);
1675 assert_eq!(max, 30.0);
1676 }
1677}