1use std::fmt;
7use std::sync::Arc;
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::time::{Duration, SystemTime, UNIX_EPOCH};
10
11use parking_lot::Mutex;
12
13pub const DEFAULT_BUCKETS: [f64; 12] = [
19 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
20];
21
22#[derive(Debug, Clone)]
24pub struct HistogramSnapshot {
25 pub buckets: Vec<f64>,
27 pub counts: Vec<u64>,
29 pub total_count: u64,
31 pub sum: f64,
33}
34
35impl HistogramSnapshot {
36 pub fn percentile(&self, p: f64) -> Option<f64> {
41 if self.total_count == 0 || !(0.0..=1.0).contains(&p) {
42 return None;
43 }
44
45 let target = p * self.total_count as f64;
46
47 let mut prev_count: u64 = 0;
48 let mut prev_bound: f64 = 0.0;
49
50 for (i, &upper) in self.buckets.iter().enumerate() {
51 let cumulative = self.counts[i];
52 if (cumulative as f64) >= target {
53 let bucket_count = cumulative - prev_count;
55 if bucket_count == 0 {
56 return Some(upper);
57 }
58 let fraction = (target - prev_count as f64) / bucket_count as f64;
59 let value = prev_bound + fraction * (upper - prev_bound);
60 return Some(value);
61 }
62 prev_count = cumulative;
63 prev_bound = upper;
64 }
65
66 self.buckets.last().copied()
69 }
70
71 pub fn p50(&self) -> Option<f64> {
73 self.percentile(0.50)
74 }
75
76 pub fn p95(&self) -> Option<f64> {
78 self.percentile(0.95)
79 }
80
81 pub fn p99(&self) -> Option<f64> {
83 self.percentile(0.99)
84 }
85}
86
87#[derive(Clone)]
91pub struct Histogram {
92 inner: Arc<Mutex<HistogramInner>>,
93}
94
95struct HistogramInner {
96 buckets: Vec<f64>,
97 counts: Vec<u64>,
98 total_count: u64,
99 sum: f64,
100}
101
102impl Histogram {
103 pub fn new() -> Self {
105 Self::with_buckets(&DEFAULT_BUCKETS)
106 }
107
108 pub fn with_buckets(bounds: &[f64]) -> Self {
112 let mut sorted = bounds.to_vec();
113 sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
114 let len = sorted.len();
115 Self {
116 inner: Arc::new(Mutex::new(HistogramInner {
117 buckets: sorted,
118 counts: vec![0; len],
119 total_count: 0,
120 sum: 0.0,
121 })),
122 }
123 }
124
125 pub fn observe(&self, value: f64) {
127 let mut inner = self.inner.lock();
128 inner.total_count += 1;
129 inner.sum += value;
130 let len = inner.buckets.len();
132 for i in 0..len {
133 if value <= inner.buckets[i] {
134 inner.counts[i] += 1;
135 }
136 }
137 }
138
139 pub fn observe_duration(&self, d: Duration) {
141 self.observe(d.as_secs_f64());
142 }
143
144 pub fn snapshot(&self) -> HistogramSnapshot {
146 let inner = self.inner.lock();
147 HistogramSnapshot {
148 buckets: inner.buckets.clone(),
149 counts: inner.counts.clone(),
150 total_count: inner.total_count,
151 sum: inner.sum,
152 }
153 }
154
155 #[cfg(test)]
157 fn reset(&self) {
158 let mut inner = self.inner.lock();
159 for c in inner.counts.iter_mut() {
160 *c = 0;
161 }
162 inner.total_count = 0;
163 inner.sum = 0.0;
164 }
165}
166
167impl Default for Histogram {
168 fn default() -> Self {
169 Self::new()
170 }
171}
172
173#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
179pub enum OperationType {
180 Get,
181 Put,
182 Delete,
183 Range,
184 Batch,
185 Stream,
186}
187
188impl OperationType {
189 pub const ALL: [OperationType; 6] = [
191 OperationType::Get,
192 OperationType::Put,
193 OperationType::Delete,
194 OperationType::Range,
195 OperationType::Batch,
196 OperationType::Stream,
197 ];
198
199 pub fn as_label(&self) -> &'static str {
201 match self {
202 OperationType::Get => "get",
203 OperationType::Put => "put",
204 OperationType::Delete => "delete",
205 OperationType::Range => "range",
206 OperationType::Batch => "batch",
207 OperationType::Stream => "stream",
208 }
209 }
210}
211
212impl fmt::Display for OperationType {
213 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
214 f.write_str(self.as_label())
215 }
216}
217
218#[derive(Clone)]
224struct OperationMetrics {
225 count: Arc<AtomicU64>,
226 errors: Arc<AtomicU64>,
227 latency: Histogram,
228}
229
230impl OperationMetrics {
231 fn new() -> Self {
232 Self {
233 count: Arc::new(AtomicU64::new(0)),
234 errors: Arc::new(AtomicU64::new(0)),
235 latency: Histogram::new(),
236 }
237 }
238}
239
240#[derive(Debug, Clone)]
242pub struct OperationSnapshot {
243 pub op_type: OperationType,
244 pub count: u64,
245 pub errors: u64,
246 pub latency: HistogramSnapshot,
247}
248
249#[derive(Clone)]
255struct AtomicGauge(Arc<AtomicU64>);
256
257impl AtomicGauge {
258 fn new() -> Self {
259 Self(Arc::new(AtomicU64::new(0)))
260 }
261
262 fn inc(&self, v: u64) {
263 self.0.fetch_add(v, Ordering::Relaxed);
264 }
265
266 fn dec(&self, v: u64) {
267 self.0.fetch_sub(v, Ordering::Relaxed);
268 }
269
270 fn set(&self, v: u64) {
271 self.0.store(v, Ordering::Relaxed);
272 }
273
274 fn get(&self) -> u64 {
275 self.0.load(Ordering::Relaxed)
276 }
277}
278
279#[derive(Debug, Clone, Default)]
281pub struct StorageSnapshot {
282 pub memtable_size_bytes: u64,
283 pub sstable_count: u64,
284 pub compaction_count: u64,
285 pub compaction_bytes_written: u64,
286 pub wal_size_bytes: u64,
287 pub block_cache_hits: u64,
288 pub block_cache_misses: u64,
289}
290
291#[derive(Clone)]
299pub struct MetricsCollector {
300 requests_total: Arc<AtomicU64>,
302 requests_success: Arc<AtomicU64>,
303 requests_failed: Arc<AtomicU64>,
304 bytes_read: Arc<AtomicU64>,
305 bytes_written: Arc<AtomicU64>,
306 active_connections: Arc<AtomicU64>,
307 queries_total: Arc<AtomicU64>,
308 query_time_us: Arc<AtomicU64>,
309
310 request_latency: Histogram,
312
313 op_get: OperationMetrics,
315 op_put: OperationMetrics,
316 op_delete: OperationMetrics,
317 op_range: OperationMetrics,
318 op_batch: OperationMetrics,
319 op_stream: OperationMetrics,
320
321 memtable_size_bytes: AtomicGauge,
323 sstable_count: AtomicGauge,
324 compaction_count: AtomicGauge,
325 compaction_bytes_written: AtomicGauge,
326 wal_size_bytes: AtomicGauge,
327 block_cache_hits: AtomicGauge,
328 block_cache_misses: AtomicGauge,
329}
330
331impl MetricsCollector {
332 pub fn new() -> Self {
334 Self {
335 requests_total: Arc::new(AtomicU64::new(0)),
336 requests_success: Arc::new(AtomicU64::new(0)),
337 requests_failed: Arc::new(AtomicU64::new(0)),
338 bytes_read: Arc::new(AtomicU64::new(0)),
339 bytes_written: Arc::new(AtomicU64::new(0)),
340 active_connections: Arc::new(AtomicU64::new(0)),
341 queries_total: Arc::new(AtomicU64::new(0)),
342 query_time_us: Arc::new(AtomicU64::new(0)),
343 request_latency: Histogram::new(),
344 op_get: OperationMetrics::new(),
345 op_put: OperationMetrics::new(),
346 op_delete: OperationMetrics::new(),
347 op_range: OperationMetrics::new(),
348 op_batch: OperationMetrics::new(),
349 op_stream: OperationMetrics::new(),
350 memtable_size_bytes: AtomicGauge::new(),
351 sstable_count: AtomicGauge::new(),
352 compaction_count: AtomicGauge::new(),
353 compaction_bytes_written: AtomicGauge::new(),
354 wal_size_bytes: AtomicGauge::new(),
355 block_cache_hits: AtomicGauge::new(),
356 block_cache_misses: AtomicGauge::new(),
357 }
358 }
359
360 pub fn inc_requests(&self) {
364 self.requests_total.fetch_add(1, Ordering::Relaxed);
365 }
366
367 pub fn inc_success(&self) {
369 self.requests_success.fetch_add(1, Ordering::Relaxed);
370 }
371
372 pub fn inc_failed(&self) {
374 self.requests_failed.fetch_add(1, Ordering::Relaxed);
375 }
376
377 pub fn add_bytes_read(&self, bytes: u64) {
379 self.bytes_read.fetch_add(bytes, Ordering::Relaxed);
380 }
381
382 pub fn add_bytes_written(&self, bytes: u64) {
384 self.bytes_written.fetch_add(bytes, Ordering::Relaxed);
385 }
386
387 pub fn inc_connections(&self) {
389 self.active_connections.fetch_add(1, Ordering::Relaxed);
390 }
391
392 pub fn dec_connections(&self) {
394 self.active_connections.fetch_sub(1, Ordering::Relaxed);
395 }
396
397 pub fn inc_queries(&self) {
399 self.queries_total.fetch_add(1, Ordering::Relaxed);
400 }
401
402 pub fn add_query_time(&self, duration_us: u64) {
404 self.query_time_us.fetch_add(duration_us, Ordering::Relaxed);
405 }
406
407 pub fn observe_request_latency(&self, d: Duration) {
411 self.request_latency.observe_duration(d);
412 }
413
414 pub fn request_latency(&self) -> &Histogram {
416 &self.request_latency
417 }
418
419 fn op_metrics(&self, op: OperationType) -> &OperationMetrics {
422 match op {
423 OperationType::Get => &self.op_get,
424 OperationType::Put => &self.op_put,
425 OperationType::Delete => &self.op_delete,
426 OperationType::Range => &self.op_range,
427 OperationType::Batch => &self.op_batch,
428 OperationType::Stream => &self.op_stream,
429 }
430 }
431
432 pub fn record_operation(&self, op_type: OperationType, duration: Duration, success: bool) {
434 let m = self.op_metrics(op_type);
435 m.count.fetch_add(1, Ordering::Relaxed);
436 if !success {
437 m.errors.fetch_add(1, Ordering::Relaxed);
438 }
439 m.latency.observe_duration(duration);
440 }
441
442 pub fn operation_snapshot(&self, op_type: OperationType) -> OperationSnapshot {
444 let m = self.op_metrics(op_type);
445 OperationSnapshot {
446 op_type,
447 count: m.count.load(Ordering::Relaxed),
448 errors: m.errors.load(Ordering::Relaxed),
449 latency: m.latency.snapshot(),
450 }
451 }
452
453 pub fn set_memtable_size(&self, bytes: u64) {
457 self.memtable_size_bytes.set(bytes);
458 }
459
460 pub fn set_sstable_count(&self, count: u64) {
462 self.sstable_count.set(count);
463 }
464
465 pub fn inc_compaction_count(&self) {
467 self.compaction_count.inc(1);
468 }
469
470 pub fn add_compaction_bytes(&self, bytes: u64) {
472 self.compaction_bytes_written.inc(bytes);
473 }
474
475 pub fn set_wal_size(&self, bytes: u64) {
477 self.wal_size_bytes.set(bytes);
478 }
479
480 pub fn inc_block_cache_hit(&self) {
482 self.block_cache_hits.inc(1);
483 }
484
485 pub fn inc_block_cache_miss(&self) {
487 self.block_cache_misses.inc(1);
488 }
489
490 pub fn inc_memtable_size(&self, bytes: u64) {
492 self.memtable_size_bytes.inc(bytes);
493 }
494
495 pub fn dec_memtable_size(&self, bytes: u64) {
497 self.memtable_size_bytes.dec(bytes);
498 }
499
500 pub fn inc_sstable_count(&self) {
502 self.sstable_count.inc(1);
503 }
504
505 pub fn dec_sstable_count(&self) {
507 self.sstable_count.dec(1);
508 }
509
510 pub fn storage_snapshot(&self) -> StorageSnapshot {
512 StorageSnapshot {
513 memtable_size_bytes: self.memtable_size_bytes.get(),
514 sstable_count: self.sstable_count.get(),
515 compaction_count: self.compaction_count.get(),
516 compaction_bytes_written: self.compaction_bytes_written.get(),
517 wal_size_bytes: self.wal_size_bytes.get(),
518 block_cache_hits: self.block_cache_hits.get(),
519 block_cache_misses: self.block_cache_misses.get(),
520 }
521 }
522
523 pub fn snapshot(&self) -> MetricsSnapshot {
527 MetricsSnapshot {
528 requests_total: self.requests_total.load(Ordering::Relaxed),
529 requests_success: self.requests_success.load(Ordering::Relaxed),
530 requests_failed: self.requests_failed.load(Ordering::Relaxed),
531 bytes_read: self.bytes_read.load(Ordering::Relaxed),
532 bytes_written: self.bytes_written.load(Ordering::Relaxed),
533 active_connections: self.active_connections.load(Ordering::Relaxed),
534 queries_total: self.queries_total.load(Ordering::Relaxed),
535 query_time_us: self.query_time_us.load(Ordering::Relaxed),
536 timestamp: SystemTime::now()
537 .duration_since(UNIX_EPOCH)
538 .map(|d| d.as_secs())
539 .unwrap_or(0),
540 request_latency: self.request_latency.snapshot(),
541 operations: OperationType::ALL
542 .iter()
543 .map(|&op| self.operation_snapshot(op))
544 .collect(),
545 storage: self.storage_snapshot(),
546 }
547 }
548
549 pub fn to_prometheus(&self) -> String {
553 let snapshot = self.snapshot();
554 let mut out = String::with_capacity(4096);
555
556 write_counter(
558 &mut out,
559 "amaters_requests_total",
560 "Total number of requests",
561 snapshot.requests_total,
562 );
563 write_counter(
564 &mut out,
565 "amaters_requests_success",
566 "Total number of successful requests",
567 snapshot.requests_success,
568 );
569 write_counter(
570 &mut out,
571 "amaters_requests_failed",
572 "Total number of failed requests",
573 snapshot.requests_failed,
574 );
575 write_counter(
576 &mut out,
577 "amaters_bytes_read",
578 "Total bytes read",
579 snapshot.bytes_read,
580 );
581 write_counter(
582 &mut out,
583 "amaters_bytes_written",
584 "Total bytes written",
585 snapshot.bytes_written,
586 );
587 write_gauge(
588 &mut out,
589 "amaters_active_connections",
590 "Current active connections",
591 snapshot.active_connections,
592 );
593 write_counter(
594 &mut out,
595 "amaters_queries_total",
596 "Total queries executed",
597 snapshot.queries_total,
598 );
599 write_counter(
600 &mut out,
601 "amaters_query_time_us_total",
602 "Total query execution time in microseconds",
603 snapshot.query_time_us,
604 );
605
606 write_histogram(
608 &mut out,
609 "amaters_request_latency_seconds",
610 "Request latency in seconds",
611 &snapshot.request_latency,
612 );
613
614 for op_snap in &snapshot.operations {
616 let label = op_snap.op_type.as_label();
617 let prefix = format!("amaters_op_{label}");
618 write_counter_with_label(
619 &mut out,
620 "amaters_op_count",
621 "Operation count",
622 &format!("op=\"{label}\""),
623 op_snap.count,
624 );
625 write_counter_with_label(
626 &mut out,
627 "amaters_op_errors",
628 "Operation errors",
629 &format!("op=\"{label}\""),
630 op_snap.errors,
631 );
632 write_histogram(
633 &mut out,
634 &format!("{prefix}_latency_seconds"),
635 &format!("Latency for {label} operations in seconds"),
636 &op_snap.latency,
637 );
638 }
639
640 let s = &snapshot.storage;
642 write_gauge(
643 &mut out,
644 "amaters_memtable_size_bytes",
645 "Current memtable size in bytes",
646 s.memtable_size_bytes,
647 );
648 write_gauge(
649 &mut out,
650 "amaters_sstable_count",
651 "Current SSTable count",
652 s.sstable_count,
653 );
654 write_counter(
655 &mut out,
656 "amaters_compaction_count",
657 "Total compaction operations",
658 s.compaction_count,
659 );
660 write_counter(
661 &mut out,
662 "amaters_compaction_bytes_written",
663 "Total bytes written during compaction",
664 s.compaction_bytes_written,
665 );
666 write_gauge(
667 &mut out,
668 "amaters_wal_size_bytes",
669 "Current WAL size in bytes",
670 s.wal_size_bytes,
671 );
672 write_counter(
673 &mut out,
674 "amaters_block_cache_hits",
675 "Block cache hits",
676 s.block_cache_hits,
677 );
678 write_counter(
679 &mut out,
680 "amaters_block_cache_misses",
681 "Block cache misses",
682 s.block_cache_misses,
683 );
684
685 out
686 }
687
688 #[cfg(test)]
690 pub fn reset(&self) {
691 self.requests_total.store(0, Ordering::Relaxed);
692 self.requests_success.store(0, Ordering::Relaxed);
693 self.requests_failed.store(0, Ordering::Relaxed);
694 self.bytes_read.store(0, Ordering::Relaxed);
695 self.bytes_written.store(0, Ordering::Relaxed);
696 self.active_connections.store(0, Ordering::Relaxed);
697 self.queries_total.store(0, Ordering::Relaxed);
698 self.query_time_us.store(0, Ordering::Relaxed);
699 self.request_latency.reset();
700 for &op in &OperationType::ALL {
701 let m = self.op_metrics(op);
702 m.count.store(0, Ordering::Relaxed);
703 m.errors.store(0, Ordering::Relaxed);
704 m.latency.reset();
705 }
706 self.memtable_size_bytes.set(0);
707 self.sstable_count.set(0);
708 self.compaction_count.set(0);
709 self.compaction_bytes_written.set(0);
710 self.wal_size_bytes.set(0);
711 self.block_cache_hits.set(0);
712 self.block_cache_misses.set(0);
713 }
714}
715
716impl Default for MetricsCollector {
717 fn default() -> Self {
718 Self::new()
719 }
720}
721
722fn write_counter(out: &mut String, name: &str, help: &str, value: u64) {
727 use std::fmt::Write;
728 let _ = writeln!(out, "# HELP {name} {help}");
729 let _ = writeln!(out, "# TYPE {name} counter");
730 let _ = writeln!(out, "{name} {value}");
731 let _ = writeln!(out);
732}
733
734fn write_counter_with_label(out: &mut String, name: &str, help: &str, label: &str, value: u64) {
735 use std::fmt::Write;
736 let _ = writeln!(out, "# HELP {name} {help}");
737 let _ = writeln!(out, "# TYPE {name} counter");
738 let _ = writeln!(out, "{name}{{{label}}} {value}");
739 let _ = writeln!(out);
740}
741
742fn write_gauge(out: &mut String, name: &str, help: &str, value: u64) {
743 use std::fmt::Write;
744 let _ = writeln!(out, "# HELP {name} {help}");
745 let _ = writeln!(out, "# TYPE {name} gauge");
746 let _ = writeln!(out, "{name} {value}");
747 let _ = writeln!(out);
748}
749
750fn write_histogram(out: &mut String, name: &str, help: &str, snap: &HistogramSnapshot) {
751 use std::fmt::Write;
752 let _ = writeln!(out, "# HELP {name} {help}");
753 let _ = writeln!(out, "# TYPE {name} histogram");
754 for (i, &bound) in snap.buckets.iter().enumerate() {
755 let le = format_f64(bound);
756 let _ = writeln!(out, "{name}_bucket{{le=\"{le}\"}} {}", snap.counts[i]);
757 }
758 let _ = writeln!(out, "{name}_bucket{{le=\"+Inf\"}} {}", snap.total_count);
759 let _ = writeln!(out, "{name}_sum {}", format_f64(snap.sum));
760 let _ = writeln!(out, "{name}_count {}", snap.total_count);
761 let _ = writeln!(out);
762}
763
764fn format_f64(v: f64) -> String {
766 if v == f64::INFINITY {
767 "+Inf".to_string()
768 } else if v == f64::NEG_INFINITY {
769 "-Inf".to_string()
770 } else if v.is_nan() {
771 "NaN".to_string()
772 } else {
773 let s = format!("{v:.6}");
775 let s = s.trim_end_matches('0');
776 if s.ends_with('.') {
778 format!("{s}0")
779 } else {
780 s.to_string()
781 }
782 }
783}
784
785#[derive(Debug, Clone)]
791pub struct MetricsSnapshot {
792 pub requests_total: u64,
793 pub requests_success: u64,
794 pub requests_failed: u64,
795 pub bytes_read: u64,
796 pub bytes_written: u64,
797 pub active_connections: u64,
798 pub queries_total: u64,
799 pub query_time_us: u64,
800 pub timestamp: u64,
801 pub request_latency: HistogramSnapshot,
803 pub operations: Vec<OperationSnapshot>,
805 pub storage: StorageSnapshot,
807}
808
809impl MetricsSnapshot {
810 pub fn avg_query_time_us(&self) -> f64 {
812 if self.queries_total == 0 {
813 0.0
814 } else {
815 self.query_time_us as f64 / self.queries_total as f64
816 }
817 }
818
819 pub fn success_rate(&self) -> f64 {
821 if self.requests_total == 0 {
822 0.0
823 } else {
824 self.requests_success as f64 / self.requests_total as f64
825 }
826 }
827
828 pub fn format_human(&self) -> String {
830 format!(
831 "Metrics:\n\
832 Requests: {} total, {} success, {} failed (success rate: {:.2}%)\n\
833 Data: {} bytes read, {} bytes written\n\
834 Connections: {} active\n\
835 Queries: {} total, avg time: {:.2} \u{03bc}s\n\
836 Timestamp: {}",
837 self.requests_total,
838 self.requests_success,
839 self.requests_failed,
840 self.success_rate() * 100.0,
841 self.bytes_read,
842 self.bytes_written,
843 self.active_connections,
844 self.queries_total,
845 self.avg_query_time_us(),
846 self.timestamp,
847 )
848 }
849}
850
851#[cfg(test)]
856mod tests {
857 use super::*;
858 use std::thread;
859
860 #[test]
863 fn test_histogram_bucket_counting() {
864 let h = Histogram::with_buckets(&[1.0, 5.0, 10.0]);
865
866 h.observe(0.5); h.observe(3.0); h.observe(7.0); h.observe(15.0); let snap = h.snapshot();
872 assert_eq!(snap.total_count, 4);
873 assert_eq!(snap.counts, vec![1, 2, 3]);
875 let expected_sum = 0.5 + 3.0 + 7.0 + 15.0;
876 assert!((snap.sum - expected_sum).abs() < 1e-9);
877 }
878
879 #[test]
880 fn test_histogram_exact_boundary() {
881 let h = Histogram::with_buckets(&[1.0, 5.0, 10.0]);
882 h.observe(1.0);
883 h.observe(5.0);
884 h.observe(10.0);
885
886 let snap = h.snapshot();
887 assert_eq!(snap.counts, vec![1, 2, 3]);
889 assert_eq!(snap.total_count, 3);
890 }
891
892 #[test]
893 fn test_histogram_default_buckets() {
894 let h = Histogram::new();
895 let snap = h.snapshot();
896 assert_eq!(snap.buckets.len(), 12);
897 assert_eq!(snap.buckets[0], 0.001);
898 assert_eq!(snap.buckets[11], 10.0);
899 }
900
901 #[test]
902 fn test_histogram_observe_duration() {
903 let h = Histogram::with_buckets(&[0.01, 0.1, 1.0]);
904 h.observe_duration(Duration::from_millis(5)); let snap = h.snapshot();
906 assert_eq!(snap.counts[0], 1);
907 assert_eq!(snap.total_count, 1);
908 assert!((snap.sum - 0.005).abs() < 1e-6);
909 }
910
911 #[test]
914 fn test_percentile_empty() {
915 let h = Histogram::with_buckets(&[1.0, 5.0, 10.0]);
916 let snap = h.snapshot();
917 assert!(snap.p50().is_none());
918 assert!(snap.p95().is_none());
919 assert!(snap.p99().is_none());
920 }
921
922 #[test]
923 fn test_percentile_single_value() {
924 let h = Histogram::with_buckets(&[1.0, 5.0, 10.0]);
925 h.observe(0.5);
926 let snap = h.snapshot();
927
928 let p50 = snap.p50().expect("should have p50");
929 assert!((p50 - 0.5).abs() < 1e-9);
933 }
934
935 #[test]
936 fn test_percentile_many_values() {
937 let h = Histogram::with_buckets(&[1.0, 2.0, 5.0, 10.0]);
938 for _ in 0..50 {
940 h.observe(0.5);
941 }
942 for _ in 0..40 {
943 h.observe(1.5);
944 }
945 for _ in 0..9 {
946 h.observe(3.0);
947 }
948 h.observe(7.0);
949
950 let snap = h.snapshot();
951 assert_eq!(snap.total_count, 100);
952
953 let p50 = snap.p50().expect("should have p50");
955 assert!(p50 <= 1.0 + 1e-9, "p50={p50} should be <= 1.0");
956
957 let p95 = snap.p95().expect("should have p95");
959 assert!(p95 > 2.0 - 1e-9 && p95 <= 5.0 + 1e-9, "p95={p95}");
960
961 let p99 = snap.p99().expect("should have p99");
963 assert!(p99 <= 5.0 + 1e-9, "p99={p99}");
964 }
965
966 #[test]
967 fn test_percentile_boundary_values() {
968 let snap = HistogramSnapshot {
969 buckets: vec![1.0, 5.0, 10.0],
970 counts: vec![0, 0, 0],
971 total_count: 0,
972 sum: 0.0,
973 };
974 assert!(snap.percentile(-0.1).is_none());
975 assert!(snap.percentile(1.1).is_none());
976 }
977
978 #[test]
981 fn test_histogram_concurrent() {
982 let h = Histogram::with_buckets(&[1.0, 5.0, 10.0]);
983 let threads: Vec<_> = (0..8)
984 .map(|_| {
985 let h2 = h.clone();
986 thread::spawn(move || {
987 for i in 0..1000 {
988 h2.observe(i as f64 % 12.0);
989 }
990 })
991 })
992 .collect();
993 for t in threads {
994 t.join().expect("thread should not panic");
995 }
996 let snap = h.snapshot();
997 assert_eq!(snap.total_count, 8000);
998 }
999
1000 #[test]
1003 fn test_operation_type_labels() {
1004 assert_eq!(OperationType::Get.as_label(), "get");
1005 assert_eq!(OperationType::Put.as_label(), "put");
1006 assert_eq!(OperationType::Delete.as_label(), "delete");
1007 assert_eq!(OperationType::Range.as_label(), "range");
1008 assert_eq!(OperationType::Batch.as_label(), "batch");
1009 assert_eq!(OperationType::Stream.as_label(), "stream");
1010 }
1011
1012 #[test]
1013 fn test_operation_type_display() {
1014 assert_eq!(format!("{}", OperationType::Get), "get");
1015 }
1016
1017 #[test]
1020 fn test_metrics_collector_creation() {
1021 let collector = MetricsCollector::new();
1022 let snapshot = collector.snapshot();
1023
1024 assert_eq!(snapshot.requests_total, 0);
1025 assert_eq!(snapshot.requests_success, 0);
1026 assert_eq!(snapshot.requests_failed, 0);
1027 }
1028
1029 #[test]
1030 fn test_increment_requests() {
1031 let collector = MetricsCollector::new();
1032
1033 collector.inc_requests();
1034 collector.inc_requests();
1035 collector.inc_success();
1036 collector.inc_failed();
1037
1038 let snapshot = collector.snapshot();
1039 assert_eq!(snapshot.requests_total, 2);
1040 assert_eq!(snapshot.requests_success, 1);
1041 assert_eq!(snapshot.requests_failed, 1);
1042 }
1043
1044 #[test]
1045 fn test_bytes_tracking() {
1046 let collector = MetricsCollector::new();
1047
1048 collector.add_bytes_read(1024);
1049 collector.add_bytes_written(2048);
1050
1051 let snapshot = collector.snapshot();
1052 assert_eq!(snapshot.bytes_read, 1024);
1053 assert_eq!(snapshot.bytes_written, 2048);
1054 }
1055
1056 #[test]
1057 fn test_connections() {
1058 let collector = MetricsCollector::new();
1059
1060 collector.inc_connections();
1061 collector.inc_connections();
1062 assert_eq!(collector.snapshot().active_connections, 2);
1063
1064 collector.dec_connections();
1065 assert_eq!(collector.snapshot().active_connections, 1);
1066 }
1067
1068 #[test]
1069 fn test_queries() {
1070 let collector = MetricsCollector::new();
1071
1072 collector.inc_queries();
1073 collector.add_query_time(1000);
1074 collector.inc_queries();
1075 collector.add_query_time(2000);
1076
1077 let snapshot = collector.snapshot();
1078 assert_eq!(snapshot.queries_total, 2);
1079 assert_eq!(snapshot.query_time_us, 3000);
1080 assert_eq!(snapshot.avg_query_time_us(), 1500.0);
1081 }
1082
1083 #[test]
1084 fn test_success_rate() {
1085 let collector = MetricsCollector::new();
1086
1087 collector.inc_requests();
1088 collector.inc_success();
1089 collector.inc_requests();
1090 collector.inc_failed();
1091
1092 let snapshot = collector.snapshot();
1093 assert_eq!(snapshot.success_rate(), 0.5);
1094 }
1095
1096 #[test]
1097 fn test_reset() {
1098 let collector = MetricsCollector::new();
1099
1100 collector.inc_requests();
1101 collector.inc_success();
1102 collector.record_operation(OperationType::Get, Duration::from_millis(10), true);
1103 collector.set_memtable_size(1024);
1104 assert_eq!(collector.snapshot().requests_total, 1);
1105
1106 collector.reset();
1107 let snap = collector.snapshot();
1108 assert_eq!(snap.requests_total, 0);
1109 assert_eq!(snap.storage.memtable_size_bytes, 0);
1110 assert_eq!(snap.operations[0].count, 0);
1111 }
1112
1113 #[test]
1114 fn test_human_format() {
1115 let collector = MetricsCollector::new();
1116 collector.inc_requests();
1117 collector.inc_success();
1118
1119 let snapshot = collector.snapshot();
1120 let formatted = snapshot.format_human();
1121
1122 assert!(formatted.contains("Metrics:"));
1123 assert!(formatted.contains("Requests:"));
1124 assert!(formatted.contains("1 total"));
1125 }
1126
1127 #[test]
1130 fn test_record_operation_success() {
1131 let collector = MetricsCollector::new();
1132 collector.record_operation(OperationType::Get, Duration::from_millis(5), true);
1133 collector.record_operation(OperationType::Get, Duration::from_millis(10), true);
1134
1135 let snap = collector.operation_snapshot(OperationType::Get);
1136 assert_eq!(snap.count, 2);
1137 assert_eq!(snap.errors, 0);
1138 assert_eq!(snap.latency.total_count, 2);
1139 }
1140
1141 #[test]
1142 fn test_record_operation_failure() {
1143 let collector = MetricsCollector::new();
1144 collector.record_operation(OperationType::Put, Duration::from_millis(100), false);
1145
1146 let snap = collector.operation_snapshot(OperationType::Put);
1147 assert_eq!(snap.count, 1);
1148 assert_eq!(snap.errors, 1);
1149 }
1150
1151 #[test]
1152 fn test_record_all_operation_types() {
1153 let collector = MetricsCollector::new();
1154 for &op in &OperationType::ALL {
1155 collector.record_operation(op, Duration::from_millis(1), true);
1156 }
1157 for &op in &OperationType::ALL {
1158 let snap = collector.operation_snapshot(op);
1159 assert_eq!(snap.count, 1, "op={op} should have count 1");
1160 }
1161 }
1162
1163 #[test]
1166 fn test_storage_gauges_set() {
1167 let collector = MetricsCollector::new();
1168 collector.set_memtable_size(4096);
1169 collector.set_sstable_count(10);
1170 collector.set_wal_size(8192);
1171
1172 let s = collector.storage_snapshot();
1173 assert_eq!(s.memtable_size_bytes, 4096);
1174 assert_eq!(s.sstable_count, 10);
1175 assert_eq!(s.wal_size_bytes, 8192);
1176 }
1177
1178 #[test]
1179 fn test_storage_gauge_inc_dec() {
1180 let collector = MetricsCollector::new();
1181
1182 collector.inc_memtable_size(1000);
1183 collector.inc_memtable_size(500);
1184 assert_eq!(collector.storage_snapshot().memtable_size_bytes, 1500);
1185
1186 collector.dec_memtable_size(300);
1187 assert_eq!(collector.storage_snapshot().memtable_size_bytes, 1200);
1188
1189 collector.inc_sstable_count();
1190 collector.inc_sstable_count();
1191 assert_eq!(collector.storage_snapshot().sstable_count, 2);
1192
1193 collector.dec_sstable_count();
1194 assert_eq!(collector.storage_snapshot().sstable_count, 1);
1195 }
1196
1197 #[test]
1198 fn test_storage_counters() {
1199 let collector = MetricsCollector::new();
1200 collector.inc_compaction_count();
1201 collector.inc_compaction_count();
1202 collector.add_compaction_bytes(10_000);
1203 collector.inc_block_cache_hit();
1204 collector.inc_block_cache_hit();
1205 collector.inc_block_cache_miss();
1206
1207 let s = collector.storage_snapshot();
1208 assert_eq!(s.compaction_count, 2);
1209 assert_eq!(s.compaction_bytes_written, 10_000);
1210 assert_eq!(s.block_cache_hits, 2);
1211 assert_eq!(s.block_cache_misses, 1);
1212 }
1213
1214 #[test]
1217 fn test_prometheus_format() {
1218 let collector = MetricsCollector::new();
1219
1220 collector.inc_requests();
1221 collector.inc_success();
1222
1223 let prometheus = collector.to_prometheus();
1224 assert!(prometheus.contains("amaters_requests_total 1"));
1225 assert!(prometheus.contains("amaters_requests_success 1"));
1226 }
1227
1228 #[test]
1229 fn test_prometheus_histogram_format() {
1230 let collector = MetricsCollector::new();
1231 collector.observe_request_latency(Duration::from_millis(5)); collector.observe_request_latency(Duration::from_millis(50)); let prom = collector.to_prometheus();
1235
1236 assert!(
1238 prom.contains("# TYPE amaters_request_latency_seconds histogram"),
1239 "missing histogram TYPE line"
1240 );
1241
1242 assert!(
1244 prom.contains("amaters_request_latency_seconds_bucket{le=\"0.005\"} 1"),
1245 "bucket le=0.005 should have count 1"
1246 );
1247 assert!(
1248 prom.contains("amaters_request_latency_seconds_bucket{le=\"0.05\"} 2"),
1249 "bucket le=0.05 should have count 2"
1250 );
1251
1252 assert!(
1254 prom.contains("amaters_request_latency_seconds_bucket{le=\"+Inf\"} 2"),
1255 "missing +Inf bucket"
1256 );
1257
1258 assert!(
1260 prom.contains("amaters_request_latency_seconds_count 2"),
1261 "missing _count"
1262 );
1263 assert!(
1264 prom.contains("amaters_request_latency_seconds_sum"),
1265 "missing _sum"
1266 );
1267 }
1268
1269 #[test]
1270 fn test_prometheus_operation_metrics() {
1271 let collector = MetricsCollector::new();
1272 collector.record_operation(OperationType::Get, Duration::from_millis(1), true);
1273 collector.record_operation(OperationType::Get, Duration::from_millis(2), false);
1274
1275 let prom = collector.to_prometheus();
1276 assert!(
1277 prom.contains("amaters_op_count{op=\"get\"} 2"),
1278 "missing op count"
1279 );
1280 assert!(
1281 prom.contains("amaters_op_errors{op=\"get\"} 1"),
1282 "missing op errors"
1283 );
1284 assert!(
1285 prom.contains("amaters_op_get_latency_seconds_count 2"),
1286 "missing op latency count"
1287 );
1288 }
1289
1290 #[test]
1291 fn test_prometheus_storage_metrics() {
1292 let collector = MetricsCollector::new();
1293 collector.set_memtable_size(4096);
1294 collector.inc_compaction_count();
1295
1296 let prom = collector.to_prometheus();
1297 assert!(
1298 prom.contains("amaters_memtable_size_bytes 4096"),
1299 "missing memtable gauge"
1300 );
1301 assert!(
1302 prom.contains("amaters_compaction_count 1"),
1303 "missing compaction counter"
1304 );
1305 }
1306
1307 #[test]
1308 fn test_prometheus_type_help_comments() {
1309 let collector = MetricsCollector::new();
1310 let prom = collector.to_prometheus();
1311
1312 assert!(prom.contains("# HELP amaters_requests_total"));
1314 assert!(prom.contains("# TYPE amaters_requests_total counter"));
1315 assert!(prom.contains("# HELP amaters_active_connections"));
1316 assert!(prom.contains("# TYPE amaters_active_connections gauge"));
1317 assert!(prom.contains("# TYPE amaters_request_latency_seconds histogram"));
1318 assert!(prom.contains("# TYPE amaters_memtable_size_bytes gauge"));
1319 assert!(prom.contains("# TYPE amaters_compaction_count counter"));
1320 assert!(prom.contains("# TYPE amaters_block_cache_hits counter"));
1321 }
1322
1323 #[test]
1326 fn test_concurrent_metric_updates() {
1327 let collector = MetricsCollector::new();
1328 let threads: Vec<_> = (0..8)
1329 .map(|i| {
1330 let c = collector.clone();
1331 thread::spawn(move || {
1332 for _ in 0..500 {
1333 c.inc_requests();
1334 if i % 2 == 0 {
1335 c.inc_success();
1336 } else {
1337 c.inc_failed();
1338 }
1339 c.record_operation(OperationType::Get, Duration::from_micros(100), true);
1340 c.inc_block_cache_hit();
1341 }
1342 })
1343 })
1344 .collect();
1345
1346 for t in threads {
1347 t.join().expect("thread should not panic");
1348 }
1349
1350 let snap = collector.snapshot();
1351 assert_eq!(snap.requests_total, 4000);
1352 assert_eq!(snap.requests_success + snap.requests_failed, 4000);
1353 assert_eq!(snap.storage.block_cache_hits, 4000);
1354
1355 let get_snap = collector.operation_snapshot(OperationType::Get);
1356 assert_eq!(get_snap.count, 4000);
1357 assert_eq!(get_snap.latency.total_count, 4000);
1358 }
1359
1360 #[test]
1363 fn test_format_f64() {
1364 assert_eq!(format_f64(0.001), "0.001");
1365 assert_eq!(format_f64(1.0), "1.0");
1366 assert_eq!(format_f64(10.0), "10.0");
1367 assert_eq!(format_f64(0.025), "0.025");
1368 assert_eq!(format_f64(f64::INFINITY), "+Inf");
1369 }
1370
1371 #[test]
1374 fn test_snapshot_includes_all_fields() {
1375 let collector = MetricsCollector::new();
1376 collector.inc_requests();
1377 collector.observe_request_latency(Duration::from_millis(1));
1378 collector.record_operation(OperationType::Put, Duration::from_millis(2), true);
1379 collector.set_memtable_size(2048);
1380
1381 let snap = collector.snapshot();
1382 assert_eq!(snap.requests_total, 1);
1383 assert_eq!(snap.request_latency.total_count, 1);
1384 assert_eq!(snap.operations.len(), 6); assert_eq!(snap.storage.memtable_size_bytes, 2048);
1386
1387 let put = snap
1389 .operations
1390 .iter()
1391 .find(|o| o.op_type == OperationType::Put)
1392 .expect("should have Put snapshot");
1393 assert_eq!(put.count, 1);
1394 }
1395}