use dashmap::DashMap;
use once_cell::sync::Lazy;
use serde::Serialize;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use tracing::{span, Level};
#[derive(Debug)]
pub struct AtomicCounters {
pub l1_get_hits: AtomicU64,
pub l1_get_misses: AtomicU64,
pub l2_get_hits: AtomicU64,
pub l2_get_misses: AtomicU64,
pub l1_set_total: AtomicU64,
pub l2_set_total: AtomicU64,
pub l1_delete_total: AtomicU64,
pub l2_delete_total: AtomicU64,
pub total_operations: AtomicU64,
pub l1_items: AtomicU64,
pub l1_capacity_used: AtomicU64,
pub prefetch_total: AtomicU64,
pub compression_total: AtomicU64,
pub compression_bytes_saved: AtomicU64,
}
impl Default for AtomicCounters {
fn default() -> Self {
Self {
l1_get_hits: AtomicU64::new(0),
l1_get_misses: AtomicU64::new(0),
l2_get_hits: AtomicU64::new(0),
l2_get_misses: AtomicU64::new(0),
l1_set_total: AtomicU64::new(0),
l2_set_total: AtomicU64::new(0),
l1_delete_total: AtomicU64::new(0),
l2_delete_total: AtomicU64::new(0),
total_operations: AtomicU64::new(0),
l1_items: AtomicU64::new(0),
l1_capacity_used: AtomicU64::new(0),
prefetch_total: AtomicU64::new(0),
compression_total: AtomicU64::new(0),
compression_bytes_saved: AtomicU64::new(0),
}
}
}
#[derive(Clone, Debug, Default)]
pub struct Metrics {
pub counters: Arc<AtomicCounters>,
pub requests_total: Arc<DashMap<String, u64>>,
pub l2_health_status: Arc<DashMap<String, u8>>,
pub wal_entries: Arc<DashMap<String, usize>>,
pub operation_duration: Arc<DashMap<String, (f64, u64)>>,
pub batch_buffer_size: Arc<DashMap<String, usize>>,
pub batch_success_rate: Arc<DashMap<String, f64>>,
pub batch_throughput: Arc<DashMap<String, f64>>,
}
pub static GLOBAL_METRICS: Lazy<Metrics> = Lazy::new(Metrics::default);
impl Metrics {
pub fn record_request(&self, service: &str, layer: &str, op: &str, result: &str) {
let span = span!(Level::INFO, "cache_request", service, layer, op, result);
let _enter = span.enter();
match (layer, op, result) {
("L1", "get", "hit") => {
self.counters.l1_get_hits.fetch_add(1, Ordering::Relaxed);
self.counters
.total_operations
.fetch_add(1, Ordering::Relaxed);
return;
}
("L1", "get", "miss") => {
self.counters.l1_get_misses.fetch_add(1, Ordering::Relaxed);
self.counters
.total_operations
.fetch_add(1, Ordering::Relaxed);
return;
}
("L2", "get", "hit") => {
self.counters.l2_get_hits.fetch_add(1, Ordering::Relaxed);
self.counters
.total_operations
.fetch_add(1, Ordering::Relaxed);
return;
}
("L2", "get", "miss") => {
self.counters.l2_get_misses.fetch_add(1, Ordering::Relaxed);
self.counters
.total_operations
.fetch_add(1, Ordering::Relaxed);
return;
}
("L1", "set", "attempt") => {
self.counters.l1_set_total.fetch_add(1, Ordering::Relaxed);
self.counters
.total_operations
.fetch_add(1, Ordering::Relaxed);
return;
}
("L2", "set", "attempt") => {
self.counters.l2_set_total.fetch_add(1, Ordering::Relaxed);
self.counters
.total_operations
.fetch_add(1, Ordering::Relaxed);
return;
}
("L1", "delete", "attempt") => {
self.counters
.l1_delete_total
.fetch_add(1, Ordering::Relaxed);
self.counters
.total_operations
.fetch_add(1, Ordering::Relaxed);
return;
}
("L2", "delete", "attempt") => {
self.counters
.l2_delete_total
.fetch_add(1, Ordering::Relaxed);
self.counters
.total_operations
.fetch_add(1, Ordering::Relaxed);
return;
}
_ => {}
}
let key = format!("{}:{}:{}:{}", service, layer, op, result);
self.requests_total
.entry(key)
.and_modify(|v| *v += 1)
.or_insert(1);
}
pub fn record_duration(&self, service: &str, layer: &str, op: &str, duration_secs: f64) {
let key = format!("{}:{}:{}", service, layer, op);
self.operation_duration
.entry(key)
.and_modify(|entry| {
entry.0 += duration_secs;
entry.1 += 1;
})
.or_insert((duration_secs, 1));
}
pub fn set_health(&self, service: &str, status: u8) {
self.l2_health_status.insert(service.to_string(), status);
}
pub fn set_wal_size(&self, service: &str, size: usize) {
self.wal_entries.insert(service.to_string(), size);
}
pub fn set_batch_buffer_size(&self, service: &str, size: usize) {
self.batch_buffer_size.insert(service.to_string(), size);
}
pub fn set_batch_success_rate(&self, service: &str, rate: f64) {
self.batch_success_rate.insert(service.to_string(), rate);
}
pub fn set_batch_throughput(&self, service: &str, throughput: f64) {
self.batch_throughput
.insert(service.to_string(), throughput);
}
pub fn get_counters(&self) -> (u64, u64, u64, u64, u64, u64, u64, u64, u64) {
(
self.counters.l1_get_hits.load(Ordering::Relaxed),
self.counters.l1_get_misses.load(Ordering::Relaxed),
self.counters.l2_get_hits.load(Ordering::Relaxed),
self.counters.l2_get_misses.load(Ordering::Relaxed),
self.counters.l1_set_total.load(Ordering::Relaxed),
self.counters.l2_set_total.load(Ordering::Relaxed),
self.counters.l1_delete_total.load(Ordering::Relaxed),
self.counters.l2_delete_total.load(Ordering::Relaxed),
self.counters.total_operations.load(Ordering::Relaxed),
)
}
}
#[cfg(any(feature = "metrics", feature = "l1-moka"))]
pub fn get_metrics_string() -> String {
let metrics = &GLOBAL_METRICS;
let mut output = String::new();
let counters = metrics.get_counters();
output.push_str(&format!("cache_l1_get_hits_total {}\n", counters.0));
output.push_str(&format!("cache_l1_get_misses_total {}\n", counters.1));
output.push_str(&format!("cache_l2_get_hits_total {}\n", counters.2));
output.push_str(&format!("cache_l2_get_misses_total {}\n", counters.3));
output.push_str(&format!("cache_l1_set_total {}\n", counters.4));
output.push_str(&format!("cache_l2_set_total {}\n", counters.5));
output.push_str(&format!("cache_l1_delete_total {}\n", counters.6));
output.push_str(&format!("cache_l2_delete_total {}\n", counters.7));
output.push_str(&format!("cache_operations_total {}\n", counters.8));
let requests: &DashMap<String, u64> = &metrics.requests_total;
for entry in requests.iter() {
let (key, value): (&String, &u64) = entry.pair();
output.push_str(&format!(
"cache_requests_total{{labels=\"{}\"}} {}\n",
key, value
));
}
let health_status: &DashMap<String, u8> = &metrics.l2_health_status;
for entry in health_status.iter() {
let (key, value): (&String, &u8) = entry.pair();
output.push_str(&format!(
"cache_l2_health_status{{service=\"{}\"}} {}\n",
key, value
));
}
let wal_entries: &DashMap<String, usize> = &metrics.wal_entries;
for entry in wal_entries.iter() {
let (key, value): (&String, &usize) = entry.pair();
output.push_str(&format!(
"cache_wal_entries{{service=\"{}\"}} {}\n",
key, value
));
}
let durations: &DashMap<String, (f64, u64)> = &metrics.operation_duration;
for entry in durations.iter() {
let (key, (total_duration, count)): (&String, &(f64, u64)) = entry.pair();
if *count > 0 {
let parts: Vec<&str> = key.split(':').collect();
if parts.len() >= 3 {
let service = parts[0];
let layer = parts[1];
let op = parts[2];
let avg_duration = total_duration / *count as f64;
output.push_str(&format!(
"cache_operation_duration_seconds{{service=\"{}\",layer=\"{}\",op=\"{}\"}} {}\n",
service,
layer,
op,
avg_duration
));
}
}
}
let buffer_sizes: &DashMap<String, usize> = &metrics.batch_buffer_size;
for entry in buffer_sizes.iter() {
let (key, value): (&String, &usize) = entry.pair();
output.push_str(&format!(
"cache_batch_buffer_size{{service=\"{}\"}} {}\n",
key, value
));
}
let success_rates: &DashMap<String, f64> = &metrics.batch_success_rate;
for entry in success_rates.iter() {
let (key, value): (&String, &f64) = entry.pair();
output.push_str(&format!(
"cache_batch_success_rate{{service=\"{}\"}} {}\n",
key, value
));
}
let throughputs: &DashMap<String, f64> = &metrics.batch_throughput;
for entry in throughputs.iter() {
let (key, value): (&String, &f64) = entry.pair();
output.push_str(&format!(
"cache_batch_throughput{{service=\"{}\"}} {}\n",
key, value
));
}
output
}
#[cfg(not(any(feature = "metrics", feature = "l1-moka")))]
#[derive(Debug, Clone, Default)]
pub struct Metrics;
#[cfg(not(any(feature = "metrics", feature = "l1-moka")))]
impl Metrics {
pub fn record_request(&self, _service: &str, _layer: &str, _op: &str, _result: &str) {}
pub fn record_duration(&self, _service: &str, _layer: &str, _op: &str, _duration_secs: f64) {}
pub fn set_health(&self, _service: &str, _status: u8) {}
pub fn set_wal_size(&self, _service: &str, _size: usize) {}
pub fn set_batch_buffer_size(&self, _service: &str, _size: usize) {}
pub fn set_batch_success_rate(&self, _service: &str, _rate: f64) {}
pub fn set_batch_throughput(&self, _service: &str, _throughput: f64) {}
pub fn get_counters(&self) -> (u64, u64, u64, u64, u64, u64, u64, u64, u64) {
(0, 0, 0, 0, 0, 0, 0, 0, 0)
}
}
#[cfg(not(any(feature = "metrics", feature = "l1-moka")))]
lazy_static! {
pub static ref GLOBAL_METRICS: Metrics = Metrics;
}
#[cfg(not(any(feature = "metrics", feature = "l1-moka")))]
pub fn get_metrics_string() -> String {
String::new()
}
#[derive(Debug, Clone, Default, Serialize)]
#[cfg(any(feature = "enhanced-stats", feature = "metrics"))]
pub struct CacheStats {
pub l1_hits: u64,
pub l1_misses: u64,
pub l2_hits: u64,
pub l2_misses: u64,
pub l1_sets: u64,
pub l2_sets: u64,
pub l1_deletes: u64,
pub l2_deletes: u64,
pub total_operations: u64,
pub l1_item_count: u64,
pub l1_capacity_used: u64,
pub prefetch_count: u64,
pub compression_count: u64,
pub compression_bytes_saved: u64,
pub timestamp: chrono::DateTime<chrono::Utc>,
}
#[cfg(any(feature = "enhanced-stats", feature = "metrics"))]
impl CacheStats {
pub fn l1_hit_rate(&self) -> f64 {
let total = self.l1_hits + self.l1_misses;
if total == 0 {
0.0
} else {
self.l1_hits as f64 / total as f64
}
}
pub fn l2_hit_rate(&self) -> f64 {
let total = self.l2_hits + self.l2_misses;
if total == 0 {
0.0
} else {
self.l2_hits as f64 / total as f64
}
}
pub fn overall_hit_rate(&self) -> f64 {
let total = self.l1_hits + self.l1_misses + self.l2_hits + self.l2_misses;
if total == 0 {
0.0
} else {
(self.l1_hits + self.l2_hits) as f64 / total as f64
}
}
pub fn l1_hit_rate_percent(&self) -> String {
format!("{:.2}%", self.l1_hit_rate() * 100.0)
}
pub fn l2_hit_rate_percent(&self) -> String {
format!("{:.2}%", self.l2_hit_rate() * 100.0)
}
pub fn overall_hit_rate_percent(&self) -> String {
format!("{:.2}%", self.overall_hit_rate() * 100.0)
}
pub fn export_prometheus(&self) -> String {
let mut output = String::new();
output.push_str("# Cache Statistics\n");
output.push_str(&format!("# Generated at: {}\n", self.timestamp));
output.push_str(&format!("cache_l1_hits_total {}\n", self.l1_hits));
output.push_str(&format!("cache_l1_misses_total {}\n", self.l1_misses));
output.push_str(&format!("cache_l2_hits_total {}\n", self.l2_hits));
output.push_str(&format!("cache_l2_misses_total {}\n", self.l2_misses));
output.push_str(&format!("cache_l1_sets_total {}\n", self.l1_sets));
output.push_str(&format!("cache_l2_sets_total {}\n", self.l2_sets));
output.push_str(&format!("cache_l1_deletes_total {}\n", self.l1_deletes));
output.push_str(&format!("cache_l2_deletes_total {}\n", self.l2_deletes));
output.push_str(&format!(
"cache_operations_total {}\n",
self.total_operations
));
output.push_str(&format!("cache_l1_hit_rate {}\n", self.l1_hit_rate()));
output.push_str(&format!("cache_l2_hit_rate {}\n", self.l2_hit_rate()));
output.push_str(&format!(
"cache_overall_hit_rate {}\n",
self.overall_hit_rate()
));
output.push_str(&format!("cache_l1_item_count {}\n", self.l1_item_count));
output.push_str(&format!(
"cache_l1_capacity_used_bytes {}\n",
self.l1_capacity_used
));
output.push_str(&format!("cache_prefetch_total {}\n", self.prefetch_count));
output.push_str(&format!(
"cache_compression_total {}\n",
self.compression_count
));
output.push_str(&format!(
"cache_compression_bytes_saved {}\n",
self.compression_bytes_saved
));
output
}
pub fn export_json(&self) -> Result<String, serde_json::Error> {
serde_json::to_string_pretty(self)
}
}
#[cfg(any(feature = "enhanced-stats", feature = "metrics"))]
impl Metrics {
pub fn snapshot(&self) -> CacheStats {
let counters = &self.counters;
CacheStats {
l1_hits: counters.l1_get_hits.load(Ordering::Relaxed),
l1_misses: counters.l1_get_misses.load(Ordering::Relaxed),
l2_hits: counters.l2_get_hits.load(Ordering::Relaxed),
l2_misses: counters.l2_get_misses.load(Ordering::Relaxed),
l1_sets: counters.l1_set_total.load(Ordering::Relaxed),
l2_sets: counters.l2_set_total.load(Ordering::Relaxed),
l1_deletes: counters.l1_delete_total.load(Ordering::Relaxed),
l2_deletes: counters.l2_delete_total.load(Ordering::Relaxed),
total_operations: counters.total_operations.load(Ordering::Relaxed),
l1_item_count: counters.l1_items.load(Ordering::Relaxed),
l1_capacity_used: counters.l1_capacity_used.load(Ordering::Relaxed),
prefetch_count: counters.prefetch_total.load(Ordering::Relaxed),
compression_count: counters.compression_total.load(Ordering::Relaxed),
compression_bytes_saved: counters.compression_bytes_saved.load(Ordering::Relaxed),
timestamp: chrono::Utc::now(),
}
}
pub fn reset(&self) {
let counters = &self.counters;
counters.l1_get_hits.store(0, Ordering::Relaxed);
counters.l1_get_misses.store(0, Ordering::Relaxed);
counters.l2_get_hits.store(0, Ordering::Relaxed);
counters.l2_get_misses.store(0, Ordering::Relaxed);
counters.l1_set_total.store(0, Ordering::Relaxed);
counters.l2_set_total.store(0, Ordering::Relaxed);
counters.l1_delete_total.store(0, Ordering::Relaxed);
counters.l2_delete_total.store(0, Ordering::Relaxed);
counters.total_operations.store(0, Ordering::Relaxed);
counters.l1_items.store(0, Ordering::Relaxed);
counters.l1_capacity_used.store(0, Ordering::Relaxed);
counters.prefetch_total.store(0, Ordering::Relaxed);
counters.compression_total.store(0, Ordering::Relaxed);
counters.compression_bytes_saved.store(0, Ordering::Relaxed);
self.requests_total.clear();
self.operation_duration.clear();
self.batch_buffer_size.clear();
self.batch_success_rate.clear();
self.batch_throughput.clear();
}
pub fn hit_rate(&self) -> f64 {
let counters = &self.counters;
let hits = counters.l1_get_hits.load(Ordering::Relaxed)
+ counters.l2_get_hits.load(Ordering::Relaxed);
let misses = counters.l1_get_misses.load(Ordering::Relaxed)
+ counters.l2_get_misses.load(Ordering::Relaxed);
let total = hits + misses;
if total == 0 {
1.0
} else {
hits as f64 / total as f64
}
}
pub fn hit_rate_percent(&self) -> String {
format!("{:.2}%", self.hit_rate() * 100.0)
}
pub fn record_prefetch(&self) {
self.counters.prefetch_total.fetch_add(1, Ordering::Relaxed);
}
pub fn record_compression(&self, bytes_saved: u64) {
self.counters
.compression_total
.fetch_add(1, Ordering::Relaxed);
self.counters
.compression_bytes_saved
.fetch_add(bytes_saved, Ordering::Relaxed);
}
pub fn set_l1_item_count(&self, count: u64) {
self.counters.l1_items.store(count, Ordering::Relaxed);
}
pub fn set_l1_capacity_used(&self, bytes: u64) {
self.counters
.l1_capacity_used
.store(bytes, Ordering::Relaxed);
}
pub fn export_prometheus(&self) -> String {
self.snapshot().export_prometheus()
}
pub fn export_json(&self) -> Result<String, serde_json::Error> {
self.snapshot().export_json()
}
}
#[cfg(any(feature = "enhanced-stats", feature = "metrics"))]
pub fn get_enhanced_stats() -> CacheStats {
GLOBAL_METRICS.snapshot()
}
#[cfg(any(feature = "enhanced-stats", feature = "metrics"))]
pub fn export_prometheus_format() -> String {
GLOBAL_METRICS.export_prometheus()
}
#[cfg(any(feature = "enhanced-stats", feature = "metrics"))]
pub fn export_json_format() -> Result<String, serde_json::Error> {
GLOBAL_METRICS.export_json()
}