1use parking_lot::RwLock;
8use std::sync::Arc;
9use std::sync::atomic::{AtomicU64, Ordering};
10
11const HISTOGRAM_BUCKETS_US: &[u64] = &[100, 500, 1_000, 5_000, 10_000, 50_000, 100_000, 500_000];
13
14pub struct CoreMetrics {
20 pub lsm_get_total: AtomicU64,
22 pub lsm_put_total: AtomicU64,
23 pub lsm_delete_total: AtomicU64,
24 pub lsm_compaction_total: AtomicU64,
25 pub lsm_wal_writes_total: AtomicU64,
26
27 pub cache_hits_total: AtomicU64,
29 pub cache_misses_total: AtomicU64,
30 pub cache_evictions_total: AtomicU64,
31
32 pub buffer_allocations_total: AtomicU64,
34 pub buffer_recycles_total: AtomicU64,
35 pub buffer_pool_misses_total: AtomicU64,
36
37 pub fhe_encrypt_us_total: AtomicU64,
39 pub fhe_decrypt_us_total: AtomicU64,
40 pub fhe_operation_count: AtomicU64,
41
42 pub memtable_size_bytes: AtomicU64,
44 pub sstable_count: AtomicU64,
45 pub compaction_level: AtomicU64,
46
47 get_latencies_us: RwLock<Vec<u64>>,
49 put_latencies_us: RwLock<Vec<u64>>,
50}
51
52impl Default for CoreMetrics {
53 fn default() -> Self {
54 Self {
55 lsm_get_total: AtomicU64::new(0),
56 lsm_put_total: AtomicU64::new(0),
57 lsm_delete_total: AtomicU64::new(0),
58 lsm_compaction_total: AtomicU64::new(0),
59 lsm_wal_writes_total: AtomicU64::new(0),
60 cache_hits_total: AtomicU64::new(0),
61 cache_misses_total: AtomicU64::new(0),
62 cache_evictions_total: AtomicU64::new(0),
63 buffer_allocations_total: AtomicU64::new(0),
64 buffer_recycles_total: AtomicU64::new(0),
65 buffer_pool_misses_total: AtomicU64::new(0),
66 fhe_encrypt_us_total: AtomicU64::new(0),
67 fhe_decrypt_us_total: AtomicU64::new(0),
68 fhe_operation_count: AtomicU64::new(0),
69 memtable_size_bytes: AtomicU64::new(0),
70 sstable_count: AtomicU64::new(0),
71 compaction_level: AtomicU64::new(0),
72 get_latencies_us: RwLock::new(Vec::new()),
73 put_latencies_us: RwLock::new(Vec::new()),
74 }
75 }
76}
77
78impl CoreMetrics {
79 pub fn new() -> Arc<Self> {
81 Arc::new(Self::default())
82 }
83
84 pub fn record_get_latency_us(&self, us: u64) {
86 self.get_latencies_us.write().push(us);
87 }
88
89 pub fn record_put_latency_us(&self, us: u64) {
91 self.put_latencies_us.write().push(us);
92 }
93
94 pub fn cache_hit_rate(&self) -> f64 {
98 let hits = self.cache_hits_total.load(Ordering::Relaxed);
99 let misses = self.cache_misses_total.load(Ordering::Relaxed);
100 let total = hits + misses;
101 if total == 0 {
102 0.0
103 } else {
104 hits as f64 / total as f64
105 }
106 }
107
108 pub fn to_prometheus(&self) -> String {
110 let mut out = String::with_capacity(4096);
111
112 let counters: &[(&str, &str, &AtomicU64)] = &[
116 (
117 "amaters_core_lsm_get_total",
118 "Total LSM-tree GET operations",
119 &self.lsm_get_total,
120 ),
121 (
122 "amaters_core_lsm_put_total",
123 "Total LSM-tree PUT operations",
124 &self.lsm_put_total,
125 ),
126 (
127 "amaters_core_lsm_delete_total",
128 "Total LSM-tree DELETE operations",
129 &self.lsm_delete_total,
130 ),
131 (
132 "amaters_core_lsm_compaction_total",
133 "Total LSM-tree compaction events",
134 &self.lsm_compaction_total,
135 ),
136 (
137 "amaters_core_lsm_wal_writes_total",
138 "Total WAL write operations",
139 &self.lsm_wal_writes_total,
140 ),
141 (
142 "amaters_core_cache_hits_total",
143 "Total block cache hits",
144 &self.cache_hits_total,
145 ),
146 (
147 "amaters_core_cache_misses_total",
148 "Total block cache misses",
149 &self.cache_misses_total,
150 ),
151 (
152 "amaters_core_cache_evictions_total",
153 "Total block cache evictions",
154 &self.cache_evictions_total,
155 ),
156 (
157 "amaters_core_buffer_allocations_total",
158 "Total buffer pool allocations",
159 &self.buffer_allocations_total,
160 ),
161 (
162 "amaters_core_buffer_recycles_total",
163 "Total buffer pool recycles",
164 &self.buffer_recycles_total,
165 ),
166 (
167 "amaters_core_buffer_pool_misses_total",
168 "Total buffer pool misses",
169 &self.buffer_pool_misses_total,
170 ),
171 (
172 "amaters_core_fhe_encrypt_us_total",
173 "Accumulated FHE encryption time in microseconds",
174 &self.fhe_encrypt_us_total,
175 ),
176 (
177 "amaters_core_fhe_decrypt_us_total",
178 "Accumulated FHE decryption time in microseconds",
179 &self.fhe_decrypt_us_total,
180 ),
181 (
182 "amaters_core_fhe_operation_count",
183 "Total FHE operations performed",
184 &self.fhe_operation_count,
185 ),
186 ];
187
188 for (name, help, atomic) in counters {
189 out.push_str(&format!("# HELP {name} {help}\n"));
190 out.push_str(&format!("# TYPE {name} counter\n"));
191 out.push_str(&format!("{name} {}\n", atomic.load(Ordering::Relaxed)));
192 }
193
194 let gauges: &[(&str, &str, &AtomicU64)] = &[
198 (
199 "amaters_core_memtable_size_bytes",
200 "Current memtable size in bytes",
201 &self.memtable_size_bytes,
202 ),
203 (
204 "amaters_core_sstable_count",
205 "Current number of SSTables",
206 &self.sstable_count,
207 ),
208 (
209 "amaters_core_compaction_level",
210 "Current LSM compaction level",
211 &self.compaction_level,
212 ),
213 ];
214
215 for (name, help, atomic) in gauges {
216 out.push_str(&format!("# HELP {name} {help}\n"));
217 out.push_str(&format!("# TYPE {name} gauge\n"));
218 out.push_str(&format!("{name} {}\n", atomic.load(Ordering::Relaxed)));
219 }
220
221 append_histogram(
225 &mut out,
226 "amaters_core_get_latency_us",
227 "GET operation latency histogram in microseconds",
228 &self.get_latencies_us.read(),
229 );
230 append_histogram(
231 &mut out,
232 "amaters_core_put_latency_us",
233 "PUT operation latency histogram in microseconds",
234 &self.put_latencies_us.read(),
235 );
236
237 out
238 }
239}
240
241fn append_histogram(out: &mut String, name: &str, help: &str, samples: &[u64]) {
243 out.push_str(&format!("# HELP {name} {help}\n"));
244 out.push_str(&format!("# TYPE {name} histogram\n"));
245
246 for &bound in HISTOGRAM_BUCKETS_US {
247 let cumulative = samples.iter().filter(|&&v| v <= bound).count() as u64;
248 out.push_str(&format!("{name}_bucket{{le=\"{bound}\"}} {cumulative}\n"));
249 }
250
251 let total_count = samples.len() as u64;
253 out.push_str(&format!("{name}_bucket{{le=\"+Inf\"}} {total_count}\n"));
254
255 let sum: u64 = samples.iter().sum();
256 out.push_str(&format!("{name}_sum {sum}\n"));
257 out.push_str(&format!("{name}_count {total_count}\n"));
258}
259
260#[cfg(test)]
261mod tests {
262 use super::*;
263
264 #[test]
265 fn test_counter_increments() {
266 let m = CoreMetrics::default();
267 m.lsm_get_total.fetch_add(5, Ordering::Relaxed);
268 m.lsm_put_total.fetch_add(3, Ordering::Relaxed);
269 m.lsm_delete_total.fetch_add(1, Ordering::Relaxed);
270 m.lsm_compaction_total.fetch_add(2, Ordering::Relaxed);
271 m.lsm_wal_writes_total.fetch_add(10, Ordering::Relaxed);
272
273 assert_eq!(m.lsm_get_total.load(Ordering::Relaxed), 5);
274 assert_eq!(m.lsm_put_total.load(Ordering::Relaxed), 3);
275 assert_eq!(m.lsm_delete_total.load(Ordering::Relaxed), 1);
276 assert_eq!(m.lsm_compaction_total.load(Ordering::Relaxed), 2);
277 assert_eq!(m.lsm_wal_writes_total.load(Ordering::Relaxed), 10);
278 }
279
280 #[test]
281 fn test_cache_hit_rate() {
282 let m = CoreMetrics::default();
283
284 assert_eq!(m.cache_hit_rate(), 0.0);
286
287 m.cache_hits_total.store(75, Ordering::Relaxed);
288 m.cache_misses_total.store(25, Ordering::Relaxed);
289
290 let rate = m.cache_hit_rate();
291 assert!(
292 (rate - 0.75).abs() < f64::EPSILON,
293 "expected 0.75, got {rate}"
294 );
295 }
296
297 #[test]
298 fn test_latency_histograms() {
299 let m = CoreMetrics::default();
300
301 for us in [
303 50u64, 200, 800, 2_000, 7_000, 20_000, 80_000, 300_000, 600_000,
304 ] {
305 m.record_get_latency_us(us);
306 }
307
308 let prom = m.to_prometheus();
309
310 assert!(prom.contains("amaters_core_get_latency_us_bucket{le=\"100\"}"));
312 assert!(prom.contains("amaters_core_get_latency_us_bucket{le=\"500\"}"));
313 assert!(prom.contains("amaters_core_get_latency_us_bucket{le=\"+Inf\"}"));
314 assert!(prom.contains("amaters_core_get_latency_us_sum"));
315 assert!(prom.contains("amaters_core_get_latency_us_count 9"));
316
317 let line = prom
319 .lines()
320 .find(|l| l.starts_with("amaters_core_get_latency_us_bucket{le=\"100\"}"))
321 .expect("bucket line not found");
322 assert!(
323 line.ends_with(" 1"),
324 "le=100 bucket should be 1, got: {line}"
325 );
326
327 let inf_line = prom
329 .lines()
330 .find(|l| l.starts_with("amaters_core_get_latency_us_bucket{le=\"+Inf\"}"))
331 .expect("+Inf line not found");
332 assert!(
333 inf_line.ends_with(" 9"),
334 "+Inf bucket should be 9, got: {inf_line}"
335 );
336 }
337
338 #[test]
339 fn test_to_prometheus_all_metrics() {
340 let m = CoreMetrics::default();
341
342 m.lsm_get_total.fetch_add(1, Ordering::Relaxed);
344 m.lsm_put_total.fetch_add(1, Ordering::Relaxed);
345 m.lsm_delete_total.fetch_add(1, Ordering::Relaxed);
346 m.lsm_compaction_total.fetch_add(1, Ordering::Relaxed);
347 m.lsm_wal_writes_total.fetch_add(1, Ordering::Relaxed);
348 m.cache_hits_total.fetch_add(1, Ordering::Relaxed);
349 m.cache_misses_total.fetch_add(1, Ordering::Relaxed);
350 m.cache_evictions_total.fetch_add(1, Ordering::Relaxed);
351 m.buffer_allocations_total.fetch_add(1, Ordering::Relaxed);
352 m.buffer_recycles_total.fetch_add(1, Ordering::Relaxed);
353 m.buffer_pool_misses_total.fetch_add(1, Ordering::Relaxed);
354 m.fhe_encrypt_us_total.fetch_add(1_000, Ordering::Relaxed);
355 m.fhe_decrypt_us_total.fetch_add(500, Ordering::Relaxed);
356 m.fhe_operation_count.fetch_add(2, Ordering::Relaxed);
357 m.memtable_size_bytes.store(1024, Ordering::Relaxed);
358 m.sstable_count.store(4, Ordering::Relaxed);
359 m.compaction_level.store(2, Ordering::Relaxed);
360 m.record_get_latency_us(100);
361 m.record_put_latency_us(200);
362
363 let prom = m.to_prometheus();
364
365 let expected_names = [
366 "amaters_core_lsm_get_total",
367 "amaters_core_lsm_put_total",
368 "amaters_core_lsm_delete_total",
369 "amaters_core_lsm_compaction_total",
370 "amaters_core_lsm_wal_writes_total",
371 "amaters_core_cache_hits_total",
372 "amaters_core_cache_misses_total",
373 "amaters_core_cache_evictions_total",
374 "amaters_core_buffer_allocations_total",
375 "amaters_core_buffer_recycles_total",
376 "amaters_core_buffer_pool_misses_total",
377 "amaters_core_fhe_encrypt_us_total",
378 "amaters_core_fhe_decrypt_us_total",
379 "amaters_core_fhe_operation_count",
380 "amaters_core_memtable_size_bytes",
381 "amaters_core_sstable_count",
382 "amaters_core_compaction_level",
383 "amaters_core_get_latency_us_bucket",
384 "amaters_core_put_latency_us_bucket",
385 ];
386
387 for name in &expected_names {
388 assert!(prom.contains(name), "missing metric: {name}");
389 }
390 }
391}