pub mod reduction;
pub use reduction::{
CooperativeBarrier, GlobalReduction, InterPhaseReduction, PhaseState, ReductionBuilder,
ReductionConfig, ReductionError, ReductionOp, SyncMode,
};
pub use ringkernel_core::analytics_context as ring_analytics_context;
pub use ringkernel_core::memory as ring_memory;
pub use ringkernel_core::reduction as ring_reduction;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use tokio::sync::RwLock;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MemoryConfig {
pub max_gpu_memory: u64,
pub max_staging_memory: u64,
pub pooling_enabled: bool,
pub bucket_sizes: Vec<u64>,
pub pressure_threshold: f64,
pub auto_defrag: bool,
pub defrag_threshold: f64,
}
impl Default for MemoryConfig {
fn default() -> Self {
Self {
max_gpu_memory: 4 * 1024 * 1024 * 1024, max_staging_memory: 1024 * 1024 * 1024, pooling_enabled: true,
bucket_sizes: vec![
64 * 1024, 256 * 1024, 1024 * 1024, 4 * 1024 * 1024, 16 * 1024 * 1024, 64 * 1024 * 1024, ],
pressure_threshold: 0.85,
auto_defrag: true,
defrag_threshold: 0.3,
}
}
}
impl MemoryConfig {
pub fn development() -> Self {
Self {
max_gpu_memory: 512 * 1024 * 1024, max_staging_memory: 256 * 1024 * 1024, pooling_enabled: false,
..Default::default()
}
}
pub fn production() -> Self {
Self::default()
}
pub fn high_performance() -> Self {
Self {
max_gpu_memory: 16 * 1024 * 1024 * 1024, max_staging_memory: 4 * 1024 * 1024 * 1024, pooling_enabled: true,
auto_defrag: true,
defrag_threshold: 0.2,
..Default::default()
}
}
}
#[derive(Debug)]
pub struct SizeBucket {
pub size: u64,
pub available: AtomicUsize,
pub allocated: AtomicUsize,
pub peak: AtomicUsize,
}
impl SizeBucket {
pub fn new(size: u64) -> Self {
Self {
size,
available: AtomicUsize::new(0),
allocated: AtomicUsize::new(0),
peak: AtomicUsize::new(0),
}
}
pub fn record_alloc(&self) {
let count = self.allocated.fetch_add(1, Ordering::Relaxed) + 1;
let mut peak = self.peak.load(Ordering::Relaxed);
while count > peak {
match self
.peak
.compare_exchange_weak(peak, count, Ordering::Relaxed, Ordering::Relaxed)
{
Ok(_) => break,
Err(p) => peak = p,
}
}
}
pub fn record_dealloc(&self) {
self.allocated.fetch_sub(1, Ordering::Relaxed);
}
pub fn stats(&self) -> BucketStats {
BucketStats {
size: self.size,
available: self.available.load(Ordering::Relaxed),
allocated: self.allocated.load(Ordering::Relaxed),
peak: self.peak.load(Ordering::Relaxed),
}
}
}
#[derive(Debug, Clone)]
pub struct BucketStats {
pub size: u64,
pub available: usize,
pub allocated: usize,
pub peak: usize,
}
#[derive(Debug)]
pub struct MemoryBuffer {
pub id: u64,
pub size: u64,
pub bucket_index: Option<usize>,
pub is_gpu: bool,
}
pub type AllocResult<T> = std::result::Result<T, MemoryError>;
#[derive(Debug, thiserror::Error)]
pub enum MemoryError {
#[error("Out of memory: requested {requested} bytes, available {available} bytes")]
OutOfMemory {
requested: u64,
available: u64,
},
#[error("Memory pressure exceeded: {usage_percent:.1}% usage")]
PressureExceeded {
usage_percent: f64,
},
#[error("Invalid buffer: {id}")]
InvalidBuffer {
id: u64,
},
#[error("Allocation failed: {reason}")]
AllocationFailed {
reason: String,
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum PressureLevel {
#[default]
Normal,
Warning,
High,
Critical,
}
impl PressureLevel {
pub fn from_ratio(ratio: f64) -> Self {
if ratio < 0.70 {
Self::Normal
} else if ratio < 0.85 {
Self::Warning
} else if ratio < 0.95 {
Self::High
} else {
Self::Critical
}
}
}
#[derive(Debug, Clone, Default)]
pub struct MemoryStats {
pub gpu_total: u64,
pub gpu_used: u64,
pub gpu_peak: u64,
pub staging_total: u64,
pub staging_used: u64,
pub allocations: u64,
pub deallocations: u64,
pub pool_hit_rate: f64,
pub pressure_level: PressureLevel,
}
pub struct KernelMemoryManager {
config: MemoryConfig,
buckets: Vec<SizeBucket>,
stats: Arc<MemoryStatsInner>,
buffers: Arc<RwLock<HashMap<u64, MemoryBuffer>>>,
next_id: AtomicU64,
}
#[derive(Debug, Default)]
struct MemoryStatsInner {
gpu_used: AtomicU64,
gpu_peak: AtomicU64,
staging_used: AtomicU64,
allocations: AtomicU64,
deallocations: AtomicU64,
pool_hits: AtomicU64,
pool_misses: AtomicU64,
}
impl KernelMemoryManager {
pub fn new(config: MemoryConfig) -> Self {
let buckets = config
.bucket_sizes
.iter()
.map(|&size| SizeBucket::new(size))
.collect();
Self {
config,
buckets,
stats: Arc::new(MemoryStatsInner::default()),
buffers: Arc::new(RwLock::new(HashMap::new())),
next_id: AtomicU64::new(1),
}
}
pub fn config(&self) -> &MemoryConfig {
&self.config
}
pub async fn allocate(&self, size: u64) -> AllocResult<MemoryBuffer> {
let pressure = self.pressure_level();
if pressure == PressureLevel::Critical {
return Err(MemoryError::PressureExceeded {
usage_percent: self.gpu_usage_percent(),
});
}
let current_used = self.stats.gpu_used.load(Ordering::Relaxed);
if current_used + size > self.config.max_gpu_memory {
return Err(MemoryError::OutOfMemory {
requested: size,
available: self.config.max_gpu_memory - current_used,
});
}
let bucket_index = if self.config.pooling_enabled {
self.find_bucket(size)
} else {
None
};
if let Some(idx) = bucket_index {
self.stats.pool_hits.fetch_add(1, Ordering::Relaxed);
self.buckets[idx].record_alloc();
} else if self.config.pooling_enabled {
self.stats.pool_misses.fetch_add(1, Ordering::Relaxed);
}
self.stats.gpu_used.fetch_add(size, Ordering::Relaxed);
self.stats.allocations.fetch_add(1, Ordering::Relaxed);
let new_used = self.stats.gpu_used.load(Ordering::Relaxed);
let mut peak = self.stats.gpu_peak.load(Ordering::Relaxed);
while new_used > peak {
match self.stats.gpu_peak.compare_exchange_weak(
peak,
new_used,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(p) => peak = p,
}
}
let id = self.next_id.fetch_add(1, Ordering::Relaxed);
let buffer = MemoryBuffer {
id,
size,
bucket_index,
is_gpu: true,
};
self.buffers.write().await.insert(
id,
MemoryBuffer {
id,
size,
bucket_index,
is_gpu: true,
},
);
Ok(buffer)
}
pub async fn deallocate(&self, buffer: MemoryBuffer) -> AllocResult<()> {
let removed = self.buffers.write().await.remove(&buffer.id);
if removed.is_none() {
return Err(MemoryError::InvalidBuffer { id: buffer.id });
}
if let Some(idx) = buffer.bucket_index {
self.buckets[idx].record_dealloc();
}
self.stats
.gpu_used
.fetch_sub(buffer.size, Ordering::Relaxed);
self.stats.deallocations.fetch_add(1, Ordering::Relaxed);
Ok(())
}
pub async fn allocate_staging(&self, size: u64) -> AllocResult<MemoryBuffer> {
let current_used = self.stats.staging_used.load(Ordering::Relaxed);
if current_used + size > self.config.max_staging_memory {
return Err(MemoryError::OutOfMemory {
requested: size,
available: self.config.max_staging_memory - current_used,
});
}
self.stats.staging_used.fetch_add(size, Ordering::Relaxed);
self.stats.allocations.fetch_add(1, Ordering::Relaxed);
let id = self.next_id.fetch_add(1, Ordering::Relaxed);
let buffer = MemoryBuffer {
id,
size,
bucket_index: None,
is_gpu: false,
};
self.buffers.write().await.insert(
id,
MemoryBuffer {
id,
size,
bucket_index: None,
is_gpu: false,
},
);
Ok(buffer)
}
pub async fn deallocate_staging(&self, buffer: MemoryBuffer) -> AllocResult<()> {
let removed = self.buffers.write().await.remove(&buffer.id);
if removed.is_none() {
return Err(MemoryError::InvalidBuffer { id: buffer.id });
}
self.stats
.staging_used
.fetch_sub(buffer.size, Ordering::Relaxed);
self.stats.deallocations.fetch_add(1, Ordering::Relaxed);
Ok(())
}
pub fn stats(&self) -> MemoryStats {
let gpu_used = self.stats.gpu_used.load(Ordering::Relaxed);
let pool_hits = self.stats.pool_hits.load(Ordering::Relaxed);
let pool_misses = self.stats.pool_misses.load(Ordering::Relaxed);
let total_pool = pool_hits + pool_misses;
MemoryStats {
gpu_total: self.config.max_gpu_memory,
gpu_used,
gpu_peak: self.stats.gpu_peak.load(Ordering::Relaxed),
staging_total: self.config.max_staging_memory,
staging_used: self.stats.staging_used.load(Ordering::Relaxed),
allocations: self.stats.allocations.load(Ordering::Relaxed),
deallocations: self.stats.deallocations.load(Ordering::Relaxed),
pool_hit_rate: if total_pool > 0 {
pool_hits as f64 / total_pool as f64
} else {
0.0
},
pressure_level: self.pressure_level(),
}
}
pub fn bucket_stats(&self) -> Vec<BucketStats> {
self.buckets.iter().map(|b| b.stats()).collect()
}
pub fn pressure_level(&self) -> PressureLevel {
PressureLevel::from_ratio(self.gpu_usage_percent() / 100.0)
}
pub fn gpu_usage_percent(&self) -> f64 {
let used = self.stats.gpu_used.load(Ordering::Relaxed) as f64;
let total = self.config.max_gpu_memory as f64;
(used / total) * 100.0
}
pub async fn request_gc(&self) {
tracing::info!(
"Memory GC requested, pressure level: {:?}",
self.pressure_level()
);
}
fn find_bucket(&self, size: u64) -> Option<usize> {
self.buckets.iter().position(|b| b.size >= size)
}
}
impl Default for KernelMemoryManager {
fn default() -> Self {
Self::new(MemoryConfig::default())
}
}
#[derive(Debug)]
pub struct ReductionBuffer<T> {
data: Vec<T>,
capacity: usize,
}
impl<T: Default + Clone> ReductionBuffer<T> {
pub fn new(capacity: usize) -> Self {
Self {
data: vec![T::default(); capacity],
capacity,
}
}
pub fn capacity(&self) -> usize {
self.capacity
}
pub fn as_slice(&self) -> &[T] {
&self.data
}
pub fn as_mut_slice(&mut self) -> &mut [T] {
&mut self.data
}
pub fn reset(&mut self) {
for item in &mut self.data {
*item = T::default();
}
}
}
pub struct ReductionBufferCache {
max_buffers: usize,
buffers: Arc<RwLock<Vec<Vec<u8>>>>,
}
impl ReductionBufferCache {
pub fn new(max_buffers: usize) -> Self {
Self {
max_buffers,
buffers: Arc::new(RwLock::new(Vec::new())),
}
}
pub async fn get(&self, size: usize) -> Vec<u8> {
let mut buffers = self.buffers.write().await;
if let Some(pos) = buffers.iter().position(|b| b.capacity() >= size) {
let mut buf = buffers.remove(pos);
buf.resize(size, 0);
return buf;
}
vec![0u8; size]
}
pub async fn return_buffer(&self, buffer: Vec<u8>) {
let mut buffers = self.buffers.write().await;
if buffers.len() < self.max_buffers {
buffers.push(buffer);
}
}
pub async fn clear(&self) {
self.buffers.write().await.clear();
}
}
impl Default for ReductionBufferCache {
fn default() -> Self {
Self::new(16)
}
}
#[derive(Debug)]
pub struct AnalyticsContext {
pub id: u64,
pub max_working_set: u64,
allocations: AtomicU64,
}
impl AnalyticsContext {
pub fn new(id: u64, max_working_set: u64) -> Self {
Self {
id,
max_working_set,
allocations: AtomicU64::new(0),
}
}
pub fn record_allocation(&self, size: u64) -> bool {
let current = self.allocations.load(Ordering::Relaxed);
if current + size > self.max_working_set {
return false;
}
self.allocations.fetch_add(size, Ordering::Relaxed);
true
}
pub fn record_deallocation(&self, size: u64) {
self.allocations.fetch_sub(size, Ordering::Relaxed);
}
pub fn current_usage(&self) -> u64 {
self.allocations.load(Ordering::Relaxed)
}
pub fn usage_percent(&self) -> f64 {
(self.current_usage() as f64 / self.max_working_set as f64) * 100.0
}
}
pub struct AnalyticsContextManager {
contexts: Arc<RwLock<HashMap<u64, Arc<AnalyticsContext>>>>,
default_working_set: u64,
next_id: AtomicU64,
}
impl AnalyticsContextManager {
pub fn new(default_working_set: u64) -> Self {
Self {
contexts: Arc::new(RwLock::new(HashMap::new())),
default_working_set,
next_id: AtomicU64::new(1),
}
}
pub async fn create_context(&self) -> Arc<AnalyticsContext> {
self.create_context_with_size(self.default_working_set)
.await
}
pub async fn create_context_with_size(&self, max_working_set: u64) -> Arc<AnalyticsContext> {
let id = self.next_id.fetch_add(1, Ordering::Relaxed);
let ctx = Arc::new(AnalyticsContext::new(id, max_working_set));
self.contexts.write().await.insert(id, ctx.clone());
ctx
}
pub async fn get_context(&self, id: u64) -> Option<Arc<AnalyticsContext>> {
self.contexts.read().await.get(&id).cloned()
}
pub async fn remove_context(&self, id: u64) {
self.contexts.write().await.remove(&id);
}
pub async fn active_contexts(&self) -> usize {
self.contexts.read().await.len()
}
}
impl Default for AnalyticsContextManager {
fn default() -> Self {
Self::new(256 * 1024 * 1024) }
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_memory_allocation() {
let manager = KernelMemoryManager::new(MemoryConfig::development());
let buffer = manager.allocate(1024).await.unwrap();
assert_eq!(buffer.size, 1024);
assert!(buffer.is_gpu);
let stats = manager.stats();
assert_eq!(stats.gpu_used, 1024);
assert_eq!(stats.allocations, 1);
manager.deallocate(buffer).await.unwrap();
let stats = manager.stats();
assert_eq!(stats.gpu_used, 0);
assert_eq!(stats.deallocations, 1);
}
#[tokio::test]
async fn test_out_of_memory() {
let config = MemoryConfig {
max_gpu_memory: 1024,
..MemoryConfig::development()
};
let manager = KernelMemoryManager::new(config);
let result = manager.allocate(2048).await;
assert!(matches!(result, Err(MemoryError::OutOfMemory { .. })));
}
#[tokio::test]
async fn test_pressure_levels() {
let config = MemoryConfig {
max_gpu_memory: 1000,
..MemoryConfig::development()
};
let manager = KernelMemoryManager::new(config);
assert_eq!(manager.pressure_level(), PressureLevel::Normal);
let _buf = manager.allocate(700).await.unwrap();
assert_eq!(manager.pressure_level(), PressureLevel::Warning);
}
#[tokio::test]
async fn test_reduction_buffer_cache() {
let cache = ReductionBufferCache::new(4);
let buf1 = cache.get(1024).await;
assert_eq!(buf1.len(), 1024);
cache.return_buffer(buf1).await;
let buf2 = cache.get(512).await;
assert_eq!(buf2.len(), 512);
}
#[tokio::test]
async fn test_analytics_context() {
let manager = AnalyticsContextManager::new(1024);
let ctx = manager.create_context().await;
assert!(ctx.record_allocation(512));
assert_eq!(ctx.current_usage(), 512);
ctx.record_deallocation(256);
assert_eq!(ctx.current_usage(), 256);
}
}