Skip to main content

amaters_server/
metrics.rs

1//! Metrics collection module
2//!
3//! Provides metrics collection for monitoring server performance including
4//! histograms for latency tracking, per-operation type metrics, and storage metrics.
5
6use std::fmt;
7use std::sync::Arc;
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::time::{Duration, SystemTime, UNIX_EPOCH};
10
11use parking_lot::Mutex;
12
13// ---------------------------------------------------------------------------
14// Histogram
15// ---------------------------------------------------------------------------
16
17/// Default histogram buckets (in seconds) for latency tracking
18pub const DEFAULT_BUCKETS: [f64; 12] = [
19    0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
20];
21
22/// A snapshot of histogram state at a point in time
23#[derive(Debug, Clone)]
24pub struct HistogramSnapshot {
25    /// Upper bounds for each bucket
26    pub buckets: Vec<f64>,
27    /// Cumulative count for each bucket (index `i` = count of observations <= `buckets[i]`)
28    pub counts: Vec<u64>,
29    /// Total number of observations
30    pub total_count: u64,
31    /// Sum of all observed values
32    pub sum: f64,
33}
34
35impl HistogramSnapshot {
36    /// Calculate an approximate percentile (0.0..=1.0) from bucket data.
37    ///
38    /// Uses linear interpolation within the bucket that contains the target rank.
39    /// Returns `None` if no observations have been recorded.
40    pub fn percentile(&self, p: f64) -> Option<f64> {
41        if self.total_count == 0 || !(0.0..=1.0).contains(&p) {
42            return None;
43        }
44
45        let target = p * self.total_count as f64;
46
47        let mut prev_count: u64 = 0;
48        let mut prev_bound: f64 = 0.0;
49
50        for (i, &upper) in self.buckets.iter().enumerate() {
51            let cumulative = self.counts[i];
52            if (cumulative as f64) >= target {
53                // Linear interpolation within this bucket
54                let bucket_count = cumulative - prev_count;
55                if bucket_count == 0 {
56                    return Some(upper);
57                }
58                let fraction = (target - prev_count as f64) / bucket_count as f64;
59                let value = prev_bound + fraction * (upper - prev_bound);
60                return Some(value);
61            }
62            prev_count = cumulative;
63            prev_bound = upper;
64        }
65
66        // All observations are above the largest bucket
67        // Return the largest bucket boundary as an approximation
68        self.buckets.last().copied()
69    }
70
71    /// Convenience: p50
72    pub fn p50(&self) -> Option<f64> {
73        self.percentile(0.50)
74    }
75
76    /// Convenience: p95
77    pub fn p95(&self) -> Option<f64> {
78        self.percentile(0.95)
79    }
80
81    /// Convenience: p99
82    pub fn p99(&self) -> Option<f64> {
83        self.percentile(0.99)
84    }
85}
86
87/// Thread-safe histogram for tracking value distributions.
88///
89/// Uses `parking_lot::Mutex` for interior mutability.
90#[derive(Clone)]
91pub struct Histogram {
92    inner: Arc<Mutex<HistogramInner>>,
93}
94
95struct HistogramInner {
96    buckets: Vec<f64>,
97    counts: Vec<u64>,
98    total_count: u64,
99    sum: f64,
100}
101
102impl Histogram {
103    /// Create a histogram with the default latency buckets.
104    pub fn new() -> Self {
105        Self::with_buckets(&DEFAULT_BUCKETS)
106    }
107
108    /// Create a histogram with custom bucket upper bounds.
109    ///
110    /// Buckets are sorted on creation.
111    pub fn with_buckets(bounds: &[f64]) -> Self {
112        let mut sorted = bounds.to_vec();
113        sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
114        let len = sorted.len();
115        Self {
116            inner: Arc::new(Mutex::new(HistogramInner {
117                buckets: sorted,
118                counts: vec![0; len],
119                total_count: 0,
120                sum: 0.0,
121            })),
122        }
123    }
124
125    /// Record a single observation.
126    pub fn observe(&self, value: f64) {
127        let mut inner = self.inner.lock();
128        inner.total_count += 1;
129        inner.sum += value;
130        // Increment cumulative counts for all buckets whose bound >= value
131        let len = inner.buckets.len();
132        for i in 0..len {
133            if value <= inner.buckets[i] {
134                inner.counts[i] += 1;
135            }
136        }
137    }
138
139    /// Record a `Duration` as seconds.
140    pub fn observe_duration(&self, d: Duration) {
141        self.observe(d.as_secs_f64());
142    }
143
144    /// Take a snapshot of the current state.
145    pub fn snapshot(&self) -> HistogramSnapshot {
146        let inner = self.inner.lock();
147        HistogramSnapshot {
148            buckets: inner.buckets.clone(),
149            counts: inner.counts.clone(),
150            total_count: inner.total_count,
151            sum: inner.sum,
152        }
153    }
154
155    /// Reset all counts (useful for testing).
156    #[cfg(test)]
157    fn reset(&self) {
158        let mut inner = self.inner.lock();
159        for c in inner.counts.iter_mut() {
160            *c = 0;
161        }
162        inner.total_count = 0;
163        inner.sum = 0.0;
164    }
165}
166
167impl Default for Histogram {
168    fn default() -> Self {
169        Self::new()
170    }
171}
172
173// ---------------------------------------------------------------------------
174// OperationType
175// ---------------------------------------------------------------------------
176
177/// Types of database operations tracked individually.
178#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
179pub enum OperationType {
180    Get,
181    Put,
182    Delete,
183    Range,
184    Batch,
185    Stream,
186}
187
188impl OperationType {
189    /// All variants in definition order.
190    pub const ALL: [OperationType; 6] = [
191        OperationType::Get,
192        OperationType::Put,
193        OperationType::Delete,
194        OperationType::Range,
195        OperationType::Batch,
196        OperationType::Stream,
197    ];
198
199    /// Lower-case label suitable for Prometheus metrics.
200    pub fn as_label(&self) -> &'static str {
201        match self {
202            OperationType::Get => "get",
203            OperationType::Put => "put",
204            OperationType::Delete => "delete",
205            OperationType::Range => "range",
206            OperationType::Batch => "batch",
207            OperationType::Stream => "stream",
208        }
209    }
210}
211
212impl fmt::Display for OperationType {
213    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
214        f.write_str(self.as_label())
215    }
216}
217
218// ---------------------------------------------------------------------------
219// Per-operation metrics
220// ---------------------------------------------------------------------------
221
222/// Metrics for a single operation type.
223#[derive(Clone)]
224struct OperationMetrics {
225    count: Arc<AtomicU64>,
226    errors: Arc<AtomicU64>,
227    latency: Histogram,
228}
229
230impl OperationMetrics {
231    fn new() -> Self {
232        Self {
233            count: Arc::new(AtomicU64::new(0)),
234            errors: Arc::new(AtomicU64::new(0)),
235            latency: Histogram::new(),
236        }
237    }
238}
239
240/// Snapshot of per-operation metrics.
241#[derive(Debug, Clone)]
242pub struct OperationSnapshot {
243    pub op_type: OperationType,
244    pub count: u64,
245    pub errors: u64,
246    pub latency: HistogramSnapshot,
247}
248
249// ---------------------------------------------------------------------------
250// Storage metrics (gauges + counters)
251// ---------------------------------------------------------------------------
252
253/// Atomic gauge that supports increment, decrement, and direct set.
254#[derive(Clone)]
255struct AtomicGauge(Arc<AtomicU64>);
256
257impl AtomicGauge {
258    fn new() -> Self {
259        Self(Arc::new(AtomicU64::new(0)))
260    }
261
262    fn inc(&self, v: u64) {
263        self.0.fetch_add(v, Ordering::Relaxed);
264    }
265
266    fn dec(&self, v: u64) {
267        self.0.fetch_sub(v, Ordering::Relaxed);
268    }
269
270    fn set(&self, v: u64) {
271        self.0.store(v, Ordering::Relaxed);
272    }
273
274    fn get(&self) -> u64 {
275        self.0.load(Ordering::Relaxed)
276    }
277}
278
279/// Snapshot of storage-level metrics.
280#[derive(Debug, Clone, Default)]
281pub struct StorageSnapshot {
282    pub memtable_size_bytes: u64,
283    pub sstable_count: u64,
284    pub compaction_count: u64,
285    pub compaction_bytes_written: u64,
286    pub wal_size_bytes: u64,
287    pub block_cache_hits: u64,
288    pub block_cache_misses: u64,
289}
290
291// ---------------------------------------------------------------------------
292// MetricsCollector
293// ---------------------------------------------------------------------------
294
295/// Metrics collector
296///
297/// Tracks various server metrics using atomic counters, histograms, and gauges.
298#[derive(Clone)]
299pub struct MetricsCollector {
300    // --- existing counters ---
301    requests_total: Arc<AtomicU64>,
302    requests_success: Arc<AtomicU64>,
303    requests_failed: Arc<AtomicU64>,
304    bytes_read: Arc<AtomicU64>,
305    bytes_written: Arc<AtomicU64>,
306    active_connections: Arc<AtomicU64>,
307    queries_total: Arc<AtomicU64>,
308    query_time_us: Arc<AtomicU64>,
309
310    // --- request latency histogram ---
311    request_latency: Histogram,
312
313    // --- per-operation metrics ---
314    op_get: OperationMetrics,
315    op_put: OperationMetrics,
316    op_delete: OperationMetrics,
317    op_range: OperationMetrics,
318    op_batch: OperationMetrics,
319    op_stream: OperationMetrics,
320
321    // --- storage gauges / counters ---
322    memtable_size_bytes: AtomicGauge,
323    sstable_count: AtomicGauge,
324    compaction_count: AtomicGauge,
325    compaction_bytes_written: AtomicGauge,
326    wal_size_bytes: AtomicGauge,
327    block_cache_hits: AtomicGauge,
328    block_cache_misses: AtomicGauge,
329}
330
331impl MetricsCollector {
332    /// Create a new metrics collector
333    pub fn new() -> Self {
334        Self {
335            requests_total: Arc::new(AtomicU64::new(0)),
336            requests_success: Arc::new(AtomicU64::new(0)),
337            requests_failed: Arc::new(AtomicU64::new(0)),
338            bytes_read: Arc::new(AtomicU64::new(0)),
339            bytes_written: Arc::new(AtomicU64::new(0)),
340            active_connections: Arc::new(AtomicU64::new(0)),
341            queries_total: Arc::new(AtomicU64::new(0)),
342            query_time_us: Arc::new(AtomicU64::new(0)),
343            request_latency: Histogram::new(),
344            op_get: OperationMetrics::new(),
345            op_put: OperationMetrics::new(),
346            op_delete: OperationMetrics::new(),
347            op_range: OperationMetrics::new(),
348            op_batch: OperationMetrics::new(),
349            op_stream: OperationMetrics::new(),
350            memtable_size_bytes: AtomicGauge::new(),
351            sstable_count: AtomicGauge::new(),
352            compaction_count: AtomicGauge::new(),
353            compaction_bytes_written: AtomicGauge::new(),
354            wal_size_bytes: AtomicGauge::new(),
355            block_cache_hits: AtomicGauge::new(),
356            block_cache_misses: AtomicGauge::new(),
357        }
358    }
359
360    // --- existing counter methods ---
361
362    /// Increment total requests
363    pub fn inc_requests(&self) {
364        self.requests_total.fetch_add(1, Ordering::Relaxed);
365    }
366
367    /// Increment successful requests
368    pub fn inc_success(&self) {
369        self.requests_success.fetch_add(1, Ordering::Relaxed);
370    }
371
372    /// Increment failed requests
373    pub fn inc_failed(&self) {
374        self.requests_failed.fetch_add(1, Ordering::Relaxed);
375    }
376
377    /// Add bytes read
378    pub fn add_bytes_read(&self, bytes: u64) {
379        self.bytes_read.fetch_add(bytes, Ordering::Relaxed);
380    }
381
382    /// Add bytes written
383    pub fn add_bytes_written(&self, bytes: u64) {
384        self.bytes_written.fetch_add(bytes, Ordering::Relaxed);
385    }
386
387    /// Increment active connections
388    pub fn inc_connections(&self) {
389        self.active_connections.fetch_add(1, Ordering::Relaxed);
390    }
391
392    /// Decrement active connections
393    pub fn dec_connections(&self) {
394        self.active_connections.fetch_sub(1, Ordering::Relaxed);
395    }
396
397    /// Increment queries executed
398    pub fn inc_queries(&self) {
399        self.queries_total.fetch_add(1, Ordering::Relaxed);
400    }
401
402    /// Add query execution time in microseconds
403    pub fn add_query_time(&self, duration_us: u64) {
404        self.query_time_us.fetch_add(duration_us, Ordering::Relaxed);
405    }
406
407    // --- request latency histogram ---
408
409    /// Record a request latency duration.
410    pub fn observe_request_latency(&self, d: Duration) {
411        self.request_latency.observe_duration(d);
412    }
413
414    /// Get the request latency histogram reference.
415    pub fn request_latency(&self) -> &Histogram {
416        &self.request_latency
417    }
418
419    // --- per-operation metrics ---
420
421    fn op_metrics(&self, op: OperationType) -> &OperationMetrics {
422        match op {
423            OperationType::Get => &self.op_get,
424            OperationType::Put => &self.op_put,
425            OperationType::Delete => &self.op_delete,
426            OperationType::Range => &self.op_range,
427            OperationType::Batch => &self.op_batch,
428            OperationType::Stream => &self.op_stream,
429        }
430    }
431
432    /// Record a completed operation with its type, duration, and success status.
433    pub fn record_operation(&self, op_type: OperationType, duration: Duration, success: bool) {
434        let m = self.op_metrics(op_type);
435        m.count.fetch_add(1, Ordering::Relaxed);
436        if !success {
437            m.errors.fetch_add(1, Ordering::Relaxed);
438        }
439        m.latency.observe_duration(duration);
440    }
441
442    /// Take a snapshot of per-operation metrics for one type.
443    pub fn operation_snapshot(&self, op_type: OperationType) -> OperationSnapshot {
444        let m = self.op_metrics(op_type);
445        OperationSnapshot {
446            op_type,
447            count: m.count.load(Ordering::Relaxed),
448            errors: m.errors.load(Ordering::Relaxed),
449            latency: m.latency.snapshot(),
450        }
451    }
452
453    // --- storage metrics ---
454
455    /// Set the current memtable size in bytes.
456    pub fn set_memtable_size(&self, bytes: u64) {
457        self.memtable_size_bytes.set(bytes);
458    }
459
460    /// Set the current SSTable count.
461    pub fn set_sstable_count(&self, count: u64) {
462        self.sstable_count.set(count);
463    }
464
465    /// Increment the compaction counter.
466    pub fn inc_compaction_count(&self) {
467        self.compaction_count.inc(1);
468    }
469
470    /// Add bytes written during compaction.
471    pub fn add_compaction_bytes(&self, bytes: u64) {
472        self.compaction_bytes_written.inc(bytes);
473    }
474
475    /// Set the current WAL size in bytes.
476    pub fn set_wal_size(&self, bytes: u64) {
477        self.wal_size_bytes.set(bytes);
478    }
479
480    /// Record a block cache hit.
481    pub fn inc_block_cache_hit(&self) {
482        self.block_cache_hits.inc(1);
483    }
484
485    /// Record a block cache miss.
486    pub fn inc_block_cache_miss(&self) {
487        self.block_cache_misses.inc(1);
488    }
489
490    /// Increment memtable size gauge.
491    pub fn inc_memtable_size(&self, bytes: u64) {
492        self.memtable_size_bytes.inc(bytes);
493    }
494
495    /// Decrement memtable size gauge.
496    pub fn dec_memtable_size(&self, bytes: u64) {
497        self.memtable_size_bytes.dec(bytes);
498    }
499
500    /// Increment sstable count gauge.
501    pub fn inc_sstable_count(&self) {
502        self.sstable_count.inc(1);
503    }
504
505    /// Decrement sstable count gauge.
506    pub fn dec_sstable_count(&self) {
507        self.sstable_count.dec(1);
508    }
509
510    /// Take a storage metrics snapshot.
511    pub fn storage_snapshot(&self) -> StorageSnapshot {
512        StorageSnapshot {
513            memtable_size_bytes: self.memtable_size_bytes.get(),
514            sstable_count: self.sstable_count.get(),
515            compaction_count: self.compaction_count.get(),
516            compaction_bytes_written: self.compaction_bytes_written.get(),
517            wal_size_bytes: self.wal_size_bytes.get(),
518            block_cache_hits: self.block_cache_hits.get(),
519            block_cache_misses: self.block_cache_misses.get(),
520        }
521    }
522
523    // --- snapshot ---
524
525    /// Get snapshot of current metrics
526    pub fn snapshot(&self) -> MetricsSnapshot {
527        MetricsSnapshot {
528            requests_total: self.requests_total.load(Ordering::Relaxed),
529            requests_success: self.requests_success.load(Ordering::Relaxed),
530            requests_failed: self.requests_failed.load(Ordering::Relaxed),
531            bytes_read: self.bytes_read.load(Ordering::Relaxed),
532            bytes_written: self.bytes_written.load(Ordering::Relaxed),
533            active_connections: self.active_connections.load(Ordering::Relaxed),
534            queries_total: self.queries_total.load(Ordering::Relaxed),
535            query_time_us: self.query_time_us.load(Ordering::Relaxed),
536            timestamp: SystemTime::now()
537                .duration_since(UNIX_EPOCH)
538                .map(|d| d.as_secs())
539                .unwrap_or(0),
540            request_latency: self.request_latency.snapshot(),
541            operations: OperationType::ALL
542                .iter()
543                .map(|&op| self.operation_snapshot(op))
544                .collect(),
545            storage: self.storage_snapshot(),
546        }
547    }
548
549    // --- prometheus ---
550
551    /// Format metrics as Prometheus-style text
552    pub fn to_prometheus(&self) -> String {
553        let snapshot = self.snapshot();
554        let mut out = String::with_capacity(4096);
555
556        // --- existing counters ---
557        write_counter(
558            &mut out,
559            "amaters_requests_total",
560            "Total number of requests",
561            snapshot.requests_total,
562        );
563        write_counter(
564            &mut out,
565            "amaters_requests_success",
566            "Total number of successful requests",
567            snapshot.requests_success,
568        );
569        write_counter(
570            &mut out,
571            "amaters_requests_failed",
572            "Total number of failed requests",
573            snapshot.requests_failed,
574        );
575        write_counter(
576            &mut out,
577            "amaters_bytes_read",
578            "Total bytes read",
579            snapshot.bytes_read,
580        );
581        write_counter(
582            &mut out,
583            "amaters_bytes_written",
584            "Total bytes written",
585            snapshot.bytes_written,
586        );
587        write_gauge(
588            &mut out,
589            "amaters_active_connections",
590            "Current active connections",
591            snapshot.active_connections,
592        );
593        write_counter(
594            &mut out,
595            "amaters_queries_total",
596            "Total queries executed",
597            snapshot.queries_total,
598        );
599        write_counter(
600            &mut out,
601            "amaters_query_time_us_total",
602            "Total query execution time in microseconds",
603            snapshot.query_time_us,
604        );
605
606        // --- request latency histogram ---
607        write_histogram(
608            &mut out,
609            "amaters_request_latency_seconds",
610            "Request latency in seconds",
611            &snapshot.request_latency,
612        );
613
614        // --- per-operation metrics ---
615        for op_snap in &snapshot.operations {
616            let label = op_snap.op_type.as_label();
617            let prefix = format!("amaters_op_{label}");
618            write_counter_with_label(
619                &mut out,
620                "amaters_op_count",
621                "Operation count",
622                &format!("op=\"{label}\""),
623                op_snap.count,
624            );
625            write_counter_with_label(
626                &mut out,
627                "amaters_op_errors",
628                "Operation errors",
629                &format!("op=\"{label}\""),
630                op_snap.errors,
631            );
632            write_histogram(
633                &mut out,
634                &format!("{prefix}_latency_seconds"),
635                &format!("Latency for {label} operations in seconds"),
636                &op_snap.latency,
637            );
638        }
639
640        // --- storage metrics ---
641        let s = &snapshot.storage;
642        write_gauge(
643            &mut out,
644            "amaters_memtable_size_bytes",
645            "Current memtable size in bytes",
646            s.memtable_size_bytes,
647        );
648        write_gauge(
649            &mut out,
650            "amaters_sstable_count",
651            "Current SSTable count",
652            s.sstable_count,
653        );
654        write_counter(
655            &mut out,
656            "amaters_compaction_count",
657            "Total compaction operations",
658            s.compaction_count,
659        );
660        write_counter(
661            &mut out,
662            "amaters_compaction_bytes_written",
663            "Total bytes written during compaction",
664            s.compaction_bytes_written,
665        );
666        write_gauge(
667            &mut out,
668            "amaters_wal_size_bytes",
669            "Current WAL size in bytes",
670            s.wal_size_bytes,
671        );
672        write_counter(
673            &mut out,
674            "amaters_block_cache_hits",
675            "Block cache hits",
676            s.block_cache_hits,
677        );
678        write_counter(
679            &mut out,
680            "amaters_block_cache_misses",
681            "Block cache misses",
682            s.block_cache_misses,
683        );
684
685        out
686    }
687
688    /// Reset all metrics (useful for testing)
689    #[cfg(test)]
690    pub fn reset(&self) {
691        self.requests_total.store(0, Ordering::Relaxed);
692        self.requests_success.store(0, Ordering::Relaxed);
693        self.requests_failed.store(0, Ordering::Relaxed);
694        self.bytes_read.store(0, Ordering::Relaxed);
695        self.bytes_written.store(0, Ordering::Relaxed);
696        self.active_connections.store(0, Ordering::Relaxed);
697        self.queries_total.store(0, Ordering::Relaxed);
698        self.query_time_us.store(0, Ordering::Relaxed);
699        self.request_latency.reset();
700        for &op in &OperationType::ALL {
701            let m = self.op_metrics(op);
702            m.count.store(0, Ordering::Relaxed);
703            m.errors.store(0, Ordering::Relaxed);
704            m.latency.reset();
705        }
706        self.memtable_size_bytes.set(0);
707        self.sstable_count.set(0);
708        self.compaction_count.set(0);
709        self.compaction_bytes_written.set(0);
710        self.wal_size_bytes.set(0);
711        self.block_cache_hits.set(0);
712        self.block_cache_misses.set(0);
713    }
714}
715
716impl Default for MetricsCollector {
717    fn default() -> Self {
718        Self::new()
719    }
720}
721
722// ---------------------------------------------------------------------------
723// Prometheus formatting helpers
724// ---------------------------------------------------------------------------
725
726fn write_counter(out: &mut String, name: &str, help: &str, value: u64) {
727    use std::fmt::Write;
728    let _ = writeln!(out, "# HELP {name} {help}");
729    let _ = writeln!(out, "# TYPE {name} counter");
730    let _ = writeln!(out, "{name} {value}");
731    let _ = writeln!(out);
732}
733
734fn write_counter_with_label(out: &mut String, name: &str, help: &str, label: &str, value: u64) {
735    use std::fmt::Write;
736    let _ = writeln!(out, "# HELP {name} {help}");
737    let _ = writeln!(out, "# TYPE {name} counter");
738    let _ = writeln!(out, "{name}{{{label}}} {value}");
739    let _ = writeln!(out);
740}
741
742fn write_gauge(out: &mut String, name: &str, help: &str, value: u64) {
743    use std::fmt::Write;
744    let _ = writeln!(out, "# HELP {name} {help}");
745    let _ = writeln!(out, "# TYPE {name} gauge");
746    let _ = writeln!(out, "{name} {value}");
747    let _ = writeln!(out);
748}
749
750fn write_histogram(out: &mut String, name: &str, help: &str, snap: &HistogramSnapshot) {
751    use std::fmt::Write;
752    let _ = writeln!(out, "# HELP {name} {help}");
753    let _ = writeln!(out, "# TYPE {name} histogram");
754    for (i, &bound) in snap.buckets.iter().enumerate() {
755        let le = format_f64(bound);
756        let _ = writeln!(out, "{name}_bucket{{le=\"{le}\"}} {}", snap.counts[i]);
757    }
758    let _ = writeln!(out, "{name}_bucket{{le=\"+Inf\"}} {}", snap.total_count);
759    let _ = writeln!(out, "{name}_sum {}", format_f64(snap.sum));
760    let _ = writeln!(out, "{name}_count {}", snap.total_count);
761    let _ = writeln!(out);
762}
763
764/// Format an f64 without unnecessary trailing zeros but always at least one decimal.
765fn format_f64(v: f64) -> String {
766    if v == f64::INFINITY {
767        "+Inf".to_string()
768    } else if v == f64::NEG_INFINITY {
769        "-Inf".to_string()
770    } else if v.is_nan() {
771        "NaN".to_string()
772    } else {
773        // Use enough precision, then trim trailing zeros
774        let s = format!("{v:.6}");
775        let s = s.trim_end_matches('0');
776        // Keep at least one decimal digit
777        if s.ends_with('.') {
778            format!("{s}0")
779        } else {
780            s.to_string()
781        }
782    }
783}
784
785// ---------------------------------------------------------------------------
786// MetricsSnapshot
787// ---------------------------------------------------------------------------
788
789/// Snapshot of metrics at a point in time
790#[derive(Debug, Clone)]
791pub struct MetricsSnapshot {
792    pub requests_total: u64,
793    pub requests_success: u64,
794    pub requests_failed: u64,
795    pub bytes_read: u64,
796    pub bytes_written: u64,
797    pub active_connections: u64,
798    pub queries_total: u64,
799    pub query_time_us: u64,
800    pub timestamp: u64,
801    /// Request latency histogram snapshot
802    pub request_latency: HistogramSnapshot,
803    /// Per-operation type snapshots
804    pub operations: Vec<OperationSnapshot>,
805    /// Storage metrics
806    pub storage: StorageSnapshot,
807}
808
809impl MetricsSnapshot {
810    /// Calculate average query time in microseconds
811    pub fn avg_query_time_us(&self) -> f64 {
812        if self.queries_total == 0 {
813            0.0
814        } else {
815            self.query_time_us as f64 / self.queries_total as f64
816        }
817    }
818
819    /// Calculate success rate (0.0 to 1.0)
820    pub fn success_rate(&self) -> f64 {
821        if self.requests_total == 0 {
822            0.0
823        } else {
824            self.requests_success as f64 / self.requests_total as f64
825        }
826    }
827
828    /// Format as human-readable string
829    pub fn format_human(&self) -> String {
830        format!(
831            "Metrics:\n\
832             Requests:    {} total, {} success, {} failed (success rate: {:.2}%)\n\
833             Data:        {} bytes read, {} bytes written\n\
834             Connections: {} active\n\
835             Queries:     {} total, avg time: {:.2} \u{03bc}s\n\
836             Timestamp:   {}",
837            self.requests_total,
838            self.requests_success,
839            self.requests_failed,
840            self.success_rate() * 100.0,
841            self.bytes_read,
842            self.bytes_written,
843            self.active_connections,
844            self.queries_total,
845            self.avg_query_time_us(),
846            self.timestamp,
847        )
848    }
849}
850
851// ---------------------------------------------------------------------------
852// Tests
853// ---------------------------------------------------------------------------
854
855#[cfg(test)]
856mod tests {
857    use super::*;
858    use std::thread;
859
860    // -- Histogram tests --
861
862    #[test]
863    fn test_histogram_bucket_counting() {
864        let h = Histogram::with_buckets(&[1.0, 5.0, 10.0]);
865
866        h.observe(0.5); // bucket 1.0
867        h.observe(3.0); // bucket 5.0
868        h.observe(7.0); // bucket 10.0
869        h.observe(15.0); // above all buckets
870
871        let snap = h.snapshot();
872        assert_eq!(snap.total_count, 4);
873        // cumulative: <=1.0 => 1, <=5.0 => 2, <=10.0 => 3
874        assert_eq!(snap.counts, vec![1, 2, 3]);
875        let expected_sum = 0.5 + 3.0 + 7.0 + 15.0;
876        assert!((snap.sum - expected_sum).abs() < 1e-9);
877    }
878
879    #[test]
880    fn test_histogram_exact_boundary() {
881        let h = Histogram::with_buckets(&[1.0, 5.0, 10.0]);
882        h.observe(1.0);
883        h.observe(5.0);
884        h.observe(10.0);
885
886        let snap = h.snapshot();
887        // 1.0 <= 1.0, 5.0 <= 5.0, 10.0 <= 10.0 => all counted cumulatively
888        assert_eq!(snap.counts, vec![1, 2, 3]);
889        assert_eq!(snap.total_count, 3);
890    }
891
892    #[test]
893    fn test_histogram_default_buckets() {
894        let h = Histogram::new();
895        let snap = h.snapshot();
896        assert_eq!(snap.buckets.len(), 12);
897        assert_eq!(snap.buckets[0], 0.001);
898        assert_eq!(snap.buckets[11], 10.0);
899    }
900
901    #[test]
902    fn test_histogram_observe_duration() {
903        let h = Histogram::with_buckets(&[0.01, 0.1, 1.0]);
904        h.observe_duration(Duration::from_millis(5)); // 0.005s -> bucket 0.01
905        let snap = h.snapshot();
906        assert_eq!(snap.counts[0], 1);
907        assert_eq!(snap.total_count, 1);
908        assert!((snap.sum - 0.005).abs() < 1e-6);
909    }
910
911    // -- Percentile tests --
912
913    #[test]
914    fn test_percentile_empty() {
915        let h = Histogram::with_buckets(&[1.0, 5.0, 10.0]);
916        let snap = h.snapshot();
917        assert!(snap.p50().is_none());
918        assert!(snap.p95().is_none());
919        assert!(snap.p99().is_none());
920    }
921
922    #[test]
923    fn test_percentile_single_value() {
924        let h = Histogram::with_buckets(&[1.0, 5.0, 10.0]);
925        h.observe(0.5);
926        let snap = h.snapshot();
927
928        let p50 = snap.p50().expect("should have p50");
929        // Single value at 0.5 falls in first bucket [0, 1.0]
930        // target = 0.5 * 1 = 0.5, bucket has count=1
931        // fraction = 0.5/1 = 0.5, value = 0.0 + 0.5 * 1.0 = 0.5
932        assert!((p50 - 0.5).abs() < 1e-9);
933    }
934
935    #[test]
936    fn test_percentile_many_values() {
937        let h = Histogram::with_buckets(&[1.0, 2.0, 5.0, 10.0]);
938        // Put 50 values in [0,1], 40 in (1,2], 9 in (2,5], 1 in (5,10]
939        for _ in 0..50 {
940            h.observe(0.5);
941        }
942        for _ in 0..40 {
943            h.observe(1.5);
944        }
945        for _ in 0..9 {
946            h.observe(3.0);
947        }
948        h.observe(7.0);
949
950        let snap = h.snapshot();
951        assert_eq!(snap.total_count, 100);
952
953        // p50: target = 50 => hits bucket[0] (count=50), at the boundary
954        let p50 = snap.p50().expect("should have p50");
955        assert!(p50 <= 1.0 + 1e-9, "p50={p50} should be <= 1.0");
956
957        // p95: target = 95, cumulative: 50, 90, 99 => bucket 2 (bound=5.0)
958        let p95 = snap.p95().expect("should have p95");
959        assert!(p95 > 2.0 - 1e-9 && p95 <= 5.0 + 1e-9, "p95={p95}");
960
961        // p99: target = 99, cumulative 99 at bucket 2 => at boundary
962        let p99 = snap.p99().expect("should have p99");
963        assert!(p99 <= 5.0 + 1e-9, "p99={p99}");
964    }
965
966    #[test]
967    fn test_percentile_boundary_values() {
968        let snap = HistogramSnapshot {
969            buckets: vec![1.0, 5.0, 10.0],
970            counts: vec![0, 0, 0],
971            total_count: 0,
972            sum: 0.0,
973        };
974        assert!(snap.percentile(-0.1).is_none());
975        assert!(snap.percentile(1.1).is_none());
976    }
977
978    // -- Concurrent histogram test --
979
980    #[test]
981    fn test_histogram_concurrent() {
982        let h = Histogram::with_buckets(&[1.0, 5.0, 10.0]);
983        let threads: Vec<_> = (0..8)
984            .map(|_| {
985                let h2 = h.clone();
986                thread::spawn(move || {
987                    for i in 0..1000 {
988                        h2.observe(i as f64 % 12.0);
989                    }
990                })
991            })
992            .collect();
993        for t in threads {
994            t.join().expect("thread should not panic");
995        }
996        let snap = h.snapshot();
997        assert_eq!(snap.total_count, 8000);
998    }
999
1000    // -- OperationType tests --
1001
1002    #[test]
1003    fn test_operation_type_labels() {
1004        assert_eq!(OperationType::Get.as_label(), "get");
1005        assert_eq!(OperationType::Put.as_label(), "put");
1006        assert_eq!(OperationType::Delete.as_label(), "delete");
1007        assert_eq!(OperationType::Range.as_label(), "range");
1008        assert_eq!(OperationType::Batch.as_label(), "batch");
1009        assert_eq!(OperationType::Stream.as_label(), "stream");
1010    }
1011
1012    #[test]
1013    fn test_operation_type_display() {
1014        assert_eq!(format!("{}", OperationType::Get), "get");
1015    }
1016
1017    // -- MetricsCollector existing tests --
1018
1019    #[test]
1020    fn test_metrics_collector_creation() {
1021        let collector = MetricsCollector::new();
1022        let snapshot = collector.snapshot();
1023
1024        assert_eq!(snapshot.requests_total, 0);
1025        assert_eq!(snapshot.requests_success, 0);
1026        assert_eq!(snapshot.requests_failed, 0);
1027    }
1028
1029    #[test]
1030    fn test_increment_requests() {
1031        let collector = MetricsCollector::new();
1032
1033        collector.inc_requests();
1034        collector.inc_requests();
1035        collector.inc_success();
1036        collector.inc_failed();
1037
1038        let snapshot = collector.snapshot();
1039        assert_eq!(snapshot.requests_total, 2);
1040        assert_eq!(snapshot.requests_success, 1);
1041        assert_eq!(snapshot.requests_failed, 1);
1042    }
1043
1044    #[test]
1045    fn test_bytes_tracking() {
1046        let collector = MetricsCollector::new();
1047
1048        collector.add_bytes_read(1024);
1049        collector.add_bytes_written(2048);
1050
1051        let snapshot = collector.snapshot();
1052        assert_eq!(snapshot.bytes_read, 1024);
1053        assert_eq!(snapshot.bytes_written, 2048);
1054    }
1055
1056    #[test]
1057    fn test_connections() {
1058        let collector = MetricsCollector::new();
1059
1060        collector.inc_connections();
1061        collector.inc_connections();
1062        assert_eq!(collector.snapshot().active_connections, 2);
1063
1064        collector.dec_connections();
1065        assert_eq!(collector.snapshot().active_connections, 1);
1066    }
1067
1068    #[test]
1069    fn test_queries() {
1070        let collector = MetricsCollector::new();
1071
1072        collector.inc_queries();
1073        collector.add_query_time(1000);
1074        collector.inc_queries();
1075        collector.add_query_time(2000);
1076
1077        let snapshot = collector.snapshot();
1078        assert_eq!(snapshot.queries_total, 2);
1079        assert_eq!(snapshot.query_time_us, 3000);
1080        assert_eq!(snapshot.avg_query_time_us(), 1500.0);
1081    }
1082
1083    #[test]
1084    fn test_success_rate() {
1085        let collector = MetricsCollector::new();
1086
1087        collector.inc_requests();
1088        collector.inc_success();
1089        collector.inc_requests();
1090        collector.inc_failed();
1091
1092        let snapshot = collector.snapshot();
1093        assert_eq!(snapshot.success_rate(), 0.5);
1094    }
1095
1096    #[test]
1097    fn test_reset() {
1098        let collector = MetricsCollector::new();
1099
1100        collector.inc_requests();
1101        collector.inc_success();
1102        collector.record_operation(OperationType::Get, Duration::from_millis(10), true);
1103        collector.set_memtable_size(1024);
1104        assert_eq!(collector.snapshot().requests_total, 1);
1105
1106        collector.reset();
1107        let snap = collector.snapshot();
1108        assert_eq!(snap.requests_total, 0);
1109        assert_eq!(snap.storage.memtable_size_bytes, 0);
1110        assert_eq!(snap.operations[0].count, 0);
1111    }
1112
1113    #[test]
1114    fn test_human_format() {
1115        let collector = MetricsCollector::new();
1116        collector.inc_requests();
1117        collector.inc_success();
1118
1119        let snapshot = collector.snapshot();
1120        let formatted = snapshot.format_human();
1121
1122        assert!(formatted.contains("Metrics:"));
1123        assert!(formatted.contains("Requests:"));
1124        assert!(formatted.contains("1 total"));
1125    }
1126
1127    // -- Record operation tests --
1128
1129    #[test]
1130    fn test_record_operation_success() {
1131        let collector = MetricsCollector::new();
1132        collector.record_operation(OperationType::Get, Duration::from_millis(5), true);
1133        collector.record_operation(OperationType::Get, Duration::from_millis(10), true);
1134
1135        let snap = collector.operation_snapshot(OperationType::Get);
1136        assert_eq!(snap.count, 2);
1137        assert_eq!(snap.errors, 0);
1138        assert_eq!(snap.latency.total_count, 2);
1139    }
1140
1141    #[test]
1142    fn test_record_operation_failure() {
1143        let collector = MetricsCollector::new();
1144        collector.record_operation(OperationType::Put, Duration::from_millis(100), false);
1145
1146        let snap = collector.operation_snapshot(OperationType::Put);
1147        assert_eq!(snap.count, 1);
1148        assert_eq!(snap.errors, 1);
1149    }
1150
1151    #[test]
1152    fn test_record_all_operation_types() {
1153        let collector = MetricsCollector::new();
1154        for &op in &OperationType::ALL {
1155            collector.record_operation(op, Duration::from_millis(1), true);
1156        }
1157        for &op in &OperationType::ALL {
1158            let snap = collector.operation_snapshot(op);
1159            assert_eq!(snap.count, 1, "op={op} should have count 1");
1160        }
1161    }
1162
1163    // -- Storage metrics tests --
1164
1165    #[test]
1166    fn test_storage_gauges_set() {
1167        let collector = MetricsCollector::new();
1168        collector.set_memtable_size(4096);
1169        collector.set_sstable_count(10);
1170        collector.set_wal_size(8192);
1171
1172        let s = collector.storage_snapshot();
1173        assert_eq!(s.memtable_size_bytes, 4096);
1174        assert_eq!(s.sstable_count, 10);
1175        assert_eq!(s.wal_size_bytes, 8192);
1176    }
1177
1178    #[test]
1179    fn test_storage_gauge_inc_dec() {
1180        let collector = MetricsCollector::new();
1181
1182        collector.inc_memtable_size(1000);
1183        collector.inc_memtable_size(500);
1184        assert_eq!(collector.storage_snapshot().memtable_size_bytes, 1500);
1185
1186        collector.dec_memtable_size(300);
1187        assert_eq!(collector.storage_snapshot().memtable_size_bytes, 1200);
1188
1189        collector.inc_sstable_count();
1190        collector.inc_sstable_count();
1191        assert_eq!(collector.storage_snapshot().sstable_count, 2);
1192
1193        collector.dec_sstable_count();
1194        assert_eq!(collector.storage_snapshot().sstable_count, 1);
1195    }
1196
1197    #[test]
1198    fn test_storage_counters() {
1199        let collector = MetricsCollector::new();
1200        collector.inc_compaction_count();
1201        collector.inc_compaction_count();
1202        collector.add_compaction_bytes(10_000);
1203        collector.inc_block_cache_hit();
1204        collector.inc_block_cache_hit();
1205        collector.inc_block_cache_miss();
1206
1207        let s = collector.storage_snapshot();
1208        assert_eq!(s.compaction_count, 2);
1209        assert_eq!(s.compaction_bytes_written, 10_000);
1210        assert_eq!(s.block_cache_hits, 2);
1211        assert_eq!(s.block_cache_misses, 1);
1212    }
1213
1214    // -- Prometheus output tests --
1215
1216    #[test]
1217    fn test_prometheus_format() {
1218        let collector = MetricsCollector::new();
1219
1220        collector.inc_requests();
1221        collector.inc_success();
1222
1223        let prometheus = collector.to_prometheus();
1224        assert!(prometheus.contains("amaters_requests_total 1"));
1225        assert!(prometheus.contains("amaters_requests_success 1"));
1226    }
1227
1228    #[test]
1229    fn test_prometheus_histogram_format() {
1230        let collector = MetricsCollector::new();
1231        collector.observe_request_latency(Duration::from_millis(5)); // 0.005s
1232        collector.observe_request_latency(Duration::from_millis(50)); // 0.050s
1233
1234        let prom = collector.to_prometheus();
1235
1236        // Should contain histogram type
1237        assert!(
1238            prom.contains("# TYPE amaters_request_latency_seconds histogram"),
1239            "missing histogram TYPE line"
1240        );
1241
1242        // Should have _bucket lines with le= labels
1243        assert!(
1244            prom.contains("amaters_request_latency_seconds_bucket{le=\"0.005\"} 1"),
1245            "bucket le=0.005 should have count 1"
1246        );
1247        assert!(
1248            prom.contains("amaters_request_latency_seconds_bucket{le=\"0.05\"} 2"),
1249            "bucket le=0.05 should have count 2"
1250        );
1251
1252        // +Inf bucket
1253        assert!(
1254            prom.contains("amaters_request_latency_seconds_bucket{le=\"+Inf\"} 2"),
1255            "missing +Inf bucket"
1256        );
1257
1258        // _sum and _count
1259        assert!(
1260            prom.contains("amaters_request_latency_seconds_count 2"),
1261            "missing _count"
1262        );
1263        assert!(
1264            prom.contains("amaters_request_latency_seconds_sum"),
1265            "missing _sum"
1266        );
1267    }
1268
1269    #[test]
1270    fn test_prometheus_operation_metrics() {
1271        let collector = MetricsCollector::new();
1272        collector.record_operation(OperationType::Get, Duration::from_millis(1), true);
1273        collector.record_operation(OperationType::Get, Duration::from_millis(2), false);
1274
1275        let prom = collector.to_prometheus();
1276        assert!(
1277            prom.contains("amaters_op_count{op=\"get\"} 2"),
1278            "missing op count"
1279        );
1280        assert!(
1281            prom.contains("amaters_op_errors{op=\"get\"} 1"),
1282            "missing op errors"
1283        );
1284        assert!(
1285            prom.contains("amaters_op_get_latency_seconds_count 2"),
1286            "missing op latency count"
1287        );
1288    }
1289
1290    #[test]
1291    fn test_prometheus_storage_metrics() {
1292        let collector = MetricsCollector::new();
1293        collector.set_memtable_size(4096);
1294        collector.inc_compaction_count();
1295
1296        let prom = collector.to_prometheus();
1297        assert!(
1298            prom.contains("amaters_memtable_size_bytes 4096"),
1299            "missing memtable gauge"
1300        );
1301        assert!(
1302            prom.contains("amaters_compaction_count 1"),
1303            "missing compaction counter"
1304        );
1305    }
1306
1307    #[test]
1308    fn test_prometheus_type_help_comments() {
1309        let collector = MetricsCollector::new();
1310        let prom = collector.to_prometheus();
1311
1312        // Every metric should have HELP and TYPE
1313        assert!(prom.contains("# HELP amaters_requests_total"));
1314        assert!(prom.contains("# TYPE amaters_requests_total counter"));
1315        assert!(prom.contains("# HELP amaters_active_connections"));
1316        assert!(prom.contains("# TYPE amaters_active_connections gauge"));
1317        assert!(prom.contains("# TYPE amaters_request_latency_seconds histogram"));
1318        assert!(prom.contains("# TYPE amaters_memtable_size_bytes gauge"));
1319        assert!(prom.contains("# TYPE amaters_compaction_count counter"));
1320        assert!(prom.contains("# TYPE amaters_block_cache_hits counter"));
1321    }
1322
1323    // -- Concurrent MetricsCollector test --
1324
1325    #[test]
1326    fn test_concurrent_metric_updates() {
1327        let collector = MetricsCollector::new();
1328        let threads: Vec<_> = (0..8)
1329            .map(|i| {
1330                let c = collector.clone();
1331                thread::spawn(move || {
1332                    for _ in 0..500 {
1333                        c.inc_requests();
1334                        if i % 2 == 0 {
1335                            c.inc_success();
1336                        } else {
1337                            c.inc_failed();
1338                        }
1339                        c.record_operation(OperationType::Get, Duration::from_micros(100), true);
1340                        c.inc_block_cache_hit();
1341                    }
1342                })
1343            })
1344            .collect();
1345
1346        for t in threads {
1347            t.join().expect("thread should not panic");
1348        }
1349
1350        let snap = collector.snapshot();
1351        assert_eq!(snap.requests_total, 4000);
1352        assert_eq!(snap.requests_success + snap.requests_failed, 4000);
1353        assert_eq!(snap.storage.block_cache_hits, 4000);
1354
1355        let get_snap = collector.operation_snapshot(OperationType::Get);
1356        assert_eq!(get_snap.count, 4000);
1357        assert_eq!(get_snap.latency.total_count, 4000);
1358    }
1359
1360    // -- format_f64 tests --
1361
1362    #[test]
1363    fn test_format_f64() {
1364        assert_eq!(format_f64(0.001), "0.001");
1365        assert_eq!(format_f64(1.0), "1.0");
1366        assert_eq!(format_f64(10.0), "10.0");
1367        assert_eq!(format_f64(0.025), "0.025");
1368        assert_eq!(format_f64(f64::INFINITY), "+Inf");
1369    }
1370
1371    // -- Snapshot in MetricsSnapshot --
1372
1373    #[test]
1374    fn test_snapshot_includes_all_fields() {
1375        let collector = MetricsCollector::new();
1376        collector.inc_requests();
1377        collector.observe_request_latency(Duration::from_millis(1));
1378        collector.record_operation(OperationType::Put, Duration::from_millis(2), true);
1379        collector.set_memtable_size(2048);
1380
1381        let snap = collector.snapshot();
1382        assert_eq!(snap.requests_total, 1);
1383        assert_eq!(snap.request_latency.total_count, 1);
1384        assert_eq!(snap.operations.len(), 6); // all 6 op types
1385        assert_eq!(snap.storage.memtable_size_bytes, 2048);
1386
1387        // Find the Put operation
1388        let put = snap
1389            .operations
1390            .iter()
1391            .find(|o| o.op_type == OperationType::Put)
1392            .expect("should have Put snapshot");
1393        assert_eq!(put.count, 1);
1394    }
1395}