use std::collections::{HashMap, VecDeque};
use std::time::{Duration, Instant};
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone)]
pub struct LeakDetectorConfig {
pub enabled: bool,
pub check_interval: Duration,
pub growth_threshold_percent: f64,
pub min_samples_for_detection: usize,
}
impl Default for LeakDetectorConfig {
fn default() -> Self {
Self {
enabled: true,
check_interval: Duration::from_secs(60),
growth_threshold_percent: 10.0,
min_samples_for_detection: 6,
}
}
}
pub struct LeakDetector {
config: LeakDetectorConfig,
regions: RwLock<HashMap<String, RegionTracker>>,
global: RwLock<GlobalTracker>,
warnings: RwLock<Vec<LeakWarning>>,
started_at: RwLock<Option<Instant>>,
last_check_at: RwLock<Option<Instant>>,
is_running: std::sync::atomic::AtomicBool,
}
impl LeakDetector {
pub fn new(config: LeakDetectorConfig) -> Self {
Self {
config,
regions: RwLock::new(HashMap::new()),
global: RwLock::new(GlobalTracker::new()),
warnings: RwLock::new(Vec::new()),
started_at: RwLock::new(None),
last_check_at: RwLock::new(None),
is_running: std::sync::atomic::AtomicBool::new(false),
}
}
pub fn start(&mut self) {
let now = Instant::now();
*self.started_at.write() = Some(now);
*self.last_check_at.write() = Some(now);
self.is_running
.store(true, std::sync::atomic::Ordering::SeqCst);
}
pub fn stop(&mut self) {
self.is_running
.store(false, std::sync::atomic::Ordering::SeqCst);
}
pub fn is_running(&self) -> bool {
self.is_running.load(std::sync::atomic::Ordering::SeqCst)
}
pub fn record_allocation(&self, label: &str, size: usize) {
if !self.config.enabled {
return;
}
let size = size as u64;
let now = Instant::now();
self.global.write().record_allocation(size, now);
let mut regions = self.regions.write();
regions
.entry(label.to_string())
.or_insert_with(|| RegionTracker::new(label))
.record_allocation(size, now);
}
pub fn record_deallocation(&self, label: &str, size: usize) {
if !self.config.enabled {
return;
}
let size = size as u64;
let now = Instant::now();
self.global.write().record_deallocation(size, now);
if let Some(region) = self.regions.write().get_mut(label) {
region.record_deallocation(size, now);
}
}
pub fn check(&self) -> Vec<LeakWarning> {
if !self.config.enabled {
return Vec::new();
}
let mut warnings = Vec::new();
{
let global = self.global.read();
if let Some(warning) = self.analyze_tracker(&*global, "global") {
warnings.push(warning);
}
}
{
let regions = self.regions.read();
for (name, tracker) in regions.iter() {
if let Some(warning) = self.analyze_tracker(tracker, name) {
warnings.push(warning);
}
}
}
*self.warnings.write() = warnings.clone();
warnings
}
fn analyze_tracker<T: MemoryTracker>(&self, tracker: &T, name: &str) -> Option<LeakWarning> {
let samples = tracker.samples();
if samples.len() < self.config.min_samples_for_detection {
return None;
}
let growth_analysis = analyze_growth(&samples);
if growth_analysis.is_growing
&& growth_analysis.growth_rate_per_minute > self.config.growth_threshold_percent
{
let severity = if growth_analysis.growth_rate_per_minute > 50.0 {
LeakSeverity::Critical
} else if growth_analysis.growth_rate_per_minute > 20.0 {
LeakSeverity::High
} else {
LeakSeverity::Medium
};
return Some(LeakWarning {
region: name.to_string(),
severity,
message: format!(
"Sustained memory growth detected: {:.1}% per minute",
growth_analysis.growth_rate_per_minute
),
growth_rate_per_minute: growth_analysis.growth_rate_per_minute,
current_bytes: tracker.current_bytes(),
samples_analyzed: samples.len(),
confidence: growth_analysis.confidence,
});
}
let stats = tracker.stats();
if stats.allocation_count > 0 && stats.deallocation_count == 0 {
return Some(LeakWarning {
region: name.to_string(),
severity: LeakSeverity::Low,
message: "Allocations without deallocations detected".to_string(),
growth_rate_per_minute: 0.0,
current_bytes: tracker.current_bytes(),
samples_analyzed: samples.len(),
confidence: 0.5,
});
}
None
}
pub fn warnings(&self) -> Vec<LeakWarning> {
self.warnings.read().clone()
}
pub fn reset(&mut self) {
self.regions.write().clear();
*self.global.write() = GlobalTracker::new();
self.warnings.write().clear();
}
pub fn region_analysis(&self, name: &str) -> Option<RegionAnalysis> {
let regions = self.regions.read();
regions.get(name).map(|tracker| {
let samples = tracker.samples();
let growth = analyze_growth(&samples);
RegionAnalysis {
name: name.to_string(),
current_bytes: tracker.current_bytes(),
allocation_count: tracker.stats().allocation_count,
deallocation_count: tracker.stats().deallocation_count,
growth_rate_per_minute: growth.growth_rate_per_minute,
is_growing: growth.is_growing,
confidence: growth.confidence,
sample_count: samples.len(),
}
})
}
}
trait MemoryTracker {
fn samples(&self) -> Vec<MemorySample>;
fn current_bytes(&self) -> u64;
fn stats(&self) -> TrackerStats;
}
#[derive(Debug, Clone, Default)]
struct TrackerStats {
allocation_count: u64,
deallocation_count: u64,
}
struct GlobalTracker {
current_bytes: u64,
allocation_count: u64,
deallocation_count: u64,
samples: VecDeque<MemorySample>,
last_sample_at: Option<Instant>,
}
impl GlobalTracker {
fn new() -> Self {
Self {
current_bytes: 0,
allocation_count: 0,
deallocation_count: 0,
samples: VecDeque::with_capacity(100),
last_sample_at: None,
}
}
fn record_allocation(&mut self, size: u64, now: Instant) {
self.current_bytes += size;
self.allocation_count += 1;
self.maybe_record_sample(now);
}
fn record_deallocation(&mut self, size: u64, now: Instant) {
self.current_bytes = self.current_bytes.saturating_sub(size);
self.deallocation_count += 1;
self.maybe_record_sample(now);
}
fn maybe_record_sample(&mut self, now: Instant) {
let should_sample = match self.last_sample_at {
Some(last) => now.duration_since(last) >= Duration::from_secs(10),
None => true,
};
if should_sample {
self.samples.push_back(MemorySample {
bytes: self.current_bytes,
timestamp: now,
});
while self.samples.len() > 100 {
self.samples.pop_front();
}
self.last_sample_at = Some(now);
}
}
}
impl MemoryTracker for GlobalTracker {
fn samples(&self) -> Vec<MemorySample> {
self.samples.iter().cloned().collect()
}
fn current_bytes(&self) -> u64 {
self.current_bytes
}
fn stats(&self) -> TrackerStats {
TrackerStats {
allocation_count: self.allocation_count,
deallocation_count: self.deallocation_count,
}
}
}
struct RegionTracker {
#[allow(dead_code)]
name: String,
current_bytes: u64,
allocation_count: u64,
deallocation_count: u64,
samples: VecDeque<MemorySample>,
last_sample_at: Option<Instant>,
}
impl RegionTracker {
fn new(name: &str) -> Self {
Self {
name: name.to_string(),
current_bytes: 0,
allocation_count: 0,
deallocation_count: 0,
samples: VecDeque::with_capacity(100),
last_sample_at: None,
}
}
fn record_allocation(&mut self, size: u64, now: Instant) {
self.current_bytes += size;
self.allocation_count += 1;
self.maybe_record_sample(now);
}
fn record_deallocation(&mut self, size: u64, now: Instant) {
self.current_bytes = self.current_bytes.saturating_sub(size);
self.deallocation_count += 1;
self.maybe_record_sample(now);
}
fn maybe_record_sample(&mut self, now: Instant) {
let should_sample = match self.last_sample_at {
Some(last) => now.duration_since(last) >= Duration::from_secs(10),
None => true,
};
if should_sample {
self.samples.push_back(MemorySample {
bytes: self.current_bytes,
timestamp: now,
});
while self.samples.len() > 100 {
self.samples.pop_front();
}
self.last_sample_at = Some(now);
}
}
}
impl MemoryTracker for RegionTracker {
fn samples(&self) -> Vec<MemorySample> {
self.samples.iter().cloned().collect()
}
fn current_bytes(&self) -> u64 {
self.current_bytes
}
fn stats(&self) -> TrackerStats {
TrackerStats {
allocation_count: self.allocation_count,
deallocation_count: self.deallocation_count,
}
}
}
#[derive(Debug, Clone)]
struct MemorySample {
bytes: u64,
timestamp: Instant,
}
#[derive(Debug)]
struct GrowthAnalysis {
is_growing: bool,
growth_rate_per_minute: f64,
confidence: f64,
}
fn analyze_growth(samples: &[MemorySample]) -> GrowthAnalysis {
if samples.len() < 2 {
return GrowthAnalysis {
is_growing: false,
growth_rate_per_minute: 0.0,
confidence: 0.0,
};
}
let n = samples.len() as f64;
let first_time = samples[0].timestamp;
let points: Vec<(f64, f64)> = samples
.iter()
.map(|s| {
let x = s.timestamp.duration_since(first_time).as_secs_f64();
let y = s.bytes as f64;
(x, y)
})
.collect();
let mean_x: f64 = points.iter().map(|(x, _)| x).sum::<f64>() / n;
let mean_y: f64 = points.iter().map(|(_, y)| y).sum::<f64>() / n;
let numerator: f64 = points
.iter()
.map(|(x, y)| (x - mean_x) * (y - mean_y))
.sum();
let denominator: f64 = points.iter().map(|(x, _)| (x - mean_x).powi(2)).sum();
if denominator == 0.0 {
return GrowthAnalysis {
is_growing: false,
growth_rate_per_minute: 0.0,
confidence: 0.0,
};
}
let slope = numerator / denominator;
let growth_rate_per_minute = slope * 60.0;
let ss_tot: f64 = points.iter().map(|(_, y)| (y - mean_y).powi(2)).sum();
let ss_res: f64 = points
.iter()
.map(|(x, y)| {
let predicted = mean_y + slope * (x - mean_x);
(y - predicted).powi(2)
})
.sum();
let r_squared = if ss_tot > 0.0 {
1.0 - (ss_res / ss_tot)
} else {
0.0
};
let is_growing = slope > 0.0 && r_squared > 0.5;
let initial_bytes = samples[0].bytes as f64;
let growth_percent_per_minute = if initial_bytes > 0.0 {
(growth_rate_per_minute / initial_bytes) * 100.0
} else {
0.0
};
GrowthAnalysis {
is_growing,
growth_rate_per_minute: growth_percent_per_minute,
confidence: r_squared.max(0.0).min(1.0),
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LeakWarning {
pub region: String,
pub severity: LeakSeverity,
pub message: String,
pub growth_rate_per_minute: f64,
pub current_bytes: u64,
pub samples_analyzed: usize,
pub confidence: f64,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum LeakSeverity {
Low,
Medium,
High,
Critical,
}
impl std::fmt::Display for LeakSeverity {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Low => write!(f, "LOW"),
Self::Medium => write!(f, "MEDIUM"),
Self::High => write!(f, "HIGH"),
Self::Critical => write!(f, "CRITICAL"),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RegionAnalysis {
pub name: String,
pub current_bytes: u64,
pub allocation_count: u64,
pub deallocation_count: u64,
pub growth_rate_per_minute: f64,
pub is_growing: bool,
pub confidence: f64,
pub sample_count: usize,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_leak_detector_basic() {
let mut detector = LeakDetector::new(LeakDetectorConfig::default());
detector.start();
detector.record_allocation("test", 1024);
detector.record_deallocation("test", 512);
let warnings = detector.check();
assert!(warnings.is_empty() || warnings.iter().all(|w| w.severity == LeakSeverity::Low));
}
#[test]
fn test_leak_detector_config() {
let config = LeakDetectorConfig {
enabled: true,
check_interval: Duration::from_secs(30),
growth_threshold_percent: 5.0,
min_samples_for_detection: 3,
};
let detector = LeakDetector::new(config);
assert!(detector.config.enabled);
assert_eq!(detector.config.growth_threshold_percent, 5.0);
}
#[test]
fn test_leak_detector_disabled() {
let config = LeakDetectorConfig {
enabled: false,
..Default::default()
};
let mut detector = LeakDetector::new(config);
detector.start();
detector.record_allocation("test", 1024);
let warnings = detector.check();
assert!(warnings.is_empty());
}
#[test]
fn test_leak_severity_display() {
assert_eq!(LeakSeverity::Low.to_string(), "LOW");
assert_eq!(LeakSeverity::Critical.to_string(), "CRITICAL");
}
#[test]
fn test_growth_analysis_empty() {
let analysis = analyze_growth(&[]);
assert!(!analysis.is_growing);
assert_eq!(analysis.confidence, 0.0);
}
#[test]
fn test_growth_analysis_single() {
let samples = vec![MemorySample {
bytes: 1000,
timestamp: Instant::now(),
}];
let analysis = analyze_growth(&samples);
assert!(!analysis.is_growing);
}
#[test]
fn test_leak_detector_reset() {
let mut detector = LeakDetector::new(LeakDetectorConfig::default());
detector.start();
detector.record_allocation("test", 1024);
detector.reset();
assert!(detector.warnings().is_empty());
}
}