use crate::{MonitoringConfig, PerformanceMetrics};
use serde::{Deserialize, Serialize};
use std::{
collections::{HashMap, VecDeque},
sync::{
atomic::{AtomicU64, Ordering},
Arc, RwLock,
},
time::{Duration, Instant, SystemTime},
};
use tokio::time::Interval;
#[derive(Debug)]
pub struct PerformanceCounters {
pub get_operations: AtomicU64,
pub put_operations: AtomicU64,
pub remove_operations: AtomicU64,
pub eviction_operations: AtomicU64,
pub total_get_latency_us: AtomicU64,
pub total_put_latency_us: AtomicU64,
pub total_remove_latency_us: AtomicU64,
pub total_eviction_latency_us: AtomicU64,
pub hit_count: AtomicU64,
pub miss_count: AtomicU64,
pub total_bytes_allocated: AtomicU64,
pub peak_memory_usage: AtomicU64,
pub current_memory_usage: AtomicU64,
pub disk_bytes_read: AtomicU64,
pub disk_bytes_written: AtomicU64,
}
impl Default for PerformanceCounters {
fn default() -> Self {
Self {
get_operations: AtomicU64::new(0),
put_operations: AtomicU64::new(0),
remove_operations: AtomicU64::new(0),
eviction_operations: AtomicU64::new(0),
total_get_latency_us: AtomicU64::new(0),
total_put_latency_us: AtomicU64::new(0),
total_remove_latency_us: AtomicU64::new(0),
total_eviction_latency_us: AtomicU64::new(0),
hit_count: AtomicU64::new(0),
miss_count: AtomicU64::new(0),
total_bytes_allocated: AtomicU64::new(0),
peak_memory_usage: AtomicU64::new(0),
current_memory_usage: AtomicU64::new(0),
disk_bytes_read: AtomicU64::new(0),
disk_bytes_written: AtomicU64::new(0),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PerformanceEvent {
pub timestamp: SystemTime,
pub event_type: PerformanceEventType,
pub latency_us: u64,
pub memory_usage: u64,
pub metadata: HashMap<String, String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum PerformanceEventType {
CacheGet,
CachePut,
CacheRemove,
CacheEviction,
CachePromotion,
CacheDemotion,
PredictivePreheating,
AdaptiveTuning,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BenchmarkResult {
pub name: String,
pub ops_per_second: f64,
pub avg_latency_us: f64,
pub p95_latency_us: f64,
pub p99_latency_us: f64,
pub memory_usage: u64,
pub hit_rate: f64,
pub duration: Duration,
pub timestamp: SystemTime,
}
#[derive(Debug)]
pub struct PerformanceMonitor {
config: MonitoringConfig,
counters: Arc<PerformanceCounters>,
event_log: Arc<RwLock<VecDeque<PerformanceEvent>>>,
latency_histogram: Arc<RwLock<HashMap<u64, u64>>>,
benchmark_history: Arc<RwLock<Vec<BenchmarkResult>>>,
start_time: Instant,
_monitoring_interval: Option<Interval>,
}
impl PerformanceMonitor {
pub fn new(config: MonitoringConfig) -> Self {
Self {
config,
counters: Arc::new(PerformanceCounters::default()),
event_log: Arc::new(RwLock::new(VecDeque::new())),
latency_histogram: Arc::new(RwLock::new(HashMap::new())),
benchmark_history: Arc::new(RwLock::new(Vec::new())),
start_time: Instant::now(),
_monitoring_interval: None,
}
}
pub async fn record_get_latency(&self, latency: Duration) {
let latency_us = latency.as_micros() as u64;
self.counters.get_operations.fetch_add(1, Ordering::Relaxed);
self.counters
.total_get_latency_us
.fetch_add(latency_us, Ordering::Relaxed);
self.update_latency_histogram(latency_us).await;
if self.config.enable_event_logging {
self.log_event(PerformanceEvent {
timestamp: SystemTime::now(),
event_type: PerformanceEventType::CacheGet,
latency_us,
memory_usage: self.counters.current_memory_usage.load(Ordering::Relaxed),
metadata: HashMap::new(),
})
.await;
}
}
pub async fn record_put_latency(&self, latency: Duration) {
let latency_us = latency.as_micros() as u64;
self.counters.put_operations.fetch_add(1, Ordering::Relaxed);
self.counters
.total_put_latency_us
.fetch_add(latency_us, Ordering::Relaxed);
self.update_latency_histogram(latency_us).await;
if self.config.enable_event_logging {
self.log_event(PerformanceEvent {
timestamp: SystemTime::now(),
event_type: PerformanceEventType::CachePut,
latency_us,
memory_usage: self.counters.current_memory_usage.load(Ordering::Relaxed),
metadata: HashMap::new(),
})
.await;
}
}
pub async fn record_remove_latency(&self, latency: Duration) {
let latency_us = latency.as_micros() as u64;
self.counters
.remove_operations
.fetch_add(1, Ordering::Relaxed);
self.counters
.total_remove_latency_us
.fetch_add(latency_us, Ordering::Relaxed);
self.update_latency_histogram(latency_us).await;
if self.config.enable_event_logging {
self.log_event(PerformanceEvent {
timestamp: SystemTime::now(),
event_type: PerformanceEventType::CacheRemove,
latency_us,
memory_usage: self.counters.current_memory_usage.load(Ordering::Relaxed),
metadata: HashMap::new(),
})
.await;
}
}
pub async fn update_memory_usage(&self, current_usage: u64) {
self.counters
.current_memory_usage
.store(current_usage, Ordering::Relaxed);
let current_peak = self.counters.peak_memory_usage.load(Ordering::Relaxed);
if current_usage > current_peak {
self.counters
.peak_memory_usage
.store(current_usage, Ordering::Relaxed);
}
}
pub async fn record_disk_io(&self, bytes_read: u64, bytes_written: u64) {
self.counters
.disk_bytes_read
.fetch_add(bytes_read, Ordering::Relaxed);
self.counters
.disk_bytes_written
.fetch_add(bytes_written, Ordering::Relaxed);
}
pub async fn record_eviction_latency(&self, latency: Duration) {
let latency_us = latency.as_micros() as u64;
self.counters
.eviction_operations
.fetch_add(1, Ordering::Relaxed);
self.counters
.total_eviction_latency_us
.fetch_add(latency_us, Ordering::Relaxed);
self.update_latency_histogram(latency_us).await;
if self.config.enable_event_logging {
self.log_event(PerformanceEvent {
timestamp: SystemTime::now(),
event_type: PerformanceEventType::CacheEviction,
latency_us,
memory_usage: self.counters.current_memory_usage.load(Ordering::Relaxed),
metadata: HashMap::new(),
})
.await;
}
}
pub fn record_cache_hit(&self) {
self.counters.hit_count.fetch_add(1, Ordering::Relaxed);
}
pub fn record_cache_miss(&self) {
self.counters.miss_count.fetch_add(1, Ordering::Relaxed);
}
pub fn record_allocation(&self, bytes: u64) {
self.counters
.total_bytes_allocated
.fetch_add(bytes, Ordering::Relaxed);
}
pub fn hit_rate(&self) -> f64 {
let hits = self.counters.hit_count.load(Ordering::Relaxed);
let misses = self.counters.miss_count.load(Ordering::Relaxed);
let total = hits + misses;
if total > 0 {
hits as f64 / total as f64
} else {
0.0
}
}
async fn log_event(&self, event: PerformanceEvent) {
if !self.config.enable_event_logging {
return;
}
let mut log = self.event_log.write().unwrap();
log.push_back(event);
while log.len() > self.config.max_events_in_memory {
log.pop_front();
}
}
async fn update_latency_histogram(&self, latency_us: u64) {
let bucket = if latency_us == 0 {
0
} else {
1u64 << (64 - latency_us.leading_zeros())
};
let mut histogram = self.latency_histogram.write().unwrap();
*histogram.entry(bucket).or_insert(0) += 1;
}
pub async fn get_performance_metrics(&self) -> PerformanceMetrics {
let get_ops = self.counters.get_operations.load(Ordering::Relaxed);
let put_ops = self.counters.put_operations.load(Ordering::Relaxed);
let remove_ops = self.counters.remove_operations.load(Ordering::Relaxed);
let eviction_ops = self.counters.eviction_operations.load(Ordering::Relaxed);
let total_get_latency = self.counters.total_get_latency_us.load(Ordering::Relaxed);
let total_put_latency = self.counters.total_put_latency_us.load(Ordering::Relaxed);
let total_eviction_latency = self
.counters
.total_eviction_latency_us
.load(Ordering::Relaxed);
let elapsed_secs = self.start_time.elapsed().as_secs_f64();
let total_ops = get_ops + put_ops + remove_ops;
let total_allocated = self.counters.total_bytes_allocated.load(Ordering::Relaxed);
let memory_allocation_rate = if elapsed_secs > 0.0 {
total_allocated as f64 / elapsed_secs
} else {
0.0
};
PerformanceMetrics {
avg_get_latency_us: if get_ops > 0 {
total_get_latency as f64 / get_ops as f64
} else {
0.0
},
avg_put_latency_us: if put_ops > 0 {
total_put_latency as f64 / put_ops as f64
} else {
0.0
},
avg_eviction_latency_us: if eviction_ops > 0 {
total_eviction_latency as f64 / eviction_ops as f64
} else {
0.0
},
ops_per_second: if elapsed_secs > 0.0 {
total_ops as f64 / elapsed_secs
} else {
0.0
},
memory_allocation_rate,
disk_io_rate: if elapsed_secs > 0.0 {
(self.counters.disk_bytes_read.load(Ordering::Relaxed)
+ self.counters.disk_bytes_written.load(Ordering::Relaxed))
as f64
/ elapsed_secs
} else {
0.0
},
cpu_usage_percent: {
let total_op_latency_us = total_get_latency
+ total_put_latency
+ total_eviction_latency
+ self
.counters
.total_remove_latency_us
.load(Ordering::Relaxed);
let elapsed_us = self.start_time.elapsed().as_micros() as f64;
if elapsed_us > 0.0 {
(total_op_latency_us as f64 / elapsed_us * 100.0).min(100.0)
} else {
0.0
}
},
peak_memory_usage: self.counters.peak_memory_usage.load(Ordering::Relaxed),
}
}
pub async fn run_benchmark<F, Fut>(&self, name: &str, benchmark_fn: F) -> BenchmarkResult
where
F: Fn() -> Fut,
Fut: std::future::Future<Output = ()>,
{
let start_time = Instant::now();
let start_ops = self.counters.get_operations.load(Ordering::Relaxed)
+ self.counters.put_operations.load(Ordering::Relaxed);
{
let mut histogram = self.latency_histogram.write().unwrap();
histogram.clear();
}
benchmark_fn().await;
let duration = start_time.elapsed();
let end_ops = self.counters.get_operations.load(Ordering::Relaxed)
+ self.counters.put_operations.load(Ordering::Relaxed);
let ops_performed = end_ops - start_ops;
let ops_per_second = if duration.as_secs_f64() > 0.0 {
ops_performed as f64 / duration.as_secs_f64()
} else {
0.0
};
let (avg_latency, p95_latency, p99_latency) = self.calculate_latency_percentiles().await;
let result = BenchmarkResult {
name: name.to_string(),
ops_per_second,
avg_latency_us: avg_latency,
p95_latency_us: p95_latency,
p99_latency_us: p99_latency,
memory_usage: self.counters.current_memory_usage.load(Ordering::Relaxed),
hit_rate: self.hit_rate(),
duration,
timestamp: SystemTime::now(),
};
{
let mut history = self.benchmark_history.write().unwrap();
history.push(result.clone());
}
result
}
async fn calculate_latency_percentiles(&self) -> (f64, f64, f64) {
let histogram = self.latency_histogram.read().unwrap();
if histogram.is_empty() {
return (0.0, 0.0, 0.0);
}
let mut latencies: Vec<(u64, u64)> = histogram
.iter()
.map(|(&latency, &count)| (latency, count))
.collect();
latencies.sort_by_key(|&(latency, _)| latency);
let total_samples: u64 = latencies.iter().map(|(_, count)| count).sum();
if total_samples == 0 {
return (0.0, 0.0, 0.0);
}
let weighted_sum: u64 = latencies
.iter()
.map(|(latency, count)| latency * count)
.sum();
let avg_latency = weighted_sum as f64 / total_samples as f64;
let p95_target = (total_samples as f64 * 0.95) as u64;
let p99_target = (total_samples as f64 * 0.99) as u64;
let mut cumulative = 0u64;
let mut p95_latency = 0.0;
let mut p99_latency = 0.0;
for &(latency, count) in &latencies {
cumulative += count;
if p95_latency == 0.0 && cumulative >= p95_target {
p95_latency = latency as f64;
}
if p99_latency == 0.0 && cumulative >= p99_target {
p99_latency = latency as f64;
break;
}
}
(avg_latency, p95_latency, p99_latency)
}
pub async fn get_recent_events(&self, limit: usize) -> Vec<PerformanceEvent> {
let log = self.event_log.read().unwrap();
log.iter().rev().take(limit).cloned().collect()
}
pub async fn get_benchmark_history(&self) -> Vec<BenchmarkResult> {
self.benchmark_history.read().unwrap().clone()
}
pub async fn reset(&self) {
self.counters.get_operations.store(0, Ordering::Relaxed);
self.counters.put_operations.store(0, Ordering::Relaxed);
self.counters.remove_operations.store(0, Ordering::Relaxed);
self.counters
.eviction_operations
.store(0, Ordering::Relaxed);
self.counters
.total_get_latency_us
.store(0, Ordering::Relaxed);
self.counters
.total_put_latency_us
.store(0, Ordering::Relaxed);
self.counters
.total_remove_latency_us
.store(0, Ordering::Relaxed);
self.counters
.total_eviction_latency_us
.store(0, Ordering::Relaxed);
self.counters.hit_count.store(0, Ordering::Relaxed);
self.counters.miss_count.store(0, Ordering::Relaxed);
self.counters
.total_bytes_allocated
.store(0, Ordering::Relaxed);
self.counters.peak_memory_usage.store(0, Ordering::Relaxed);
self.counters
.current_memory_usage
.store(0, Ordering::Relaxed);
self.counters.disk_bytes_read.store(0, Ordering::Relaxed);
self.counters.disk_bytes_written.store(0, Ordering::Relaxed);
{
let mut log = self.event_log.write().unwrap();
log.clear();
}
{
let mut histogram = self.latency_histogram.write().unwrap();
histogram.clear();
}
}
}