1use hdrhistogram::Histogram;
2use std::sync::atomic::{AtomicU64, Ordering};
3use std::time::{Duration, Instant};
4
5type LatencyPercentiles = ((u64, u64, u64, u64), (u64, u64, u64, u64), (u64, u64, u64));
7
8#[derive(Debug, Clone)]
10pub struct DBStats {
11 pub writes_per_sec: f64,
13 pub reads_per_sec: f64,
14 pub deletes_per_sec: f64,
15
16 pub total_puts: u64,
18 pub total_gets: u64,
19 pub total_deletes: u64,
20 pub total_flushes: u64,
21 pub total_compactions: u64,
22
23 pub put_latency_p50_us: u64,
25 pub put_latency_p95_us: u64,
26 pub put_latency_p99_us: u64,
27 pub put_latency_p999_us: u64,
28
29 pub get_latency_p50_us: u64,
30 pub get_latency_p95_us: u64,
31 pub get_latency_p99_us: u64,
32 pub get_latency_p999_us: u64,
33
34 pub delete_latency_p50_us: u64,
35 pub delete_latency_p95_us: u64,
36 pub delete_latency_p99_us: u64,
37
38 pub memtable_size_bytes: usize,
40 pub memtable_capacity_bytes: usize,
41 pub memtable_utilization_pct: f64,
42 pub wal_size_bytes: u64,
43 pub total_disk_bytes: u64,
44
45 pub cache_hits: u64,
47 pub cache_misses: u64,
48 pub cache_hit_rate: f64,
49 pub block_cache_size: usize, pub block_cache_capacity: usize, pub sstables_per_level: Vec<usize>,
54 pub level_sizes_bytes: Vec<u64>,
55 pub total_sstables: usize,
56
57 pub logical_bytes_written: u64, pub physical_bytes_written: u64, pub write_amplification: f64, pub uptime_seconds: u64,
64}
65
66pub(crate) struct MetricsCollector {
68 pub(crate) total_puts: AtomicU64,
70 pub(crate) total_gets: AtomicU64,
71 pub(crate) total_deletes: AtomicU64,
72 pub(crate) total_flushes: AtomicU64,
73 pub(crate) total_compactions: AtomicU64,
74
75 pub(crate) logical_bytes_written: AtomicU64, pub(crate) physical_bytes_written: AtomicU64, pub(crate) put_latencies: std::sync::Mutex<Histogram<u64>>,
81 pub(crate) get_latencies: std::sync::Mutex<Histogram<u64>>,
82 pub(crate) delete_latencies: std::sync::Mutex<Histogram<u64>>,
83
84 pub(crate) start_time: Instant,
86}
87
88impl MetricsCollector {
89 pub fn new() -> Self {
91 Self {
92 total_puts: AtomicU64::new(0),
93 total_gets: AtomicU64::new(0),
94 total_deletes: AtomicU64::new(0),
95 total_flushes: AtomicU64::new(0),
96 total_compactions: AtomicU64::new(0),
97
98 logical_bytes_written: AtomicU64::new(0),
99 physical_bytes_written: AtomicU64::new(0),
100
101 put_latencies: std::sync::Mutex::new(
103 Histogram::new_with_bounds(1, 60_000_000, 3).expect("Invalid histogram config"),
104 ),
105 get_latencies: std::sync::Mutex::new(
106 Histogram::new_with_bounds(1, 60_000_000, 3).expect("Invalid histogram config"),
107 ),
108 delete_latencies: std::sync::Mutex::new(
109 Histogram::new_with_bounds(1, 60_000_000, 3).expect("Invalid histogram config"),
110 ),
111
112 start_time: Instant::now(),
113 }
114 }
115
116 #[inline]
118 pub fn record_put(&self, latency: Duration) {
119 self.total_puts.fetch_add(1, Ordering::Relaxed);
120
121 let latency_us = latency.as_micros() as u64;
123 if let Ok(mut hist) = self.put_latencies.lock() {
124 let _ = hist.record(latency_us); }
126 }
127
128 #[inline]
130 pub fn record_get(&self, latency: Duration) {
131 self.total_gets.fetch_add(1, Ordering::Relaxed);
132
133 let latency_us = latency.as_micros() as u64;
134 if let Ok(mut hist) = self.get_latencies.lock() {
135 let _ = hist.record(latency_us);
136 }
137 }
138
139 #[inline]
141 pub fn record_delete(&self, latency: Duration) {
142 self.total_deletes.fetch_add(1, Ordering::Relaxed);
143
144 let latency_us = latency.as_micros() as u64;
145 if let Ok(mut hist) = self.delete_latencies.lock() {
146 let _ = hist.record(latency_us);
147 }
148 }
149
150 #[inline]
152 pub fn record_flush(&self) {
153 self.total_flushes.fetch_add(1, Ordering::Relaxed);
154 }
155
156 #[inline]
158 #[allow(dead_code)] pub fn record_compaction(&self) {
160 self.total_compactions.fetch_add(1, Ordering::Relaxed);
161 }
162
163 #[inline]
165 pub fn record_logical_bytes(&self, bytes: u64) {
166 self.logical_bytes_written
167 .fetch_add(bytes, Ordering::Relaxed);
168 }
169
170 #[inline]
172 pub fn record_physical_bytes(&self, bytes: u64) {
173 self.physical_bytes_written
174 .fetch_add(bytes, Ordering::Relaxed);
175 }
176
177 pub fn get_counts(&self) -> (u64, u64, u64, u64, u64) {
179 (
180 self.total_puts.load(Ordering::Relaxed),
181 self.total_gets.load(Ordering::Relaxed),
182 self.total_deletes.load(Ordering::Relaxed),
183 self.total_flushes.load(Ordering::Relaxed),
184 self.total_compactions.load(Ordering::Relaxed),
185 )
186 }
187
188 pub fn uptime_seconds(&self) -> u64 {
190 self.start_time.elapsed().as_secs()
191 }
192
193 pub fn calculate_throughput(&self) -> (f64, f64, f64) {
195 let uptime_secs = self.uptime_seconds() as f64;
196 if uptime_secs < 0.001 {
197 return (0.0, 0.0, 0.0);
198 }
199
200 let puts = self.total_puts.load(Ordering::Relaxed) as f64;
201 let gets = self.total_gets.load(Ordering::Relaxed) as f64;
202 let deletes = self.total_deletes.load(Ordering::Relaxed) as f64;
203
204 (
205 puts / uptime_secs,
206 gets / uptime_secs,
207 deletes / uptime_secs,
208 )
209 }
210
211 pub fn get_latency_percentiles(&self) -> LatencyPercentiles {
213 let put_stats = {
214 let hist = self.put_latencies.lock().expect("mutex poisoned");
215 (
216 hist.value_at_percentile(50.0),
217 hist.value_at_percentile(95.0),
218 hist.value_at_percentile(99.0),
219 hist.value_at_percentile(99.9),
220 )
221 };
222
223 let get_stats = {
224 let hist = self.get_latencies.lock().expect("mutex poisoned");
225 (
226 hist.value_at_percentile(50.0),
227 hist.value_at_percentile(95.0),
228 hist.value_at_percentile(99.0),
229 hist.value_at_percentile(99.9),
230 )
231 };
232
233 let delete_stats = {
234 let hist = self.delete_latencies.lock().expect("mutex poisoned");
235 (
236 hist.value_at_percentile(50.0),
237 hist.value_at_percentile(95.0),
238 hist.value_at_percentile(99.0),
239 )
240 };
241
242 (put_stats, get_stats, delete_stats)
243 }
244}
245
246impl Default for MetricsCollector {
247 fn default() -> Self {
248 Self::new()
249 }
250}
251
252#[cfg(test)]
253mod tests {
254 use super::*;
255 use std::thread;
256 use std::time::Duration;
257
258 #[test]
259 fn test_metrics_collector_basic() {
260 let collector = MetricsCollector::new();
261
262 collector.record_put(Duration::from_micros(100));
264 collector.record_put(Duration::from_micros(200));
265 collector.record_get(Duration::from_micros(50));
266 collector.record_delete(Duration::from_micros(150));
267
268 let (puts, gets, deletes, flushes, compactions) = collector.get_counts();
269 assert_eq!(puts, 2);
270 assert_eq!(gets, 1);
271 assert_eq!(deletes, 1);
272 assert_eq!(flushes, 0);
273 assert_eq!(compactions, 0);
274 }
275
276 #[test]
277 fn test_metrics_latency_percentiles() {
278 let collector = MetricsCollector::new();
279
280 for i in 1..=100 {
282 collector.record_put(Duration::from_micros(i * 10));
283 }
284
285 let (put_stats, _, _) = collector.get_latency_percentiles();
286 let (p50, p95, p99, _p999) = put_stats;
287
288 assert!((400..=600).contains(&p50), "p50: {}", p50);
290
291 assert!((900..=1000).contains(&p95), "p95: {}", p95);
293
294 assert!((980..=1010).contains(&p99), "p99: {}", p99);
296 }
297
298 #[test]
299 fn test_metrics_throughput() {
300 let collector = MetricsCollector::new();
301
302 for _ in 0..100 {
304 collector.record_put(Duration::from_micros(10));
305 }
306
307 thread::sleep(Duration::from_secs(1));
309
310 let (writes_per_sec, _, _) = collector.calculate_throughput();
311
312 assert!(
315 writes_per_sec > 0.0 && writes_per_sec <= 200.0,
316 "writes_per_sec: {}",
317 writes_per_sec
318 );
319 }
320
321 #[test]
322 fn test_metrics_concurrent() {
323 let collector = std::sync::Arc::new(MetricsCollector::new());
324 let mut handles = vec![];
325
326 for _ in 0..10 {
328 let c = collector.clone();
329 let handle = thread::spawn(move || {
330 for _ in 0..100 {
331 c.record_put(Duration::from_micros(50));
332 c.record_get(Duration::from_micros(25));
333 }
334 });
335 handles.push(handle);
336 }
337
338 for handle in handles {
339 handle.join().unwrap();
340 }
341
342 let (puts, gets, _, _, _) = collector.get_counts();
343 assert_eq!(puts, 1000);
344 assert_eq!(gets, 1000);
345 }
346
347 #[test]
348 fn test_metrics_uptime() {
349 let collector = MetricsCollector::new();
350
351 thread::sleep(Duration::from_millis(100));
352
353 let uptime = collector.uptime_seconds();
354 assert!(uptime == 0); thread::sleep(Duration::from_secs(1));
357
358 let uptime = collector.uptime_seconds();
359 assert!((1..=2).contains(&uptime));
360 }
361}