use hdrhistogram::Histogram;
use parking_lot::Mutex;
use serde::Serialize;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::time::Instant;
#[derive(Debug, Serialize, Clone)]
pub struct EngineStats {
pub total_records_written: u64,
pub total_bytes_written: u64,
pub total_records_read: u64,
pub total_records_expired: u64,
pub total_flushes: u64,
pub total_gc_runs: u64,
pub total_compaction_runs: u64,
pub records_purged_by_gc: u64,
pub udp_packets_received: u64,
pub udp_packets_dropped: u64,
pub http_requests_total: u64,
pub memtable_records: usize,
pub memtable_bytes: usize,
pub frozen_memtable_count: usize,
pub sstable_count: usize,
pub sstable_bytes: u64,
pub wal_bytes: u64,
pub block_meta_index_entries: usize,
pub time_index_buckets: usize,
pub block_cache_hit_rate: f64,
pub compression_ratio: f64,
pub write_latency_p50_us: u64,
pub write_latency_p90_us: u64,
pub write_latency_p99_us: u64,
pub query_latency_p50_us: u64,
pub query_latency_p90_us: u64,
pub query_latency_p99_us: u64,
pub flush_latency_p50_us: u64,
pub flush_latency_p90_us: u64,
pub flush_latency_p99_us: u64,
pub uptime_secs: u64,
pub last_gc_at: i64,
pub last_flush_at: i64,
pub last_compaction_at: i64,
}
pub struct StatsCounters {
total_records_written: AtomicU64,
total_bytes_written: AtomicU64,
total_records_read: AtomicU64,
total_records_expired: AtomicU64,
total_flushes: AtomicU64,
total_gc_runs: AtomicU64,
total_compaction_runs: AtomicU64,
records_purged_by_gc: AtomicU64,
pub udp_packets_received: AtomicU64,
pub udp_packets_dropped: AtomicU64,
pub http_requests_total: AtomicU64,
memtable_records: AtomicUsize,
memtable_bytes: AtomicUsize,
frozen_memtable_count: AtomicUsize,
sstable_count: AtomicUsize,
sstable_bytes: AtomicU64,
wal_bytes: AtomicU64,
block_meta_index_entries: AtomicUsize,
time_index_buckets: AtomicUsize,
compression_ratio_permille: AtomicU64,
started_at: Instant,
write_latency: Mutex<Histogram<u64>>,
query_latency: Mutex<Histogram<u64>>,
flush_latency: Mutex<Histogram<u64>>,
last_gc_at: Mutex<i64>,
last_flush_at: Mutex<i64>,
last_compaction_at: Mutex<i64>,
}
impl Default for StatsCounters {
fn default() -> Self {
Self::new()
}
}
impl StatsCounters {
pub fn new() -> Self {
Self {
total_records_written: AtomicU64::new(0),
total_bytes_written: AtomicU64::new(0),
total_records_read: AtomicU64::new(0),
total_records_expired: AtomicU64::new(0),
total_flushes: AtomicU64::new(0),
total_gc_runs: AtomicU64::new(0),
total_compaction_runs: AtomicU64::new(0),
records_purged_by_gc: AtomicU64::new(0),
udp_packets_received: AtomicU64::new(0),
udp_packets_dropped: AtomicU64::new(0),
http_requests_total: AtomicU64::new(0),
memtable_records: AtomicUsize::new(0),
memtable_bytes: AtomicUsize::new(0),
frozen_memtable_count: AtomicUsize::new(0),
sstable_count: AtomicUsize::new(0),
sstable_bytes: AtomicU64::new(0),
wal_bytes: AtomicU64::new(0),
block_meta_index_entries: AtomicUsize::new(0),
time_index_buckets: AtomicUsize::new(0),
compression_ratio_permille: AtomicU64::new(1000),
started_at: Instant::now(),
write_latency: Mutex::new(Histogram::new(3).unwrap()),
query_latency: Mutex::new(Histogram::new(3).unwrap()),
flush_latency: Mutex::new(Histogram::new(3).unwrap()),
last_gc_at: Mutex::new(0),
last_flush_at: Mutex::new(0),
last_compaction_at: Mutex::new(0),
}
}
pub fn records_written(&self, count: u64, bytes: u64) {
self.total_records_written
.fetch_add(count, Ordering::Relaxed);
self.total_bytes_written.fetch_add(bytes, Ordering::Relaxed);
}
pub fn record_written(&self, bytes: u64) {
self.records_written(1, bytes);
}
pub fn records_read(&self, count: u64) {
self.total_records_read.fetch_add(count, Ordering::Relaxed);
}
pub fn records_expired(&self, count: u64) {
self.total_records_expired
.fetch_add(count, Ordering::Relaxed);
}
pub fn flush_done(&self) {
self.total_flushes.fetch_add(1, Ordering::Relaxed);
*self.last_flush_at.lock() = chrono_now_us();
}
pub fn gc_done(&self, purged: u64) {
self.total_gc_runs.fetch_add(1, Ordering::Relaxed);
self.records_purged_by_gc
.fetch_add(purged, Ordering::Relaxed);
*self.last_gc_at.lock() = chrono_now_us();
}
pub fn compaction_done(&self) {
self.total_compaction_runs.fetch_add(1, Ordering::Relaxed);
*self.last_compaction_at.lock() = chrono_now_us();
}
pub fn set_memtable(&self, records: usize, bytes: usize) {
self.memtable_records.store(records, Ordering::Relaxed);
self.memtable_bytes.store(bytes, Ordering::Relaxed);
}
pub fn set_frozen_count(&self, count: usize) {
self.frozen_memtable_count.store(count, Ordering::Relaxed);
}
pub fn set_sstable(&self, count: usize, bytes: u64) {
self.sstable_count.store(count, Ordering::Relaxed);
self.sstable_bytes.store(bytes, Ordering::Relaxed);
}
pub fn add_wal_bytes(&self, bytes: u64) {
self.wal_bytes.fetch_add(bytes, Ordering::Relaxed);
}
pub fn set_index_stats(&self, entries: usize, buckets: usize) {
self.block_meta_index_entries
.store(entries, Ordering::Relaxed);
self.time_index_buckets.store(buckets, Ordering::Relaxed);
}
pub fn set_compression_ratio(&self, ratio: f64) {
self.compression_ratio_permille
.store((ratio * 1000.0) as u64, Ordering::Relaxed);
}
pub fn record_write_latency(&self, us: u64) {
if let Some(mut h) = self.write_latency.try_lock() {
let _ = h.record(us);
}
}
pub fn record_query_latency(&self, us: u64) {
if let Some(mut h) = self.query_latency.try_lock() {
let _ = h.record(us);
}
}
pub fn record_flush_latency(&self, us: u64) {
if let Some(mut h) = self.flush_latency.try_lock() {
let _ = h.record(us);
}
}
pub fn snapshot(&self, cache_hit_rate: f64) -> EngineStats {
let cr = self.compression_ratio_permille.load(Ordering::Relaxed) as f64 / 1000.0;
let (wp50, wp90, wp99) = histogram_percentiles(&self.write_latency);
let (qp50, qp90, qp99) = histogram_percentiles(&self.query_latency);
let (fp50, fp90, fp99) = histogram_percentiles(&self.flush_latency);
EngineStats {
total_records_written: self.total_records_written.load(Ordering::Relaxed),
total_bytes_written: self.total_bytes_written.load(Ordering::Relaxed),
total_records_read: self.total_records_read.load(Ordering::Relaxed),
total_records_expired: self.total_records_expired.load(Ordering::Relaxed),
total_flushes: self.total_flushes.load(Ordering::Relaxed),
total_gc_runs: self.total_gc_runs.load(Ordering::Relaxed),
total_compaction_runs: self.total_compaction_runs.load(Ordering::Relaxed),
records_purged_by_gc: self.records_purged_by_gc.load(Ordering::Relaxed),
udp_packets_received: self.udp_packets_received.load(Ordering::Relaxed),
udp_packets_dropped: self.udp_packets_dropped.load(Ordering::Relaxed),
http_requests_total: self.http_requests_total.load(Ordering::Relaxed),
memtable_records: self.memtable_records.load(Ordering::Relaxed),
memtable_bytes: self.memtable_bytes.load(Ordering::Relaxed),
frozen_memtable_count: self.frozen_memtable_count.load(Ordering::Relaxed),
sstable_count: self.sstable_count.load(Ordering::Relaxed),
sstable_bytes: self.sstable_bytes.load(Ordering::Relaxed),
wal_bytes: self.wal_bytes.load(Ordering::Relaxed),
block_meta_index_entries: self.block_meta_index_entries.load(Ordering::Relaxed),
time_index_buckets: self.time_index_buckets.load(Ordering::Relaxed),
block_cache_hit_rate: cache_hit_rate,
compression_ratio: cr,
write_latency_p50_us: wp50,
write_latency_p90_us: wp90,
write_latency_p99_us: wp99,
query_latency_p50_us: qp50,
query_latency_p90_us: qp90,
query_latency_p99_us: qp99,
flush_latency_p50_us: fp50,
flush_latency_p90_us: fp90,
flush_latency_p99_us: fp99,
uptime_secs: self.started_at.elapsed().as_secs(),
last_gc_at: *self.last_gc_at.lock(),
last_flush_at: *self.last_flush_at.lock(),
last_compaction_at: *self.last_compaction_at.lock(),
}
}
pub fn to_prometheus(&self, cache_hit_rate: f64) -> String {
let s = self.snapshot(cache_hit_rate);
format!(
include_str!("prometheus_template.txt"),
s.total_records_written,
s.total_bytes_written,
s.total_records_read,
s.total_records_expired,
s.total_flushes,
s.total_gc_runs,
s.total_compaction_runs,
s.records_purged_by_gc,
s.udp_packets_received,
s.udp_packets_dropped,
s.http_requests_total,
s.memtable_records,
s.memtable_bytes,
s.frozen_memtable_count,
s.sstable_count,
s.sstable_bytes,
s.wal_bytes,
s.block_meta_index_entries,
s.time_index_buckets,
s.block_cache_hit_rate,
s.compression_ratio,
s.write_latency_p50_us,
s.write_latency_p90_us,
s.write_latency_p99_us,
s.query_latency_p50_us,
s.query_latency_p90_us,
s.query_latency_p99_us,
s.flush_latency_p50_us,
s.flush_latency_p90_us,
s.flush_latency_p99_us,
s.uptime_secs,
)
}
}
fn histogram_percentiles(h: &Mutex<Histogram<u64>>) -> (u64, u64, u64) {
let h = h.lock();
(
h.value_at_percentile(50.0),
h.value_at_percentile(90.0),
h.value_at_percentile(99.0),
)
}
fn chrono_now_us() -> i64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_micros() as i64
}