use crate::{Result, TensorError};
use scirs2_core::memory::{BufferPool, GlobalBufferPool, ChunkProcessor};
use scirs2_core::memory_efficient::{MemoryMappedArray, LazyArray, ChunkedArray};
use scirs2_core::memory_efficient::{ZeroCopyOps, AdaptiveChunking, DiskBackedArray};
use std::collections::HashMap;
use std::sync::{Arc, Mutex, RwLock};
use std::time::{Duration, Instant};
pub struct UltraEfficientMemoryPool {
pools: RwLock<HashMap<PoolKey, Arc<BufferPool>>>,
global_pool: Arc<GlobalBufferPool>,
chunk_processor: Arc<ChunkProcessor>,
stats: Arc<Mutex<MemoryStats>>,
config: PoolConfig,
}
#[derive(Debug, Clone)]
pub struct PoolConfig {
pub max_pool_size: usize,
pub min_allocation_size: usize,
pub max_memory_allocation: usize,
pub enable_zero_copy: bool,
pub enable_adaptive_chunking: bool,
pub memory_pressure_threshold: f64,
pub aggressive_optimization: bool,
pub cleanup_interval: Duration,
}
impl Default for PoolConfig {
fn default() -> Self {
Self {
max_pool_size: 1_073_741_824, min_allocation_size: 4096, max_memory_allocation: 536_870_912, enable_zero_copy: true,
enable_adaptive_chunking: true,
memory_pressure_threshold: 0.85, aggressive_optimization: true,
cleanup_interval: Duration::from_secs(30),
}
}
}
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
struct PoolKey {
element_size: usize,
size_class: SizeClass,
alignment: usize,
}
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
enum SizeClass {
Small,
Medium,
Large,
Huge,
}
#[derive(Debug, Clone, Default)]
pub struct MemoryStats {
pub total_allocated: usize,
pub total_freed: usize,
pub current_usage: usize,
pub peak_usage: usize,
pub allocation_count: u64,
pub deallocation_count: u64,
pub cache_hits: u64,
pub cache_misses: u64,
pub fragmentation_ratio: f64,
pub zero_copy_operations: u64,
pub adaptive_chunking_ops: u64,
pub disk_backed_ops: u64,
pub last_cleanup: Option<Instant>,
}
impl UltraEfficientMemoryPool {
pub fn new(config: PoolConfig) -> Result<Self> {
let global_pool = Arc::new(GlobalBufferPool::new(config.max_pool_size * 4)?);
let chunk_processor = Arc::new(ChunkProcessor::new(
config.max_memory_allocation,
config.enable_adaptive_chunking,
)?);
Ok(Self {
pools: RwLock::new(HashMap::new()),
global_pool,
chunk_processor,
stats: Arc::new(Mutex::new(MemoryStats::default())),
config,
})
}
pub fn allocate<T>(&self, size: usize) -> Result<UltraEfficientBuffer<T>>
where
T: Clone + Send + Sync + 'static,
{
let element_size = std::mem::size_of::<T>();
let total_bytes = size * element_size;
let size_class = Self::classify_size(total_bytes);
let alignment = std::mem::align_of::<T>();
{
let mut stats = self.stats.lock().expect("lock should not be poisoned");
stats.allocation_count += 1;
stats.total_allocated += total_bytes;
stats.current_usage += total_bytes;
stats.peak_usage = stats.peak_usage.max(stats.current_usage);
}
if self.is_memory_pressure() && self.config.aggressive_optimization {
self.apply_memory_pressure_optimizations()?;
}
match size_class {
SizeClass::Huge => self.allocate_huge(size, total_bytes),
SizeClass::Large => self.allocate_large(size, total_bytes),
SizeClass::Medium | SizeClass::Small => {
self.allocate_from_pool(size, element_size, size_class, alignment)
}
}
}
pub fn deallocate<T>(&self, buffer: UltraEfficientBuffer<T>) -> Result<()>
where
T: Clone + Send + Sync + 'static,
{
let total_bytes = buffer.capacity() * std::mem::size_of::<T>();
{
let mut stats = self.stats.lock().expect("lock should not be poisoned");
stats.deallocation_count += 1;
stats.total_freed += total_bytes;
stats.current_usage = stats.current_usage.saturating_sub(total_bytes);
}
match &buffer.storage {
BufferStorage::Pool { pool, .. } => {
pool.return_buffer(buffer.into_raw_parts())?;
let mut stats = self.stats.lock().expect("lock should not be poisoned");
stats.cache_hits += 1;
}
BufferStorage::DiskBacked { .. } => {
let mut stats = self.stats.lock().expect("lock should not be poisoned");
stats.disk_backed_ops += 1;
}
BufferStorage::ZeroCopy { .. } => {
let mut stats = self.stats.lock().expect("lock should not be poisoned");
stats.zero_copy_operations += 1;
}
BufferStorage::AdaptiveChunked { .. } => {
self.chunk_processor.process_deallocation(buffer.into_raw_parts())?;
let mut stats = self.stats.lock().expect("lock should not be poisoned");
stats.adaptive_chunking_ops += 1;
}
}
Ok(())
}
pub fn get_stats(&self) -> MemoryStats {
self.stats.lock().expect("lock should not be poisoned").clone()
}
pub fn cleanup(&self) -> Result<()> {
let start_time = Instant::now();
{
let mut pools = self.pools.write().expect("write lock should not be poisoned");
pools.retain(|_, pool| !pool.is_empty());
}
self.global_pool.cleanup()?;
self.chunk_processor.cleanup()?;
{
let mut stats = self.stats.lock().expect("lock should not be poisoned");
stats.last_cleanup = Some(start_time);
if stats.total_allocated > 0 {
stats.fragmentation_ratio =
(stats.total_allocated - stats.total_freed) as f64 / stats.total_allocated as f64;
}
}
Ok(())
}
fn is_memory_pressure(&self) -> bool {
let stats = self.stats.lock().expect("lock should not be poisoned");
let usage_ratio = stats.current_usage as f64 / self.config.max_pool_size as f64;
usage_ratio > self.config.memory_pressure_threshold
}
fn apply_memory_pressure_optimizations(&self) -> Result<()> {
self.cleanup()?;
{
let pools = self.pools.read().expect("read lock should not be poisoned");
for pool in pools.values() {
pool.force_cleanup()?;
}
}
self.chunk_processor.enable_aggressive_mode()?;
Ok(())
}
fn classify_size(bytes: usize) -> SizeClass {
match bytes {
0..=65536 => SizeClass::Small, 65537..=16777216 => SizeClass::Medium, 16777217..=536870912 => SizeClass::Large, _ => SizeClass::Huge, }
}
fn allocate_from_pool<T>(
&self,
size: usize,
element_size: usize,
size_class: SizeClass,
alignment: usize,
) -> Result<UltraEfficientBuffer<T>>
where
T: Clone + Send + Sync + 'static,
{
let pool_key = PoolKey {
element_size,
size_class: size_class.clone(),
alignment,
};
let pool = {
let pools = self.pools.read().expect("read lock should not be poisoned");
if let Some(pool) = pools.get(&pool_key) {
pool.clone()
} else {
drop(pools);
let mut pools = self.pools.write().expect("write lock should not be poisoned");
let pool = Arc::new(BufferPool::new(
self.config.max_pool_size / 16, element_size,
)?);
pools.insert(pool_key.clone(), pool.clone());
pool
}
};
let raw_buffer = pool.allocate(size * element_size)?;
{
let mut stats = self.stats.lock().expect("lock should not be poisoned");
stats.cache_hits += 1;
}
Ok(UltraEfficientBuffer {
storage: BufferStorage::Pool {
pool: pool.clone(),
raw_buffer,
},
size,
capacity: size,
})
}
fn allocate_large<T>(&self, size: usize, total_bytes: usize) -> Result<UltraEfficientBuffer<T>>
where
T: Clone + Send + Sync + 'static,
{
if self.config.enable_adaptive_chunking {
let chunked_array = ChunkedArray::new(size, self.chunk_processor.optimal_chunk_size())?;
Ok(UltraEfficientBuffer {
storage: BufferStorage::AdaptiveChunked {
array: chunked_array,
},
size,
capacity: size,
})
} else {
let raw_buffer = self.global_pool.allocate(total_bytes)?;
Ok(UltraEfficientBuffer {
storage: BufferStorage::Pool {
pool: Arc::new(BufferPool::from_global(&self.global_pool)?),
raw_buffer,
},
size,
capacity: size,
})
}
}
fn allocate_huge<T>(&self, size: usize, total_bytes: usize) -> Result<UltraEfficientBuffer<T>>
where
T: Clone + Send + Sync + 'static,
{
let disk_backed = DiskBackedArray::new(size)?;
{
let mut stats = self.stats.lock().expect("lock should not be poisoned");
stats.disk_backed_ops += 1;
}
Ok(UltraEfficientBuffer {
storage: BufferStorage::DiskBacked {
array: disk_backed,
},
size,
capacity: size,
})
}
pub fn create_zero_copy<T>(&self, data: &[T]) -> Result<UltraEfficientBuffer<T>>
where
T: Clone + Send + Sync + 'static,
{
if !self.config.enable_zero_copy {
return Err(TensorError::unsupported_operation_simple(
"Zero-copy operations disabled in pool configuration".to_string()
));
}
let zero_copy_ops = ZeroCopyOps::from_slice(data)?;
{
let mut stats = self.stats.lock().expect("lock should not be poisoned");
stats.zero_copy_operations += 1;
}
Ok(UltraEfficientBuffer {
storage: BufferStorage::ZeroCopy {
ops: zero_copy_ops,
},
size: data.len(),
capacity: data.len(),
})
}
pub fn optimal_size_for<T>(&self, requested_size: usize) -> usize
where
T: 'static,
{
let element_size = std::mem::size_of::<T>();
let total_bytes = requested_size * element_size;
let size_class = Self::classify_size(total_bytes);
match size_class {
SizeClass::Small => {
(requested_size.next_power_of_two()).max(64)
}
SizeClass::Medium => {
((requested_size + 1023) / 1024) * 1024
}
SizeClass::Large => {
((requested_size + 262143) / 262144) * 262144
}
SizeClass::Huge => {
self.chunk_processor.optimal_chunk_size()
}
}
}
pub fn optimize(&self) -> Result<()> {
self.apply_memory_pressure_optimizations()?;
{
let pools = self.pools.read().expect("read lock should not be poisoned");
for pool in pools.values() {
pool.defragment()?;
}
}
self.global_pool.optimize()?;
Ok(())
}
}
pub struct UltraEfficientBuffer<T> {
storage: BufferStorage<T>,
size: usize,
capacity: usize,
}
enum BufferStorage<T> {
Pool {
pool: Arc<BufferPool>,
raw_buffer: *mut T,
},
DiskBacked {
array: DiskBackedArray<T>,
},
ZeroCopy {
ops: ZeroCopyOps<T>,
},
AdaptiveChunked {
array: ChunkedArray<T>,
},
}
impl<T> UltraEfficientBuffer<T> {
pub fn size(&self) -> usize {
self.size
}
pub fn capacity(&self) -> usize {
self.capacity
}
pub unsafe fn as_ptr(&self) -> *const T {
match &self.storage {
BufferStorage::Pool { raw_buffer, .. } => *raw_buffer,
BufferStorage::DiskBacked { array } => array.as_ptr(),
BufferStorage::ZeroCopy { ops } => ops.as_ptr(),
BufferStorage::AdaptiveChunked { array } => array.as_ptr(),
}
}
pub unsafe fn as_mut_ptr(&mut self) -> *mut T {
match &mut self.storage {
BufferStorage::Pool { raw_buffer, .. } => *raw_buffer,
BufferStorage::DiskBacked { array } => array.as_mut_ptr(),
BufferStorage::ZeroCopy { ops } => ops.as_mut_ptr(),
BufferStorage::AdaptiveChunked { array } => array.as_mut_ptr(),
}
}
pub fn as_slice(&self) -> &[T] {
unsafe {
std::slice::from_raw_parts(self.as_ptr(), self.size)
}
}
pub fn as_mut_slice(&mut self) -> &mut [T] {
unsafe {
std::slice::from_raw_parts_mut(self.as_mut_ptr(), self.size)
}
}
fn into_raw_parts(self) -> (*mut T, usize, usize) {
let ptr = unsafe { self.as_ptr() as *mut T };
let size = self.size;
let capacity = self.capacity;
std::mem::forget(self); (ptr, size, capacity)
}
pub fn is_zero_copy(&self) -> bool {
matches!(self.storage, BufferStorage::ZeroCopy { .. })
}
pub fn is_disk_backed(&self) -> bool {
matches!(self.storage, BufferStorage::DiskBacked { .. })
}
pub fn is_adaptive_chunked(&self) -> bool {
matches!(self.storage, BufferStorage::AdaptiveChunked { .. })
}
}
static GLOBAL_MEMORY_POOL: std::sync::OnceLock<UltraEfficientMemoryPool> = std::sync::OnceLock::new();
pub fn global_memory_pool() -> &'static UltraEfficientMemoryPool {
GLOBAL_MEMORY_POOL.get_or_init(|| {
UltraEfficientMemoryPool::new(PoolConfig::default())
.expect("Failed to initialize global memory pool")
})
}
pub mod profiling {
use super::*;
use std::time::Instant;
pub struct MemoryProfiler {
start_time: Instant,
initial_stats: MemoryStats,
}
impl MemoryProfiler {
pub fn start() -> Self {
let pool = global_memory_pool();
Self {
start_time: Instant::now(),
initial_stats: pool.get_stats(),
}
}
pub fn finish(self) -> MemoryProfileReport {
let pool = global_memory_pool();
let final_stats = pool.get_stats();
let duration = self.start_time.elapsed();
MemoryProfileReport {
duration,
initial_stats: self.initial_stats,
final_stats,
}
}
}
pub struct MemoryProfileReport {
pub duration: Duration,
pub initial_stats: MemoryStats,
pub final_stats: MemoryStats,
}
impl MemoryProfileReport {
pub fn print_report(&self) {
println!("🚀 === ULTRA-EFFICIENT MEMORY POOL REPORT ===");
println!("Duration: {:.3}ms", self.duration.as_secs_f64() * 1000.0);
let alloc_delta = self.final_stats.allocation_count - self.initial_stats.allocation_count;
let dealloc_delta = self.final_stats.deallocation_count - self.initial_stats.deallocation_count;
let memory_delta = self.final_stats.current_usage as i64 - self.initial_stats.current_usage as i64;
println!("Allocations: {} (+{})", self.final_stats.allocation_count, alloc_delta);
println!("Deallocations: {} (+{})", self.final_stats.deallocation_count, dealloc_delta);
println!("Memory Delta: {:+} bytes", memory_delta);
println!("Peak Usage: {} bytes", self.final_stats.peak_usage);
println!("Cache Hit Rate: {:.2}%",
100.0 * self.final_stats.cache_hits as f64 /
(self.final_stats.cache_hits + self.final_stats.cache_misses) as f64);
println!("Fragmentation: {:.2}%", self.final_stats.fragmentation_ratio * 100.0);
println!("Zero-Copy Operations: {}", self.final_stats.zero_copy_operations);
println!("Adaptive Chunking Operations: {}", self.final_stats.adaptive_chunking_ops);
println!("Disk-Backed Operations: {}", self.final_stats.disk_backed_ops);
}
pub fn performance_metrics(&self) -> MemoryPerformanceMetrics {
let total_operations = self.final_stats.allocation_count + self.final_stats.deallocation_count;
let ops_per_second = if self.duration.as_secs_f64() > 0.0 {
total_operations as f64 / self.duration.as_secs_f64()
} else {
0.0
};
let cache_hit_rate = if self.final_stats.cache_hits + self.final_stats.cache_misses > 0 {
self.final_stats.cache_hits as f64 /
(self.final_stats.cache_hits + self.final_stats.cache_misses) as f64
} else {
0.0
};
MemoryPerformanceMetrics {
operations_per_second: ops_per_second,
cache_hit_rate,
fragmentation_ratio: self.final_stats.fragmentation_ratio,
zero_copy_ratio: self.final_stats.zero_copy_operations as f64 / total_operations.max(1) as f64,
memory_efficiency: 1.0 - self.final_stats.fragmentation_ratio,
}
}
}
#[derive(Debug, Clone)]
pub struct MemoryPerformanceMetrics {
pub operations_per_second: f64,
pub cache_hit_rate: f64,
pub fragmentation_ratio: f64,
pub zero_copy_ratio: f64,
pub memory_efficiency: f64,
}
}
unsafe impl<T: Send> Send for UltraEfficientBuffer<T> {}
unsafe impl<T: Sync> Sync for UltraEfficientBuffer<T> {}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_memory_pool_basic_allocation() {
let pool = UltraEfficientMemoryPool::new(PoolConfig::default()).expect("test: operation should succeed");
let buffer: UltraEfficientBuffer<f32> = pool.allocate(1000).expect("test: allocate should succeed");
assert_eq!(buffer.size(), 1000);
assert!(buffer.capacity() >= 1000);
pool.deallocate(buffer).expect("test: deallocate should succeed");
}
#[test]
fn test_size_classification() {
assert_eq!(UltraEfficientMemoryPool::classify_size(1000), SizeClass::Small);
assert_eq!(UltraEfficientMemoryPool::classify_size(100_000), SizeClass::Medium);
assert_eq!(UltraEfficientMemoryPool::classify_size(20_000_000), SizeClass::Large);
assert_eq!(UltraEfficientMemoryPool::classify_size(1_000_000_000), SizeClass::Huge);
}
#[test]
fn test_memory_stats() {
let pool = UltraEfficientMemoryPool::new(PoolConfig::default()).expect("test: operation should succeed");
let initial_stats = pool.get_stats();
let buffer: UltraEfficientBuffer<i32> = pool.allocate(500).expect("test: allocate should succeed");
let stats_after_alloc = pool.get_stats();
assert!(stats_after_alloc.allocation_count > initial_stats.allocation_count);
assert!(stats_after_alloc.current_usage > initial_stats.current_usage);
pool.deallocate(buffer).expect("test: deallocate should succeed");
let final_stats = pool.get_stats();
assert!(final_stats.deallocation_count > initial_stats.deallocation_count);
}
#[test]
fn test_optimal_size_calculation() {
let pool = UltraEfficientMemoryPool::new(PoolConfig::default()).expect("test: operation should succeed");
let optimal_small = pool.optimal_size_for::<f32>(100);
let optimal_medium = pool.optimal_size_for::<f32>(50_000);
assert!(optimal_small >= 100);
assert!(optimal_medium >= 50_000);
assert!(optimal_small.is_power_of_two() || optimal_small >= 64);
}
#[test]
fn test_memory_pressure_handling() {
let mut config = PoolConfig::default();
config.memory_pressure_threshold = 0.1; config.max_pool_size = 1024;
let pool = UltraEfficientMemoryPool::new(config).expect("test: new should succeed");
let _buffers: Vec<UltraEfficientBuffer<u8>> = (0..100)
.map(|_| pool.allocate(100).expect("test: map should succeed"))
.collect();
let cleanup_result = pool.cleanup();
assert!(cleanup_result.is_ok());
}
#[test]
fn test_profiling() {
let profiler = profiling::MemoryProfiler::start();
let pool = global_memory_pool();
let buffer: UltraEfficientBuffer<f64> = pool.allocate(1000).expect("test: allocate should succeed");
pool.deallocate(buffer).expect("test: deallocate should succeed");
let report = profiler.finish();
let metrics = report.performance_metrics();
assert!(metrics.operations_per_second >= 0.0);
assert!(metrics.cache_hit_rate >= 0.0 && metrics.cache_hit_rate <= 1.0);
assert!(metrics.memory_efficiency >= 0.0 && metrics.memory_efficiency <= 1.0);
}
}