#![allow(unused_variables)]
use super::allocation::{
size_class, AllocationMetadata, AllocationRequest, AllocationStats, AllocationType,
CudaAllocation,
};
use crate::cuda::cuda_sys_compat as cuda_sys;
use crate::cuda::error::{CudaError, CudaResult, CustResultExt};
use cust::device::Device as CustDevice;
use std::collections::HashMap;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Mutex;
use std::time::{Duration, Instant};
#[cfg(debug_assertions)]
use std::collections::HashSet;
#[cfg(debug_assertions)]
static ALLOCATION_TRACKER: once_cell::sync::Lazy<Mutex<HashSet<usize>>> =
once_cell::sync::Lazy::new(|| Mutex::new(HashSet::new()));
#[derive(Debug)]
pub struct CudaMemoryManager {
device_id: usize,
pools: Mutex<HashMap<usize, DeviceMemoryPool>>,
total_allocated: AtomicUsize,
peak_allocated: AtomicUsize,
memory_limit: AtomicUsize,
pressure_threshold: AtomicUsize,
stats: Mutex<AllocationStats>,
config: DeviceMemoryConfig,
last_cleanup: Mutex<Instant>,
device_properties: DeviceProperties,
}
#[derive(Debug)]
pub struct DeviceMemoryPool {
size_class: usize,
free_blocks: Vec<CudaAllocation>,
allocated_blocks: Vec<CudaAllocation>,
pool_stats: PoolStatistics,
last_access: Instant,
config: PoolConfig,
}
#[derive(Debug, Clone)]
pub struct DeviceMemoryConfig {
pub max_memory_fraction: f32,
pub pressure_threshold_fraction: f32,
pub enable_pooling: bool,
pub max_pool_size: usize,
pub enable_compaction: bool,
pub cleanup_interval: Duration,
pub debug_tracking: bool,
pub allocation_alignment: usize,
pub enable_async_alloc: bool,
}
#[derive(Debug, Clone)]
pub struct PoolConfig {
pub max_free_blocks: usize,
pub min_age_for_cleanup: Duration,
pub growth_strategy: GrowthStrategy,
pub track_statistics: bool,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum GrowthStrategy {
Fixed,
Linear,
Exponential { max_size: usize },
Adaptive,
}
#[derive(Debug, Clone)]
pub struct PoolStatistics {
pub total_allocations: u64,
pub cache_hits: u64,
pub cache_misses: u64,
pub utilization: f32,
pub peak_utilization: f32,
pub average_lifetime: Duration,
pub memory_efficiency: f32,
}
#[derive(Debug, Clone)]
pub struct DeviceProperties {
pub total_memory: usize,
pub compute_capability: (i32, i32),
pub memory_bus_width: i32,
pub memory_clock_rate: i32,
pub l2_cache_size: i32,
pub max_threads_per_block: i32,
pub unified_addressing: bool,
pub concurrent_kernels: bool,
}
#[derive(Debug, Clone)]
pub struct AllocationContext {
pub request: AllocationRequest,
pub metadata: AllocationMetadata,
pub performance: AllocationPerformance,
pub device_context: DeviceContext,
}
#[derive(Debug, Clone)]
pub struct AllocationPerformance {
pub allocation_time: Duration,
pub cache_hit: bool,
pub memory_pressure: f32,
pub fragmentation_level: f32,
pub retry_count: u32,
}
#[derive(Debug, Clone)]
pub struct DeviceContext {
pub cuda_context: Option<String>,
pub available_memory: usize,
pub device_utilization: f32,
pub active_streams: usize,
}
impl CudaMemoryManager {
pub fn new(device_id: usize) -> CudaResult<Self> {
Self::new_with_config(device_id, DeviceMemoryConfig::default())
}
pub fn new_with_config(device_id: usize, config: DeviceMemoryConfig) -> CudaResult<Self> {
let device_properties = Self::query_device_properties(device_id)?;
let (memory_limit, pressure_threshold) = Self::calculate_memory_limits(
&device_properties,
config.max_memory_fraction,
config.pressure_threshold_fraction,
);
Ok(Self {
device_id,
pools: Mutex::new(HashMap::new()),
total_allocated: AtomicUsize::new(0),
peak_allocated: AtomicUsize::new(0),
memory_limit: AtomicUsize::new(memory_limit),
pressure_threshold: AtomicUsize::new(pressure_threshold),
stats: Mutex::new(AllocationStats::default()),
config,
last_cleanup: Mutex::new(Instant::now()),
device_properties,
})
}
pub fn allocate(&self, size: usize) -> CudaResult<CudaAllocation> {
let request = AllocationRequest {
size,
allocation_type: AllocationType::Device,
device_id: Some(self.device_id),
..Default::default()
};
self.allocate_with_request(request)
}
pub fn allocate_with_request(&self, request: AllocationRequest) -> CudaResult<CudaAllocation> {
let allocation_start = Instant::now();
self.validate_allocation_request(&request)?;
if self.is_under_memory_pressure() {
self.handle_memory_pressure()?;
}
self.check_memory_limits(request.size)?;
let size_cls = size_class(request.size);
if self.config.enable_pooling {
if let Some(allocation) = self.try_pool_allocation(size_cls, &request)? {
self.record_allocation_success(&request, allocation_start, true);
return Ok(allocation);
}
}
let allocation = self.allocate_new_block(size_cls, &request)?;
self.record_allocation_success(&request, allocation_start, false);
Ok(allocation)
}
pub fn deallocate(&self, mut allocation: CudaAllocation) -> CudaResult<()> {
allocation.mark_free();
self.update_deallocation_stats(&allocation);
if self.config.enable_pooling {
self.return_to_pool(allocation)?;
} else {
self.free_allocation(allocation)?;
}
Ok(())
}
pub fn memory_info(&self) -> DeviceMemoryInfo {
let current_allocated = self.total_allocated.load(Ordering::Relaxed);
let peak_allocated = self.peak_allocated.load(Ordering::Relaxed);
let memory_limit = self.memory_limit.load(Ordering::Relaxed);
DeviceMemoryInfo {
device_id: self.device_id,
total_memory: self.device_properties.total_memory,
current_allocated,
peak_allocated,
memory_limit,
available_memory: memory_limit.saturating_sub(current_allocated),
utilization_percent: if memory_limit > 0 {
(current_allocated * 100) / memory_limit
} else {
0
},
fragmentation_level: self.calculate_fragmentation_level(),
pool_count: self.get_pool_count(),
}
}
pub fn get_statistics(&self) -> CudaResult<AllocationStats> {
let stats = self.stats.lock().map_err(|_| CudaError::Context {
message: "Failed to acquire statistics lock".to_string(),
})?;
Ok(stats.clone())
}
pub fn cleanup_and_compact(&self) -> CudaResult<CleanupResult> {
let cleanup_start = Instant::now();
let mut total_freed = 0;
let mut pools_cleaned = 0;
let mut pools = self.pools.lock().map_err(|_| CudaError::Context {
message: "Failed to acquire pools lock for cleanup".to_string(),
})?;
for (size_class, pool) in pools.iter_mut() {
let freed = pool.cleanup_old_allocations(self.config.cleanup_interval)?;
if freed > 0 {
total_freed += freed;
pools_cleaned += 1;
}
}
pools.retain(|_, pool| !pool.is_empty());
if let Ok(mut last_cleanup) = self.last_cleanup.lock() {
*last_cleanup = Instant::now();
}
Ok(CleanupResult {
duration: cleanup_start.elapsed(),
bytes_freed: total_freed,
pools_cleaned,
empty_pools_removed: 0, })
}
pub fn is_under_memory_pressure(&self) -> bool {
let current = self.total_allocated.load(Ordering::Relaxed);
let threshold = self.pressure_threshold.load(Ordering::Relaxed);
current > threshold
}
pub fn prefetch_to_device(&self, ptr: *mut u8, size: usize) -> CudaResult<()> {
unsafe {
let result = cuda_sys::cudaMemPrefetchAsync(
ptr as *const std::ffi::c_void,
size,
self.device_id as i32,
0 as crate::cuda::cudaStream_t,
);
if result != crate::cuda::cudaSuccess {
return Err(CudaError::Context {
message: format!(
"Failed to prefetch memory to device {}: {:?}",
self.device_id, result
),
});
}
}
Ok(())
}
pub fn device_id(&self) -> usize {
self.device_id
}
pub fn device_properties(&self) -> &DeviceProperties {
&self.device_properties
}
fn query_device_properties(device_id: usize) -> CudaResult<DeviceProperties> {
if let Ok(device) = CustDevice::get_device(device_id as u32) {
let total_memory = device.total_memory().unwrap_or(8 * 1024 * 1024 * 1024);
Ok(DeviceProperties {
total_memory: total_memory as usize,
compute_capability: (7, 5), memory_bus_width: 384,
memory_clock_rate: 1000000, l2_cache_size: 6 * 1024 * 1024, max_threads_per_block: 1024,
unified_addressing: true,
concurrent_kernels: true,
})
} else {
Ok(DeviceProperties {
total_memory: 8 * 1024 * 1024 * 1024, compute_capability: (7, 5),
memory_bus_width: 384,
memory_clock_rate: 1000000,
l2_cache_size: 6 * 1024 * 1024,
max_threads_per_block: 1024,
unified_addressing: true,
concurrent_kernels: true,
})
}
}
fn calculate_memory_limits(
properties: &DeviceProperties,
max_fraction: f32,
pressure_fraction: f32,
) -> (usize, usize) {
let total_memory = properties.total_memory;
let memory_limit = (total_memory as f32 * max_fraction) as usize;
let pressure_threshold = (total_memory as f32 * pressure_fraction) as usize;
(memory_limit, pressure_threshold)
}
fn validate_allocation_request(&self, request: &AllocationRequest) -> CudaResult<()> {
if request.size == 0 {
return Err(CudaError::Context {
message: "Cannot allocate zero bytes".to_string(),
});
}
if request.size > self.device_properties.total_memory {
return Err(CudaError::Context {
message: format!(
"Requested size {} exceeds total device memory {}",
request.size, self.device_properties.total_memory
),
});
}
Ok(())
}
fn check_memory_limits(&self, size: usize) -> CudaResult<()> {
let current_allocated = self.total_allocated.load(Ordering::Relaxed);
let memory_limit = self.memory_limit.load(Ordering::Relaxed);
if current_allocated + size > memory_limit {
return Err(CudaError::Context {
message: format!(
"Allocation would exceed memory limit. Requested: {}, Current: {}, Limit: {}",
size, current_allocated, memory_limit
),
});
}
Ok(())
}
fn handle_memory_pressure(&self) -> CudaResult<()> {
if self.config.enable_compaction {
let _ = self.cleanup_and_compact()?;
}
Ok(())
}
fn try_pool_allocation(
&self,
size_class: usize,
request: &AllocationRequest,
) -> CudaResult<Option<CudaAllocation>> {
let mut pools = self.pools.lock().map_err(|_| CudaError::Context {
message: "Failed to acquire pools lock for allocation".to_string(),
})?;
if let Some(pool) = pools.get_mut(&size_class) {
if let Some(mut allocation) = pool.allocate() {
allocation.mark_in_use();
let _ = &request.tag;
self.update_allocation_stats(size_class, true);
return Ok(Some(allocation));
}
}
Ok(None)
}
fn allocate_new_block(
&self,
size_class: usize,
request: &AllocationRequest,
) -> CudaResult<CudaAllocation> {
let ptr = unsafe { cust::memory::cuda_malloc(size_class).cuda_result()? };
let allocation = CudaAllocation::new_on_device(ptr, size_class, size_class, self.device_id);
let _ = &request.tag;
if self.config.enable_pooling {
let mut pools = self.pools.lock().map_err(|_| CudaError::Context {
message: "Failed to acquire pools lock for new block".to_string(),
})?;
let pool = pools
.entry(size_class)
.or_insert_with(|| DeviceMemoryPool::new(size_class, PoolConfig::default()));
pool.add_allocation(allocation);
}
self.update_allocation_stats(size_class, false);
#[cfg(debug_assertions)]
{
if self.config.debug_tracking {
if let Ok(mut tracker) = ALLOCATION_TRACKER.lock() {
tracker.insert(allocation.as_ptr() as usize);
}
}
}
Ok(allocation)
}
fn return_to_pool(&self, allocation: CudaAllocation) -> CudaResult<()> {
let mut pools = self.pools.lock().map_err(|_| CudaError::Context {
message: "Failed to acquire pools lock for deallocation".to_string(),
})?;
if let Some(pool) = pools.get_mut(&allocation.size_class) {
pool.deallocate(allocation);
}
Ok(())
}
fn free_allocation(&self, allocation: CudaAllocation) -> CudaResult<()> {
#[cfg(debug_assertions)]
{
if self.config.debug_tracking {
if let Ok(mut tracker) = ALLOCATION_TRACKER.lock() {
tracker.remove(&(allocation.as_ptr() as usize));
}
}
}
unsafe {
cust::memory::cuda_free(allocation.ptr).cuda_result()?;
}
Ok(())
}
fn update_allocation_stats(&self, size: usize, cache_hit: bool) {
let current = self.total_allocated.fetch_add(size, Ordering::Relaxed) + size;
let mut peak = self.peak_allocated.load(Ordering::Relaxed);
while current > peak {
match self.peak_allocated.compare_exchange_weak(
peak,
current,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(new_peak) => peak = new_peak,
}
}
if let Ok(mut stats) = self.stats.lock() {
stats.total_allocations += 1;
stats.active_allocations += 1;
stats.total_bytes_allocated += size as u64;
stats.current_bytes_allocated = current as u64;
stats.peak_bytes_allocated = self.peak_allocated.load(Ordering::Relaxed) as u64;
if cache_hit {
let total = stats.total_allocations as f32;
stats.cache_hit_rate = ((stats.cache_hit_rate * (total - 1.0)) + 1.0) / total;
} else {
let total = stats.total_allocations as f32;
stats.cache_hit_rate = (stats.cache_hit_rate * (total - 1.0)) / total;
}
stats.average_allocation_size = if stats.total_allocations > 0 {
(stats.total_bytes_allocated / stats.total_allocations) as usize
} else {
0
};
}
}
fn update_deallocation_stats(&self, allocation: &CudaAllocation) {
self.total_allocated
.fetch_sub(allocation.size, Ordering::Relaxed);
if let Ok(mut stats) = self.stats.lock() {
stats.active_allocations = stats.active_allocations.saturating_sub(1);
stats.current_bytes_allocated = self.total_allocated.load(Ordering::Relaxed) as u64;
}
}
fn record_allocation_success(
&self,
request: &AllocationRequest,
start_time: Instant,
cache_hit: bool,
) {
let allocation_time = start_time.elapsed();
if let Ok(mut stats) = self.stats.lock() {
let total = stats.total_allocations as u32;
if total > 0 {
stats.average_allocation_time =
(stats.average_allocation_time * (total - 1) + allocation_time) / total;
} else {
stats.average_allocation_time = allocation_time;
}
stats.success_rate = 1.0; }
}
fn calculate_fragmentation_level(&self) -> f32 {
0.1 }
fn get_pool_count(&self) -> usize {
self.pools.lock().map(|pools| pools.len()).unwrap_or(0)
}
pub fn allocate_unified(
&self,
size: usize,
) -> CudaResult<super::allocation::UnifiedAllocation> {
let mut ptr: *mut std::ffi::c_void = std::ptr::null_mut();
unsafe {
let result = cuda_sys::cudaMallocManaged(&mut ptr, size, cuda_sys::cudaMemAttachGlobal);
if result != cuda_sys::cudaSuccess {
return Err(CudaError::Context {
message: format!("cudaMallocManaged failed with error code {:?}", result),
});
}
}
Ok(super::allocation::UnifiedAllocation::new(
ptr as *mut u8,
size,
))
}
pub fn prefetch_to_host(&self, ptr: *mut u8, size: usize) -> CudaResult<()> {
unsafe {
let result = cuda_sys::cudaMemPrefetchAsync(
ptr as *const std::ffi::c_void,
size,
cuda_sys::cudaCpuDeviceId,
std::ptr::null_mut(), );
if result != cuda_sys::cudaSuccess {
return Err(CudaError::Context {
message: format!(
"cudaMemPrefetchAsync to host failed with error code {:?}",
result
),
});
}
}
Ok(())
}
pub fn prefetch_to_gpu(&self, ptr: *mut u8, size: usize, device_id: i32) -> CudaResult<()> {
unsafe {
let result = cuda_sys::cudaMemPrefetchAsync(
ptr as *const std::ffi::c_void,
size,
device_id,
std::ptr::null_mut(), );
if result != cuda_sys::cudaSuccess {
return Err(CudaError::Context {
message: format!(
"cudaMemPrefetchAsync to device failed with error code {:?}",
result
),
});
}
}
Ok(())
}
pub fn set_memory_advice(
&self,
ptr: *mut u8,
size: usize,
advice: super::MemoryAdvice,
device_id: i32,
) -> CudaResult<()> {
let cuda_advice = match advice {
super::MemoryAdvice::SetReadMostly => cuda_sys::cudaMemAdviseSetReadMostly,
super::MemoryAdvice::UnsetReadMostly => cuda_sys::cudaMemAdviseUnsetReadMostly,
super::MemoryAdvice::SetPreferredLocation => {
cuda_sys::cudaMemAdviseSetPreferredLocation
}
super::MemoryAdvice::UnsetPreferredLocation => {
cuda_sys::cudaMemAdviseUnsetPreferredLocation
}
super::MemoryAdvice::SetAccessedBy => cuda_sys::cudaMemAdviseSetAccessedBy,
super::MemoryAdvice::UnsetAccessedBy => cuda_sys::cudaMemAdviseUnsetAccessedBy,
};
unsafe {
let result = cuda_sys::cudaMemAdvise(
ptr as *const std::ffi::c_void,
size,
cuda_advice,
device_id,
);
if result != cuda_sys::cudaSuccess {
return Err(CudaError::Context {
message: format!("cudaMemAdvise failed with error code {:?}", result),
});
}
}
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct DeviceMemoryInfo {
pub device_id: usize,
pub total_memory: usize,
pub current_allocated: usize,
pub peak_allocated: usize,
pub memory_limit: usize,
pub available_memory: usize,
pub utilization_percent: usize,
pub fragmentation_level: f32,
pub pool_count: usize,
}
#[derive(Debug, Clone)]
pub struct CleanupResult {
pub duration: Duration,
pub bytes_freed: usize,
pub pools_cleaned: usize,
pub empty_pools_removed: usize,
}
impl Default for DeviceMemoryConfig {
fn default() -> Self {
Self {
max_memory_fraction: 0.85,
pressure_threshold_fraction: 0.75,
enable_pooling: true,
max_pool_size: 16,
enable_compaction: true,
cleanup_interval: Duration::from_secs(60),
debug_tracking: cfg!(debug_assertions),
allocation_alignment: 256,
enable_async_alloc: false,
}
}
}
impl Default for PoolConfig {
fn default() -> Self {
Self {
max_free_blocks: 8,
min_age_for_cleanup: Duration::from_secs(30),
growth_strategy: GrowthStrategy::Adaptive,
track_statistics: true,
}
}
}
impl DeviceMemoryPool {
fn new(size_class: usize, config: PoolConfig) -> Self {
Self {
size_class,
free_blocks: Vec::new(),
allocated_blocks: Vec::new(),
pool_stats: PoolStatistics::default(),
last_access: Instant::now(),
config,
}
}
fn allocate(&mut self) -> Option<CudaAllocation> {
self.last_access = Instant::now();
if let Some(allocation) = self.free_blocks.pop() {
self.pool_stats.cache_hits += 1;
Some(allocation)
} else {
self.pool_stats.cache_misses += 1;
None
}
}
fn deallocate(&mut self, allocation: CudaAllocation) {
if self.free_blocks.len() < self.config.max_free_blocks {
self.free_blocks.push(allocation);
}
self.last_access = Instant::now();
}
fn add_allocation(&mut self, allocation: CudaAllocation) {
self.allocated_blocks.push(allocation);
self.pool_stats.total_allocations += 1;
}
fn cleanup_old_allocations(&mut self, max_age: Duration) -> CudaResult<usize> {
let now = Instant::now();
let initial_count = self.free_blocks.len();
self.free_blocks.retain(|allocation| {
let age = now.duration_since(allocation.allocation_time);
age <= max_age || age <= self.config.min_age_for_cleanup
});
Ok(initial_count - self.free_blocks.len())
}
fn is_empty(&self) -> bool {
self.free_blocks.is_empty() && self.allocated_blocks.is_empty()
}
}
impl Default for PoolStatistics {
fn default() -> Self {
Self {
total_allocations: 0,
cache_hits: 0,
cache_misses: 0,
utilization: 0.0,
peak_utilization: 0.0,
average_lifetime: Duration::from_secs(0),
memory_efficiency: 1.0,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_device_memory_config() {
let config = DeviceMemoryConfig::default();
assert_eq!(config.max_memory_fraction, 0.85);
assert!(config.enable_pooling);
assert!(config.enable_compaction);
}
#[test]
fn test_memory_limit_calculation() {
let properties = DeviceProperties {
total_memory: 8 * 1024 * 1024 * 1024, ..Default::default()
};
let (limit, threshold) = CudaMemoryManager::calculate_memory_limits(&properties, 0.8, 0.7);
assert_eq!(
limit,
((8 * 1024 * 1024 * 1024_usize) as f32 * 0.8) as usize
);
assert_eq!(
threshold,
((8 * 1024 * 1024 * 1024_usize) as f32 * 0.7) as usize
);
}
#[test]
fn test_device_memory_pool() {
let config = PoolConfig::default();
let mut pool = DeviceMemoryPool::new(1024, config);
assert!(pool.is_empty());
assert_eq!(pool.pool_stats.total_allocations, 0);
assert!(pool.allocate().is_none());
assert_eq!(pool.pool_stats.cache_misses, 1);
}
#[test]
fn test_growth_strategies() {
assert_eq!(GrowthStrategy::Fixed, GrowthStrategy::Fixed);
assert_ne!(GrowthStrategy::Linear, GrowthStrategy::Fixed);
if let GrowthStrategy::Exponential { max_size } =
(GrowthStrategy::Exponential { max_size: 1024 })
{
assert_eq!(max_size, 1024);
}
}
#[test]
fn test_allocation_request_validation() {
}
}
impl Default for DeviceProperties {
fn default() -> Self {
Self {
total_memory: 8 * 1024 * 1024 * 1024,
compute_capability: (7, 5),
memory_bus_width: 384,
memory_clock_rate: 1000000,
l2_cache_size: 6 * 1024 * 1024,
max_threads_per_block: 1024,
unified_addressing: true,
concurrent_kernels: true,
}
}
}
pub type DeviceMemoryMetrics = PoolStatistics;
pub type PoolConfiguration = PoolConfig;