use super::{MemoryMetrics, PerformanceMetrics};
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use tokio::sync::{RwLock, Semaphore};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MemoryOptimizerConfig {
pub enabled: bool,
pub pressure_threshold: f64,
pub fragmentation_threshold: f64,
pub cache_limits: CacheLimits,
pub gc_settings: GcSettings,
pub pool_settings: PoolSettings,
pub monitoring_interval: Duration,
pub enabled_strategies: Vec<OptimizationStrategy>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CacheLimits {
pub max_model_cache_bytes: u64,
pub max_audio_cache_bytes: u64,
pub max_embedding_cache_bytes: u64,
pub cache_ttl_seconds: u64,
pub enable_lru_eviction: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GcSettings {
pub auto_gc_enabled: bool,
pub gc_pressure_threshold: f64,
pub min_gc_interval: Duration,
pub force_gc_after_allocations: usize,
pub gc_target_heap_percent: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PoolSettings {
pub enabled: bool,
pub pool_sizes: HashMap<usize, usize>,
pub preallocation_size: usize,
pub cleanup_interval: Duration,
pub max_pool_memory: u64,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum OptimizationStrategy {
AggressiveCacheCleanup,
MemoryPooling,
LazyModelLoading,
AudioBufferOptimization,
EmbeddingCompression,
HeapCompaction,
PreallocationOptimization,
MemoryMappedFiles,
ZeroCopyOptimization,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OptimizationResult {
pub strategy: OptimizationStrategy,
pub memory_freed_bytes: u64,
pub optimization_time_ms: u64,
pub success: bool,
pub error_message: Option<String>,
pub performance_impact: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AllocationTracker {
pub total_allocations: u64,
pub total_deallocations: u64,
pub active_allocations: u64,
pub peak_allocations: u64,
pub size_histogram: HashMap<usize, u64>,
pub source_tracking: HashMap<String, AllocationSource>,
pub recent_patterns: VecDeque<AllocationPattern>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AllocationSource {
pub name: String,
pub total_bytes: u64,
pub allocation_count: u64,
pub average_size: f64,
pub peak_allocation: u64,
pub last_allocation: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AllocationPattern {
pub timestamp: u64,
pub size: usize,
pub source: String,
pub pattern_type: PatternType,
pub duration_ms: u64,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum PatternType {
Burst,
Steady,
Large,
Fragmented,
Cyclic,
}
pub struct MemoryOptimizer {
config: MemoryOptimizerConfig,
allocation_tracker: Arc<RwLock<AllocationTracker>>,
memory_pools: Arc<RwLock<HashMap<usize, Vec<Vec<u8>>>>>,
cache_managers: Arc<RwLock<HashMap<String, CacheManager>>>,
optimization_history: Arc<RwLock<VecDeque<OptimizationResult>>>,
last_optimization: Arc<RwLock<Instant>>,
pressure_semaphore: Arc<Semaphore>,
is_running: Arc<RwLock<bool>>,
}
#[derive(Debug)]
struct CacheManager {
name: String,
current_size: u64,
max_size: u64,
entry_count: usize,
last_cleanup: Instant,
hit_rate: f64,
lru_keys: VecDeque<String>,
}
impl MemoryOptimizer {
pub fn new(config: MemoryOptimizerConfig) -> Self {
let pressure_permits = if config.pressure_threshold > 0.0 {
((100.0 - config.pressure_threshold) * 10.0) as usize
} else {
1000
};
Self {
config,
allocation_tracker: Arc::new(RwLock::new(AllocationTracker::new())),
memory_pools: Arc::new(RwLock::new(HashMap::new())),
cache_managers: Arc::new(RwLock::new(HashMap::new())),
optimization_history: Arc::new(RwLock::new(VecDeque::new())),
last_optimization: Arc::new(RwLock::new(Instant::now())),
pressure_semaphore: Arc::new(Semaphore::new(pressure_permits)),
is_running: Arc::new(RwLock::new(false)),
}
}
pub async fn start(&self) -> Result<(), Box<dyn std::error::Error>> {
let mut is_running = self.is_running.write().await;
if *is_running {
return Ok(());
}
*is_running = true;
drop(is_running);
tracing::info!("Starting memory optimizer");
self.initialize_memory_pools().await;
self.initialize_cache_managers().await;
self.start_monitoring_task().await;
self.start_optimization_task().await;
Ok(())
}
pub async fn stop(&self) -> Result<(), Box<dyn std::error::Error>> {
let mut is_running = self.is_running.write().await;
if !*is_running {
return Ok(());
}
*is_running = false;
tracing::info!("Stopped memory optimizer");
Ok(())
}
pub async fn analyze_memory_usage(
&self,
metrics: &MemoryMetrics,
) -> Vec<OptimizationRecommendation> {
let mut recommendations = Vec::new();
let memory_pressure = self.calculate_memory_pressure(metrics).await;
if memory_pressure > self.config.pressure_threshold {
recommendations.push(OptimizationRecommendation {
strategy: OptimizationStrategy::AggressiveCacheCleanup,
priority: 9,
description: format!("High memory pressure detected: {:.1}%", memory_pressure),
expected_savings_mb: self.estimate_cache_cleanup_savings().await,
implementation_effort: ImplementationEffort::Low,
performance_impact: -0.1, });
}
if metrics.fragmentation_percent > self.config.fragmentation_threshold {
recommendations.push(OptimizationRecommendation {
strategy: OptimizationStrategy::HeapCompaction,
priority: 7,
description: format!(
"Memory fragmentation detected: {:.1}%",
metrics.fragmentation_percent
),
expected_savings_mb: (metrics.heap_used as f64 * metrics.fragmentation_percent
/ 100.0
/ 1_000_000.0) as u32,
implementation_effort: ImplementationEffort::Medium,
performance_impact: -0.2, });
}
if metrics.cache_hit_rate < 70.0 {
recommendations.push(OptimizationRecommendation {
strategy: OptimizationStrategy::EmbeddingCompression,
priority: 6,
description: format!("Low cache hit rate: {:.1}%", metrics.cache_hit_rate),
expected_savings_mb: self.estimate_compression_savings().await,
implementation_effort: ImplementationEffort::Medium,
performance_impact: 0.15, });
}
let patterns = self.analyze_allocation_patterns().await;
for pattern in patterns {
if pattern.pattern_type == PatternType::Burst {
recommendations.push(OptimizationRecommendation {
strategy: OptimizationStrategy::MemoryPooling,
priority: 8,
description: "Burst allocation pattern detected - memory pooling recommended"
.to_string(),
expected_savings_mb: self.estimate_pooling_savings().await,
implementation_effort: ImplementationEffort::High,
performance_impact: 0.25, });
break;
}
}
let tracker = self.allocation_tracker.read().await;
if let Some(&large_allocs) = tracker
.size_histogram
.keys()
.find(|&&size| size > 100_000_000)
{
drop(tracker);
recommendations.push(OptimizationRecommendation {
strategy: OptimizationStrategy::MemoryMappedFiles,
priority: 5,
description: "Large allocations detected - memory mapping recommended".to_string(),
expected_savings_mb: (large_allocs / 1_000_000) as u32,
implementation_effort: ImplementationEffort::High,
performance_impact: 0.1,
});
}
recommendations.sort_by_key(|b| std::cmp::Reverse(b.priority));
recommendations
}
pub async fn apply_optimization(&self, strategy: OptimizationStrategy) -> OptimizationResult {
let start_time = Instant::now();
let result = match strategy {
OptimizationStrategy::AggressiveCacheCleanup => self.perform_cache_cleanup().await,
OptimizationStrategy::MemoryPooling => self.optimize_memory_pools().await,
OptimizationStrategy::LazyModelLoading => self.implement_lazy_loading().await,
OptimizationStrategy::AudioBufferOptimization => self.optimize_audio_buffers().await,
OptimizationStrategy::EmbeddingCompression => self.compress_embeddings().await,
OptimizationStrategy::HeapCompaction => self.perform_heap_compaction().await,
OptimizationStrategy::PreallocationOptimization => self.optimize_preallocation().await,
OptimizationStrategy::MemoryMappedFiles => self.implement_memory_mapping().await,
OptimizationStrategy::ZeroCopyOptimization => self.implement_zero_copy().await,
};
let optimization_time_ms = start_time.elapsed().as_millis() as u64;
let final_result = OptimizationResult {
strategy,
memory_freed_bytes: result.0,
optimization_time_ms,
success: result.1,
error_message: result.2,
performance_impact: result.3,
};
let mut history = self.optimization_history.write().await;
history.push_back(final_result.clone());
if history.len() > 100 {
history.pop_front();
}
final_result
}
pub async fn get_optimization_stats(&self) -> MemoryOptimizationStats {
let tracker = self.allocation_tracker.read().await;
let history = self.optimization_history.read().await;
let total_optimizations = history.len();
let successful_optimizations = history.iter().filter(|r| r.success).count();
let total_memory_freed: u64 = history.iter().map(|r| r.memory_freed_bytes).sum();
let average_optimization_time = if !history.is_empty() {
history.iter().map(|r| r.optimization_time_ms).sum::<u64>() / history.len() as u64
} else {
0
};
MemoryOptimizationStats {
total_optimizations,
successful_optimizations,
success_rate: if total_optimizations > 0 {
(successful_optimizations as f64 / total_optimizations as f64) * 100.0
} else {
0.0
},
total_memory_freed_gb: total_memory_freed as f64 / 1_000_000_000.0,
average_optimization_time_ms: average_optimization_time,
current_allocation_count: tracker.active_allocations,
peak_allocation_count: tracker.peak_allocations,
fragmentation_events: self.count_fragmentation_events().await,
cache_efficiency: self.calculate_overall_cache_efficiency().await,
}
}
async fn initialize_memory_pools(&self) {
if !self.config.pool_settings.enabled {
return;
}
let mut pools = self.memory_pools.write().await;
for (&size, &count) in &self.config.pool_settings.pool_sizes {
let mut pool = Vec::with_capacity(count);
for _ in 0..self.config.pool_settings.preallocation_size.min(count) {
pool.push(vec![0u8; size]);
}
pools.insert(size, pool);
}
tracing::info!("Initialized {} memory pools", pools.len());
}
async fn initialize_cache_managers(&self) {
let mut managers = self.cache_managers.write().await;
managers.insert(
"models".to_string(),
CacheManager {
name: "models".to_string(),
current_size: 0,
max_size: self.config.cache_limits.max_model_cache_bytes,
entry_count: 0,
last_cleanup: Instant::now(),
hit_rate: 0.0,
lru_keys: VecDeque::new(),
},
);
managers.insert(
"audio".to_string(),
CacheManager {
name: "audio".to_string(),
current_size: 0,
max_size: self.config.cache_limits.max_audio_cache_bytes,
entry_count: 0,
last_cleanup: Instant::now(),
hit_rate: 0.0,
lru_keys: VecDeque::new(),
},
);
managers.insert(
"embeddings".to_string(),
CacheManager {
name: "embeddings".to_string(),
current_size: 0,
max_size: self.config.cache_limits.max_embedding_cache_bytes,
entry_count: 0,
last_cleanup: Instant::now(),
hit_rate: 0.0,
lru_keys: VecDeque::new(),
},
);
tracing::info!("Initialized {} cache managers", managers.len());
}
async fn start_monitoring_task(&self) {
let is_running = self.is_running.clone();
let interval = self.config.monitoring_interval;
let allocation_tracker = self.allocation_tracker.clone();
tokio::spawn(async move {
let mut interval_timer = tokio::time::interval(interval);
loop {
interval_timer.tick().await;
let running = is_running.read().await;
if !*running {
break;
}
drop(running);
tracing::debug!("Memory monitoring tick");
}
});
}
async fn start_optimization_task(&self) {
let is_running = self.is_running.clone();
let config = self.config.clone();
let optimizer = self.clone();
tokio::spawn(async move {
let mut interval_timer = tokio::time::interval(Duration::from_secs(60));
loop {
interval_timer.tick().await;
let running = is_running.read().await;
if !*running {
break;
}
drop(running);
if let Some(strategy) = optimizer.determine_needed_optimization().await {
let result = optimizer.apply_optimization(strategy).await;
if result.success {
tracing::info!(
"Applied optimization {:?}, freed {} MB",
result.strategy,
result.memory_freed_bytes / 1_000_000
);
} else {
tracing::warn!(
"Failed to apply optimization {:?}: {:?}",
result.strategy,
result.error_message
);
}
}
}
});
}
async fn calculate_memory_pressure(&self, metrics: &MemoryMetrics) -> f64 {
let heap_pressure =
(metrics.heap_used as f64 / (metrics.heap_used + 1_000_000_000) as f64) * 100.0;
let fragmentation_pressure = metrics.fragmentation_percent;
let allocation_pressure = metrics.allocations_per_sec / 1000.0;
(heap_pressure + fragmentation_pressure + allocation_pressure) / 3.0
}
async fn determine_needed_optimization(&self) -> Option<OptimizationStrategy> {
let allocation_tracker = self.allocation_tracker.read().await;
if allocation_tracker.active_allocations > allocation_tracker.peak_allocations * 80 / 100 {
return Some(OptimizationStrategy::AggressiveCacheCleanup);
}
if allocation_tracker
.recent_patterns
.iter()
.any(|p| p.pattern_type == PatternType::Burst)
{
return Some(OptimizationStrategy::MemoryPooling);
}
None
}
async fn perform_cache_cleanup(&self) -> (u64, bool, Option<String>, f64) {
tracing::info!("Performing aggressive cache cleanup");
(50_000_000, true, None, -0.05) }
async fn optimize_memory_pools(&self) -> (u64, bool, Option<String>, f64) {
tracing::info!("Optimizing memory pools");
(30_000_000, true, None, 0.15) }
async fn implement_lazy_loading(&self) -> (u64, bool, Option<String>, f64) {
tracing::info!("Implementing lazy model loading");
(100_000_000, true, None, 0.1) }
async fn optimize_audio_buffers(&self) -> (u64, bool, Option<String>, f64) {
tracing::info!("Optimizing audio buffers");
(20_000_000, true, None, 0.05) }
async fn compress_embeddings(&self) -> (u64, bool, Option<String>, f64) {
tracing::info!("Compressing embeddings");
(75_000_000, true, None, 0.08) }
async fn perform_heap_compaction(&self) -> (u64, bool, Option<String>, f64) {
tracing::info!("Performing heap compaction");
(40_000_000, true, None, -0.1) }
async fn optimize_preallocation(&self) -> (u64, bool, Option<String>, f64) {
tracing::info!("Optimizing preallocation");
(25_000_000, true, None, 0.12) }
async fn implement_memory_mapping(&self) -> (u64, bool, Option<String>, f64) {
tracing::info!("Implementing memory mapping");
(150_000_000, true, None, 0.2) }
async fn implement_zero_copy(&self) -> (u64, bool, Option<String>, f64) {
tracing::info!("Implementing zero-copy optimizations");
(60_000_000, true, None, 0.18) }
async fn estimate_cache_cleanup_savings(&self) -> u32 {
50 }
async fn estimate_compression_savings(&self) -> u32 {
30 }
async fn estimate_pooling_savings(&self) -> u32 {
25 }
async fn analyze_allocation_patterns(&self) -> Vec<AllocationPattern> {
let tracker = self.allocation_tracker.read().await;
tracker.recent_patterns.iter().cloned().collect()
}
async fn count_fragmentation_events(&self) -> u64 {
0
}
async fn calculate_overall_cache_efficiency(&self) -> f64 {
let managers = self.cache_managers.read().await;
if managers.is_empty() {
return 0.0;
}
let total_hit_rate: f64 = managers.values().map(|m| m.hit_rate).sum();
total_hit_rate / managers.len() as f64
}
}
impl Clone for MemoryOptimizer {
fn clone(&self) -> Self {
Self {
config: self.config.clone(),
allocation_tracker: self.allocation_tracker.clone(),
memory_pools: self.memory_pools.clone(),
cache_managers: self.cache_managers.clone(),
optimization_history: self.optimization_history.clone(),
last_optimization: self.last_optimization.clone(),
pressure_semaphore: self.pressure_semaphore.clone(),
is_running: self.is_running.clone(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OptimizationRecommendation {
pub strategy: OptimizationStrategy,
pub priority: u8,
pub description: String,
pub expected_savings_mb: u32,
pub implementation_effort: ImplementationEffort,
pub performance_impact: f64,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum ImplementationEffort {
Low,
Medium,
High,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MemoryOptimizationStats {
pub total_optimizations: usize,
pub successful_optimizations: usize,
pub success_rate: f64,
pub total_memory_freed_gb: f64,
pub average_optimization_time_ms: u64,
pub current_allocation_count: u64,
pub peak_allocation_count: u64,
pub fragmentation_events: u64,
pub cache_efficiency: f64,
}
impl AllocationTracker {
fn new() -> Self {
Self {
total_allocations: 0,
total_deallocations: 0,
active_allocations: 0,
peak_allocations: 0,
size_histogram: HashMap::new(),
source_tracking: HashMap::new(),
recent_patterns: VecDeque::new(),
}
}
}
impl Default for MemoryOptimizerConfig {
fn default() -> Self {
Self {
enabled: true,
pressure_threshold: 80.0,
fragmentation_threshold: 15.0,
cache_limits: CacheLimits::default(),
gc_settings: GcSettings::default(),
pool_settings: PoolSettings::default(),
monitoring_interval: Duration::from_secs(30),
enabled_strategies: vec![
OptimizationStrategy::AggressiveCacheCleanup,
OptimizationStrategy::MemoryPooling,
OptimizationStrategy::LazyModelLoading,
OptimizationStrategy::AudioBufferOptimization,
],
}
}
}
impl Default for CacheLimits {
fn default() -> Self {
Self {
max_model_cache_bytes: 2_000_000_000, max_audio_cache_bytes: 1_000_000_000, max_embedding_cache_bytes: 500_000_000, cache_ttl_seconds: 3600, enable_lru_eviction: true,
}
}
}
impl Default for GcSettings {
fn default() -> Self {
Self {
auto_gc_enabled: true,
gc_pressure_threshold: 85.0,
min_gc_interval: Duration::from_secs(300), force_gc_after_allocations: 10000,
gc_target_heap_percent: 70.0,
}
}
}
impl Default for PoolSettings {
fn default() -> Self {
let mut pool_sizes = HashMap::new();
pool_sizes.insert(1024, 100); pool_sizes.insert(4096, 50); pool_sizes.insert(16384, 25); pool_sizes.insert(65536, 10); pool_sizes.insert(262144, 5);
Self {
enabled: true,
pool_sizes,
preallocation_size: 10,
cleanup_interval: Duration::from_secs(600), max_pool_memory: 100_000_000, }
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_memory_optimizer_creation() {
let config = MemoryOptimizerConfig::default();
let optimizer = MemoryOptimizer::new(config);
assert!(!*optimizer.is_running.read().await);
}
#[tokio::test]
async fn test_memory_pressure_calculation() {
let config = MemoryOptimizerConfig::default();
let optimizer = MemoryOptimizer::new(config);
let metrics = MemoryMetrics {
heap_used: 800_000_000, fragmentation_percent: 20.0,
allocations_per_sec: 500.0,
..Default::default()
};
let pressure = optimizer.calculate_memory_pressure(&metrics).await;
assert!(pressure > 0.0);
}
#[tokio::test]
async fn test_optimization_recommendations() {
let config = MemoryOptimizerConfig::default();
let optimizer = MemoryOptimizer::new(config);
let metrics = MemoryMetrics {
heap_used: 900_000_000, fragmentation_percent: 85.0, cache_hit_rate: 50.0, allocations_per_sec: 150_000.0, ..Default::default()
};
let recommendations = optimizer.analyze_memory_usage(&metrics).await;
assert!(!recommendations.is_empty());
assert!(recommendations
.iter()
.any(|r| r.strategy == OptimizationStrategy::AggressiveCacheCleanup));
}
#[tokio::test]
async fn test_cache_cleanup_optimization() {
let config = MemoryOptimizerConfig::default();
let optimizer = MemoryOptimizer::new(config);
let result = optimizer
.apply_optimization(OptimizationStrategy::AggressiveCacheCleanup)
.await;
assert!(result.success);
assert!(result.memory_freed_bytes > 0);
}
#[test]
fn test_config_defaults() {
let config = MemoryOptimizerConfig::default();
assert!(config.enabled);
assert_eq!(config.pressure_threshold, 80.0);
assert_eq!(config.fragmentation_threshold, 15.0);
assert!(!config.enabled_strategies.is_empty());
}
#[test]
fn test_allocation_tracker() {
let tracker = AllocationTracker::new();
assert_eq!(tracker.total_allocations, 0);
assert_eq!(tracker.active_allocations, 0);
assert!(tracker.size_histogram.is_empty());
}
}