seerdb/
metrics.rs

1use hdrhistogram::Histogram;
2use std::sync::atomic::{AtomicU64, Ordering};
3use std::time::{Duration, Instant};
4
5/// Type alias for latency percentile tuples (p50, p95, p99, p999)
6type LatencyPercentiles = ((u64, u64, u64, u64), (u64, u64, u64, u64), (u64, u64, u64));
7
8/// Database statistics for monitoring and observability
9#[derive(Debug, Clone)]
10pub struct DBStats {
11    // Throughput metrics
12    pub writes_per_sec: f64,
13    pub reads_per_sec: f64,
14    pub deletes_per_sec: f64,
15
16    // Operation counts (lifetime)
17    pub total_puts: u64,
18    pub total_gets: u64,
19    pub total_deletes: u64,
20    pub total_flushes: u64,
21    pub total_compactions: u64,
22
23    // Latency percentiles (microseconds)
24    pub put_latency_p50_us: u64,
25    pub put_latency_p95_us: u64,
26    pub put_latency_p99_us: u64,
27    pub put_latency_p999_us: u64,
28
29    pub get_latency_p50_us: u64,
30    pub get_latency_p95_us: u64,
31    pub get_latency_p99_us: u64,
32    pub get_latency_p999_us: u64,
33
34    pub delete_latency_p50_us: u64,
35    pub delete_latency_p95_us: u64,
36    pub delete_latency_p99_us: u64,
37
38    // Resource usage
39    pub memtable_size_bytes: usize,
40    pub memtable_capacity_bytes: usize,
41    pub memtable_utilization_pct: f64,
42    pub wal_size_bytes: u64,
43    pub total_disk_bytes: u64,
44
45    // Block cache performance
46    pub cache_hits: u64,
47    pub cache_misses: u64,
48    pub cache_hit_rate: f64,
49    pub block_cache_size: usize,     // Current number of cached blocks
50    pub block_cache_capacity: usize, // Maximum blocks in cache
51
52    // LSM tree structure
53    pub sstables_per_level: Vec<usize>,
54    pub level_sizes_bytes: Vec<u64>,
55    pub total_sstables: usize,
56
57    // Write amplification tracking
58    pub logical_bytes_written: u64, // Bytes written by user (put values)
59    pub physical_bytes_written: u64, // Bytes written to disk (WAL, SSTable, vLog, compaction)
60    pub write_amplification: f64,   // physical / logical
61
62    // Uptime
63    pub uptime_seconds: u64,
64}
65
66/// Internal metrics collector with atomic counters
67pub(crate) struct MetricsCollector {
68    // Operation counters
69    pub(crate) total_puts: AtomicU64,
70    pub(crate) total_gets: AtomicU64,
71    pub(crate) total_deletes: AtomicU64,
72    pub(crate) total_flushes: AtomicU64,
73    pub(crate) total_compactions: AtomicU64,
74
75    // Write amplification tracking
76    pub(crate) logical_bytes_written: AtomicU64, // User data bytes
77    pub(crate) physical_bytes_written: AtomicU64, // Disk bytes
78
79    // Latency histograms (require locking)
80    pub(crate) put_latencies: std::sync::Mutex<Histogram<u64>>,
81    pub(crate) get_latencies: std::sync::Mutex<Histogram<u64>>,
82    pub(crate) delete_latencies: std::sync::Mutex<Histogram<u64>>,
83
84    // Start time for uptime calculation
85    pub(crate) start_time: Instant,
86}
87
88impl MetricsCollector {
89    /// Create a new metrics collector
90    pub fn new() -> Self {
91        Self {
92            total_puts: AtomicU64::new(0),
93            total_gets: AtomicU64::new(0),
94            total_deletes: AtomicU64::new(0),
95            total_flushes: AtomicU64::new(0),
96            total_compactions: AtomicU64::new(0),
97
98            logical_bytes_written: AtomicU64::new(0),
99            physical_bytes_written: AtomicU64::new(0),
100
101            // High dynamic range histograms: 1us to 1 minute, 3 sig figs
102            put_latencies: std::sync::Mutex::new(
103                Histogram::new_with_bounds(1, 60_000_000, 3).expect("Invalid histogram config"),
104            ),
105            get_latencies: std::sync::Mutex::new(
106                Histogram::new_with_bounds(1, 60_000_000, 3).expect("Invalid histogram config"),
107            ),
108            delete_latencies: std::sync::Mutex::new(
109                Histogram::new_with_bounds(1, 60_000_000, 3).expect("Invalid histogram config"),
110            ),
111
112            start_time: Instant::now(),
113        }
114    }
115
116    /// Record a put operation
117    #[inline]
118    pub fn record_put(&self, latency: Duration) {
119        self.total_puts.fetch_add(1, Ordering::Relaxed);
120
121        // Record latency in microseconds
122        let latency_us = latency.as_micros() as u64;
123        if let Ok(mut hist) = self.put_latencies.lock() {
124            let _ = hist.record(latency_us); // Ignore overflow (clamps to max)
125        }
126    }
127
128    /// Record a get operation
129    #[inline]
130    pub fn record_get(&self, latency: Duration) {
131        self.total_gets.fetch_add(1, Ordering::Relaxed);
132
133        let latency_us = latency.as_micros() as u64;
134        if let Ok(mut hist) = self.get_latencies.lock() {
135            let _ = hist.record(latency_us);
136        }
137    }
138
139    /// Record a delete operation
140    #[inline]
141    pub fn record_delete(&self, latency: Duration) {
142        self.total_deletes.fetch_add(1, Ordering::Relaxed);
143
144        let latency_us = latency.as_micros() as u64;
145        if let Ok(mut hist) = self.delete_latencies.lock() {
146            let _ = hist.record(latency_us);
147        }
148    }
149
150    /// Record a flush operation
151    #[inline]
152    pub fn record_flush(&self) {
153        self.total_flushes.fetch_add(1, Ordering::Relaxed);
154    }
155
156    /// Record a compaction operation
157    #[inline]
158    #[allow(dead_code)] // Will be used when compaction metrics are added
159    pub fn record_compaction(&self) {
160        self.total_compactions.fetch_add(1, Ordering::Relaxed);
161    }
162
163    /// Record logical bytes written (user data)
164    #[inline]
165    pub fn record_logical_bytes(&self, bytes: u64) {
166        self.logical_bytes_written
167            .fetch_add(bytes, Ordering::Relaxed);
168    }
169
170    /// Record physical bytes written to disk (WAL, `SSTable`, vLog, compaction)
171    #[inline]
172    pub fn record_physical_bytes(&self, bytes: u64) {
173        self.physical_bytes_written
174            .fetch_add(bytes, Ordering::Relaxed);
175    }
176
177    /// Get current operation counts
178    pub fn get_counts(&self) -> (u64, u64, u64, u64, u64) {
179        (
180            self.total_puts.load(Ordering::Relaxed),
181            self.total_gets.load(Ordering::Relaxed),
182            self.total_deletes.load(Ordering::Relaxed),
183            self.total_flushes.load(Ordering::Relaxed),
184            self.total_compactions.load(Ordering::Relaxed),
185        )
186    }
187
188    /// Get uptime in seconds
189    pub fn uptime_seconds(&self) -> u64 {
190        self.start_time.elapsed().as_secs()
191    }
192
193    /// Calculate throughput (ops/sec)
194    pub fn calculate_throughput(&self) -> (f64, f64, f64) {
195        let uptime_secs = self.uptime_seconds() as f64;
196        if uptime_secs < 0.001 {
197            return (0.0, 0.0, 0.0);
198        }
199
200        let puts = self.total_puts.load(Ordering::Relaxed) as f64;
201        let gets = self.total_gets.load(Ordering::Relaxed) as f64;
202        let deletes = self.total_deletes.load(Ordering::Relaxed) as f64;
203
204        (
205            puts / uptime_secs,
206            gets / uptime_secs,
207            deletes / uptime_secs,
208        )
209    }
210
211    /// Get latency percentiles (microseconds)
212    pub fn get_latency_percentiles(&self) -> LatencyPercentiles {
213        let put_stats = {
214            let hist = self.put_latencies.lock().expect("mutex poisoned");
215            (
216                hist.value_at_percentile(50.0),
217                hist.value_at_percentile(95.0),
218                hist.value_at_percentile(99.0),
219                hist.value_at_percentile(99.9),
220            )
221        };
222
223        let get_stats = {
224            let hist = self.get_latencies.lock().expect("mutex poisoned");
225            (
226                hist.value_at_percentile(50.0),
227                hist.value_at_percentile(95.0),
228                hist.value_at_percentile(99.0),
229                hist.value_at_percentile(99.9),
230            )
231        };
232
233        let delete_stats = {
234            let hist = self.delete_latencies.lock().expect("mutex poisoned");
235            (
236                hist.value_at_percentile(50.0),
237                hist.value_at_percentile(95.0),
238                hist.value_at_percentile(99.0),
239            )
240        };
241
242        (put_stats, get_stats, delete_stats)
243    }
244}
245
246impl Default for MetricsCollector {
247    fn default() -> Self {
248        Self::new()
249    }
250}
251
252#[cfg(test)]
253mod tests {
254    use super::*;
255    use std::thread;
256    use std::time::Duration;
257
258    #[test]
259    fn test_metrics_collector_basic() {
260        let collector = MetricsCollector::new();
261
262        // Record some operations
263        collector.record_put(Duration::from_micros(100));
264        collector.record_put(Duration::from_micros(200));
265        collector.record_get(Duration::from_micros(50));
266        collector.record_delete(Duration::from_micros(150));
267
268        let (puts, gets, deletes, flushes, compactions) = collector.get_counts();
269        assert_eq!(puts, 2);
270        assert_eq!(gets, 1);
271        assert_eq!(deletes, 1);
272        assert_eq!(flushes, 0);
273        assert_eq!(compactions, 0);
274    }
275
276    #[test]
277    fn test_metrics_latency_percentiles() {
278        let collector = MetricsCollector::new();
279
280        // Record 100 operations with increasing latencies
281        for i in 1..=100 {
282            collector.record_put(Duration::from_micros(i * 10));
283        }
284
285        let (put_stats, _, _) = collector.get_latency_percentiles();
286        let (p50, p95, p99, _p999) = put_stats;
287
288        // p50 should be around 500us (50th value is 500us)
289        assert!((400..=600).contains(&p50), "p50: {}", p50);
290
291        // p95 should be around 950us
292        assert!((900..=1000).contains(&p95), "p95: {}", p95);
293
294        // p99 should be around 990us
295        assert!((980..=1010).contains(&p99), "p99: {}", p99);
296    }
297
298    #[test]
299    fn test_metrics_throughput() {
300        let collector = MetricsCollector::new();
301
302        // Record operations
303        for _ in 0..100 {
304            collector.record_put(Duration::from_micros(10));
305        }
306
307        // Wait after operations to ensure uptime > 0
308        thread::sleep(Duration::from_secs(1));
309
310        let (writes_per_sec, _, _) = collector.calculate_throughput();
311
312        // Should be less than 200 ops/sec (100 ops in >1s)
313        // We just want to ensure it calculates properly, not test timing precision
314        assert!(
315            writes_per_sec > 0.0 && writes_per_sec <= 200.0,
316            "writes_per_sec: {}",
317            writes_per_sec
318        );
319    }
320
321    #[test]
322    fn test_metrics_concurrent() {
323        let collector = std::sync::Arc::new(MetricsCollector::new());
324        let mut handles = vec![];
325
326        // Spawn 10 threads recording operations
327        for _ in 0..10 {
328            let c = collector.clone();
329            let handle = thread::spawn(move || {
330                for _ in 0..100 {
331                    c.record_put(Duration::from_micros(50));
332                    c.record_get(Duration::from_micros(25));
333                }
334            });
335            handles.push(handle);
336        }
337
338        for handle in handles {
339            handle.join().unwrap();
340        }
341
342        let (puts, gets, _, _, _) = collector.get_counts();
343        assert_eq!(puts, 1000);
344        assert_eq!(gets, 1000);
345    }
346
347    #[test]
348    fn test_metrics_uptime() {
349        let collector = MetricsCollector::new();
350
351        thread::sleep(Duration::from_millis(100));
352
353        let uptime = collector.uptime_seconds();
354        assert!(uptime == 0); // Less than 1 second
355
356        thread::sleep(Duration::from_secs(1));
357
358        let uptime = collector.uptime_seconds();
359        assert!((1..=2).contains(&uptime));
360    }
361}