use crate::brp_messages::AllocationInfo;
use crate::error::Result;
use chrono::{DateTime, Utc};
use dashmap::DashMap;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, VecDeque};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::RwLock;
use tracing::{debug, info, warn};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MemoryProfile {
pub total_allocated: usize,
pub allocations_per_system: HashMap<String, usize>,
pub top_allocations: Vec<AllocationInfo>,
}
const MAX_ALLOCATION_BACKTRACES: usize = 10_000;
const MAX_MEMORY_SNAPSHOTS: usize = 1000;
const LEAK_DETECTION_THRESHOLD: Duration = Duration::from_secs(300);
#[derive(Debug, Clone)]
pub struct MemoryProfilerConfig {
pub max_overhead_percent: f32,
pub capture_backtraces: bool,
pub enable_leak_detection: bool,
pub snapshot_interval: Duration,
pub monitor_entity_count: bool,
pub track_resource_footprint: bool,
}
impl Default for MemoryProfilerConfig {
fn default() -> Self {
Self {
max_overhead_percent: 5.0,
capture_backtraces: true,
enable_leak_detection: true,
snapshot_interval: Duration::from_secs(30),
monitor_entity_count: true,
track_resource_footprint: true,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AllocationRecord {
pub size: usize,
pub system_name: String,
pub timestamp: DateTime<Utc>,
pub backtrace: Option<Vec<String>>,
pub allocation_id: u64,
pub is_active: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MemorySnapshot {
pub timestamp: DateTime<Utc>,
pub total_allocated: usize,
pub system_allocations: HashMap<String, usize>,
pub entity_count: usize,
pub resource_footprint: HashMap<String, usize>,
pub active_allocations: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MemoryLeak {
pub system_name: String,
pub leak_count: usize,
pub leaked_memory: usize,
pub oldest_allocation: DateTime<Utc>,
pub sample_backtraces: Vec<Vec<String>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MemoryTrend {
pub system_name: String,
pub growth_rate_bytes_per_min: f64,
pub predicted_usage_1h: usize,
pub prediction_confidence: f64,
pub trend_direction: TrendDirection,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum TrendDirection {
Growing,
Stable,
Declining,
}
pub struct MemoryProfiler {
config: MemoryProfilerConfig,
allocations: Arc<DashMap<u64, AllocationRecord>>,
snapshots: Arc<RwLock<VecDeque<MemorySnapshot>>>,
system_memory: Arc<DashMap<String, AtomicUsize>>,
resource_memory: Arc<DashMap<String, AtomicUsize>>,
entity_count: Arc<AtomicUsize>,
next_allocation_id: Arc<AtomicUsize>,
start_time: Instant,
total_overhead_time: Arc<AtomicUsize>,
}
impl MemoryProfiler {
pub fn new(config: MemoryProfilerConfig) -> Self {
Self {
config,
allocations: Arc::new(DashMap::new()),
snapshots: Arc::new(RwLock::new(VecDeque::new())),
system_memory: Arc::new(DashMap::new()),
resource_memory: Arc::new(DashMap::new()),
entity_count: Arc::new(AtomicUsize::new(0)),
next_allocation_id: Arc::new(AtomicUsize::new(1)),
start_time: Instant::now(),
total_overhead_time: Arc::new(AtomicUsize::new(0)),
}
}
pub fn record_allocation(
&self,
system_name: &str,
size: usize,
backtrace: Option<Vec<String>>,
) -> Result<u64> {
let overhead_start = Instant::now();
let allocation_id = self.next_allocation_id.fetch_add(1, Ordering::Relaxed) as u64;
let record = AllocationRecord {
size,
system_name: system_name.to_string(),
timestamp: Utc::now(),
backtrace: if self.config.capture_backtraces { backtrace } else { None },
allocation_id,
is_active: true,
};
self.allocations.insert(allocation_id, record);
let system_entry = self.system_memory.entry(system_name.to_string())
.or_insert_with(|| AtomicUsize::new(0));
system_entry.fetch_add(size, Ordering::Relaxed);
if self.allocations.len() > MAX_ALLOCATION_BACKTRACES {
self.cleanup_old_allocations();
}
let overhead_time = overhead_start.elapsed();
self.total_overhead_time.fetch_add(overhead_time.as_nanos() as usize, Ordering::Relaxed);
debug!("Recorded allocation: {} bytes for system: {}", size, system_name);
Ok(allocation_id)
}
pub fn record_deallocation(&self, allocation_id: u64) -> Result<()> {
let overhead_start = Instant::now();
if let Some(mut record) = self.allocations.get_mut(&allocation_id) {
record.is_active = false;
let system_name = record.system_name.clone();
if let Some(system_entry) = self.system_memory.get(&system_name) {
system_entry.fetch_sub(record.size, Ordering::Relaxed);
}
}
let overhead_time = overhead_start.elapsed();
self.total_overhead_time.fetch_add(overhead_time.as_nanos() as usize, Ordering::Relaxed);
Ok(())
}
pub fn update_entity_count(&self, count: usize) {
if self.config.monitor_entity_count {
self.entity_count.store(count, Ordering::Relaxed);
}
}
pub fn update_resource_memory(&self, resource_name: &str, memory_size: usize) {
if self.config.track_resource_footprint {
let resource_entry = self.resource_memory.entry(resource_name.to_string())
.or_insert_with(|| AtomicUsize::new(0));
resource_entry.store(memory_size, Ordering::Relaxed);
}
}
pub async fn take_snapshot(&self) -> Result<MemorySnapshot> {
let overhead_start = Instant::now();
let system_allocations: HashMap<String, usize> = self.system_memory
.iter()
.map(|entry| (entry.key().clone(), entry.value().load(Ordering::Relaxed)))
.collect();
let resource_footprint: HashMap<String, usize> = self.resource_memory
.iter()
.map(|entry| (entry.key().clone(), entry.value().load(Ordering::Relaxed)))
.collect();
let total_allocated = system_allocations.values().sum();
let active_allocations = self.allocations.iter()
.filter(|entry| entry.value().is_active)
.count();
let snapshot = MemorySnapshot {
timestamp: Utc::now(),
total_allocated,
system_allocations,
entity_count: self.entity_count.load(Ordering::Relaxed),
resource_footprint,
active_allocations,
};
{
let mut snapshots = self.snapshots.write().await;
snapshots.push_back(snapshot.clone());
while snapshots.len() > MAX_MEMORY_SNAPSHOTS {
snapshots.pop_front();
}
}
let overhead_time = overhead_start.elapsed();
self.total_overhead_time.fetch_add(overhead_time.as_nanos() as usize, Ordering::Relaxed);
info!("Memory snapshot taken: {} total allocated, {} active allocations",
total_allocated, active_allocations);
Ok(snapshot)
}
pub async fn detect_leaks(&self) -> Result<Vec<MemoryLeak>> {
if !self.config.enable_leak_detection {
return Ok(Vec::new());
}
let overhead_start = Instant::now();
let now = Utc::now();
let mut leaks = Vec::new();
let mut system_leaks: HashMap<String, Vec<&AllocationRecord>> = HashMap::new();
let leak_candidates: Vec<AllocationRecord> = self.allocations.iter()
.filter_map(|entry| {
let record = entry.value();
if record.is_active {
let age = now.signed_duration_since(record.timestamp)
.to_std()
.unwrap_or(Duration::ZERO);
if age > LEAK_DETECTION_THRESHOLD {
Some(record.clone())
} else {
None
}
} else {
None
}
})
.collect();
for record in &leak_candidates {
system_leaks.entry(record.system_name.clone())
.or_insert_with(Vec::new)
.push(record);
}
for (system_name, leaked_records) in system_leaks {
if leaked_records.len() < 10 {
continue; }
let leak_count = leaked_records.len();
let leaked_memory: usize = leaked_records.iter().map(|r| r.size).sum();
let oldest_allocation = leaked_records.iter()
.min_by_key(|r| r.timestamp)
.map(|r| r.timestamp)
.unwrap_or_else(Utc::now);
let sample_backtraces: Vec<Vec<String>> = leaked_records.iter()
.take(5)
.filter_map(|r| r.backtrace.as_ref())
.cloned()
.collect();
let leak = MemoryLeak {
system_name,
leak_count,
leaked_memory,
oldest_allocation,
sample_backtraces,
};
leaks.push(leak);
}
let overhead_time = overhead_start.elapsed();
self.total_overhead_time.fetch_add(overhead_time.as_nanos() as usize, Ordering::Relaxed);
if !leaks.is_empty() {
warn!("Detected {} potential memory leaks", leaks.len());
}
Ok(leaks)
}
pub async fn analyze_trends(&self) -> Result<Vec<MemoryTrend>> {
let overhead_start = Instant::now();
let snapshots = self.snapshots.read().await;
if snapshots.len() < 3 {
return Ok(Vec::new());
}
let mut trends = Vec::new();
let mut system_histories: HashMap<String, Vec<(DateTime<Utc>, usize)>> = HashMap::new();
for snapshot in snapshots.iter() {
for (system_name, memory_usage) in &snapshot.system_allocations {
system_histories.entry(system_name.clone())
.or_insert_with(Vec::new)
.push((snapshot.timestamp, *memory_usage));
}
}
for (system_name, history) in system_histories {
if history.len() < 3 {
continue;
}
let trend = self.calculate_memory_trend(&system_name, &history);
trends.push(trend);
}
let overhead_time = overhead_start.elapsed();
self.total_overhead_time.fetch_add(overhead_time.as_nanos() as usize, Ordering::Relaxed);
debug!("Analyzed memory trends for {} systems", trends.len());
Ok(trends)
}
fn calculate_memory_trend(
&self,
system_name: &str,
history: &[(DateTime<Utc>, usize)]
) -> MemoryTrend {
let n = history.len() as f64;
let time_values: Vec<f64> = history.iter()
.enumerate()
.map(|(i, _)| i as f64)
.collect();
let memory_values: Vec<f64> = history.iter()
.map(|(_, mem)| *mem as f64)
.collect();
let sum_x: f64 = time_values.iter().sum();
let sum_y: f64 = memory_values.iter().sum();
let sum_xy: f64 = time_values.iter().zip(&memory_values)
.map(|(x, y)| x * y)
.sum();
let sum_x2: f64 = time_values.iter().map(|x| x * x).sum();
let slope = (n * sum_xy - sum_x * sum_y) / (n * sum_x2 - sum_x * sum_x);
let time_span_minutes = if history.len() > 1 {
let duration = history.last().unwrap().0.signed_duration_since(history[0].0);
duration.num_minutes() as f64 / (history.len() - 1) as f64
} else {
1.0
};
let growth_rate_bytes_per_min = slope / time_span_minutes;
let current_usage = memory_values.last().unwrap_or(&0.0);
let predicted_usage_1h = (*current_usage + growth_rate_bytes_per_min * 60.0)
.max(0.0) as usize;
let mean_y = sum_y / n;
let ss_tot: f64 = memory_values.iter()
.map(|y| (y - mean_y).powi(2))
.sum();
let ss_res: f64 = time_values.iter().zip(&memory_values)
.map(|(x, y)| {
let predicted = mean_y + slope * (x - sum_x / n);
(y - predicted).powi(2)
})
.sum();
let prediction_confidence = if ss_tot > 0.0 {
(1.0 - ss_res / ss_tot).max(0.0).min(1.0)
} else {
0.0
};
let trend_direction = if growth_rate_bytes_per_min > 1000.0 {
TrendDirection::Growing
} else if growth_rate_bytes_per_min < -1000.0 {
TrendDirection::Declining
} else {
TrendDirection::Stable
};
MemoryTrend {
system_name: system_name.to_string(),
growth_rate_bytes_per_min,
predicted_usage_1h,
prediction_confidence,
trend_direction,
}
}
pub async fn get_memory_profile(&self) -> Result<MemoryProfile> {
let overhead_start = Instant::now();
let total_allocated = self.system_memory
.iter()
.map(|entry| entry.value().load(Ordering::Relaxed))
.sum();
let allocations_per_system: HashMap<String, usize> = self.system_memory
.iter()
.map(|entry| (entry.key().clone(), entry.value().load(Ordering::Relaxed)))
.collect();
let mut top_allocations: Vec<AllocationInfo> = self.allocations
.iter()
.filter(|entry| entry.value().is_active)
.map(|entry| {
let record = entry.value();
AllocationInfo {
size: record.size,
location: record.system_name.clone(),
backtrace: record.backtrace.clone(),
count: 1, }
})
.collect();
top_allocations.sort_by(|a, b| b.size.cmp(&a.size));
top_allocations.truncate(20);
let overhead_time = overhead_start.elapsed();
self.total_overhead_time.fetch_add(overhead_time.as_nanos() as usize, Ordering::Relaxed);
Ok(MemoryProfile {
total_allocated,
allocations_per_system,
top_allocations,
})
}
pub fn get_overhead_percentage(&self) -> f32 {
let total_runtime = self.start_time.elapsed();
let total_overhead = Duration::from_nanos(
self.total_overhead_time.load(Ordering::Relaxed) as u64
);
if total_runtime.as_nanos() == 0 {
0.0
} else {
(total_overhead.as_nanos() as f32 / total_runtime.as_nanos() as f32) * 100.0
}
}
pub fn is_overhead_acceptable(&self) -> bool {
self.get_overhead_percentage() <= self.config.max_overhead_percent
}
fn cleanup_old_allocations(&self) {
let threshold = Utc::now() - chrono::Duration::hours(1);
let mut to_remove = Vec::new();
for entry in self.allocations.iter() {
if !entry.value().is_active && entry.value().timestamp < threshold {
to_remove.push(*entry.key());
}
}
for allocation_id in to_remove {
self.allocations.remove(&allocation_id);
}
debug!("Cleaned up old allocation records");
}
pub async fn get_statistics(&self) -> HashMap<String, serde_json::Value> {
let mut stats = HashMap::new();
stats.insert("total_allocations_tracked".to_string(),
serde_json::Value::Number(self.allocations.len().into()));
stats.insert("active_allocations".to_string(),
serde_json::Value::Number(
self.allocations.iter().filter(|e| e.value().is_active).count().into()
));
stats.insert("snapshots_stored".to_string(),
serde_json::Value::Number(self.snapshots.read().await.len().into()));
stats.insert("overhead_percentage".to_string(),
serde_json::Value::Number(
serde_json::Number::from_f64(self.get_overhead_percentage() as f64).unwrap()
));
stats.insert("entity_count".to_string(),
serde_json::Value::Number(self.entity_count.load(Ordering::Relaxed).into()));
stats
}
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::time::sleep;
#[tokio::test]
async fn test_memory_profiler_creation() {
let config = MemoryProfilerConfig::default();
let profiler = MemoryProfiler::new(config);
assert!(profiler.is_overhead_acceptable());
assert_eq!(profiler.entity_count.load(Ordering::Relaxed), 0);
}
#[tokio::test]
async fn test_allocation_tracking() {
let config = MemoryProfilerConfig::default();
let profiler = MemoryProfiler::new(config);
let allocation_id = profiler.record_allocation(
"TestSystem",
1024,
Some(vec!["test_function".to_string()])
).unwrap();
assert!(allocation_id > 0);
assert!(profiler.allocations.contains_key(&allocation_id));
profiler.record_deallocation(allocation_id).unwrap();
let record = profiler.allocations.get(&allocation_id).unwrap();
assert!(!record.is_active);
}
#[tokio::test]
async fn test_memory_snapshots() {
let config = MemoryProfilerConfig::default();
let profiler = MemoryProfiler::new(config);
profiler.record_allocation("TestSystem", 2048, None).unwrap();
profiler.update_entity_count(100);
let snapshot = profiler.take_snapshot().await.unwrap();
assert_eq!(snapshot.total_allocated, 2048);
assert_eq!(snapshot.entity_count, 100);
assert!(snapshot.system_allocations.contains_key("TestSystem"));
}
#[tokio::test]
async fn test_leak_detection() {
let mut config = MemoryProfilerConfig::default();
config.enable_leak_detection = true;
let profiler = MemoryProfiler::new(config);
for _i in 0..20 {
let allocation_id = profiler.record_allocation(
"LeakySystem",
1024,
None
).unwrap();
if let Some(mut record) = profiler.allocations.get_mut(&allocation_id) {
record.timestamp = Utc::now() - chrono::Duration::minutes(10);
}
}
let leaks = profiler.detect_leaks().await.unwrap();
assert!(!leaks.is_empty());
let leak = &leaks[0];
assert_eq!(leak.system_name, "LeakySystem");
assert!(leak.leak_count >= 10);
}
#[tokio::test]
async fn test_trend_analysis() {
let config = MemoryProfilerConfig::default();
let profiler = MemoryProfiler::new(config);
for i in 1..=5 {
profiler.record_allocation("GrowingSystem", i * 1000, None).unwrap();
profiler.take_snapshot().await.unwrap();
sleep(Duration::from_millis(10)).await; }
let trends = profiler.analyze_trends().await.unwrap();
assert!(!trends.is_empty());
let trend = trends.iter().find(|t| t.system_name == "GrowingSystem");
assert!(trend.is_some());
let trend = trend.unwrap();
matches!(trend.trend_direction, TrendDirection::Growing);
}
#[tokio::test]
async fn test_overhead_monitoring() {
let config = MemoryProfilerConfig::default();
let profiler = MemoryProfiler::new(config);
for i in 0..100 {
let allocation_id = profiler.record_allocation("TestSystem", i * 10, None).unwrap();
profiler.record_deallocation(allocation_id).unwrap();
}
profiler.take_snapshot().await.unwrap();
let overhead = profiler.get_overhead_percentage();
assert!(overhead >= 0.0);
if overhead > 15.0 {
println!("Warning: Memory profiler overhead is {:.3}% (higher than expected)", overhead);
}
}
#[tokio::test]
async fn test_memory_profile_generation() {
let config = MemoryProfilerConfig::default();
let profiler = MemoryProfiler::new(config);
profiler.record_allocation("System1", 2048, None).unwrap();
profiler.record_allocation("System2", 4096, None).unwrap();
let profile = profiler.get_memory_profile().await.unwrap();
assert_eq!(profile.total_allocated, 6144);
assert_eq!(profile.allocations_per_system.len(), 2);
assert!(profile.allocations_per_system.contains_key("System1"));
assert!(profile.allocations_per_system.contains_key("System2"));
}
}