Skip to main content

a3s_lane/
metrics.rs

1//! Metrics collection and reporting for queue observability.
2//!
3//! This module provides a pluggable metrics system with a local in-memory
4//! implementation by default, but allows users to integrate external metrics
5//! systems like Prometheus or OpenTelemetry.
6
7use async_trait::async_trait;
8use std::collections::HashMap;
9use std::sync::Arc;
10use tokio::sync::RwLock;
11
12/// A pluggable metrics backend trait.
13///
14/// Implement this trait to integrate with external metrics systems like
15/// Prometheus, OpenTelemetry, or custom monitoring solutions.
16#[async_trait]
17pub trait MetricsBackend: Send + Sync {
18    /// Increment a counter metric by the given value
19    async fn increment_counter(&self, name: &str, value: u64);
20
21    /// Set a gauge metric to the given value
22    async fn set_gauge(&self, name: &str, value: f64);
23
24    /// Record a histogram observation (typically for latency measurements in milliseconds)
25    async fn record_histogram(&self, name: &str, value: f64);
26
27    /// Get current counter value (for testing/debugging)
28    async fn get_counter(&self, name: &str) -> Option<u64>;
29
30    /// Get current gauge value (for testing/debugging)
31    async fn get_gauge(&self, name: &str) -> Option<f64>;
32
33    /// Get histogram statistics (for testing/debugging)
34    async fn get_histogram_stats(&self, name: &str) -> Option<HistogramStats>;
35
36    /// Reset all metrics (useful for testing)
37    async fn reset(&self);
38
39    /// Export all metrics as a snapshot
40    async fn snapshot(&self) -> MetricsSnapshot;
41}
42
43/// Statistics for a histogram metric
44#[derive(Debug, Clone, PartialEq)]
45pub struct HistogramStats {
46    pub count: u64,
47    pub sum: f64,
48    pub min: f64,
49    pub max: f64,
50    pub mean: f64,
51    /// Percentile values (p50, p90, p95, p99)
52    pub percentiles: HistogramPercentiles,
53}
54
55/// Percentile values for histogram
56#[derive(Debug, Clone, PartialEq, Default)]
57pub struct HistogramPercentiles {
58    pub p50: f64,
59    pub p90: f64,
60    pub p95: f64,
61    pub p99: f64,
62}
63
64impl Default for HistogramStats {
65    fn default() -> Self {
66        Self::new()
67    }
68}
69
70impl HistogramStats {
71    pub fn new() -> Self {
72        Self {
73            count: 0,
74            sum: 0.0,
75            min: f64::MAX,
76            max: f64::MIN,
77            mean: 0.0,
78            percentiles: HistogramPercentiles::default(),
79        }
80    }
81}
82
83/// Internal histogram data structure that tracks values for percentile calculation
84#[derive(Debug, Clone)]
85struct HistogramData {
86    values: Vec<f64>,
87    stats: HistogramStats,
88}
89
90impl HistogramData {
91    fn new() -> Self {
92        Self {
93            values: Vec::new(),
94            stats: HistogramStats::new(),
95        }
96    }
97
98    fn record(&mut self, value: f64) {
99        self.values.push(value);
100        self.stats.count += 1;
101        self.stats.sum += value;
102        self.stats.min = self.stats.min.min(value);
103        self.stats.max = self.stats.max.max(value);
104        self.stats.mean = self.stats.sum / self.stats.count as f64;
105        self.update_percentiles();
106    }
107
108    fn update_percentiles(&mut self) {
109        if self.values.is_empty() {
110            return;
111        }
112
113        let mut sorted = self.values.clone();
114        sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
115
116        let len = sorted.len();
117        self.stats.percentiles.p50 = Self::percentile(&sorted, len, 0.50);
118        self.stats.percentiles.p90 = Self::percentile(&sorted, len, 0.90);
119        self.stats.percentiles.p95 = Self::percentile(&sorted, len, 0.95);
120        self.stats.percentiles.p99 = Self::percentile(&sorted, len, 0.99);
121    }
122
123    fn percentile(sorted: &[f64], len: usize, p: f64) -> f64 {
124        if len == 0 {
125            return 0.0;
126        }
127        let idx = ((len as f64 * p) as usize).min(len - 1);
128        sorted[idx]
129    }
130
131    fn stats(&self) -> HistogramStats {
132        self.stats.clone()
133    }
134}
135
136/// Snapshot of all metrics at a point in time
137#[derive(Debug, Clone, Default)]
138pub struct MetricsSnapshot {
139    pub counters: HashMap<String, u64>,
140    pub gauges: HashMap<String, f64>,
141    pub histograms: HashMap<String, HistogramStats>,
142}
143
144/// Local in-memory metrics implementation.
145///
146/// This is the default metrics backend that stores all metrics in memory.
147/// Suitable for development, testing, and single-instance deployments.
148pub struct LocalMetrics {
149    counters: RwLock<HashMap<String, u64>>,
150    gauges: RwLock<HashMap<String, f64>>,
151    histograms: RwLock<HashMap<String, HistogramData>>,
152}
153
154impl LocalMetrics {
155    /// Create a new local metrics instance
156    pub fn new() -> Self {
157        Self {
158            counters: RwLock::new(HashMap::new()),
159            gauges: RwLock::new(HashMap::new()),
160            histograms: RwLock::new(HashMap::new()),
161        }
162    }
163}
164
165impl Default for LocalMetrics {
166    fn default() -> Self {
167        Self::new()
168    }
169}
170
171#[async_trait]
172impl MetricsBackend for LocalMetrics {
173    async fn increment_counter(&self, name: &str, value: u64) {
174        let mut counters = self.counters.write().await;
175        *counters.entry(name.to_string()).or_insert(0) += value;
176    }
177
178    async fn set_gauge(&self, name: &str, value: f64) {
179        let mut gauges = self.gauges.write().await;
180        gauges.insert(name.to_string(), value);
181    }
182
183    async fn record_histogram(&self, name: &str, value: f64) {
184        let mut histograms = self.histograms.write().await;
185        histograms
186            .entry(name.to_string())
187            .or_insert_with(HistogramData::new)
188            .record(value);
189    }
190
191    async fn get_counter(&self, name: &str) -> Option<u64> {
192        let counters = self.counters.read().await;
193        counters.get(name).copied()
194    }
195
196    async fn get_gauge(&self, name: &str) -> Option<f64> {
197        let gauges = self.gauges.read().await;
198        gauges.get(name).copied()
199    }
200
201    async fn get_histogram_stats(&self, name: &str) -> Option<HistogramStats> {
202        let histograms = self.histograms.read().await;
203        histograms.get(name).map(|h| h.stats())
204    }
205
206    async fn reset(&self) {
207        let mut counters = self.counters.write().await;
208        let mut gauges = self.gauges.write().await;
209        let mut histograms = self.histograms.write().await;
210        counters.clear();
211        gauges.clear();
212        histograms.clear();
213    }
214
215    async fn snapshot(&self) -> MetricsSnapshot {
216        let counters = self.counters.read().await;
217        let gauges = self.gauges.read().await;
218        let histograms = self.histograms.read().await;
219
220        MetricsSnapshot {
221            counters: counters.clone(),
222            gauges: gauges.clone(),
223            histograms: histograms
224                .iter()
225                .map(|(k, v)| (k.clone(), v.stats()))
226                .collect(),
227        }
228    }
229}
230
231/// Predefined metric names for queue observability
232pub mod metric_names {
233    /// Counter: Total commands submitted
234    pub const COMMANDS_SUBMITTED: &str = "lane.commands.submitted";
235    /// Counter: Total commands completed successfully
236    pub const COMMANDS_COMPLETED: &str = "lane.commands.completed";
237    /// Counter: Total commands failed
238    pub const COMMANDS_FAILED: &str = "lane.commands.failed";
239    /// Counter: Total commands timed out
240    pub const COMMANDS_TIMEOUT: &str = "lane.commands.timeout";
241    /// Counter: Total commands retried
242    pub const COMMANDS_RETRIED: &str = "lane.commands.retried";
243    /// Counter: Total commands sent to DLQ
244    pub const COMMANDS_DEAD_LETTERED: &str = "lane.commands.dead_lettered";
245
246    /// Gauge: Current queue depth (pending commands)
247    pub const QUEUE_DEPTH: &str = "lane.queue.depth";
248    /// Gauge: Current active commands
249    pub const QUEUE_ACTIVE: &str = "lane.queue.active";
250
251    /// Histogram: Command execution latency (ms)
252    pub const COMMAND_LATENCY: &str = "lane.command.latency_ms";
253    /// Histogram: Command wait time in queue (ms)
254    pub const COMMAND_WAIT_TIME: &str = "lane.command.wait_time_ms";
255}
256
257/// Queue metrics collector that wraps a metrics backend
258/// and provides convenient methods for queue-specific metrics.
259pub struct QueueMetrics {
260    backend: Arc<dyn MetricsBackend>,
261}
262
263impl QueueMetrics {
264    /// Create a new queue metrics collector with the given backend
265    pub fn new(backend: Arc<dyn MetricsBackend>) -> Self {
266        Self { backend }
267    }
268
269    /// Create a new queue metrics collector with local in-memory backend
270    pub fn local() -> Self {
271        Self {
272            backend: Arc::new(LocalMetrics::new()),
273        }
274    }
275
276    /// Get the underlying metrics backend
277    pub fn backend(&self) -> &Arc<dyn MetricsBackend> {
278        &self.backend
279    }
280
281    /// Record a command submission
282    pub async fn record_submit(&self, lane_id: &str) {
283        self.backend
284            .increment_counter(metric_names::COMMANDS_SUBMITTED, 1)
285            .await;
286        self.backend
287            .increment_counter(
288                &format!("{}.{}", metric_names::COMMANDS_SUBMITTED, lane_id),
289                1,
290            )
291            .await;
292    }
293
294    /// Record a command completion
295    pub async fn record_complete(&self, lane_id: &str, latency_ms: f64) {
296        self.backend
297            .increment_counter(metric_names::COMMANDS_COMPLETED, 1)
298            .await;
299        self.backend
300            .increment_counter(
301                &format!("{}.{}", metric_names::COMMANDS_COMPLETED, lane_id),
302                1,
303            )
304            .await;
305        self.backend
306            .record_histogram(metric_names::COMMAND_LATENCY, latency_ms)
307            .await;
308        self.backend
309            .record_histogram(
310                &format!("{}.{}", metric_names::COMMAND_LATENCY, lane_id),
311                latency_ms,
312            )
313            .await;
314    }
315
316    /// Record a command failure
317    pub async fn record_failure(&self, lane_id: &str) {
318        self.backend
319            .increment_counter(metric_names::COMMANDS_FAILED, 1)
320            .await;
321        self.backend
322            .increment_counter(&format!("{}.{}", metric_names::COMMANDS_FAILED, lane_id), 1)
323            .await;
324    }
325
326    /// Record a command timeout
327    pub async fn record_timeout(&self, lane_id: &str) {
328        self.backend
329            .increment_counter(metric_names::COMMANDS_TIMEOUT, 1)
330            .await;
331        self.backend
332            .increment_counter(
333                &format!("{}.{}", metric_names::COMMANDS_TIMEOUT, lane_id),
334                1,
335            )
336            .await;
337    }
338
339    /// Record a command retry
340    pub async fn record_retry(&self, lane_id: &str) {
341        self.backend
342            .increment_counter(metric_names::COMMANDS_RETRIED, 1)
343            .await;
344        self.backend
345            .increment_counter(
346                &format!("{}.{}", metric_names::COMMANDS_RETRIED, lane_id),
347                1,
348            )
349            .await;
350    }
351
352    /// Record a command sent to dead letter queue
353    pub async fn record_dead_letter(&self, lane_id: &str) {
354        self.backend
355            .increment_counter(metric_names::COMMANDS_DEAD_LETTERED, 1)
356            .await;
357        self.backend
358            .increment_counter(
359                &format!("{}.{}", metric_names::COMMANDS_DEAD_LETTERED, lane_id),
360                1,
361            )
362            .await;
363    }
364
365    /// Update queue depth gauge
366    pub async fn set_queue_depth(&self, lane_id: &str, depth: usize) {
367        self.backend
368            .set_gauge(
369                &format!("{}.{}", metric_names::QUEUE_DEPTH, lane_id),
370                depth as f64,
371            )
372            .await;
373    }
374
375    /// Update active commands gauge
376    pub async fn set_active_commands(&self, lane_id: &str, active: usize) {
377        self.backend
378            .set_gauge(
379                &format!("{}.{}", metric_names::QUEUE_ACTIVE, lane_id),
380                active as f64,
381            )
382            .await;
383    }
384
385    /// Record command wait time in queue
386    pub async fn record_wait_time(&self, lane_id: &str, wait_time_ms: f64) {
387        self.backend
388            .record_histogram(metric_names::COMMAND_WAIT_TIME, wait_time_ms)
389            .await;
390        self.backend
391            .record_histogram(
392                &format!("{}.{}", metric_names::COMMAND_WAIT_TIME, lane_id),
393                wait_time_ms,
394            )
395            .await;
396    }
397
398    /// Get a snapshot of all metrics
399    pub async fn snapshot(&self) -> MetricsSnapshot {
400        self.backend.snapshot().await
401    }
402
403    /// Reset all metrics
404    pub async fn reset(&self) {
405        self.backend.reset().await;
406    }
407}
408
409impl Clone for QueueMetrics {
410    fn clone(&self) -> Self {
411        Self {
412            backend: Arc::clone(&self.backend),
413        }
414    }
415}
416
417#[cfg(test)]
418mod tests {
419    use super::*;
420
421    #[tokio::test]
422    async fn test_local_metrics_counter() {
423        let metrics = LocalMetrics::new();
424
425        assert_eq!(metrics.get_counter("test.counter").await, None);
426
427        metrics.increment_counter("test.counter", 1).await;
428        assert_eq!(metrics.get_counter("test.counter").await, Some(1));
429
430        metrics.increment_counter("test.counter", 5).await;
431        assert_eq!(metrics.get_counter("test.counter").await, Some(6));
432    }
433
434    #[tokio::test]
435    async fn test_local_metrics_gauge() {
436        let metrics = LocalMetrics::new();
437
438        assert_eq!(metrics.get_gauge("test.gauge").await, None);
439
440        metrics.set_gauge("test.gauge", 42.5).await;
441        assert_eq!(metrics.get_gauge("test.gauge").await, Some(42.5));
442
443        metrics.set_gauge("test.gauge", 100.0).await;
444        assert_eq!(metrics.get_gauge("test.gauge").await, Some(100.0));
445    }
446
447    #[tokio::test]
448    async fn test_local_metrics_histogram() {
449        let metrics = LocalMetrics::new();
450
451        assert!(metrics
452            .get_histogram_stats("test.histogram")
453            .await
454            .is_none());
455
456        metrics.record_histogram("test.histogram", 10.0).await;
457        metrics.record_histogram("test.histogram", 20.0).await;
458        metrics.record_histogram("test.histogram", 30.0).await;
459
460        let stats = metrics.get_histogram_stats("test.histogram").await.unwrap();
461        assert_eq!(stats.count, 3);
462        assert_eq!(stats.sum, 60.0);
463        assert_eq!(stats.min, 10.0);
464        assert_eq!(stats.max, 30.0);
465        assert_eq!(stats.mean, 20.0);
466    }
467
468    #[tokio::test]
469    async fn test_local_metrics_histogram_percentiles() {
470        let metrics = LocalMetrics::new();
471
472        // Record 100 values from 1 to 100
473        for i in 1..=100 {
474            metrics.record_histogram("test.histogram", i as f64).await;
475        }
476
477        let stats = metrics.get_histogram_stats("test.histogram").await.unwrap();
478        assert_eq!(stats.count, 100);
479        assert_eq!(stats.min, 1.0);
480        assert_eq!(stats.max, 100.0);
481
482        // Check percentiles (approximate due to discrete values)
483        assert!(stats.percentiles.p50 >= 49.0 && stats.percentiles.p50 <= 51.0);
484        assert!(stats.percentiles.p90 >= 89.0 && stats.percentiles.p90 <= 91.0);
485        assert!(stats.percentiles.p95 >= 94.0 && stats.percentiles.p95 <= 96.0);
486        assert!(stats.percentiles.p99 >= 98.0 && stats.percentiles.p99 <= 100.0);
487    }
488
489    #[tokio::test]
490    async fn test_local_metrics_reset() {
491        let metrics = LocalMetrics::new();
492
493        metrics.increment_counter("test.counter", 10).await;
494        metrics.set_gauge("test.gauge", 50.0).await;
495        metrics.record_histogram("test.histogram", 100.0).await;
496
497        metrics.reset().await;
498
499        assert_eq!(metrics.get_counter("test.counter").await, None);
500        assert_eq!(metrics.get_gauge("test.gauge").await, None);
501        assert!(metrics
502            .get_histogram_stats("test.histogram")
503            .await
504            .is_none());
505    }
506
507    #[tokio::test]
508    async fn test_local_metrics_snapshot() {
509        let metrics = LocalMetrics::new();
510
511        metrics.increment_counter("counter1", 5).await;
512        metrics.increment_counter("counter2", 10).await;
513        metrics.set_gauge("gauge1", 42.0).await;
514        metrics.record_histogram("histogram1", 100.0).await;
515
516        let snapshot = metrics.snapshot().await;
517
518        assert_eq!(snapshot.counters.get("counter1"), Some(&5));
519        assert_eq!(snapshot.counters.get("counter2"), Some(&10));
520        assert_eq!(snapshot.gauges.get("gauge1"), Some(&42.0));
521        assert!(snapshot.histograms.contains_key("histogram1"));
522    }
523
524    #[tokio::test]
525    async fn test_queue_metrics_record_submit() {
526        let metrics = QueueMetrics::local();
527
528        metrics.record_submit("query").await;
529        metrics.record_submit("query").await;
530        metrics.record_submit("system").await;
531
532        let snapshot = metrics.snapshot().await;
533        assert_eq!(
534            snapshot.counters.get(metric_names::COMMANDS_SUBMITTED),
535            Some(&3)
536        );
537        assert_eq!(
538            snapshot
539                .counters
540                .get(&format!("{}.query", metric_names::COMMANDS_SUBMITTED)),
541            Some(&2)
542        );
543        assert_eq!(
544            snapshot
545                .counters
546                .get(&format!("{}.system", metric_names::COMMANDS_SUBMITTED)),
547            Some(&1)
548        );
549    }
550
551    #[tokio::test]
552    async fn test_queue_metrics_record_complete() {
553        let metrics = QueueMetrics::local();
554
555        metrics.record_complete("query", 50.0).await;
556        metrics.record_complete("query", 100.0).await;
557
558        let snapshot = metrics.snapshot().await;
559        assert_eq!(
560            snapshot.counters.get(metric_names::COMMANDS_COMPLETED),
561            Some(&2)
562        );
563
564        let latency_stats = snapshot
565            .histograms
566            .get(metric_names::COMMAND_LATENCY)
567            .unwrap();
568        assert_eq!(latency_stats.count, 2);
569        assert_eq!(latency_stats.mean, 75.0);
570    }
571
572    #[tokio::test]
573    async fn test_queue_metrics_record_failure() {
574        let metrics = QueueMetrics::local();
575
576        metrics.record_failure("query").await;
577
578        let snapshot = metrics.snapshot().await;
579        assert_eq!(
580            snapshot.counters.get(metric_names::COMMANDS_FAILED),
581            Some(&1)
582        );
583    }
584
585    #[tokio::test]
586    async fn test_queue_metrics_record_timeout() {
587        let metrics = QueueMetrics::local();
588
589        metrics.record_timeout("query").await;
590
591        let snapshot = metrics.snapshot().await;
592        assert_eq!(
593            snapshot.counters.get(metric_names::COMMANDS_TIMEOUT),
594            Some(&1)
595        );
596    }
597
598    #[tokio::test]
599    async fn test_queue_metrics_record_retry() {
600        let metrics = QueueMetrics::local();
601
602        metrics.record_retry("query").await;
603        metrics.record_retry("query").await;
604
605        let snapshot = metrics.snapshot().await;
606        assert_eq!(
607            snapshot.counters.get(metric_names::COMMANDS_RETRIED),
608            Some(&2)
609        );
610    }
611
612    #[tokio::test]
613    async fn test_queue_metrics_record_dead_letter() {
614        let metrics = QueueMetrics::local();
615
616        metrics.record_dead_letter("query").await;
617
618        let snapshot = metrics.snapshot().await;
619        assert_eq!(
620            snapshot.counters.get(metric_names::COMMANDS_DEAD_LETTERED),
621            Some(&1)
622        );
623    }
624
625    #[tokio::test]
626    async fn test_queue_metrics_set_queue_depth() {
627        let metrics = QueueMetrics::local();
628
629        metrics.set_queue_depth("query", 10).await;
630        metrics.set_queue_depth("system", 5).await;
631
632        let snapshot = metrics.snapshot().await;
633        assert_eq!(
634            snapshot
635                .gauges
636                .get(&format!("{}.query", metric_names::QUEUE_DEPTH)),
637            Some(&10.0)
638        );
639        assert_eq!(
640            snapshot
641                .gauges
642                .get(&format!("{}.system", metric_names::QUEUE_DEPTH)),
643            Some(&5.0)
644        );
645    }
646
647    #[tokio::test]
648    async fn test_queue_metrics_set_active_commands() {
649        let metrics = QueueMetrics::local();
650
651        metrics.set_active_commands("query", 3).await;
652
653        let snapshot = metrics.snapshot().await;
654        assert_eq!(
655            snapshot
656                .gauges
657                .get(&format!("{}.query", metric_names::QUEUE_ACTIVE)),
658            Some(&3.0)
659        );
660    }
661
662    #[tokio::test]
663    async fn test_queue_metrics_record_wait_time() {
664        let metrics = QueueMetrics::local();
665
666        metrics.record_wait_time("query", 25.0).await;
667        metrics.record_wait_time("query", 75.0).await;
668
669        let snapshot = metrics.snapshot().await;
670        let wait_stats = snapshot
671            .histograms
672            .get(metric_names::COMMAND_WAIT_TIME)
673            .unwrap();
674        assert_eq!(wait_stats.count, 2);
675        assert_eq!(wait_stats.mean, 50.0);
676    }
677
678    #[tokio::test]
679    async fn test_queue_metrics_clone() {
680        let metrics = QueueMetrics::local();
681        metrics.record_submit("query").await;
682
683        let cloned = metrics.clone();
684        cloned.record_submit("query").await;
685
686        // Both should share the same backend
687        let snapshot = metrics.snapshot().await;
688        assert_eq!(
689            snapshot.counters.get(metric_names::COMMANDS_SUBMITTED),
690            Some(&2)
691        );
692    }
693
694    #[tokio::test]
695    async fn test_histogram_stats_default() {
696        let stats = HistogramStats::default();
697        assert_eq!(stats.count, 0);
698        assert_eq!(stats.sum, 0.0);
699        assert_eq!(stats.mean, 0.0);
700    }
701
702    #[test]
703    fn test_histogram_percentiles_default() {
704        let percentiles = HistogramPercentiles::default();
705        assert_eq!(percentiles.p50, 0.0);
706        assert_eq!(percentiles.p90, 0.0);
707        assert_eq!(percentiles.p95, 0.0);
708        assert_eq!(percentiles.p99, 0.0);
709    }
710
711    #[test]
712    fn test_metrics_snapshot_default() {
713        let snapshot = MetricsSnapshot::default();
714        assert!(snapshot.counters.is_empty());
715        assert!(snapshot.gauges.is_empty());
716        assert!(snapshot.histograms.is_empty());
717    }
718}