use anyhow::{Context, Result};
use std::collections::{HashMap, VecDeque};
use std::sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Arc, Mutex,
};
use std::time::{Duration, Instant};
use uuid::Uuid;
use super::analytics::*;
use super::types::*;
pub struct MemoryProfiler {
config: ProfilerConfig,
metrics_history: Arc<Mutex<VecDeque<MemoryMetrics>>>,
allocations: Arc<Mutex<HashMap<Uuid, AllocationInfo>>>,
alerts: Arc<Mutex<Vec<MemoryAlert>>>,
patterns: Arc<Mutex<Vec<MemoryPattern>>>,
is_monitoring: Arc<AtomicBool>,
start_time: Option<Instant>,
cached_recommendations: AlertRecommendations,
adaptive_thresholds: Arc<Mutex<AdaptiveThresholds>>,
leak_detection: Arc<Mutex<LeakDetectionHeuristics>>,
memory_predictor: Arc<Mutex<MemoryPredictor>>,
monitoring_overhead_us: Arc<AtomicU64>,
total_collections: Arc<AtomicU64>,
}
impl MemoryProfiler {
pub fn new(config: ProfilerConfig) -> Result<Self> {
std::fs::create_dir_all(&config.output_dir).context("Failed to create output directory")?;
Ok(Self {
config,
metrics_history: Arc::new(Mutex::new(VecDeque::new())),
allocations: Arc::new(Mutex::new(HashMap::new())),
alerts: Arc::new(Mutex::new(Vec::new())),
patterns: Arc::new(Mutex::new(Vec::new())),
is_monitoring: Arc::new(AtomicBool::new(false)),
start_time: None,
cached_recommendations: AlertRecommendations::new(),
adaptive_thresholds: Arc::new(Mutex::new(AdaptiveThresholds::default())),
leak_detection: Arc::new(Mutex::new(LeakDetectionHeuristics::default())),
memory_predictor: Arc::new(Mutex::new(MemoryPredictor::default())),
monitoring_overhead_us: Arc::new(AtomicU64::new(0)),
total_collections: Arc::new(AtomicU64::new(0)),
})
}
pub async fn start_monitoring(&mut self) -> Result<()> {
if self
.is_monitoring
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
.is_err()
{
return Ok(());
}
self.start_time = Some(Instant::now());
let metrics_history = self.metrics_history.clone();
let _allocations = self.allocations.clone();
let alerts = self.alerts.clone();
let patterns = self.patterns.clone();
let config = self.config.clone();
let is_monitoring = self.is_monitoring.clone();
let cached_recommendations = self.cached_recommendations.clone();
let adaptive_thresholds = self.adaptive_thresholds.clone();
let leak_detection = self.leak_detection.clone();
let memory_predictor = self.memory_predictor.clone();
let monitoring_overhead_us = self.monitoring_overhead_us.clone();
let total_collections = self.total_collections.clone();
tokio::spawn(async move {
let mut interval =
tokio::time::interval(Duration::from_millis(config.collection_interval_ms));
let mut previous_metrics: Option<MemoryMetrics> = None;
while is_monitoring.load(Ordering::Relaxed) {
interval.tick().await;
let monitoring_start = Instant::now();
total_collections.fetch_add(1, Ordering::Relaxed);
let current_metrics = match Self::collect_memory_metrics().await {
Ok(metrics) => metrics,
Err(e) => {
eprintln!("Failed to collect memory metrics: {}", e);
continue;
},
};
let growth_rate = if let Some(ref prev) = previous_metrics {
let time_diff = current_metrics
.timestamp
.duration_since(prev.timestamp)
.unwrap_or_default()
.as_secs_f64();
if time_diff > 0.0 {
(current_metrics.total_memory_mb - prev.total_memory_mb) / time_diff
} else {
0.0
}
} else {
0.0
};
let mut final_metrics = current_metrics;
final_metrics.memory_growth_rate_mb_per_sec = growth_rate;
{
let mut history = match metrics_history.lock() {
Ok(guard) => guard,
Err(_) => continue, };
history.push_back(final_metrics.clone());
while history.len() > config.max_data_points {
history.pop_front();
}
}
Self::update_adaptive_thresholds(&final_metrics, &adaptive_thresholds).await;
if config.enable_leak_detection {
Self::detect_memory_leaks(
&final_metrics,
&previous_metrics,
&leak_detection,
&alerts,
&cached_recommendations,
)
.await;
}
Self::update_memory_prediction(&final_metrics, &memory_predictor, &metrics_history)
.await;
Self::analyze_for_alerts_adaptive(
&final_metrics,
&previous_metrics,
&alerts,
&config,
&cached_recommendations,
&adaptive_thresholds,
)
.await;
if config.enable_pattern_analysis {
Self::update_patterns(&final_metrics, &patterns).await;
}
let monitoring_duration = monitoring_start.elapsed();
monitoring_overhead_us
.store(monitoring_duration.as_micros() as u64, Ordering::Relaxed);
previous_metrics = Some(final_metrics);
}
});
Ok(())
}
pub async fn stop_monitoring(&self) -> Result<()> {
self.is_monitoring.store(false, Ordering::SeqCst);
Ok(())
}
pub async fn get_current_metrics(&self) -> Result<Option<MemoryMetrics>> {
let history = self
.metrics_history
.lock()
.map_err(|e| anyhow::anyhow!("Failed to lock metrics history: {}", e))?;
Ok(history.back().cloned())
}
pub async fn get_alerts(&self) -> Result<Vec<MemoryAlert>> {
let alerts = self
.alerts
.lock()
.map_err(|e| anyhow::anyhow!("Failed to lock alerts: {}", e))?;
Ok(alerts.clone())
}
pub async fn get_patterns(&self) -> Result<Vec<MemoryPattern>> {
let patterns = self
.patterns
.lock()
.map_err(|e| anyhow::anyhow!("Failed to lock patterns: {}", e))?;
Ok(patterns.clone())
}
pub async fn get_adaptive_thresholds(&self) -> Result<AdaptiveThresholds> {
let thresholds = self
.adaptive_thresholds
.lock()
.map_err(|e| anyhow::anyhow!("Failed to lock adaptive thresholds: {}", e))?;
Ok(thresholds.clone())
}
pub async fn get_monitoring_stats(&self) -> Result<MonitoringStats> {
let total_collections = self.total_collections.load(Ordering::Relaxed);
let overhead_us = self.monitoring_overhead_us.load(Ordering::Relaxed);
let average_overhead_us = overhead_us.checked_div(total_collections).unwrap_or(0);
let uptime_secs = self.start_time.map(|start| start.elapsed().as_secs()).unwrap_or(0);
Ok(MonitoringStats {
total_collections,
average_overhead_us,
uptime_secs,
})
}
pub async fn configure_leak_detection(&self, config: LeakDetectionConfig) -> Result<()> {
let mut detection = self
.leak_detection
.lock()
.map_err(|e| anyhow::anyhow!("Failed to lock leak detection: {}", e))?;
*detection = LeakDetectionHeuristics {
sustained_growth_threshold: config.growth_threshold,
growth_duration_threshold: Duration::from_secs(config.duration_secs),
allocation_pattern_threshold: config.allocation_threshold,
false_positive_filter: config.confidence_threshold,
};
Ok(())
}
pub async fn get_leak_detection_config(&self) -> Result<LeakDetectionConfig> {
let detection = self
.leak_detection
.lock()
.map_err(|e| anyhow::anyhow!("Failed to lock leak detection: {}", e))?;
Ok(LeakDetectionConfig {
growth_threshold: detection.sustained_growth_threshold,
duration_secs: detection.growth_duration_threshold.as_secs(),
allocation_threshold: detection.allocation_pattern_threshold,
confidence_threshold: detection.false_positive_filter,
})
}
pub async fn predict_memory_usage(
&self,
horizon_secs: u64,
) -> Result<Option<MemoryPrediction>> {
let metrics_history = self
.metrics_history
.lock()
.map_err(|e| anyhow::anyhow!("Failed to lock metrics history: {}", e))?;
let metrics: Vec<MemoryMetrics> = metrics_history.iter().cloned().collect();
drop(metrics_history);
let mut predictor = self
.memory_predictor
.lock()
.map_err(|e| anyhow::anyhow!("Failed to lock memory predictor: {}", e))?;
Ok(predictor.predict_memory_usage(&metrics, Some(horizon_secs)))
}
pub async fn get_analytics_summary(&self) -> Result<AnalyticsSummary> {
let thresholds = self.get_adaptive_thresholds().await?;
let prediction = self.predict_memory_usage(300).await?; let monitoring_stats = self.get_monitoring_stats().await?;
let leak_config = self.get_leak_detection_config().await?;
Ok(AnalyticsSummary {
adaptive_thresholds: thresholds,
memory_prediction: prediction,
monitoring_stats,
leak_detection_config: leak_config,
})
}
pub(crate) fn get_metrics_history(
&self,
) -> &Arc<Mutex<std::collections::VecDeque<super::types::MemoryMetrics>>> {
&self.metrics_history
}
pub(crate) fn get_allocations(
&self,
) -> &Arc<Mutex<HashMap<uuid::Uuid, super::types::AllocationInfo>>> {
&self.allocations
}
pub(crate) fn get_alerts_internal(&self) -> &Arc<Mutex<Vec<super::types::MemoryAlert>>> {
&self.alerts
}
pub(crate) fn get_patterns_internal(&self) -> &Arc<Mutex<Vec<super::types::MemoryPattern>>> {
&self.patterns
}
pub(crate) fn get_start_time(&self) -> Option<std::time::Instant> {
self.start_time
}
pub(crate) fn get_config(&self) -> &super::types::ProfilerConfig {
&self.config
}
#[cfg(test)]
pub fn is_monitoring(&self) -> bool {
self.is_monitoring.load(std::sync::atomic::Ordering::Relaxed)
}
#[cfg(test)]
pub fn get_cached_recommendations(&self) -> &super::analytics::AlertRecommendations {
&self.cached_recommendations
}
#[cfg(test)]
pub fn get_adaptive_thresholds_internal(
&self,
) -> &Arc<Mutex<super::analytics::AdaptiveThresholds>> {
&self.adaptive_thresholds
}
#[cfg(test)]
pub fn get_memory_predictor_internal(&self) -> &Arc<Mutex<super::analytics::MemoryPredictor>> {
&self.memory_predictor
}
}
#[derive(Debug, Clone)]
pub struct MonitoringStats {
pub total_collections: u64,
pub average_overhead_us: u64,
pub uptime_secs: u64,
}
#[derive(Debug, Clone)]
pub struct LeakDetectionConfig {
pub growth_threshold: f64,
pub duration_secs: u64,
pub allocation_threshold: usize,
pub confidence_threshold: f64,
}
#[derive(Debug, Clone)]
pub struct AnalyticsSummary {
pub adaptive_thresholds: AdaptiveThresholds,
pub memory_prediction: Option<MemoryPrediction>,
pub monitoring_stats: MonitoringStats,
pub leak_detection_config: LeakDetectionConfig,
}