Skip to main content

dbx_core/monitoring/
metrics.rs

1//! DbxMetrics — Global atomic metrics registry.
2//!
3//! All counters use `AtomicU64` for lock-free, zero-overhead tracking.
4//! Histograms use `Mutex<HistogramInner>` for bucket tracking.
5
6use crate::monitoring::histogram::Histogram;
7use std::sync::atomic::{AtomicU64, Ordering};
8
9/// Snapshot of current metrics values (non-atomic copy for reporting).
10#[derive(Debug, Clone, Default)]
11pub struct MetricsSnapshot {
12    // Operation Counters
13    pub inserts_total: u64,
14    pub gets_total: u64,
15    pub deletes_total: u64,
16    pub sql_queries_total: u64,
17    pub flush_total: u64,
18
19    // Tier Hit Rates
20    pub delta_hits: u64,
21    pub delta_misses: u64,
22    pub cache_hits: u64,
23    pub cache_misses: u64,
24    pub wos_hits: u64,
25    pub wos_misses: u64,
26
27    // Sharding Stats
28    pub scatter_writes_total: u64,
29    pub scatter_reads_total: u64,
30
31    // Partition Stats
32    pub partition_prune_hits: u64,
33
34    // WAL Stats
35    pub wal_appends_total: u64,
36    pub wal_compactions_total: u64,
37
38    // Latency (avg µs)
39    pub avg_query_latency_us: u64,
40    pub avg_insert_latency_us: u64,
41
42    // Computed hit rates (0.0 - 1.0)
43    pub delta_hit_rate: f64,
44    pub cache_hit_rate: f64,
45    pub wos_hit_rate: f64,
46}
47
48/// Global metrics registry for DBX.
49///
50/// Embedded in `Database` as `Arc<DbxMetrics>` — cloning the Arc is cheap.
51pub struct DbxMetrics {
52    // ── Operation Counters ──────────────────────────
53    pub inserts_total: AtomicU64,
54    pub gets_total: AtomicU64,
55    pub deletes_total: AtomicU64,
56    pub sql_queries_total: AtomicU64,
57    pub flush_total: AtomicU64,
58
59    // ── Tier Hit/Miss Counters ───────────────────────
60    pub delta_hits: AtomicU64,
61    pub delta_misses: AtomicU64,
62    pub cache_hits: AtomicU64,
63    pub cache_misses: AtomicU64,
64    pub wos_hits: AtomicU64,
65    pub wos_misses: AtomicU64,
66
67    // ── Sharding Stats ──────────────────────────────
68    pub scatter_writes_total: AtomicU64,
69    pub scatter_reads_total: AtomicU64,
70
71    // ── Partition Stats ─────────────────────────────
72    pub partition_prune_hits: AtomicU64,
73
74    // ── WAL Stats ───────────────────────────────────
75    pub wal_appends_total: AtomicU64,
76    pub wal_compactions_total: AtomicU64,
77
78    // ── Latency Histograms ──────────────────────────
79    pub query_latency_us: Histogram,
80    pub insert_latency_us: Histogram,
81}
82
83impl DbxMetrics {
84    pub fn new() -> Self {
85        Self {
86            inserts_total: AtomicU64::new(0),
87            gets_total: AtomicU64::new(0),
88            deletes_total: AtomicU64::new(0),
89            sql_queries_total: AtomicU64::new(0),
90            flush_total: AtomicU64::new(0),
91
92            delta_hits: AtomicU64::new(0),
93            delta_misses: AtomicU64::new(0),
94            cache_hits: AtomicU64::new(0),
95            cache_misses: AtomicU64::new(0),
96            wos_hits: AtomicU64::new(0),
97            wos_misses: AtomicU64::new(0),
98
99            scatter_writes_total: AtomicU64::new(0),
100            scatter_reads_total: AtomicU64::new(0),
101
102            partition_prune_hits: AtomicU64::new(0),
103
104            wal_appends_total: AtomicU64::new(0),
105            wal_compactions_total: AtomicU64::new(0),
106
107            query_latency_us: Histogram::new(
108                "dbx_query_latency_us",
109                "SQL query execution latency in microseconds",
110            ),
111            insert_latency_us: Histogram::new(
112                "dbx_insert_latency_us",
113                "INSERT operation latency in microseconds",
114            ),
115        }
116    }
117
118    /// Atomically increment a counter.
119    #[inline]
120    pub fn inc_inserts(&self) {
121        self.inserts_total.fetch_add(1, Ordering::Relaxed);
122    }
123
124    #[inline]
125    pub fn inc_gets(&self) {
126        self.gets_total.fetch_add(1, Ordering::Relaxed);
127    }
128
129    #[inline]
130    pub fn inc_deletes(&self) {
131        self.deletes_total.fetch_add(1, Ordering::Relaxed);
132    }
133
134    #[inline]
135    pub fn inc_sql_queries(&self) {
136        self.sql_queries_total.fetch_add(1, Ordering::Relaxed);
137    }
138
139    #[inline]
140    pub fn inc_flush(&self) {
141        self.flush_total.fetch_add(1, Ordering::Relaxed);
142    }
143
144    #[inline]
145    pub fn inc_delta_hit(&self) {
146        self.delta_hits.fetch_add(1, Ordering::Relaxed);
147    }
148
149    #[inline]
150    pub fn inc_delta_miss(&self) {
151        self.delta_misses.fetch_add(1, Ordering::Relaxed);
152    }
153
154    #[inline]
155    pub fn inc_cache_hit(&self) {
156        self.cache_hits.fetch_add(1, Ordering::Relaxed);
157    }
158
159    #[inline]
160    pub fn inc_cache_miss(&self) {
161        self.cache_misses.fetch_add(1, Ordering::Relaxed);
162    }
163
164    #[inline]
165    pub fn inc_wos_hit(&self) {
166        self.wos_hits.fetch_add(1, Ordering::Relaxed);
167    }
168
169    #[inline]
170    pub fn inc_wos_miss(&self) {
171        self.wos_misses.fetch_add(1, Ordering::Relaxed);
172    }
173
174    #[inline]
175    pub fn inc_scatter_write(&self) {
176        self.scatter_writes_total.fetch_add(1, Ordering::Relaxed);
177    }
178
179    #[inline]
180    pub fn inc_scatter_read(&self) {
181        self.scatter_reads_total.fetch_add(1, Ordering::Relaxed);
182    }
183
184    #[inline]
185    pub fn inc_partition_prune_hit(&self) {
186        self.partition_prune_hits.fetch_add(1, Ordering::Relaxed);
187    }
188
189    #[inline]
190    pub fn inc_wal_append(&self) {
191        self.wal_appends_total.fetch_add(1, Ordering::Relaxed);
192    }
193
194    #[inline]
195    pub fn inc_wal_compaction(&self) {
196        self.wal_compactions_total.fetch_add(1, Ordering::Relaxed);
197    }
198
199    /// Create a non-atomic snapshot for reporting/display.
200    pub fn snapshot(&self) -> MetricsSnapshot {
201        let delta_hits = self.delta_hits.load(Ordering::Relaxed);
202        let delta_misses = self.delta_misses.load(Ordering::Relaxed);
203        let delta_total = delta_hits + delta_misses;
204
205        let cache_hits = self.cache_hits.load(Ordering::Relaxed);
206        let cache_misses = self.cache_misses.load(Ordering::Relaxed);
207        let cache_total = cache_hits + cache_misses;
208
209        let wos_hits = self.wos_hits.load(Ordering::Relaxed);
210        let wos_misses = self.wos_misses.load(Ordering::Relaxed);
211        let wos_total = wos_hits + wos_misses;
212
213        let (q_sum, q_count) = self.query_latency_us.snapshot();
214        let (i_sum, i_count) = self.insert_latency_us.snapshot();
215
216        MetricsSnapshot {
217            inserts_total: self.inserts_total.load(Ordering::Relaxed),
218            gets_total: self.gets_total.load(Ordering::Relaxed),
219            deletes_total: self.deletes_total.load(Ordering::Relaxed),
220            sql_queries_total: self.sql_queries_total.load(Ordering::Relaxed),
221            flush_total: self.flush_total.load(Ordering::Relaxed),
222
223            delta_hits,
224            delta_misses,
225            cache_hits,
226            cache_misses,
227            wos_hits,
228            wos_misses,
229
230            scatter_writes_total: self.scatter_writes_total.load(Ordering::Relaxed),
231            scatter_reads_total: self.scatter_reads_total.load(Ordering::Relaxed),
232
233            partition_prune_hits: self.partition_prune_hits.load(Ordering::Relaxed),
234
235            wal_appends_total: self.wal_appends_total.load(Ordering::Relaxed),
236            wal_compactions_total: self.wal_compactions_total.load(Ordering::Relaxed),
237
238            avg_query_latency_us: if q_count > 0 { q_sum / q_count } else { 0 },
239            avg_insert_latency_us: if i_count > 0 { i_sum / i_count } else { 0 },
240
241            delta_hit_rate: if delta_total > 0 {
242                delta_hits as f64 / delta_total as f64
243            } else {
244                0.0
245            },
246            cache_hit_rate: if cache_total > 0 {
247                cache_hits as f64 / cache_total as f64
248            } else {
249                0.0
250            },
251            wos_hit_rate: if wos_total > 0 {
252                wos_hits as f64 / wos_total as f64
253            } else {
254                0.0
255            },
256        }
257    }
258
259    /// Reset all counters and histograms.
260    pub fn reset(&self) {
261        self.inserts_total.store(0, Ordering::Relaxed);
262        self.gets_total.store(0, Ordering::Relaxed);
263        self.deletes_total.store(0, Ordering::Relaxed);
264        self.sql_queries_total.store(0, Ordering::Relaxed);
265        self.flush_total.store(0, Ordering::Relaxed);
266        self.delta_hits.store(0, Ordering::Relaxed);
267        self.delta_misses.store(0, Ordering::Relaxed);
268        self.cache_hits.store(0, Ordering::Relaxed);
269        self.cache_misses.store(0, Ordering::Relaxed);
270        self.wos_hits.store(0, Ordering::Relaxed);
271        self.wos_misses.store(0, Ordering::Relaxed);
272        self.scatter_writes_total.store(0, Ordering::Relaxed);
273        self.scatter_reads_total.store(0, Ordering::Relaxed);
274        self.partition_prune_hits.store(0, Ordering::Relaxed);
275        self.wal_appends_total.store(0, Ordering::Relaxed);
276        self.wal_compactions_total.store(0, Ordering::Relaxed);
277        self.query_latency_us.reset();
278        self.insert_latency_us.reset();
279    }
280}
281
282impl Default for DbxMetrics {
283    fn default() -> Self {
284        Self::new()
285    }
286}
287
288#[cfg(test)]
289mod tests {
290    use super::*;
291
292    #[test]
293    fn test_inc_counters() {
294        let m = DbxMetrics::new();
295        m.inc_inserts();
296        m.inc_inserts();
297        m.inc_gets();
298        let snap = m.snapshot();
299        assert_eq!(snap.inserts_total, 2);
300        assert_eq!(snap.gets_total, 1);
301    }
302
303    #[test]
304    fn test_hit_rate_calculation() {
305        let m = DbxMetrics::new();
306        m.inc_delta_hit();
307        m.inc_delta_hit();
308        m.inc_delta_miss();
309        let snap = m.snapshot();
310        assert!((snap.delta_hit_rate - 2.0 / 3.0).abs() < 0.001);
311    }
312
313    #[test]
314    fn test_reset() {
315        let m = DbxMetrics::new();
316        m.inc_inserts();
317        m.inc_inserts();
318        m.reset();
319        let snap = m.snapshot();
320        assert_eq!(snap.inserts_total, 0);
321    }
322
323    #[test]
324    fn test_latency_histogram() {
325        let m = DbxMetrics::new();
326        m.query_latency_us.observe(500);
327        m.query_latency_us.observe(1200);
328        let snap = m.snapshot();
329        assert_eq!(snap.avg_query_latency_us, 850); // (500 + 1200) / 2
330    }
331}