use hdrhistogram::Histogram;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant};
type LatencyPercentiles = ((u64, u64, u64, u64), (u64, u64, u64, u64), (u64, u64, u64));
#[derive(Debug, Clone)]
pub struct DBStats {
pub writes_per_sec: f64,
pub reads_per_sec: f64,
pub deletes_per_sec: f64,
pub total_puts: u64,
pub total_gets: u64,
pub total_deletes: u64,
pub total_flushes: u64,
pub total_compactions: u64,
pub put_latency_p50_us: u64,
pub put_latency_p95_us: u64,
pub put_latency_p99_us: u64,
pub put_latency_p999_us: u64,
pub get_latency_p50_us: u64,
pub get_latency_p95_us: u64,
pub get_latency_p99_us: u64,
pub get_latency_p999_us: u64,
pub delete_latency_p50_us: u64,
pub delete_latency_p95_us: u64,
pub delete_latency_p99_us: u64,
pub memtable_size_bytes: usize,
pub memtable_capacity_bytes: usize,
pub memtable_utilization_pct: f64,
pub wal_size_bytes: u64,
pub total_disk_bytes: u64,
pub cache_hits: u64,
pub cache_misses: u64,
pub cache_hit_rate: f64,
pub block_cache_size: usize, pub block_cache_capacity: usize,
pub sstables_per_level: Vec<usize>,
pub level_sizes_bytes: Vec<u64>,
pub total_sstables: usize,
pub logical_bytes_written: u64, pub physical_bytes_written: u64, pub write_amplification: f64,
pub uptime_seconds: u64,
}
pub(crate) struct MetricsCollector {
pub(crate) total_puts: AtomicU64,
pub(crate) total_gets: AtomicU64,
pub(crate) total_deletes: AtomicU64,
pub(crate) total_flushes: AtomicU64,
pub(crate) total_compactions: AtomicU64,
pub(crate) logical_bytes_written: AtomicU64, pub(crate) physical_bytes_written: AtomicU64,
pub(crate) put_latencies: std::sync::Mutex<Histogram<u64>>,
pub(crate) get_latencies: std::sync::Mutex<Histogram<u64>>,
pub(crate) delete_latencies: std::sync::Mutex<Histogram<u64>>,
pub(crate) start_time: Instant,
}
impl MetricsCollector {
pub fn new() -> Self {
Self {
total_puts: AtomicU64::new(0),
total_gets: AtomicU64::new(0),
total_deletes: AtomicU64::new(0),
total_flushes: AtomicU64::new(0),
total_compactions: AtomicU64::new(0),
logical_bytes_written: AtomicU64::new(0),
physical_bytes_written: AtomicU64::new(0),
put_latencies: std::sync::Mutex::new(
Histogram::new_with_bounds(1, 60_000_000, 3).expect("Invalid histogram config"),
),
get_latencies: std::sync::Mutex::new(
Histogram::new_with_bounds(1, 60_000_000, 3).expect("Invalid histogram config"),
),
delete_latencies: std::sync::Mutex::new(
Histogram::new_with_bounds(1, 60_000_000, 3).expect("Invalid histogram config"),
),
start_time: Instant::now(),
}
}
#[inline]
pub fn record_put(&self, latency: Duration) {
self.total_puts.fetch_add(1, Ordering::Relaxed);
let latency_us = latency.as_micros() as u64;
if let Ok(mut hist) = self.put_latencies.lock() {
let _ = hist.record(latency_us); }
}
#[inline]
pub fn record_get(&self, latency: Duration) {
self.total_gets.fetch_add(1, Ordering::Relaxed);
let latency_us = latency.as_micros() as u64;
if let Ok(mut hist) = self.get_latencies.lock() {
let _ = hist.record(latency_us);
}
}
#[inline]
pub fn record_delete(&self, latency: Duration) {
self.total_deletes.fetch_add(1, Ordering::Relaxed);
let latency_us = latency.as_micros() as u64;
if let Ok(mut hist) = self.delete_latencies.lock() {
let _ = hist.record(latency_us);
}
}
#[inline]
pub fn record_flush(&self) {
self.total_flushes.fetch_add(1, Ordering::Relaxed);
}
#[inline]
#[allow(dead_code)] pub fn record_compaction(&self) {
self.total_compactions.fetch_add(1, Ordering::Relaxed);
}
#[inline]
pub fn record_logical_bytes(&self, bytes: u64) {
self.logical_bytes_written
.fetch_add(bytes, Ordering::Relaxed);
}
#[inline]
pub fn record_physical_bytes(&self, bytes: u64) {
self.physical_bytes_written
.fetch_add(bytes, Ordering::Relaxed);
}
pub fn get_counts(&self) -> (u64, u64, u64, u64, u64) {
(
self.total_puts.load(Ordering::Relaxed),
self.total_gets.load(Ordering::Relaxed),
self.total_deletes.load(Ordering::Relaxed),
self.total_flushes.load(Ordering::Relaxed),
self.total_compactions.load(Ordering::Relaxed),
)
}
pub fn uptime_seconds(&self) -> u64 {
self.start_time.elapsed().as_secs()
}
pub fn calculate_throughput(&self) -> (f64, f64, f64) {
let uptime_secs = self.uptime_seconds() as f64;
if uptime_secs < 0.001 {
return (0.0, 0.0, 0.0);
}
let puts = self.total_puts.load(Ordering::Relaxed) as f64;
let gets = self.total_gets.load(Ordering::Relaxed) as f64;
let deletes = self.total_deletes.load(Ordering::Relaxed) as f64;
(
puts / uptime_secs,
gets / uptime_secs,
deletes / uptime_secs,
)
}
pub fn get_latency_percentiles(&self) -> LatencyPercentiles {
let put_stats = {
let hist = self.put_latencies.lock().expect("mutex poisoned");
(
hist.value_at_percentile(50.0),
hist.value_at_percentile(95.0),
hist.value_at_percentile(99.0),
hist.value_at_percentile(99.9),
)
};
let get_stats = {
let hist = self.get_latencies.lock().expect("mutex poisoned");
(
hist.value_at_percentile(50.0),
hist.value_at_percentile(95.0),
hist.value_at_percentile(99.0),
hist.value_at_percentile(99.9),
)
};
let delete_stats = {
let hist = self.delete_latencies.lock().expect("mutex poisoned");
(
hist.value_at_percentile(50.0),
hist.value_at_percentile(95.0),
hist.value_at_percentile(99.0),
)
};
(put_stats, get_stats, delete_stats)
}
}
impl Default for MetricsCollector {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::thread;
use std::time::Duration;
#[test]
fn test_metrics_collector_basic() {
let collector = MetricsCollector::new();
collector.record_put(Duration::from_micros(100));
collector.record_put(Duration::from_micros(200));
collector.record_get(Duration::from_micros(50));
collector.record_delete(Duration::from_micros(150));
let (puts, gets, deletes, flushes, compactions) = collector.get_counts();
assert_eq!(puts, 2);
assert_eq!(gets, 1);
assert_eq!(deletes, 1);
assert_eq!(flushes, 0);
assert_eq!(compactions, 0);
}
#[test]
fn test_metrics_latency_percentiles() {
let collector = MetricsCollector::new();
for i in 1..=100 {
collector.record_put(Duration::from_micros(i * 10));
}
let (put_stats, _, _) = collector.get_latency_percentiles();
let (p50, p95, p99, _p999) = put_stats;
assert!((400..=600).contains(&p50), "p50: {}", p50);
assert!((900..=1000).contains(&p95), "p95: {}", p95);
assert!((980..=1010).contains(&p99), "p99: {}", p99);
}
#[test]
fn test_metrics_throughput() {
let collector = MetricsCollector::new();
for _ in 0..100 {
collector.record_put(Duration::from_micros(10));
}
thread::sleep(Duration::from_secs(1));
let (writes_per_sec, _, _) = collector.calculate_throughput();
assert!(
writes_per_sec > 0.0 && writes_per_sec <= 200.0,
"writes_per_sec: {}",
writes_per_sec
);
}
#[test]
fn test_metrics_concurrent() {
let collector = std::sync::Arc::new(MetricsCollector::new());
let mut handles = vec![];
for _ in 0..10 {
let c = collector.clone();
let handle = thread::spawn(move || {
for _ in 0..100 {
c.record_put(Duration::from_micros(50));
c.record_get(Duration::from_micros(25));
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
let (puts, gets, _, _, _) = collector.get_counts();
assert_eq!(puts, 1000);
assert_eq!(gets, 1000);
}
#[test]
fn test_metrics_uptime() {
let collector = MetricsCollector::new();
thread::sleep(Duration::from_millis(100));
let uptime = collector.uptime_seconds();
assert!(uptime == 0);
thread::sleep(Duration::from_secs(1));
let uptime = collector.uptime_seconds();
assert!((1..=2).contains(&uptime));
}
}