use std::collections::VecDeque;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::broadcast;
use tracing::{debug, info, warn};
const MAX_HISTOGRAM_BUCKETS: usize = 100;
#[derive(Debug, Clone)]
pub struct HistogramBucket {
pub upper_bound_us: u64,
pub count: u64,
pub cumulative_percentile: f64,
}
impl HistogramBucket {
fn new(upper_bound_us: u64) -> Self {
Self {
upper_bound_us,
count: 0,
cumulative_percentile: 0.0,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum OperationType {
Get,
Set,
Delete,
Exists,
Batch,
Other,
}
impl OperationType {
pub fn name(&self) -> &'static str {
match self {
OperationType::Get => "get",
OperationType::Set => "set",
OperationType::Delete => "delete",
OperationType::Exists => "exists",
OperationType::Batch => "batch",
OperationType::Other => "other",
}
}
}
#[derive(Clone)]
pub struct LatencyHistogram {
buckets: Vec<u64>,
bucket_counts: Vec<Arc<AtomicU64>>,
total_count: Arc<AtomicU64>,
total_latency_us: Arc<AtomicU64>,
min_latency_us: Arc<AtomicU64>,
max_latency_us: Arc<AtomicU64>,
}
impl LatencyHistogram {
pub fn new(bucket_bounds_us: Vec<u64>) -> Self {
if bucket_bounds_us.len() > MAX_HISTOGRAM_BUCKETS {
panic!(
"Histogram bucket bounds exceed maximum of {} (got {})",
MAX_HISTOGRAM_BUCKETS,
bucket_bounds_us.len()
);
}
let bucket_counts: Vec<_> = bucket_bounds_us
.iter()
.map(|_| Arc::new(AtomicU64::new(0)))
.collect();
let max_latency = u64::MAX;
Self {
buckets: bucket_bounds_us,
bucket_counts,
total_count: Arc::new(AtomicU64::new(0)),
total_latency_us: Arc::new(AtomicU64::new(0)),
min_latency_us: Arc::new(AtomicU64::new(max_latency)),
max_latency_us: Arc::new(AtomicU64::new(0)),
}
}
pub fn record(&self, latency: Duration) {
let latency_us = latency.as_micros() as u64;
self.total_count.fetch_add(1, Ordering::Relaxed);
self.total_latency_us.fetch_add(latency_us, Ordering::Relaxed);
loop {
let current_min = self.min_latency_us.load(Ordering::Relaxed);
if latency_us >= current_min || current_min == u64::MAX {
break;
}
if self
.min_latency_us
.compare_exchange(current_min, latency_us, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
break;
}
}
loop {
let current_max = self.max_latency_us.load(Ordering::Relaxed);
if latency_us <= current_max {
break;
}
if self
.max_latency_us
.compare_exchange(current_max, latency_us, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
break;
}
}
for (i, bound) in self.buckets.iter().enumerate() {
if latency_us <= *bound {
self.bucket_counts[i].fetch_add(1, Ordering::Relaxed);
return;
}
}
if let Some(last) = self.bucket_counts.last() {
last.fetch_add(1, Ordering::Relaxed);
}
}
pub fn buckets(&self) -> Vec<HistogramBucket> {
let total = self.total_count.load(Ordering::Relaxed);
if total == 0 {
return self
.buckets
.iter()
.map(|b| HistogramBucket::new(*b))
.collect();
}
let mut result = Vec::new();
let mut cumulative = 0u64;
for (i, bound) in self.buckets.iter().enumerate() {
let count = self.bucket_counts[i].load(Ordering::Relaxed);
cumulative += count;
let percentile = cumulative as f64 / total as f64 * 100.0;
result.push(HistogramBucket {
upper_bound_us: *bound,
count,
cumulative_percentile: percentile,
});
}
result
}
pub fn summary(&self) -> HistogramSummary {
let total = self.total_count.load(Ordering::Relaxed);
let total_latency = self.total_latency_us.load(Ordering::Relaxed);
let min = self.min_latency_us.load(Ordering::Relaxed);
let max = self.max_latency_us.load(Ordering::Relaxed);
let avg_latency_us = if total > 0 {
total_latency as f64 / total as f64
} else {
0.0
};
HistogramSummary {
total_count: total,
avg_latency_us,
min_latency_us: if min == u64::MAX { 0 } else { min },
max_latency_us: max,
}
}
pub fn reset(&self) {
for counter in &self.bucket_counts {
counter.store(0, Ordering::Relaxed);
}
self.total_count.store(0, Ordering::Relaxed);
self.total_latency_us.store(0, Ordering::Relaxed);
self.min_latency_us.store(u64::MAX, Ordering::Relaxed);
self.max_latency_us.store(0, Ordering::Relaxed);
}
}
#[derive(Debug, Clone)]
pub struct HistogramSummary {
pub total_count: u64,
pub avg_latency_us: f64,
pub min_latency_us: u64,
pub max_latency_us: u64,
}
#[derive(Clone)]
pub struct OperationCounter {
op_type: OperationType,
success_count: Arc<AtomicU64>,
failure_count: Arc<AtomicU64>,
latency_histogram: LatencyHistogram,
}
impl OperationCounter {
pub fn new(op_type: OperationType, bucket_bounds_us: Vec<u64>) -> Self {
Self {
op_type,
success_count: Arc::new(AtomicU64::new(0)),
failure_count: Arc::new(AtomicU64::new(0)),
latency_histogram: LatencyHistogram::new(bucket_bounds_us),
}
}
pub fn record_success(&self, latency: Duration) {
self.success_count.fetch_add(1, Ordering::Relaxed);
self.latency_histogram.record(latency);
}
pub fn record_failure(&self, latency: Duration) {
self.failure_count.fetch_add(1, Ordering::Relaxed);
self.latency_histogram.record(latency);
}
pub fn stats(&self) -> OperationStats {
let success = self.success_count.load(Ordering::Relaxed);
let failure = self.failure_count.load(Ordering::Relaxed);
let total = success + failure;
let summary = self.latency_histogram.summary();
OperationStats {
op_type: self.op_type.name().to_string(),
total_count: total,
success_count: success,
failure_count: failure,
success_rate: if total > 0 { success as f64 / total as f64 * 100.0 } else { 0.0 },
avg_latency_us: summary.avg_latency_us,
min_latency_us: summary.min_latency_us,
max_latency_us: summary.max_latency_us,
}
}
}
#[derive(Debug, Clone)]
pub struct OperationStats {
pub op_type: String,
pub total_count: u64,
pub success_count: u64,
pub failure_count: u64,
pub success_rate: f64,
pub avg_latency_us: f64,
pub min_latency_us: u64,
pub max_latency_us: u64,
}
#[derive(Clone)]
pub struct MetricsCollector {
operation_counters: Arc<Vec<OperationCounter>>,
l1_hits: Arc<AtomicU64>,
l1_misses: Arc<AtomicU64>,
l2_hits: Arc<AtomicU64>,
l2_misses: Arc<AtomicU64>,
connections: Arc<AtomicUsize>,
active_tasks: Arc<AtomicUsize>,
queue_depth: Arc<AtomicUsize>,
_update_tx: broadcast::Sender<()>,
}
impl MetricsCollector {
pub fn new() -> Self {
let (tx, _) = broadcast::channel(1);
let op_types = vec![
OperationType::Get,
OperationType::Set,
OperationType::Delete,
OperationType::Exists,
OperationType::Batch,
];
let bucket_bounds = vec![100, 500, 1000, 5000, 10000, 50000, 100000, 500000, 1000000];
let operation_counters: Vec<_> = op_types
.into_iter()
.map(|op| OperationCounter::new(op, bucket_bounds.clone()))
.collect();
Self {
operation_counters: Arc::new(operation_counters),
l1_hits: Arc::new(AtomicU64::new(0)),
l1_misses: Arc::new(AtomicU64::new(0)),
l2_hits: Arc::new(AtomicU64::new(0)),
l2_misses: Arc::new(AtomicU64::new(0)),
connections: Arc::new(AtomicUsize::new(0)),
active_tasks: Arc::new(AtomicUsize::new(0)),
queue_depth: Arc::new(AtomicUsize::new(0)),
_update_tx: tx,
}
}
pub fn operation_counter(&self, op_type: OperationType) -> Option<&OperationCounter> {
self.operation_counters
.iter()
.find(|c| c.op_type == op_type)
}
pub fn record_l1_hit(&self) {
self.l1_hits.fetch_add(1, Ordering::Relaxed);
}
pub fn record_l1_miss(&self) {
self.l1_misses.fetch_add(1, Ordering::Relaxed);
}
pub fn record_l2_hit(&self) {
self.l2_hits.fetch_add(1, Ordering::Relaxed);
}
pub fn record_l2_miss(&self) {
self.l2_misses.fetch_add(1, Ordering::Relaxed);
}
pub fn set_connections(&self, count: usize) {
self.connections.store(count, Ordering::Relaxed);
}
pub fn set_active_tasks(&self, count: usize) {
self.active_tasks.store(count, Ordering::Relaxed);
}
pub fn set_queue_depth(&self, depth: usize) {
self.queue_depth.store(depth, Ordering::Relaxed);
}
pub fn full_stats(&self) -> FullMetrics {
let l1_hits = self.l1_hits.load(Ordering::Relaxed);
let l1_misses = self.l1_misses.load(Ordering::Relaxed);
let l2_hits = self.l2_hits.load(Ordering::Relaxed);
let l2_misses = self.l2_misses.load(Ordering::Relaxed);
let l1_total = l1_hits + l1_misses;
let l2_total = l2_hits + l2_misses;
let op_stats: Vec<_> = self
.operation_counters
.iter()
.map(|c| c.stats())
.collect();
FullMetrics {
l1_hits,
l1_misses,
l1_hit_rate: if l1_total > 0 { l1_hits as f64 / l1_total as f64 * 100.0 } else { 0.0 },
l2_hits,
l2_misses,
l2_hit_rate: if l2_total > 0 { l2_hits as f64 / l2_total as f64 * 100.0 } else { 0.0 },
connections: self.connections.load(Ordering::Relaxed),
active_tasks: self.active_tasks.load(Ordering::Relaxed),
queue_depth: self.queue_depth.load(Ordering::Relaxed),
operation_stats: op_stats,
}
}
pub fn cache_hit_rates(&self) -> CacheHitRates {
let l1_hits = self.l1_hits.load(Ordering::Relaxed);
let l1_misses = self.l1_misses.load(Ordering::Relaxed);
let l2_hits = self.l2_hits.load(Ordering::Relaxed);
let l2_misses = self.l2_misses.load(Ordering::Relaxed);
let l1_total = l1_hits + l1_misses;
let l2_total = l2_hits + l2_misses;
let global_hit_rate = if l1_total > 0 {
l1_hits as f64 / l1_total as f64 * 100.0
} else {
0.0
};
let l2_hit_rate = if l1_misses > 0 {
l2_hits as f64 / l1_misses as f64 * 100.0
} else {
0.0
};
CacheHitRates {
l1_hit_rate: global_hit_rate,
l2_hit_rate,
l1_hits,
l1_misses,
l2_hits,
l2_misses,
}
}
}
impl Default for MetricsCollector {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct CacheHitRates {
pub l1_hit_rate: f64,
pub l2_hit_rate: f64,
pub l1_hits: u64,
pub l1_misses: u64,
pub l2_hits: u64,
pub l2_misses: u64,
}
#[derive(Debug, Clone)]
pub struct FullMetrics {
pub l1_hits: u64,
pub l1_misses: u64,
pub l1_hit_rate: f64,
pub l2_hits: u64,
pub l2_misses: u64,
pub l2_hit_rate: f64,
pub connections: usize,
pub active_tasks: usize,
pub queue_depth: usize,
pub operation_stats: Vec<OperationStats>,
}
#[derive(Debug, Clone)]
pub struct PerformanceSnapshot {
pub timestamp: Instant,
pub metrics: FullMetrics,
pub interval_secs: f64,
}
impl PerformanceSnapshot {
pub fn new(metrics: FullMetrics, interval_secs: f64) -> Self {
Self {
timestamp: Instant::now(),
metrics,
interval_secs,
}
}
}
#[derive(Clone)]
pub struct SlidingWindowMetrics {
collector: Arc<MetricsCollector>,
snapshots: Arc<Mutex<VecDeque<PerformanceSnapshot>>>,
max_snapshots: usize,
window_secs: u64,
last_capture: Arc<Mutex<Instant>>,
}
impl SlidingWindowMetrics {
pub fn new(collector: Arc<MetricsCollector>, window_secs: u64, max_snapshots: usize) -> Self {
Self {
collector,
snapshots: Arc::new(Mutex::new(VecDeque::new())),
max_snapshots,
window_secs,
last_capture: Arc::new(Mutex::new(Instant::now())),
}
}
pub fn capture(&self) {
let mut last = self.last_capture.lock().await;
let now = Instant::now();
let interval = now.duration_since(*last).as_secs_f64();
let metrics = self.collector.full_stats();
let snapshot = PerformanceSnapshot::new(metrics, interval);
let mut snapshots = self.snapshots.lock().await;
snapshots.push_back(snapshot);
let now = Instant::now();
snapshots.retain(|s| now.duration_since(s.timestamp) < Duration::from_secs(self.window_secs));
while snapshots.len() > self.max_snapshots {
snapshots.pop_front();
}
*last = now;
}
pub async fn window_summary(&self) -> WindowMetricsSummary {
let snapshots = self.snapshots.lock().await;
let count = snapshots.len();
if count == 0 {
return WindowMetricsSummary::default();
}
let mut total_l1_hits = 0;
let mut total_l1_misses = 0;
let mut total_l2_hits = 0;
let mut total_l2_misses = 0;
let mut total_ops = 0;
let mut total_success = 0;
for snapshot in snapshots.iter() {
total_l1_hits += snapshot.metrics.l1_hits;
total_l1_misses += snapshot.metrics.l1_misses;
total_l2_hits += snapshot.metrics.l2_hits;
total_l2_misses += snapshot.metrics.l2_misses;
for op in &snapshot.metrics.operation_stats {
total_ops += op.total_count;
total_success += op.success_count;
}
}
let l1_total = total_l1_hits + total_l1_misses;
let l2_total = total_l2_hits + total_l2_misses;
WindowMetricsSummary {
snapshot_count: count,
window_secs: self.window_secs,
avg_l1_hit_rate: if l1_total > 0 { total_l1_hits as f64 / l1_total as f64 * 100.0 } else { 0.0 },
avg_l2_hit_rate: if l2_total > 0 { total_l2_hits as f64 / l2_total as f64 * 100.0 } else { 0.0 },
total_l1_hits,
total_l1_misses,
total_l2_hits,
total_l2_misses,
total_operations: total_ops,
success_rate: if total_ops > 0 { total_success as f64 / total_ops as f64 * 100.0 } else { 0.0 },
}
}
}
#[derive(Debug, Clone, Default)]
pub struct WindowMetricsSummary {
pub snapshot_count: usize,
pub window_secs: u64,
pub avg_l1_hit_rate: f64,
pub avg_l2_hit_rate: f64,
pub total_l1_hits: u64,
pub total_l1_misses: u64,
pub total_l2_hits: u64,
pub total_l2_misses: u64,
pub total_operations: u64,
pub success_rate: f64,
}