Skip to main content

amaters_core/
metrics.rs

1//! Metrics facade for the amaters-core storage engine.
2//!
3//! All fields are updated via atomic operations for lock-free access from
4//! multiple threads. Call [`CoreMetrics::to_prometheus`] to get an
5//! OpenMetrics/Prometheus text snapshot.
6
7use parking_lot::RwLock;
8use std::sync::Arc;
9use std::sync::atomic::{AtomicU64, Ordering};
10
11/// Histogram bucket upper bounds in microseconds.
12const HISTOGRAM_BUCKETS_US: &[u64] = &[100, 500, 1_000, 5_000, 10_000, 50_000, 100_000, 500_000];
13
14/// Metrics facade for the amaters-core storage engine.
15///
16/// All fields are updated via atomic operations for lock-free access from
17/// multiple threads. Call `to_prometheus()` to get an OpenMetrics/Prometheus
18/// text snapshot.
19pub struct CoreMetrics {
20    // --- LSM-Tree counters ---
21    pub lsm_get_total: AtomicU64,
22    pub lsm_put_total: AtomicU64,
23    pub lsm_delete_total: AtomicU64,
24    pub lsm_compaction_total: AtomicU64,
25    pub lsm_wal_writes_total: AtomicU64,
26
27    // --- Block cache counters ---
28    pub cache_hits_total: AtomicU64,
29    pub cache_misses_total: AtomicU64,
30    pub cache_evictions_total: AtomicU64,
31
32    // --- Buffer pool counters ---
33    pub buffer_allocations_total: AtomicU64,
34    pub buffer_recycles_total: AtomicU64,
35    pub buffer_pool_misses_total: AtomicU64,
36
37    // --- FHE operation timing (accumulated microseconds) ---
38    pub fhe_encrypt_us_total: AtomicU64,
39    pub fhe_decrypt_us_total: AtomicU64,
40    pub fhe_operation_count: AtomicU64,
41
42    // --- Gauges (current values) ---
43    pub memtable_size_bytes: AtomicU64,
44    pub sstable_count: AtomicU64,
45    pub compaction_level: AtomicU64,
46
47    // --- Latency histograms ---
48    get_latencies_us: RwLock<Vec<u64>>,
49    put_latencies_us: RwLock<Vec<u64>>,
50}
51
52impl Default for CoreMetrics {
53    fn default() -> Self {
54        Self {
55            lsm_get_total: AtomicU64::new(0),
56            lsm_put_total: AtomicU64::new(0),
57            lsm_delete_total: AtomicU64::new(0),
58            lsm_compaction_total: AtomicU64::new(0),
59            lsm_wal_writes_total: AtomicU64::new(0),
60            cache_hits_total: AtomicU64::new(0),
61            cache_misses_total: AtomicU64::new(0),
62            cache_evictions_total: AtomicU64::new(0),
63            buffer_allocations_total: AtomicU64::new(0),
64            buffer_recycles_total: AtomicU64::new(0),
65            buffer_pool_misses_total: AtomicU64::new(0),
66            fhe_encrypt_us_total: AtomicU64::new(0),
67            fhe_decrypt_us_total: AtomicU64::new(0),
68            fhe_operation_count: AtomicU64::new(0),
69            memtable_size_bytes: AtomicU64::new(0),
70            sstable_count: AtomicU64::new(0),
71            compaction_level: AtomicU64::new(0),
72            get_latencies_us: RwLock::new(Vec::new()),
73            put_latencies_us: RwLock::new(Vec::new()),
74        }
75    }
76}
77
78impl CoreMetrics {
79    /// Create a new `CoreMetrics` wrapped in an `Arc`.
80    pub fn new() -> Arc<Self> {
81        Arc::new(Self::default())
82    }
83
84    /// Record a GET operation latency in microseconds.
85    pub fn record_get_latency_us(&self, us: u64) {
86        self.get_latencies_us.write().push(us);
87    }
88
89    /// Record a PUT operation latency in microseconds.
90    pub fn record_put_latency_us(&self, us: u64) {
91        self.put_latencies_us.write().push(us);
92    }
93
94    /// Compute the block cache hit rate as a value in `[0.0, 1.0]`.
95    ///
96    /// Returns `0.0` when no cache events have been recorded.
97    pub fn cache_hit_rate(&self) -> f64 {
98        let hits = self.cache_hits_total.load(Ordering::Relaxed);
99        let misses = self.cache_misses_total.load(Ordering::Relaxed);
100        let total = hits + misses;
101        if total == 0 {
102            0.0
103        } else {
104            hits as f64 / total as f64
105        }
106    }
107
108    /// Render all metrics as an OpenMetrics/Prometheus text exposition.
109    pub fn to_prometheus(&self) -> String {
110        let mut out = String::with_capacity(4096);
111
112        // ------------------------------------------------------------------
113        // Counters
114        // ------------------------------------------------------------------
115        let counters: &[(&str, &str, &AtomicU64)] = &[
116            (
117                "amaters_core_lsm_get_total",
118                "Total LSM-tree GET operations",
119                &self.lsm_get_total,
120            ),
121            (
122                "amaters_core_lsm_put_total",
123                "Total LSM-tree PUT operations",
124                &self.lsm_put_total,
125            ),
126            (
127                "amaters_core_lsm_delete_total",
128                "Total LSM-tree DELETE operations",
129                &self.lsm_delete_total,
130            ),
131            (
132                "amaters_core_lsm_compaction_total",
133                "Total LSM-tree compaction events",
134                &self.lsm_compaction_total,
135            ),
136            (
137                "amaters_core_lsm_wal_writes_total",
138                "Total WAL write operations",
139                &self.lsm_wal_writes_total,
140            ),
141            (
142                "amaters_core_cache_hits_total",
143                "Total block cache hits",
144                &self.cache_hits_total,
145            ),
146            (
147                "amaters_core_cache_misses_total",
148                "Total block cache misses",
149                &self.cache_misses_total,
150            ),
151            (
152                "amaters_core_cache_evictions_total",
153                "Total block cache evictions",
154                &self.cache_evictions_total,
155            ),
156            (
157                "amaters_core_buffer_allocations_total",
158                "Total buffer pool allocations",
159                &self.buffer_allocations_total,
160            ),
161            (
162                "amaters_core_buffer_recycles_total",
163                "Total buffer pool recycles",
164                &self.buffer_recycles_total,
165            ),
166            (
167                "amaters_core_buffer_pool_misses_total",
168                "Total buffer pool misses",
169                &self.buffer_pool_misses_total,
170            ),
171            (
172                "amaters_core_fhe_encrypt_us_total",
173                "Accumulated FHE encryption time in microseconds",
174                &self.fhe_encrypt_us_total,
175            ),
176            (
177                "amaters_core_fhe_decrypt_us_total",
178                "Accumulated FHE decryption time in microseconds",
179                &self.fhe_decrypt_us_total,
180            ),
181            (
182                "amaters_core_fhe_operation_count",
183                "Total FHE operations performed",
184                &self.fhe_operation_count,
185            ),
186        ];
187
188        for (name, help, atomic) in counters {
189            out.push_str(&format!("# HELP {name} {help}\n"));
190            out.push_str(&format!("# TYPE {name} counter\n"));
191            out.push_str(&format!("{name} {}\n", atomic.load(Ordering::Relaxed)));
192        }
193
194        // ------------------------------------------------------------------
195        // Gauges
196        // ------------------------------------------------------------------
197        let gauges: &[(&str, &str, &AtomicU64)] = &[
198            (
199                "amaters_core_memtable_size_bytes",
200                "Current memtable size in bytes",
201                &self.memtable_size_bytes,
202            ),
203            (
204                "amaters_core_sstable_count",
205                "Current number of SSTables",
206                &self.sstable_count,
207            ),
208            (
209                "amaters_core_compaction_level",
210                "Current LSM compaction level",
211                &self.compaction_level,
212            ),
213        ];
214
215        for (name, help, atomic) in gauges {
216            out.push_str(&format!("# HELP {name} {help}\n"));
217            out.push_str(&format!("# TYPE {name} gauge\n"));
218            out.push_str(&format!("{name} {}\n", atomic.load(Ordering::Relaxed)));
219        }
220
221        // ------------------------------------------------------------------
222        // Histograms
223        // ------------------------------------------------------------------
224        append_histogram(
225            &mut out,
226            "amaters_core_get_latency_us",
227            "GET operation latency histogram in microseconds",
228            &self.get_latencies_us.read(),
229        );
230        append_histogram(
231            &mut out,
232            "amaters_core_put_latency_us",
233            "PUT operation latency histogram in microseconds",
234            &self.put_latencies_us.read(),
235        );
236
237        out
238    }
239}
240
241/// Append a single histogram in Prometheus text format to `out`.
242fn append_histogram(out: &mut String, name: &str, help: &str, samples: &[u64]) {
243    out.push_str(&format!("# HELP {name} {help}\n"));
244    out.push_str(&format!("# TYPE {name} histogram\n"));
245
246    for &bound in HISTOGRAM_BUCKETS_US {
247        let cumulative = samples.iter().filter(|&&v| v <= bound).count() as u64;
248        out.push_str(&format!("{name}_bucket{{le=\"{bound}\"}} {cumulative}\n"));
249    }
250
251    // +Inf bucket — all observations
252    let total_count = samples.len() as u64;
253    out.push_str(&format!("{name}_bucket{{le=\"+Inf\"}} {total_count}\n"));
254
255    let sum: u64 = samples.iter().sum();
256    out.push_str(&format!("{name}_sum {sum}\n"));
257    out.push_str(&format!("{name}_count {total_count}\n"));
258}
259
260#[cfg(test)]
261mod tests {
262    use super::*;
263
264    #[test]
265    fn test_counter_increments() {
266        let m = CoreMetrics::default();
267        m.lsm_get_total.fetch_add(5, Ordering::Relaxed);
268        m.lsm_put_total.fetch_add(3, Ordering::Relaxed);
269        m.lsm_delete_total.fetch_add(1, Ordering::Relaxed);
270        m.lsm_compaction_total.fetch_add(2, Ordering::Relaxed);
271        m.lsm_wal_writes_total.fetch_add(10, Ordering::Relaxed);
272
273        assert_eq!(m.lsm_get_total.load(Ordering::Relaxed), 5);
274        assert_eq!(m.lsm_put_total.load(Ordering::Relaxed), 3);
275        assert_eq!(m.lsm_delete_total.load(Ordering::Relaxed), 1);
276        assert_eq!(m.lsm_compaction_total.load(Ordering::Relaxed), 2);
277        assert_eq!(m.lsm_wal_writes_total.load(Ordering::Relaxed), 10);
278    }
279
280    #[test]
281    fn test_cache_hit_rate() {
282        let m = CoreMetrics::default();
283
284        // No events yet — should return 0.0
285        assert_eq!(m.cache_hit_rate(), 0.0);
286
287        m.cache_hits_total.store(75, Ordering::Relaxed);
288        m.cache_misses_total.store(25, Ordering::Relaxed);
289
290        let rate = m.cache_hit_rate();
291        assert!(
292            (rate - 0.75).abs() < f64::EPSILON,
293            "expected 0.75, got {rate}"
294        );
295    }
296
297    #[test]
298    fn test_latency_histograms() {
299        let m = CoreMetrics::default();
300
301        // Record various GET latencies across different buckets
302        for us in [
303            50u64, 200, 800, 2_000, 7_000, 20_000, 80_000, 300_000, 600_000,
304        ] {
305            m.record_get_latency_us(us);
306        }
307
308        let prom = m.to_prometheus();
309
310        // Verify bucket lines are present
311        assert!(prom.contains("amaters_core_get_latency_us_bucket{le=\"100\"}"));
312        assert!(prom.contains("amaters_core_get_latency_us_bucket{le=\"500\"}"));
313        assert!(prom.contains("amaters_core_get_latency_us_bucket{le=\"+Inf\"}"));
314        assert!(prom.contains("amaters_core_get_latency_us_sum"));
315        assert!(prom.contains("amaters_core_get_latency_us_count 9"));
316
317        // le=100 → only 50 qualifies → count=1
318        let line = prom
319            .lines()
320            .find(|l| l.starts_with("amaters_core_get_latency_us_bucket{le=\"100\"}"))
321            .expect("bucket line not found");
322        assert!(
323            line.ends_with(" 1"),
324            "le=100 bucket should be 1, got: {line}"
325        );
326
327        // +Inf → all 9 samples
328        let inf_line = prom
329            .lines()
330            .find(|l| l.starts_with("amaters_core_get_latency_us_bucket{le=\"+Inf\"}"))
331            .expect("+Inf line not found");
332        assert!(
333            inf_line.ends_with(" 9"),
334            "+Inf bucket should be 9, got: {inf_line}"
335        );
336    }
337
338    #[test]
339    fn test_to_prometheus_all_metrics() {
340        let m = CoreMetrics::default();
341
342        // Increment a selection of every category
343        m.lsm_get_total.fetch_add(1, Ordering::Relaxed);
344        m.lsm_put_total.fetch_add(1, Ordering::Relaxed);
345        m.lsm_delete_total.fetch_add(1, Ordering::Relaxed);
346        m.lsm_compaction_total.fetch_add(1, Ordering::Relaxed);
347        m.lsm_wal_writes_total.fetch_add(1, Ordering::Relaxed);
348        m.cache_hits_total.fetch_add(1, Ordering::Relaxed);
349        m.cache_misses_total.fetch_add(1, Ordering::Relaxed);
350        m.cache_evictions_total.fetch_add(1, Ordering::Relaxed);
351        m.buffer_allocations_total.fetch_add(1, Ordering::Relaxed);
352        m.buffer_recycles_total.fetch_add(1, Ordering::Relaxed);
353        m.buffer_pool_misses_total.fetch_add(1, Ordering::Relaxed);
354        m.fhe_encrypt_us_total.fetch_add(1_000, Ordering::Relaxed);
355        m.fhe_decrypt_us_total.fetch_add(500, Ordering::Relaxed);
356        m.fhe_operation_count.fetch_add(2, Ordering::Relaxed);
357        m.memtable_size_bytes.store(1024, Ordering::Relaxed);
358        m.sstable_count.store(4, Ordering::Relaxed);
359        m.compaction_level.store(2, Ordering::Relaxed);
360        m.record_get_latency_us(100);
361        m.record_put_latency_us(200);
362
363        let prom = m.to_prometheus();
364
365        let expected_names = [
366            "amaters_core_lsm_get_total",
367            "amaters_core_lsm_put_total",
368            "amaters_core_lsm_delete_total",
369            "amaters_core_lsm_compaction_total",
370            "amaters_core_lsm_wal_writes_total",
371            "amaters_core_cache_hits_total",
372            "amaters_core_cache_misses_total",
373            "amaters_core_cache_evictions_total",
374            "amaters_core_buffer_allocations_total",
375            "amaters_core_buffer_recycles_total",
376            "amaters_core_buffer_pool_misses_total",
377            "amaters_core_fhe_encrypt_us_total",
378            "amaters_core_fhe_decrypt_us_total",
379            "amaters_core_fhe_operation_count",
380            "amaters_core_memtable_size_bytes",
381            "amaters_core_sstable_count",
382            "amaters_core_compaction_level",
383            "amaters_core_get_latency_us_bucket",
384            "amaters_core_put_latency_us_bucket",
385        ];
386
387        for name in &expected_names {
388            assert!(prom.contains(name), "missing metric: {name}");
389        }
390    }
391}