use std::collections::{HashMap, VecDeque};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use super::types::{
calculate_percentiles, CacheEvent, CacheEventType, CacheTelemetryMetrics, MetricsSnapshot,
TelemetryConfig,
};
pub struct CacheTelemetryCollector {
pub(super) current_metrics: Arc<Mutex<CacheTelemetryMetrics>>,
pub(super) recent_events: Arc<Mutex<VecDeque<CacheEvent>>>,
pub(super) snapshots: Arc<Mutex<VecDeque<MetricsSnapshot>>>,
pub(super) latency_histogram: Arc<Mutex<HashMap<u64, u64>>>,
pub(super) config: TelemetryConfig,
}
impl CacheTelemetryCollector {
pub fn new(config: TelemetryConfig) -> Self {
Self {
current_metrics: Arc::new(Mutex::new(CacheTelemetryMetrics::new())),
recent_events: Arc::new(Mutex::new(VecDeque::with_capacity(config.max_events))),
snapshots: Arc::new(Mutex::new(VecDeque::with_capacity(config.max_snapshots))),
latency_histogram: Arc::new(Mutex::new(HashMap::new())),
config,
}
}
pub fn record_hit(&self, latency: Duration, size_bytes: Option<usize>, key_hash: u64) {
{
let mut metrics = self
.current_metrics
.lock()
.expect("lock should not be poisoned");
metrics.hits += 1;
let latency_us = latency.as_micros() as f64;
let total_hits = metrics.hits as f64;
metrics.avg_hit_latency_us =
(metrics.avg_hit_latency_us * (total_hits - 1.0) + latency_us) / total_hits;
if let Some(size) = size_bytes {
metrics.current_size_bytes = metrics.current_size_bytes.saturating_add(size);
}
}
if self.config.track_latency_histogram {
self.record_latency(latency);
}
self.record_event(CacheEvent {
event_type: CacheEventType::Hit,
timestamp: Instant::now(),
latency,
size_bytes,
key_hash,
});
}
pub fn record_miss(&self, latency: Duration, size_bytes: Option<usize>, key_hash: u64) {
{
let mut metrics = self
.current_metrics
.lock()
.expect("lock should not be poisoned");
metrics.misses += 1;
let latency_us = latency.as_micros() as f64;
let total_misses = metrics.misses as f64;
metrics.avg_miss_latency_us =
(metrics.avg_miss_latency_us * (total_misses - 1.0) + latency_us) / total_misses;
}
if self.config.track_latency_histogram {
self.record_latency(latency);
}
self.record_event(CacheEvent {
event_type: CacheEventType::Miss,
timestamp: Instant::now(),
latency,
size_bytes,
key_hash,
});
}
pub fn record_eviction(&self, size_bytes: Option<usize>, key_hash: u64) {
{
let mut metrics = self
.current_metrics
.lock()
.expect("lock should not be poisoned");
metrics.evictions += 1;
if let Some(size) = size_bytes {
metrics.current_size_bytes = metrics.current_size_bytes.saturating_sub(size);
metrics.total_freed_bytes += size as u64;
}
}
self.record_event(CacheEvent {
event_type: CacheEventType::Eviction,
timestamp: Instant::now(),
latency: Duration::from_micros(0),
size_bytes,
key_hash,
});
}
pub fn record_insertion(&self, size_bytes: Option<usize>, key_hash: u64) {
{
let mut metrics = self
.current_metrics
.lock()
.expect("lock should not be poisoned");
metrics.insertions += 1;
if let Some(size) = size_bytes {
metrics.current_size_bytes = metrics.current_size_bytes.saturating_add(size);
metrics.peak_size_bytes = metrics.peak_size_bytes.max(metrics.current_size_bytes);
metrics.total_allocated_bytes += size as u64;
}
}
self.record_event(CacheEvent {
event_type: CacheEventType::Insertion,
timestamp: Instant::now(),
latency: Duration::from_micros(0),
size_bytes,
key_hash,
});
}
pub fn get_metrics(&self) -> CacheTelemetryMetrics {
let mut metrics = self
.current_metrics
.lock()
.expect("lock should not be poisoned")
.clone();
metrics.window_duration = metrics.window_start.elapsed();
metrics.calculate_derived();
if self.config.track_latency_histogram {
let histogram = self
.latency_histogram
.lock()
.expect("lock should not be poisoned");
let percentiles = calculate_percentiles(&histogram);
metrics.p50_latency_us = percentiles.0;
metrics.p95_latency_us = percentiles.1;
metrics.p99_latency_us = percentiles.2;
}
metrics
}
pub fn snapshot(&self) {
let snapshot = MetricsSnapshot {
timestamp: Instant::now(),
metrics: self.get_metrics(),
};
let mut snapshots = self.snapshots.lock().expect("lock should not be poisoned");
snapshots.push_back(snapshot);
while snapshots.len() > self.config.max_snapshots {
snapshots.pop_front();
}
}
pub fn get_recent_events(&self, count: usize) -> Vec<CacheEvent> {
self.recent_events
.lock()
.expect("lock should not be poisoned")
.iter()
.rev()
.take(count)
.cloned()
.collect()
}
pub fn get_snapshots(&self) -> Vec<MetricsSnapshot> {
self.snapshots
.lock()
.expect("lock should not be poisoned")
.iter()
.cloned()
.collect()
}
pub fn reset(&self) {
*self
.current_metrics
.lock()
.expect("lock should not be poisoned") = CacheTelemetryMetrics::new();
self.recent_events
.lock()
.expect("lock should not be poisoned")
.clear();
self.snapshots
.lock()
.expect("lock should not be poisoned")
.clear();
self.latency_histogram
.lock()
.expect("lock should not be poisoned")
.clear();
}
pub(super) fn record_event(&self, event: CacheEvent) {
let mut events = self
.recent_events
.lock()
.expect("lock should not be poisoned");
events.push_back(event);
while events.len() > self.config.max_events {
events.pop_front();
}
}
pub(super) fn record_latency(&self, latency: Duration) {
let bucket = (latency.as_micros() as u64 / 100) * 100; let mut histogram = self
.latency_histogram
.lock()
.expect("lock should not be poisoned");
*histogram.entry(bucket).or_insert(0) += 1;
}
}