1use async_trait::async_trait;
8use std::collections::HashMap;
9use std::sync::Arc;
10use tokio::sync::RwLock;
11
12#[async_trait]
17pub trait MetricsBackend: Send + Sync {
18 async fn increment_counter(&self, name: &str, value: u64);
20
21 async fn set_gauge(&self, name: &str, value: f64);
23
24 async fn record_histogram(&self, name: &str, value: f64);
26
27 async fn get_counter(&self, name: &str) -> Option<u64>;
29
30 async fn get_gauge(&self, name: &str) -> Option<f64>;
32
33 async fn get_histogram_stats(&self, name: &str) -> Option<HistogramStats>;
35
36 async fn reset(&self);
38
39 async fn snapshot(&self) -> MetricsSnapshot;
41}
42
43#[derive(Debug, Clone, PartialEq)]
45pub struct HistogramStats {
46 pub count: u64,
47 pub sum: f64,
48 pub min: f64,
49 pub max: f64,
50 pub mean: f64,
51 pub percentiles: HistogramPercentiles,
53}
54
55#[derive(Debug, Clone, PartialEq, Default)]
57pub struct HistogramPercentiles {
58 pub p50: f64,
59 pub p90: f64,
60 pub p95: f64,
61 pub p99: f64,
62}
63
64impl Default for HistogramStats {
65 fn default() -> Self {
66 Self::new()
67 }
68}
69
70impl HistogramStats {
71 pub fn new() -> Self {
72 Self {
73 count: 0,
74 sum: 0.0,
75 min: f64::MAX,
76 max: f64::MIN,
77 mean: 0.0,
78 percentiles: HistogramPercentiles::default(),
79 }
80 }
81}
82
83#[derive(Debug, Clone)]
85struct HistogramData {
86 values: Vec<f64>,
87 stats: HistogramStats,
88}
89
90impl HistogramData {
91 fn new() -> Self {
92 Self {
93 values: Vec::new(),
94 stats: HistogramStats::new(),
95 }
96 }
97
98 fn record(&mut self, value: f64) {
99 self.values.push(value);
100 self.stats.count += 1;
101 self.stats.sum += value;
102 self.stats.min = self.stats.min.min(value);
103 self.stats.max = self.stats.max.max(value);
104 self.stats.mean = self.stats.sum / self.stats.count as f64;
105 self.update_percentiles();
106 }
107
108 fn update_percentiles(&mut self) {
109 if self.values.is_empty() {
110 return;
111 }
112
113 let mut sorted = self.values.clone();
114 sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
115
116 let len = sorted.len();
117 self.stats.percentiles.p50 = Self::percentile(&sorted, len, 0.50);
118 self.stats.percentiles.p90 = Self::percentile(&sorted, len, 0.90);
119 self.stats.percentiles.p95 = Self::percentile(&sorted, len, 0.95);
120 self.stats.percentiles.p99 = Self::percentile(&sorted, len, 0.99);
121 }
122
123 fn percentile(sorted: &[f64], len: usize, p: f64) -> f64 {
124 if len == 0 {
125 return 0.0;
126 }
127 let idx = ((len as f64 * p) as usize).min(len - 1);
128 sorted[idx]
129 }
130
131 fn stats(&self) -> HistogramStats {
132 self.stats.clone()
133 }
134}
135
136#[derive(Debug, Clone, Default)]
138pub struct MetricsSnapshot {
139 pub counters: HashMap<String, u64>,
140 pub gauges: HashMap<String, f64>,
141 pub histograms: HashMap<String, HistogramStats>,
142}
143
144pub struct LocalMetrics {
149 counters: RwLock<HashMap<String, u64>>,
150 gauges: RwLock<HashMap<String, f64>>,
151 histograms: RwLock<HashMap<String, HistogramData>>,
152}
153
154impl LocalMetrics {
155 pub fn new() -> Self {
157 Self {
158 counters: RwLock::new(HashMap::new()),
159 gauges: RwLock::new(HashMap::new()),
160 histograms: RwLock::new(HashMap::new()),
161 }
162 }
163}
164
165impl Default for LocalMetrics {
166 fn default() -> Self {
167 Self::new()
168 }
169}
170
171#[async_trait]
172impl MetricsBackend for LocalMetrics {
173 async fn increment_counter(&self, name: &str, value: u64) {
174 let mut counters = self.counters.write().await;
175 *counters.entry(name.to_string()).or_insert(0) += value;
176 }
177
178 async fn set_gauge(&self, name: &str, value: f64) {
179 let mut gauges = self.gauges.write().await;
180 gauges.insert(name.to_string(), value);
181 }
182
183 async fn record_histogram(&self, name: &str, value: f64) {
184 let mut histograms = self.histograms.write().await;
185 histograms
186 .entry(name.to_string())
187 .or_insert_with(HistogramData::new)
188 .record(value);
189 }
190
191 async fn get_counter(&self, name: &str) -> Option<u64> {
192 let counters = self.counters.read().await;
193 counters.get(name).copied()
194 }
195
196 async fn get_gauge(&self, name: &str) -> Option<f64> {
197 let gauges = self.gauges.read().await;
198 gauges.get(name).copied()
199 }
200
201 async fn get_histogram_stats(&self, name: &str) -> Option<HistogramStats> {
202 let histograms = self.histograms.read().await;
203 histograms.get(name).map(|h| h.stats())
204 }
205
206 async fn reset(&self) {
207 let mut counters = self.counters.write().await;
208 let mut gauges = self.gauges.write().await;
209 let mut histograms = self.histograms.write().await;
210 counters.clear();
211 gauges.clear();
212 histograms.clear();
213 }
214
215 async fn snapshot(&self) -> MetricsSnapshot {
216 let counters = self.counters.read().await;
217 let gauges = self.gauges.read().await;
218 let histograms = self.histograms.read().await;
219
220 MetricsSnapshot {
221 counters: counters.clone(),
222 gauges: gauges.clone(),
223 histograms: histograms
224 .iter()
225 .map(|(k, v)| (k.clone(), v.stats()))
226 .collect(),
227 }
228 }
229}
230
231pub mod metric_names {
233 pub const COMMANDS_SUBMITTED: &str = "lane.commands.submitted";
235 pub const COMMANDS_COMPLETED: &str = "lane.commands.completed";
237 pub const COMMANDS_FAILED: &str = "lane.commands.failed";
239 pub const COMMANDS_TIMEOUT: &str = "lane.commands.timeout";
241 pub const COMMANDS_RETRIED: &str = "lane.commands.retried";
243 pub const COMMANDS_DEAD_LETTERED: &str = "lane.commands.dead_lettered";
245
246 pub const QUEUE_DEPTH: &str = "lane.queue.depth";
248 pub const QUEUE_ACTIVE: &str = "lane.queue.active";
250
251 pub const COMMAND_LATENCY: &str = "lane.command.latency_ms";
253 pub const COMMAND_WAIT_TIME: &str = "lane.command.wait_time_ms";
255}
256
257pub struct QueueMetrics {
260 backend: Arc<dyn MetricsBackend>,
261}
262
263impl QueueMetrics {
264 pub fn new(backend: Arc<dyn MetricsBackend>) -> Self {
266 Self { backend }
267 }
268
269 pub fn local() -> Self {
271 Self {
272 backend: Arc::new(LocalMetrics::new()),
273 }
274 }
275
276 pub fn backend(&self) -> &Arc<dyn MetricsBackend> {
278 &self.backend
279 }
280
281 pub async fn record_submit(&self, lane_id: &str) {
283 self.backend
284 .increment_counter(metric_names::COMMANDS_SUBMITTED, 1)
285 .await;
286 self.backend
287 .increment_counter(
288 &format!("{}.{}", metric_names::COMMANDS_SUBMITTED, lane_id),
289 1,
290 )
291 .await;
292 }
293
294 pub async fn record_complete(&self, lane_id: &str, latency_ms: f64) {
296 self.backend
297 .increment_counter(metric_names::COMMANDS_COMPLETED, 1)
298 .await;
299 self.backend
300 .increment_counter(
301 &format!("{}.{}", metric_names::COMMANDS_COMPLETED, lane_id),
302 1,
303 )
304 .await;
305 self.backend
306 .record_histogram(metric_names::COMMAND_LATENCY, latency_ms)
307 .await;
308 self.backend
309 .record_histogram(
310 &format!("{}.{}", metric_names::COMMAND_LATENCY, lane_id),
311 latency_ms,
312 )
313 .await;
314 }
315
316 pub async fn record_failure(&self, lane_id: &str) {
318 self.backend
319 .increment_counter(metric_names::COMMANDS_FAILED, 1)
320 .await;
321 self.backend
322 .increment_counter(&format!("{}.{}", metric_names::COMMANDS_FAILED, lane_id), 1)
323 .await;
324 }
325
326 pub async fn record_timeout(&self, lane_id: &str) {
328 self.backend
329 .increment_counter(metric_names::COMMANDS_TIMEOUT, 1)
330 .await;
331 self.backend
332 .increment_counter(
333 &format!("{}.{}", metric_names::COMMANDS_TIMEOUT, lane_id),
334 1,
335 )
336 .await;
337 }
338
339 pub async fn record_retry(&self, lane_id: &str) {
341 self.backend
342 .increment_counter(metric_names::COMMANDS_RETRIED, 1)
343 .await;
344 self.backend
345 .increment_counter(
346 &format!("{}.{}", metric_names::COMMANDS_RETRIED, lane_id),
347 1,
348 )
349 .await;
350 }
351
352 pub async fn record_dead_letter(&self, lane_id: &str) {
354 self.backend
355 .increment_counter(metric_names::COMMANDS_DEAD_LETTERED, 1)
356 .await;
357 self.backend
358 .increment_counter(
359 &format!("{}.{}", metric_names::COMMANDS_DEAD_LETTERED, lane_id),
360 1,
361 )
362 .await;
363 }
364
365 pub async fn set_queue_depth(&self, lane_id: &str, depth: usize) {
367 self.backend
368 .set_gauge(
369 &format!("{}.{}", metric_names::QUEUE_DEPTH, lane_id),
370 depth as f64,
371 )
372 .await;
373 }
374
375 pub async fn set_active_commands(&self, lane_id: &str, active: usize) {
377 self.backend
378 .set_gauge(
379 &format!("{}.{}", metric_names::QUEUE_ACTIVE, lane_id),
380 active as f64,
381 )
382 .await;
383 }
384
385 pub async fn record_wait_time(&self, lane_id: &str, wait_time_ms: f64) {
387 self.backend
388 .record_histogram(metric_names::COMMAND_WAIT_TIME, wait_time_ms)
389 .await;
390 self.backend
391 .record_histogram(
392 &format!("{}.{}", metric_names::COMMAND_WAIT_TIME, lane_id),
393 wait_time_ms,
394 )
395 .await;
396 }
397
398 pub async fn snapshot(&self) -> MetricsSnapshot {
400 self.backend.snapshot().await
401 }
402
403 pub async fn reset(&self) {
405 self.backend.reset().await;
406 }
407}
408
409impl Clone for QueueMetrics {
410 fn clone(&self) -> Self {
411 Self {
412 backend: Arc::clone(&self.backend),
413 }
414 }
415}
416
417#[cfg(test)]
418mod tests {
419 use super::*;
420
421 #[tokio::test]
422 async fn test_local_metrics_counter() {
423 let metrics = LocalMetrics::new();
424
425 assert_eq!(metrics.get_counter("test.counter").await, None);
426
427 metrics.increment_counter("test.counter", 1).await;
428 assert_eq!(metrics.get_counter("test.counter").await, Some(1));
429
430 metrics.increment_counter("test.counter", 5).await;
431 assert_eq!(metrics.get_counter("test.counter").await, Some(6));
432 }
433
434 #[tokio::test]
435 async fn test_local_metrics_gauge() {
436 let metrics = LocalMetrics::new();
437
438 assert_eq!(metrics.get_gauge("test.gauge").await, None);
439
440 metrics.set_gauge("test.gauge", 42.5).await;
441 assert_eq!(metrics.get_gauge("test.gauge").await, Some(42.5));
442
443 metrics.set_gauge("test.gauge", 100.0).await;
444 assert_eq!(metrics.get_gauge("test.gauge").await, Some(100.0));
445 }
446
447 #[tokio::test]
448 async fn test_local_metrics_histogram() {
449 let metrics = LocalMetrics::new();
450
451 assert!(metrics
452 .get_histogram_stats("test.histogram")
453 .await
454 .is_none());
455
456 metrics.record_histogram("test.histogram", 10.0).await;
457 metrics.record_histogram("test.histogram", 20.0).await;
458 metrics.record_histogram("test.histogram", 30.0).await;
459
460 let stats = metrics.get_histogram_stats("test.histogram").await.unwrap();
461 assert_eq!(stats.count, 3);
462 assert_eq!(stats.sum, 60.0);
463 assert_eq!(stats.min, 10.0);
464 assert_eq!(stats.max, 30.0);
465 assert_eq!(stats.mean, 20.0);
466 }
467
468 #[tokio::test]
469 async fn test_local_metrics_histogram_percentiles() {
470 let metrics = LocalMetrics::new();
471
472 for i in 1..=100 {
474 metrics.record_histogram("test.histogram", i as f64).await;
475 }
476
477 let stats = metrics.get_histogram_stats("test.histogram").await.unwrap();
478 assert_eq!(stats.count, 100);
479 assert_eq!(stats.min, 1.0);
480 assert_eq!(stats.max, 100.0);
481
482 assert!(stats.percentiles.p50 >= 49.0 && stats.percentiles.p50 <= 51.0);
484 assert!(stats.percentiles.p90 >= 89.0 && stats.percentiles.p90 <= 91.0);
485 assert!(stats.percentiles.p95 >= 94.0 && stats.percentiles.p95 <= 96.0);
486 assert!(stats.percentiles.p99 >= 98.0 && stats.percentiles.p99 <= 100.0);
487 }
488
489 #[tokio::test]
490 async fn test_local_metrics_reset() {
491 let metrics = LocalMetrics::new();
492
493 metrics.increment_counter("test.counter", 10).await;
494 metrics.set_gauge("test.gauge", 50.0).await;
495 metrics.record_histogram("test.histogram", 100.0).await;
496
497 metrics.reset().await;
498
499 assert_eq!(metrics.get_counter("test.counter").await, None);
500 assert_eq!(metrics.get_gauge("test.gauge").await, None);
501 assert!(metrics
502 .get_histogram_stats("test.histogram")
503 .await
504 .is_none());
505 }
506
507 #[tokio::test]
508 async fn test_local_metrics_snapshot() {
509 let metrics = LocalMetrics::new();
510
511 metrics.increment_counter("counter1", 5).await;
512 metrics.increment_counter("counter2", 10).await;
513 metrics.set_gauge("gauge1", 42.0).await;
514 metrics.record_histogram("histogram1", 100.0).await;
515
516 let snapshot = metrics.snapshot().await;
517
518 assert_eq!(snapshot.counters.get("counter1"), Some(&5));
519 assert_eq!(snapshot.counters.get("counter2"), Some(&10));
520 assert_eq!(snapshot.gauges.get("gauge1"), Some(&42.0));
521 assert!(snapshot.histograms.contains_key("histogram1"));
522 }
523
524 #[tokio::test]
525 async fn test_queue_metrics_record_submit() {
526 let metrics = QueueMetrics::local();
527
528 metrics.record_submit("query").await;
529 metrics.record_submit("query").await;
530 metrics.record_submit("system").await;
531
532 let snapshot = metrics.snapshot().await;
533 assert_eq!(
534 snapshot.counters.get(metric_names::COMMANDS_SUBMITTED),
535 Some(&3)
536 );
537 assert_eq!(
538 snapshot
539 .counters
540 .get(&format!("{}.query", metric_names::COMMANDS_SUBMITTED)),
541 Some(&2)
542 );
543 assert_eq!(
544 snapshot
545 .counters
546 .get(&format!("{}.system", metric_names::COMMANDS_SUBMITTED)),
547 Some(&1)
548 );
549 }
550
551 #[tokio::test]
552 async fn test_queue_metrics_record_complete() {
553 let metrics = QueueMetrics::local();
554
555 metrics.record_complete("query", 50.0).await;
556 metrics.record_complete("query", 100.0).await;
557
558 let snapshot = metrics.snapshot().await;
559 assert_eq!(
560 snapshot.counters.get(metric_names::COMMANDS_COMPLETED),
561 Some(&2)
562 );
563
564 let latency_stats = snapshot
565 .histograms
566 .get(metric_names::COMMAND_LATENCY)
567 .unwrap();
568 assert_eq!(latency_stats.count, 2);
569 assert_eq!(latency_stats.mean, 75.0);
570 }
571
572 #[tokio::test]
573 async fn test_queue_metrics_record_failure() {
574 let metrics = QueueMetrics::local();
575
576 metrics.record_failure("query").await;
577
578 let snapshot = metrics.snapshot().await;
579 assert_eq!(
580 snapshot.counters.get(metric_names::COMMANDS_FAILED),
581 Some(&1)
582 );
583 }
584
585 #[tokio::test]
586 async fn test_queue_metrics_record_timeout() {
587 let metrics = QueueMetrics::local();
588
589 metrics.record_timeout("query").await;
590
591 let snapshot = metrics.snapshot().await;
592 assert_eq!(
593 snapshot.counters.get(metric_names::COMMANDS_TIMEOUT),
594 Some(&1)
595 );
596 }
597
598 #[tokio::test]
599 async fn test_queue_metrics_record_retry() {
600 let metrics = QueueMetrics::local();
601
602 metrics.record_retry("query").await;
603 metrics.record_retry("query").await;
604
605 let snapshot = metrics.snapshot().await;
606 assert_eq!(
607 snapshot.counters.get(metric_names::COMMANDS_RETRIED),
608 Some(&2)
609 );
610 }
611
612 #[tokio::test]
613 async fn test_queue_metrics_record_dead_letter() {
614 let metrics = QueueMetrics::local();
615
616 metrics.record_dead_letter("query").await;
617
618 let snapshot = metrics.snapshot().await;
619 assert_eq!(
620 snapshot.counters.get(metric_names::COMMANDS_DEAD_LETTERED),
621 Some(&1)
622 );
623 }
624
625 #[tokio::test]
626 async fn test_queue_metrics_set_queue_depth() {
627 let metrics = QueueMetrics::local();
628
629 metrics.set_queue_depth("query", 10).await;
630 metrics.set_queue_depth("system", 5).await;
631
632 let snapshot = metrics.snapshot().await;
633 assert_eq!(
634 snapshot
635 .gauges
636 .get(&format!("{}.query", metric_names::QUEUE_DEPTH)),
637 Some(&10.0)
638 );
639 assert_eq!(
640 snapshot
641 .gauges
642 .get(&format!("{}.system", metric_names::QUEUE_DEPTH)),
643 Some(&5.0)
644 );
645 }
646
647 #[tokio::test]
648 async fn test_queue_metrics_set_active_commands() {
649 let metrics = QueueMetrics::local();
650
651 metrics.set_active_commands("query", 3).await;
652
653 let snapshot = metrics.snapshot().await;
654 assert_eq!(
655 snapshot
656 .gauges
657 .get(&format!("{}.query", metric_names::QUEUE_ACTIVE)),
658 Some(&3.0)
659 );
660 }
661
662 #[tokio::test]
663 async fn test_queue_metrics_record_wait_time() {
664 let metrics = QueueMetrics::local();
665
666 metrics.record_wait_time("query", 25.0).await;
667 metrics.record_wait_time("query", 75.0).await;
668
669 let snapshot = metrics.snapshot().await;
670 let wait_stats = snapshot
671 .histograms
672 .get(metric_names::COMMAND_WAIT_TIME)
673 .unwrap();
674 assert_eq!(wait_stats.count, 2);
675 assert_eq!(wait_stats.mean, 50.0);
676 }
677
678 #[tokio::test]
679 async fn test_queue_metrics_clone() {
680 let metrics = QueueMetrics::local();
681 metrics.record_submit("query").await;
682
683 let cloned = metrics.clone();
684 cloned.record_submit("query").await;
685
686 let snapshot = metrics.snapshot().await;
688 assert_eq!(
689 snapshot.counters.get(metric_names::COMMANDS_SUBMITTED),
690 Some(&2)
691 );
692 }
693
694 #[tokio::test]
695 async fn test_histogram_stats_default() {
696 let stats = HistogramStats::default();
697 assert_eq!(stats.count, 0);
698 assert_eq!(stats.sum, 0.0);
699 assert_eq!(stats.mean, 0.0);
700 }
701
702 #[test]
703 fn test_histogram_percentiles_default() {
704 let percentiles = HistogramPercentiles::default();
705 assert_eq!(percentiles.p50, 0.0);
706 assert_eq!(percentiles.p90, 0.0);
707 assert_eq!(percentiles.p95, 0.0);
708 assert_eq!(percentiles.p99, 0.0);
709 }
710
711 #[test]
712 fn test_metrics_snapshot_default() {
713 let snapshot = MetricsSnapshot::default();
714 assert!(snapshot.counters.is_empty());
715 assert!(snapshot.gauges.is_empty());
716 assert!(snapshot.histograms.is_empty());
717 }
718}