1use parking_lot::RwLock;
36use std::collections::HashMap;
37use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
38use std::sync::Arc;
39
40#[derive(Debug, thiserror::Error)]
42pub enum MetricsError {
43 #[error("Metric '{0}' already exists")]
45 AlreadyExists(String),
46
47 #[error("Metric '{0}' not found")]
49 NotFound(String),
50
51 #[error("Invalid metric configuration: {0}")]
53 InvalidConfig(String),
54}
55
56#[derive(Clone)]
61pub struct MetricsRegistry {
62 counters: Arc<RwLock<HashMap<String, Counter>>>,
63 gauges: Arc<RwLock<HashMap<String, Gauge>>>,
64 histograms: Arc<RwLock<HashMap<String, Histogram>>>,
65}
66
67impl MetricsRegistry {
68 pub fn new() -> Self {
70 Self {
71 counters: Arc::new(RwLock::new(HashMap::new())),
72 gauges: Arc::new(RwLock::new(HashMap::new())),
73 histograms: Arc::new(RwLock::new(HashMap::new())),
74 }
75 }
76
77 pub fn register_counter(
83 &self,
84 name: impl Into<String>,
85 labels: Vec<(String, String)>,
86 ) -> Counter {
87 let name = name.into();
88 let mut counters = self.counters.write();
89
90 if counters.contains_key(&name) {
91 counters.get(&name).unwrap().clone()
93 } else {
94 let counter = Counter::new(name.clone(), labels);
95 counters.insert(name, counter.clone());
96 counter
97 }
98 }
99
100 pub fn register_gauge(
106 &self,
107 name: impl Into<String>,
108 labels: Vec<(String, String)>,
109 ) -> Gauge {
110 let name = name.into();
111 let mut gauges = self.gauges.write();
112
113 if gauges.contains_key(&name) {
114 gauges.get(&name).unwrap().clone()
116 } else {
117 let gauge = Gauge::new(name.clone(), labels);
118 gauges.insert(name, gauge.clone());
119 gauge
120 }
121 }
122
123 pub fn register_histogram(
136 &self,
137 name: impl Into<String>,
138 buckets: Vec<f64>,
139 labels: Vec<(String, String)>,
140 ) -> Histogram {
141 let name = name.into();
142 let mut histograms = self.histograms.write();
143
144 if histograms.contains_key(&name) {
145 histograms.get(&name).unwrap().clone()
147 } else {
148 let histogram = Histogram::new(name.clone(), buckets, labels);
149 histograms.insert(name, histogram.clone());
150 histogram
151 }
152 }
153
154 pub fn get_counter(&self, name: &str) -> Option<Counter> {
156 self.counters.read().get(name).cloned()
157 }
158
159 pub fn get_gauge(&self, name: &str) -> Option<Gauge> {
161 self.gauges.read().get(name).cloned()
162 }
163
164 pub fn get_histogram(&self, name: &str) -> Option<Histogram> {
166 self.histograms.read().get(name).cloned()
167 }
168
169 pub fn counter_names(&self) -> Vec<String> {
171 self.counters.read().keys().cloned().collect()
172 }
173
174 pub fn gauge_names(&self) -> Vec<String> {
176 self.gauges.read().keys().cloned().collect()
177 }
178
179 pub fn histogram_names(&self) -> Vec<String> {
181 self.histograms.read().keys().cloned().collect()
182 }
183}
184
185impl Default for MetricsRegistry {
186 fn default() -> Self {
187 Self::new()
188 }
189}
190
191#[derive(Clone)]
198pub struct Counter {
199 name: String,
200 value: Arc<AtomicU64>,
201 labels: Arc<HashMap<String, String>>,
202}
203
204impl Counter {
205 pub fn new(name: String, labels: Vec<(String, String)>) -> Self {
207 Self {
208 name,
209 value: Arc::new(AtomicU64::new(0)),
210 labels: Arc::new(labels.into_iter().collect()),
211 }
212 }
213
214 pub fn inc(&self) {
216 self.value.fetch_add(1, Ordering::Relaxed);
217 }
218
219 pub fn inc_by(&self, n: u64) {
221 self.value.fetch_add(n, Ordering::Relaxed);
222 }
223
224 pub fn get(&self) -> u64 {
226 self.value.load(Ordering::Relaxed)
227 }
228
229 pub fn name(&self) -> &str {
231 &self.name
232 }
233
234 pub fn labels(&self) -> &HashMap<String, String> {
236 &self.labels
237 }
238}
239
240#[derive(Clone)]
247pub struct Gauge {
248 name: String,
249 value: Arc<AtomicI64>,
250 labels: Arc<HashMap<String, String>>,
251}
252
253impl Gauge {
254 pub fn new(name: String, labels: Vec<(String, String)>) -> Self {
256 Self {
257 name,
258 value: Arc::new(AtomicI64::new(0)),
259 labels: Arc::new(labels.into_iter().collect()),
260 }
261 }
262
263 pub fn set(&self, value: i64) {
265 self.value.store(value, Ordering::Relaxed);
266 }
267
268 pub fn inc(&self) {
270 self.value.fetch_add(1, Ordering::Relaxed);
271 }
272
273 pub fn dec(&self) {
275 self.value.fetch_sub(1, Ordering::Relaxed);
276 }
277
278 pub fn add(&self, n: i64) {
280 self.value.fetch_add(n, Ordering::Relaxed);
281 }
282
283 pub fn sub(&self, n: i64) {
285 self.value.fetch_sub(n, Ordering::Relaxed);
286 }
287
288 pub fn get(&self) -> i64 {
290 self.value.load(Ordering::Relaxed)
291 }
292
293 pub fn name(&self) -> &str {
295 &self.name
296 }
297
298 pub fn labels(&self) -> &HashMap<String, String> {
300 &self.labels
301 }
302}
303
304#[derive(Clone)]
314pub struct Histogram {
315 name: String,
316 buckets: Arc<Vec<f64>>,
317 counts: Arc<Vec<AtomicU64>>,
318 sum: Arc<AtomicU64>,
319 count: Arc<AtomicU64>,
320 labels: Arc<HashMap<String, String>>,
321}
322
323impl Histogram {
324 pub fn new(name: String, buckets: Vec<f64>, labels: Vec<(String, String)>) -> Self {
330 assert!(!buckets.is_empty(), "Histogram buckets cannot be empty");
331
332 for i in 1..buckets.len() {
334 assert!(
335 buckets[i] > buckets[i - 1],
336 "Histogram buckets must be sorted in ascending order"
337 );
338 }
339
340 let bucket_count = buckets.len() + 1; let counts: Vec<AtomicU64> = (0..bucket_count).map(|_| AtomicU64::new(0)).collect();
342
343 Self {
344 name,
345 buckets: Arc::new(buckets),
346 counts: Arc::new(counts),
347 sum: Arc::new(AtomicU64::new(0)),
348 count: Arc::new(AtomicU64::new(0)),
349 labels: Arc::new(labels.into_iter().collect()),
350 }
351 }
352
353 pub fn observe(&self, value: f64) {
355 let bucket_index = self
357 .buckets
358 .iter()
359 .position(|&b| value <= b)
360 .unwrap_or(self.buckets.len());
361
362 self.counts[bucket_index].fetch_add(1, Ordering::Relaxed);
364
365 self.count.fetch_add(1, Ordering::Relaxed);
367
368 let value_as_u64 = (value * 1000000.0) as u64; self.sum.fetch_add(value_as_u64, Ordering::Relaxed);
373 }
374
375 pub fn get_count(&self) -> u64 {
377 self.count.load(Ordering::Relaxed)
378 }
379
380 pub fn get_sum(&self) -> f64 {
382 let sum_micros = self.sum.load(Ordering::Relaxed);
383 sum_micros as f64 / 1000000.0
384 }
385
386 pub fn get_buckets(&self) -> &[f64] {
388 &self.buckets
389 }
390
391 pub fn get_bucket_counts(&self) -> Vec<u64> {
393 self.counts
394 .iter()
395 .map(|c| c.load(Ordering::Relaxed))
396 .collect()
397 }
398
399 pub fn percentile(&self, percentile: f64) -> f64 {
409 assert!(
410 (0.0..=1.0).contains(&percentile),
411 "Percentile must be between 0.0 and 1.0"
412 );
413
414 let total_count = self.get_count();
415 if total_count == 0 {
416 return 0.0;
417 }
418
419 let target_count = (total_count as f64 * percentile).ceil() as u64;
420 let mut cumulative = 0u64;
421
422 for (i, count) in self.get_bucket_counts().iter().enumerate() {
423 cumulative += count;
424 if cumulative >= target_count {
425 return if i < self.buckets.len() {
427 self.buckets[i]
428 } else {
429 *self.buckets.last().unwrap()
431 };
432 }
433 }
434
435 *self.buckets.last().unwrap()
437 }
438
439 pub fn name(&self) -> &str {
441 &self.name
442 }
443
444 pub fn labels(&self) -> &HashMap<String, String> {
446 &self.labels
447 }
448}
449
450#[cfg(test)]
451mod tests {
452 use super::*;
453
454 #[test]
455 fn test_counter_basic() {
456 let counter = Counter::new("test_counter".to_string(), vec![]);
457 assert_eq!(counter.get(), 0);
458
459 counter.inc();
460 assert_eq!(counter.get(), 1);
461
462 counter.inc_by(5);
463 assert_eq!(counter.get(), 6);
464 }
465
466 #[test]
467 fn test_counter_thread_safety() {
468 let counter = Counter::new("test_counter".to_string(), vec![]);
469 let counter_clone = counter.clone();
470
471 let handle = std::thread::spawn(move || {
472 for _ in 0..1000 {
473 counter_clone.inc();
474 }
475 });
476
477 for _ in 0..1000 {
478 counter.inc();
479 }
480
481 handle.join().unwrap();
482 assert_eq!(counter.get(), 2000);
483 }
484
485 #[test]
486 fn test_gauge_basic() {
487 let gauge = Gauge::new("test_gauge".to_string(), vec![]);
488 assert_eq!(gauge.get(), 0);
489
490 gauge.set(10);
491 assert_eq!(gauge.get(), 10);
492
493 gauge.inc();
494 assert_eq!(gauge.get(), 11);
495
496 gauge.dec();
497 assert_eq!(gauge.get(), 10);
498
499 gauge.add(5);
500 assert_eq!(gauge.get(), 15);
501
502 gauge.sub(3);
503 assert_eq!(gauge.get(), 12);
504 }
505
506 #[test]
507 fn test_histogram_basic() {
508 let histogram = Histogram::new(
509 "test_histogram".to_string(),
510 vec![1.0, 5.0, 10.0, 50.0, 100.0],
511 vec![],
512 );
513
514 histogram.observe(0.5);
515 histogram.observe(3.0);
516 histogram.observe(7.0);
517 histogram.observe(25.0);
518 histogram.observe(75.0);
519 histogram.observe(150.0);
520
521 assert_eq!(histogram.get_count(), 6);
522
523 let counts = histogram.get_bucket_counts();
524 assert_eq!(counts[0], 1); assert_eq!(counts[1], 1); assert_eq!(counts[2], 1); assert_eq!(counts[3], 1); assert_eq!(counts[4], 1); assert_eq!(counts[5], 1); }
531
532 #[test]
533 fn test_histogram_percentiles() {
534 let histogram = Histogram::new(
535 "test_histogram".to_string(),
536 vec![10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0],
537 vec![],
538 );
539
540 for i in 1..=100 {
542 histogram.observe(i as f64);
543 }
544
545 assert_eq!(histogram.get_count(), 100);
546
547 let p50 = histogram.percentile(0.50);
549 let p95 = histogram.percentile(0.95);
550 let p99 = histogram.percentile(0.99);
551
552 assert!(p50 <= p95);
554 assert!(p95 <= p99);
555
556 assert!(p50 >= 40.0 && p50 <= 60.0);
558
559 assert!(p95 >= 90.0 && p95 <= 100.0);
561
562 assert!(p99 >= 90.0 && p99 <= 100.0);
564 }
565
566 #[test]
567 fn test_registry_counter() {
568 let registry = MetricsRegistry::new();
569
570 let counter1 = registry.register_counter("test_counter", vec![]);
571 counter1.inc();
572
573 let counter2 = registry.get_counter("test_counter").unwrap();
574 assert_eq!(counter2.get(), 1);
575
576 counter2.inc();
577 assert_eq!(counter1.get(), 2);
578 }
579
580 #[test]
581 fn test_registry_gauge() {
582 let registry = MetricsRegistry::new();
583
584 let gauge1 = registry.register_gauge("test_gauge", vec![]);
585 gauge1.set(42);
586
587 let gauge2 = registry.get_gauge("test_gauge").unwrap();
588 assert_eq!(gauge2.get(), 42);
589 }
590
591 #[test]
592 fn test_registry_histogram() {
593 let registry = MetricsRegistry::new();
594
595 let hist1 = registry.register_histogram("test_histogram", vec![1.0, 10.0, 100.0], vec![]);
596 hist1.observe(5.0);
597
598 let hist2 = registry.get_histogram("test_histogram").unwrap();
599 assert_eq!(hist2.get_count(), 1);
600 }
601
602 #[test]
603 fn test_registry_list_metrics() {
604 let registry = MetricsRegistry::new();
605
606 registry.register_counter("counter1", vec![]);
607 registry.register_counter("counter2", vec![]);
608 registry.register_gauge("gauge1", vec![]);
609 registry.register_histogram("histogram1", vec![1.0, 10.0], vec![]);
610
611 let counter_names = registry.counter_names();
612 assert_eq!(counter_names.len(), 2);
613 assert!(counter_names.contains(&"counter1".to_string()));
614 assert!(counter_names.contains(&"counter2".to_string()));
615
616 let gauge_names = registry.gauge_names();
617 assert_eq!(gauge_names.len(), 1);
618 assert!(gauge_names.contains(&"gauge1".to_string()));
619
620 let histogram_names = registry.histogram_names();
621 assert_eq!(histogram_names.len(), 1);
622 assert!(histogram_names.contains(&"histogram1".to_string()));
623 }
624
625 #[test]
626 fn test_counter_with_labels() {
627 let counter = Counter::new(
628 "test_counter".to_string(),
629 vec![
630 ("node_id".to_string(), "node-1".to_string()),
631 ("region".to_string(), "us-west".to_string()),
632 ],
633 );
634
635 assert_eq!(counter.labels().get("node_id").unwrap(), "node-1");
636 assert_eq!(counter.labels().get("region").unwrap(), "us-west");
637 }
638
639 #[test]
640 #[should_panic(expected = "Histogram buckets cannot be empty")]
641 fn test_histogram_empty_buckets() {
642 Histogram::new("test".to_string(), vec![], vec![]);
643 }
644
645 #[test]
646 #[should_panic(expected = "Histogram buckets must be sorted in ascending order")]
647 fn test_histogram_unsorted_buckets() {
648 Histogram::new("test".to_string(), vec![10.0, 5.0, 20.0], vec![]);
649 }
650
651 #[test]
652 fn test_node_metrics_creation() {
653 let registry = MetricsRegistry::new();
654 let metrics = NodeMetrics::new(®istry);
655
656 assert_eq!(metrics.active_connections.get(), 0);
658 assert_eq!(metrics.total_connections.get(), 0);
659 assert_eq!(metrics.failed_connections.get(), 0);
660 assert_eq!(metrics.messages_sent.get(), 0);
661 assert_eq!(metrics.messages_received.get(), 0);
662 assert_eq!(metrics.messages_dropped.get(), 0);
663 assert_eq!(metrics.memory_usage_bytes.get(), 0);
664 assert_eq!(metrics.cpu_usage_percent.get(), 0);
665 assert_eq!(metrics.time_drift_ms.get(), 0);
666 assert_eq!(metrics.state_divergence_count.get(), 0);
667 }
668
669 #[test]
670 fn test_node_metrics_connection_tracking() {
671 let registry = MetricsRegistry::new();
672 let metrics = NodeMetrics::new(®istry);
673
674 metrics.active_connections.inc();
676 metrics.total_connections.inc();
677 assert_eq!(metrics.active_connections.get(), 1);
678 assert_eq!(metrics.total_connections.get(), 1);
679
680 metrics.active_connections.inc();
682 metrics.total_connections.inc();
683 assert_eq!(metrics.active_connections.get(), 2);
684 assert_eq!(metrics.total_connections.get(), 2);
685
686 metrics.active_connections.dec();
688 assert_eq!(metrics.active_connections.get(), 1);
689 assert_eq!(metrics.total_connections.get(), 2); metrics.failed_connections.inc();
693 assert_eq!(metrics.failed_connections.get(), 1);
694 }
695
696 #[test]
697 fn test_node_metrics_message_tracking() {
698 let registry = MetricsRegistry::new();
699 let metrics = NodeMetrics::new(®istry);
700
701 metrics.messages_sent.inc_by(10);
703 assert_eq!(metrics.messages_sent.get(), 10);
704
705 metrics.messages_received.inc_by(8);
707 assert_eq!(metrics.messages_received.get(), 8);
708
709 metrics.messages_dropped.inc();
711 assert_eq!(metrics.messages_dropped.get(), 1);
712
713 metrics.message_size_bytes.observe(512.0);
715 metrics.message_size_bytes.observe(2048.0);
716 assert_eq!(metrics.message_size_bytes.get_count(), 2);
717 }
718
719 #[test]
720 fn test_node_metrics_latency_tracking() {
721 let registry = MetricsRegistry::new();
722 let metrics = NodeMetrics::new(®istry);
723
724 metrics.message_latency_ms.observe(5.0);
726 metrics.message_latency_ms.observe(15.0);
727 metrics.message_latency_ms.observe(50.0);
728 assert_eq!(metrics.message_latency_ms.get_count(), 3);
729
730 metrics.state_sync_latency_ms.observe(100.0);
732 metrics.state_sync_latency_ms.observe(500.0);
733 assert_eq!(metrics.state_sync_latency_ms.get_count(), 2);
734
735 let p50 = metrics.message_latency_ms.percentile(0.50);
737 let p95 = metrics.message_latency_ms.percentile(0.95);
738 assert!(p50 <= p95);
739 }
740
741 #[test]
742 fn test_node_metrics_resource_tracking() {
743 let registry = MetricsRegistry::new();
744 let metrics = NodeMetrics::new(®istry);
745
746 metrics.memory_usage_bytes.set(512 * 1024 * 1024);
748 assert_eq!(metrics.memory_usage_bytes.get(), 536870912);
749
750 metrics.cpu_usage_percent.set(45);
752 assert_eq!(metrics.cpu_usage_percent.get(), 45);
753
754 metrics.cpu_usage_percent.set(60);
756 assert_eq!(metrics.cpu_usage_percent.get(), 60);
757 }
758
759 #[test]
760 fn test_node_metrics_protocol_tracking() {
761 let registry = MetricsRegistry::new();
762 let metrics = NodeMetrics::new(®istry);
763
764 metrics.time_drift_ms.set(50);
766 assert_eq!(metrics.time_drift_ms.get(), 50);
767
768 metrics.time_drift_ms.set(-25);
769 assert_eq!(metrics.time_drift_ms.get(), -25);
770
771 metrics.state_divergence_count.set(0);
773 assert_eq!(metrics.state_divergence_count.get(), 0);
774
775 metrics.state_divergence_count.inc();
776 assert_eq!(metrics.state_divergence_count.get(), 1);
777 }
778
779 #[test]
780 fn test_node_metrics_accessor_methods() {
781 let registry = MetricsRegistry::new();
782 let metrics = NodeMetrics::new(®istry);
783
784 metrics.active_connections().inc();
786 assert_eq!(metrics.active_connections.get(), 1);
787
788 metrics.total_connections().inc();
789 assert_eq!(metrics.total_connections.get(), 1);
790
791 metrics.messages_sent().inc_by(5);
792 assert_eq!(metrics.messages_sent.get(), 5);
793
794 metrics.message_latency_ms().observe(42.0);
795 assert_eq!(metrics.message_latency_ms.get_count(), 1);
796 }
797
798 #[test]
799 fn test_node_metrics_registry_integration() {
800 let registry = MetricsRegistry::new();
801 let _metrics = NodeMetrics::new(®istry);
802
803 let counter_names = registry.counter_names();
805 assert!(counter_names.contains(&"elara_total_connections".to_string()));
806 assert!(counter_names.contains(&"elara_failed_connections".to_string()));
807 assert!(counter_names.contains(&"elara_messages_sent".to_string()));
808 assert!(counter_names.contains(&"elara_messages_received".to_string()));
809 assert!(counter_names.contains(&"elara_messages_dropped".to_string()));
810
811 let gauge_names = registry.gauge_names();
812 assert!(gauge_names.contains(&"elara_active_connections".to_string()));
813 assert!(gauge_names.contains(&"elara_memory_usage_bytes".to_string()));
814 assert!(gauge_names.contains(&"elara_cpu_usage_percent".to_string()));
815 assert!(gauge_names.contains(&"elara_time_drift_ms".to_string()));
816 assert!(gauge_names.contains(&"elara_state_divergence_count".to_string()));
817
818 let histogram_names = registry.histogram_names();
819 assert!(histogram_names.contains(&"elara_message_size_bytes".to_string()));
820 assert!(histogram_names.contains(&"elara_message_latency_ms".to_string()));
821 assert!(histogram_names.contains(&"elara_state_sync_latency_ms".to_string()));
822 }
823
824 #[test]
825 fn test_node_metrics_histogram_buckets() {
826 let registry = MetricsRegistry::new();
827 let metrics = NodeMetrics::new(®istry);
828
829 let latency_buckets = metrics.message_latency_ms.get_buckets();
831 assert_eq!(latency_buckets[0], 1.0);
832 assert_eq!(latency_buckets[latency_buckets.len() - 1], 5000.0);
833
834 let sync_buckets = metrics.state_sync_latency_ms.get_buckets();
836 assert_eq!(sync_buckets[0], 10.0);
837 assert_eq!(sync_buckets[sync_buckets.len() - 1], 30000.0);
838
839 let size_buckets = metrics.message_size_bytes.get_buckets();
841 assert_eq!(size_buckets[0], 64.0);
842 assert_eq!(size_buckets[size_buckets.len() - 1], 1048576.0);
843 }
844}
845
846#[derive(Clone)]
878pub struct NodeMetrics {
879 pub active_connections: Gauge,
882
883 pub total_connections: Counter,
885
886 pub failed_connections: Counter,
888
889 pub messages_sent: Counter,
892
893 pub messages_received: Counter,
895
896 pub messages_dropped: Counter,
898
899 pub message_size_bytes: Histogram,
901
902 pub message_latency_ms: Histogram,
905
906 pub state_sync_latency_ms: Histogram,
908
909 pub memory_usage_bytes: Gauge,
912
913 pub cpu_usage_percent: Gauge,
915
916 pub time_drift_ms: Gauge,
920
921 pub state_divergence_count: Gauge,
924
925 pub quarantine_buffer_size: Gauge,
928}
929
930impl NodeMetrics {
931 pub fn new(registry: &MetricsRegistry) -> Self {
962 let active_connections = registry.register_gauge("elara_active_connections", vec![]);
964 let total_connections = registry.register_counter("elara_total_connections", vec![]);
965 let failed_connections = registry.register_counter("elara_failed_connections", vec![]);
966
967 let messages_sent = registry.register_counter("elara_messages_sent", vec![]);
969 let messages_received = registry.register_counter("elara_messages_received", vec![]);
970 let messages_dropped = registry.register_counter("elara_messages_dropped", vec![]);
971
972 let message_size_bytes = registry.register_histogram(
974 "elara_message_size_bytes",
975 vec![
976 64.0, 256.0, 1024.0, 4096.0, 16384.0, 65536.0, 262144.0, 1048576.0, ],
985 vec![],
986 );
987
988 let message_latency_ms = registry.register_histogram(
991 "elara_message_latency_ms",
992 vec![
993 1.0, 5.0, 10.0, 25.0, 50.0, 100.0, 250.0, 500.0, 1000.0, 2500.0, 5000.0, ],
1005 vec![],
1006 );
1007
1008 let state_sync_latency_ms = registry.register_histogram(
1010 "elara_state_sync_latency_ms",
1011 vec![
1012 10.0, 50.0, 100.0, 250.0, 500.0, 1000.0, 2500.0, 5000.0, 10000.0, 30000.0, ],
1023 vec![],
1024 );
1025
1026 let memory_usage_bytes = registry.register_gauge("elara_memory_usage_bytes", vec![]);
1028 let cpu_usage_percent = registry.register_gauge("elara_cpu_usage_percent", vec![]);
1029
1030 let time_drift_ms = registry.register_gauge("elara_time_drift_ms", vec![]);
1032 let state_divergence_count =
1033 registry.register_gauge("elara_state_divergence_count", vec![]);
1034 let quarantine_buffer_size =
1035 registry.register_gauge("elara_quarantine_buffer_size", vec![]);
1036
1037 Self {
1038 active_connections,
1040 total_connections,
1041 failed_connections,
1042
1043 messages_sent,
1045 messages_received,
1046 messages_dropped,
1047 message_size_bytes,
1048
1049 message_latency_ms,
1051 state_sync_latency_ms,
1052
1053 memory_usage_bytes,
1055 cpu_usage_percent,
1056
1057 time_drift_ms,
1059 state_divergence_count,
1060 quarantine_buffer_size,
1061 }
1062 }
1063
1064 pub fn active_connections(&self) -> &Gauge {
1066 &self.active_connections
1067 }
1068
1069 pub fn total_connections(&self) -> &Counter {
1071 &self.total_connections
1072 }
1073
1074 pub fn failed_connections(&self) -> &Counter {
1076 &self.failed_connections
1077 }
1078
1079 pub fn messages_sent(&self) -> &Counter {
1081 &self.messages_sent
1082 }
1083
1084 pub fn messages_received(&self) -> &Counter {
1086 &self.messages_received
1087 }
1088
1089 pub fn messages_dropped(&self) -> &Counter {
1091 &self.messages_dropped
1092 }
1093
1094 pub fn message_size_bytes(&self) -> &Histogram {
1096 &self.message_size_bytes
1097 }
1098
1099 pub fn message_latency_ms(&self) -> &Histogram {
1101 &self.message_latency_ms
1102 }
1103
1104 pub fn state_sync_latency_ms(&self) -> &Histogram {
1106 &self.state_sync_latency_ms
1107 }
1108
1109 pub fn memory_usage_bytes(&self) -> &Gauge {
1111 &self.memory_usage_bytes
1112 }
1113
1114 pub fn cpu_usage_percent(&self) -> &Gauge {
1116 &self.cpu_usage_percent
1117 }
1118
1119 pub fn time_drift_ms(&self) -> &Gauge {
1121 &self.time_drift_ms
1122 }
1123
1124 pub fn state_divergence_count(&self) -> &Gauge {
1126 &self.state_divergence_count
1127 }
1128
1129 pub fn quarantine_buffer_size(&self) -> &Gauge {
1131 &self.quarantine_buffer_size
1132 }
1133}
1134
1135impl std::fmt::Debug for NodeMetrics {
1136 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1137 f.debug_struct("NodeMetrics")
1138 .field("active_connections", &self.active_connections.get())
1139 .field("total_connections", &self.total_connections.get())
1140 .field("failed_connections", &self.failed_connections.get())
1141 .field("messages_sent", &self.messages_sent.get())
1142 .field("messages_received", &self.messages_received.get())
1143 .field("messages_dropped", &self.messages_dropped.get())
1144 .field("message_size_bytes_count", &self.message_size_bytes.get_count())
1145 .field("message_latency_ms_count", &self.message_latency_ms.get_count())
1146 .field("state_sync_latency_ms_count", &self.state_sync_latency_ms.get_count())
1147 .field("memory_usage_bytes", &self.memory_usage_bytes.get())
1148 .field("cpu_usage_percent", &self.cpu_usage_percent.get())
1149 .field("time_drift_ms", &self.time_drift_ms.get())
1150 .field("state_divergence_count", &self.state_divergence_count.get())
1151 .field("quarantine_buffer_size", &self.quarantine_buffer_size.get())
1152 .finish()
1153 }
1154}
1155
1156impl MetricsRegistry {
1157 pub fn export_prometheus(&self) -> String {
1182 let mut output = String::new();
1183
1184 let counters = self.counters.read();
1186 for (name, counter) in counters.iter() {
1187 output.push_str(&format!("# HELP {} Counter metric\n", name));
1188 output.push_str(&format!("# TYPE {} counter\n", name));
1189
1190 if counter.labels().is_empty() {
1191 output.push_str(&format!("{} {}\n", name, counter.get()));
1192 } else {
1193 let labels = format_labels(counter.labels());
1194 output.push_str(&format!("{}{} {}\n", name, labels, counter.get()));
1195 }
1196 }
1197
1198 let gauges = self.gauges.read();
1200 for (name, gauge) in gauges.iter() {
1201 output.push_str(&format!("# HELP {} Gauge metric\n", name));
1202 output.push_str(&format!("# TYPE {} gauge\n", name));
1203
1204 if gauge.labels().is_empty() {
1205 output.push_str(&format!("{} {}\n", name, gauge.get()));
1206 } else {
1207 let labels = format_labels(gauge.labels());
1208 output.push_str(&format!("{}{} {}\n", name, labels, gauge.get()));
1209 }
1210 }
1211
1212 let histograms = self.histograms.read();
1214 for (name, histogram) in histograms.iter() {
1215 output.push_str(&format!("# HELP {} Histogram metric\n", name));
1216 output.push_str(&format!("# TYPE {} histogram\n", name));
1217
1218 let label_prefix = if histogram.labels().is_empty() {
1219 String::new()
1220 } else {
1221 format_labels(histogram.labels())
1222 };
1223
1224 let buckets = histogram.get_buckets();
1226 let counts = histogram.get_bucket_counts();
1227 let mut cumulative = 0u64;
1228
1229 for (i, &bucket) in buckets.iter().enumerate() {
1230 cumulative += counts[i];
1231 let bucket_label = if label_prefix.is_empty() {
1232 format!("{{le=\"{:.1}\"}}", bucket)
1233 } else {
1234 let trimmed = label_prefix.trim_end_matches('}');
1236 format!("{},le=\"{:.1}\"}}", trimmed, bucket)
1237 };
1238 output.push_str(&format!("{}_bucket{} {}\n", name, bucket_label, cumulative));
1239 }
1240
1241 cumulative += counts[buckets.len()];
1243 let inf_label = if label_prefix.is_empty() {
1244 "{le=\"+Inf\"}".to_string()
1245 } else {
1246 let trimmed = label_prefix.trim_end_matches('}');
1247 format!("{},le=\"+Inf\"}}", trimmed)
1248 };
1249 output.push_str(&format!("{}_bucket{} {}\n", name, inf_label, cumulative));
1250
1251 output.push_str(&format!("{}_sum{} {}\n", name, label_prefix, histogram.get_sum()));
1253 output.push_str(&format!("{}_count{} {}\n", name, label_prefix, histogram.get_count()));
1254 }
1255
1256 output
1257 }
1258}
1259
1260fn format_labels(labels: &HashMap<String, String>) -> String {
1264 if labels.is_empty() {
1265 return String::new();
1266 }
1267
1268 let mut label_pairs: Vec<String> = labels
1269 .iter()
1270 .map(|(k, v)| format!("{}=\"{}\"", k, escape_label_value(v)))
1271 .collect();
1272
1273 label_pairs.sort();
1275
1276 format!("{{{}}}", label_pairs.join(","))
1277}
1278
1279fn escape_label_value(value: &str) -> String {
1281 value
1282 .replace('\\', "\\\\")
1283 .replace('"', "\\\"")
1284 .replace('\n', "\\n")
1285}