Skip to main content

elara_runtime/observability/
metrics.rs

1//! Metrics collection system for ELARA runtime.
2//!
3//! This module provides a thread-safe metrics registry with support for:
4//! - **Counters**: Monotonically increasing values
5//! - **Gauges**: Values that can increase or decrease
6//! - **Histograms**: Distribution of observations with percentile calculations
7//!
8//! # Example
9//!
10//! ```rust
11//! use elara_runtime::observability::metrics::{MetricsRegistry, Counter, Gauge, Histogram};
12//!
13//! let registry = MetricsRegistry::new();
14//!
15//! // Register and use a counter
16//! let counter = registry.register_counter("messages_sent", vec![]);
17//! counter.inc();
18//! counter.inc_by(5);
19//!
20//! // Register and use a gauge
21//! let gauge = registry.register_gauge("active_connections", vec![]);
22//! gauge.set(10);
23//! gauge.inc();
24//! gauge.dec();
25//!
26//! // Register and use a histogram
27//! let histogram = registry.register_histogram(
28//!     "message_latency_ms",
29//!     vec![1.0, 5.0, 10.0, 50.0, 100.0, 500.0, 1000.0],
30//!     vec![],
31//! );
32//! histogram.observe(42.5);
33//! ```
34
35use parking_lot::RwLock;
36use std::collections::HashMap;
37use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
38use std::sync::Arc;
39
40/// Errors that can occur during metrics operations.
41#[derive(Debug, thiserror::Error)]
42pub enum MetricsError {
43    /// Metric with the given name already exists.
44    #[error("Metric '{0}' already exists")]
45    AlreadyExists(String),
46
47    /// Metric with the given name was not found.
48    #[error("Metric '{0}' not found")]
49    NotFound(String),
50
51    /// Invalid metric configuration.
52    #[error("Invalid metric configuration: {0}")]
53    InvalidConfig(String),
54}
55
56/// Thread-safe registry for all metrics.
57///
58/// The registry maintains separate storage for counters, gauges, and histograms,
59/// allowing concurrent access from multiple threads.
60#[derive(Clone)]
61pub struct MetricsRegistry {
62    counters: Arc<RwLock<HashMap<String, Counter>>>,
63    gauges: Arc<RwLock<HashMap<String, Gauge>>>,
64    histograms: Arc<RwLock<HashMap<String, Histogram>>>,
65}
66
67impl MetricsRegistry {
68    /// Creates a new empty metrics registry.
69    pub fn new() -> Self {
70        Self {
71            counters: Arc::new(RwLock::new(HashMap::new())),
72            gauges: Arc::new(RwLock::new(HashMap::new())),
73            histograms: Arc::new(RwLock::new(HashMap::new())),
74        }
75    }
76
77    /// Registers a new counter with the given name and labels.
78    ///
79    /// # Errors
80    ///
81    /// Returns `MetricsError::AlreadyExists` if a counter with this name already exists.
82    pub fn register_counter(
83        &self,
84        name: impl Into<String>,
85        labels: Vec<(String, String)>,
86    ) -> Counter {
87        let name = name.into();
88        let mut counters = self.counters.write();
89
90        if counters.contains_key(&name) {
91            // Return existing counter
92            counters.get(&name).unwrap().clone()
93        } else {
94            let counter = Counter::new(name.clone(), labels);
95            counters.insert(name, counter.clone());
96            counter
97        }
98    }
99
100    /// Registers a new gauge with the given name and labels.
101    ///
102    /// # Errors
103    ///
104    /// Returns `MetricsError::AlreadyExists` if a gauge with this name already exists.
105    pub fn register_gauge(
106        &self,
107        name: impl Into<String>,
108        labels: Vec<(String, String)>,
109    ) -> Gauge {
110        let name = name.into();
111        let mut gauges = self.gauges.write();
112
113        if gauges.contains_key(&name) {
114            // Return existing gauge
115            gauges.get(&name).unwrap().clone()
116        } else {
117            let gauge = Gauge::new(name.clone(), labels);
118            gauges.insert(name, gauge.clone());
119            gauge
120        }
121    }
122
123    /// Registers a new histogram with the given name, buckets, and labels.
124    ///
125    /// # Arguments
126    ///
127    /// * `name` - The metric name
128    /// * `buckets` - Bucket boundaries for the histogram (must be sorted in ascending order)
129    /// * `labels` - Key-value pairs for metric labels
130    ///
131    /// # Errors
132    ///
133    /// Returns `MetricsError::AlreadyExists` if a histogram with this name already exists.
134    /// Returns `MetricsError::InvalidConfig` if buckets are not sorted or empty.
135    pub fn register_histogram(
136        &self,
137        name: impl Into<String>,
138        buckets: Vec<f64>,
139        labels: Vec<(String, String)>,
140    ) -> Histogram {
141        let name = name.into();
142        let mut histograms = self.histograms.write();
143
144        if histograms.contains_key(&name) {
145            // Return existing histogram
146            histograms.get(&name).unwrap().clone()
147        } else {
148            let histogram = Histogram::new(name.clone(), buckets, labels);
149            histograms.insert(name, histogram.clone());
150            histogram
151        }
152    }
153
154    /// Gets a counter by name.
155    pub fn get_counter(&self, name: &str) -> Option<Counter> {
156        self.counters.read().get(name).cloned()
157    }
158
159    /// Gets a gauge by name.
160    pub fn get_gauge(&self, name: &str) -> Option<Gauge> {
161        self.gauges.read().get(name).cloned()
162    }
163
164    /// Gets a histogram by name.
165    pub fn get_histogram(&self, name: &str) -> Option<Histogram> {
166        self.histograms.read().get(name).cloned()
167    }
168
169    /// Returns all registered counter names.
170    pub fn counter_names(&self) -> Vec<String> {
171        self.counters.read().keys().cloned().collect()
172    }
173
174    /// Returns all registered gauge names.
175    pub fn gauge_names(&self) -> Vec<String> {
176        self.gauges.read().keys().cloned().collect()
177    }
178
179    /// Returns all registered histogram names.
180    pub fn histogram_names(&self) -> Vec<String> {
181        self.histograms.read().keys().cloned().collect()
182    }
183}
184
185impl Default for MetricsRegistry {
186    fn default() -> Self {
187        Self::new()
188    }
189}
190
191/// A monotonically increasing counter metric.
192///
193/// Counters are used to track values that only increase, such as:
194/// - Total messages sent
195/// - Total connections established
196/// - Total errors encountered
197#[derive(Clone)]
198pub struct Counter {
199    name: String,
200    value: Arc<AtomicU64>,
201    labels: Arc<HashMap<String, String>>,
202}
203
204impl Counter {
205    /// Creates a new counter with the given name and labels.
206    pub fn new(name: String, labels: Vec<(String, String)>) -> Self {
207        Self {
208            name,
209            value: Arc::new(AtomicU64::new(0)),
210            labels: Arc::new(labels.into_iter().collect()),
211        }
212    }
213
214    /// Increments the counter by 1.
215    pub fn inc(&self) {
216        self.value.fetch_add(1, Ordering::Relaxed);
217    }
218
219    /// Increments the counter by the given amount.
220    pub fn inc_by(&self, n: u64) {
221        self.value.fetch_add(n, Ordering::Relaxed);
222    }
223
224    /// Returns the current value of the counter.
225    pub fn get(&self) -> u64 {
226        self.value.load(Ordering::Relaxed)
227    }
228
229    /// Returns the name of the counter.
230    pub fn name(&self) -> &str {
231        &self.name
232    }
233
234    /// Returns the labels associated with this counter.
235    pub fn labels(&self) -> &HashMap<String, String> {
236        &self.labels
237    }
238}
239
240/// A gauge metric that can increase or decrease.
241///
242/// Gauges are used to track values that can go up or down, such as:
243/// - Active connections
244/// - Memory usage
245/// - Queue depth
246#[derive(Clone)]
247pub struct Gauge {
248    name: String,
249    value: Arc<AtomicI64>,
250    labels: Arc<HashMap<String, String>>,
251}
252
253impl Gauge {
254    /// Creates a new gauge with the given name and labels.
255    pub fn new(name: String, labels: Vec<(String, String)>) -> Self {
256        Self {
257            name,
258            value: Arc::new(AtomicI64::new(0)),
259            labels: Arc::new(labels.into_iter().collect()),
260        }
261    }
262
263    /// Sets the gauge to the given value.
264    pub fn set(&self, value: i64) {
265        self.value.store(value, Ordering::Relaxed);
266    }
267
268    /// Increments the gauge by 1.
269    pub fn inc(&self) {
270        self.value.fetch_add(1, Ordering::Relaxed);
271    }
272
273    /// Decrements the gauge by 1.
274    pub fn dec(&self) {
275        self.value.fetch_sub(1, Ordering::Relaxed);
276    }
277
278    /// Increments the gauge by the given amount.
279    pub fn add(&self, n: i64) {
280        self.value.fetch_add(n, Ordering::Relaxed);
281    }
282
283    /// Decrements the gauge by the given amount.
284    pub fn sub(&self, n: i64) {
285        self.value.fetch_sub(n, Ordering::Relaxed);
286    }
287
288    /// Returns the current value of the gauge.
289    pub fn get(&self) -> i64 {
290        self.value.load(Ordering::Relaxed)
291    }
292
293    /// Returns the name of the gauge.
294    pub fn name(&self) -> &str {
295        &self.name
296    }
297
298    /// Returns the labels associated with this gauge.
299    pub fn labels(&self) -> &HashMap<String, String> {
300        &self.labels
301    }
302}
303
304/// A histogram metric for tracking distributions of observations.
305///
306/// Histograms are used to track the distribution of values, such as:
307/// - Request latencies
308/// - Message sizes
309/// - Processing times
310///
311/// The histogram maintains counts in predefined buckets and can calculate
312/// percentiles (p50, p95, p99).
313#[derive(Clone)]
314pub struct Histogram {
315    name: String,
316    buckets: Arc<Vec<f64>>,
317    counts: Arc<Vec<AtomicU64>>,
318    sum: Arc<AtomicU64>,
319    count: Arc<AtomicU64>,
320    labels: Arc<HashMap<String, String>>,
321}
322
323impl Histogram {
324    /// Creates a new histogram with the given name, buckets, and labels.
325    ///
326    /// # Panics
327    ///
328    /// Panics if buckets are empty or not sorted in ascending order.
329    pub fn new(name: String, buckets: Vec<f64>, labels: Vec<(String, String)>) -> Self {
330        assert!(!buckets.is_empty(), "Histogram buckets cannot be empty");
331
332        // Verify buckets are sorted
333        for i in 1..buckets.len() {
334            assert!(
335                buckets[i] > buckets[i - 1],
336                "Histogram buckets must be sorted in ascending order"
337            );
338        }
339
340        let bucket_count = buckets.len() + 1; // +1 for +Inf bucket
341        let counts: Vec<AtomicU64> = (0..bucket_count).map(|_| AtomicU64::new(0)).collect();
342
343        Self {
344            name,
345            buckets: Arc::new(buckets),
346            counts: Arc::new(counts),
347            sum: Arc::new(AtomicU64::new(0)),
348            count: Arc::new(AtomicU64::new(0)),
349            labels: Arc::new(labels.into_iter().collect()),
350        }
351    }
352
353    /// Records an observation in the histogram.
354    pub fn observe(&self, value: f64) {
355        // Find the appropriate bucket
356        let bucket_index = self
357            .buckets
358            .iter()
359            .position(|&b| value <= b)
360            .unwrap_or(self.buckets.len());
361
362        // Increment the bucket count
363        self.counts[bucket_index].fetch_add(1, Ordering::Relaxed);
364
365        // Update count
366        self.count.fetch_add(1, Ordering::Relaxed);
367        
368        // Update sum - we need to handle this atomically
369        // We'll use a simple approach: convert to integer representation for atomic ops
370        // This works for positive values and maintains precision
371        let value_as_u64 = (value * 1000000.0) as u64; // Store as microseconds for precision
372        self.sum.fetch_add(value_as_u64, Ordering::Relaxed);
373    }
374
375    /// Returns the total count of observations.
376    pub fn get_count(&self) -> u64 {
377        self.count.load(Ordering::Relaxed)
378    }
379
380    /// Returns the sum of all observations.
381    pub fn get_sum(&self) -> f64 {
382        let sum_micros = self.sum.load(Ordering::Relaxed);
383        sum_micros as f64 / 1000000.0
384    }
385
386    /// Returns the bucket boundaries.
387    pub fn get_buckets(&self) -> &[f64] {
388        &self.buckets
389    }
390
391    /// Returns the count for each bucket.
392    pub fn get_bucket_counts(&self) -> Vec<u64> {
393        self.counts
394            .iter()
395            .map(|c| c.load(Ordering::Relaxed))
396            .collect()
397    }
398
399    /// Calculates the specified percentile from the histogram data.
400    ///
401    /// # Arguments
402    ///
403    /// * `percentile` - A value between 0.0 and 1.0 (e.g., 0.95 for p95)
404    ///
405    /// # Returns
406    ///
407    /// An estimated value at the given percentile, or 0.0 if no observations.
408    pub fn percentile(&self, percentile: f64) -> f64 {
409        assert!(
410            (0.0..=1.0).contains(&percentile),
411            "Percentile must be between 0.0 and 1.0"
412        );
413
414        let total_count = self.get_count();
415        if total_count == 0 {
416            return 0.0;
417        }
418
419        let target_count = (total_count as f64 * percentile).ceil() as u64;
420        let mut cumulative = 0u64;
421
422        for (i, count) in self.get_bucket_counts().iter().enumerate() {
423            cumulative += count;
424            if cumulative >= target_count {
425                // Return the upper bound of this bucket
426                return if i < self.buckets.len() {
427                    self.buckets[i]
428                } else {
429                    // +Inf bucket - return the last bucket boundary
430                    *self.buckets.last().unwrap()
431                };
432            }
433        }
434
435        // Fallback (should not reach here)
436        *self.buckets.last().unwrap()
437    }
438
439    /// Returns the name of the histogram.
440    pub fn name(&self) -> &str {
441        &self.name
442    }
443
444    /// Returns the labels associated with this histogram.
445    pub fn labels(&self) -> &HashMap<String, String> {
446        &self.labels
447    }
448}
449
450#[cfg(test)]
451mod tests {
452    use super::*;
453
454    #[test]
455    fn test_counter_basic() {
456        let counter = Counter::new("test_counter".to_string(), vec![]);
457        assert_eq!(counter.get(), 0);
458
459        counter.inc();
460        assert_eq!(counter.get(), 1);
461
462        counter.inc_by(5);
463        assert_eq!(counter.get(), 6);
464    }
465
466    #[test]
467    fn test_counter_thread_safety() {
468        let counter = Counter::new("test_counter".to_string(), vec![]);
469        let counter_clone = counter.clone();
470
471        let handle = std::thread::spawn(move || {
472            for _ in 0..1000 {
473                counter_clone.inc();
474            }
475        });
476
477        for _ in 0..1000 {
478            counter.inc();
479        }
480
481        handle.join().unwrap();
482        assert_eq!(counter.get(), 2000);
483    }
484
485    #[test]
486    fn test_gauge_basic() {
487        let gauge = Gauge::new("test_gauge".to_string(), vec![]);
488        assert_eq!(gauge.get(), 0);
489
490        gauge.set(10);
491        assert_eq!(gauge.get(), 10);
492
493        gauge.inc();
494        assert_eq!(gauge.get(), 11);
495
496        gauge.dec();
497        assert_eq!(gauge.get(), 10);
498
499        gauge.add(5);
500        assert_eq!(gauge.get(), 15);
501
502        gauge.sub(3);
503        assert_eq!(gauge.get(), 12);
504    }
505
506    #[test]
507    fn test_histogram_basic() {
508        let histogram = Histogram::new(
509            "test_histogram".to_string(),
510            vec![1.0, 5.0, 10.0, 50.0, 100.0],
511            vec![],
512        );
513
514        histogram.observe(0.5);
515        histogram.observe(3.0);
516        histogram.observe(7.0);
517        histogram.observe(25.0);
518        histogram.observe(75.0);
519        histogram.observe(150.0);
520
521        assert_eq!(histogram.get_count(), 6);
522
523        let counts = histogram.get_bucket_counts();
524        assert_eq!(counts[0], 1); // <= 1.0
525        assert_eq!(counts[1], 1); // <= 5.0
526        assert_eq!(counts[2], 1); // <= 10.0
527        assert_eq!(counts[3], 1); // <= 50.0
528        assert_eq!(counts[4], 1); // <= 100.0
529        assert_eq!(counts[5], 1); // > 100.0 (+Inf)
530    }
531
532    #[test]
533    fn test_histogram_percentiles() {
534        let histogram = Histogram::new(
535            "test_histogram".to_string(),
536            vec![10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0],
537            vec![],
538        );
539
540        // Add 100 observations: 1, 2, 3, ..., 100
541        for i in 1..=100 {
542            histogram.observe(i as f64);
543        }
544
545        assert_eq!(histogram.get_count(), 100);
546
547        // Test percentiles
548        let p50 = histogram.percentile(0.50);
549        let p95 = histogram.percentile(0.95);
550        let p99 = histogram.percentile(0.99);
551
552        // Verify ordering
553        assert!(p50 <= p95);
554        assert!(p95 <= p99);
555
556        // p50 should be around 50
557        assert!(p50 >= 40.0 && p50 <= 60.0);
558
559        // p95 should be around 95
560        assert!(p95 >= 90.0 && p95 <= 100.0);
561
562        // p99 should be around 99
563        assert!(p99 >= 90.0 && p99 <= 100.0);
564    }
565
566    #[test]
567    fn test_registry_counter() {
568        let registry = MetricsRegistry::new();
569
570        let counter1 = registry.register_counter("test_counter", vec![]);
571        counter1.inc();
572
573        let counter2 = registry.get_counter("test_counter").unwrap();
574        assert_eq!(counter2.get(), 1);
575
576        counter2.inc();
577        assert_eq!(counter1.get(), 2);
578    }
579
580    #[test]
581    fn test_registry_gauge() {
582        let registry = MetricsRegistry::new();
583
584        let gauge1 = registry.register_gauge("test_gauge", vec![]);
585        gauge1.set(42);
586
587        let gauge2 = registry.get_gauge("test_gauge").unwrap();
588        assert_eq!(gauge2.get(), 42);
589    }
590
591    #[test]
592    fn test_registry_histogram() {
593        let registry = MetricsRegistry::new();
594
595        let hist1 = registry.register_histogram("test_histogram", vec![1.0, 10.0, 100.0], vec![]);
596        hist1.observe(5.0);
597
598        let hist2 = registry.get_histogram("test_histogram").unwrap();
599        assert_eq!(hist2.get_count(), 1);
600    }
601
602    #[test]
603    fn test_registry_list_metrics() {
604        let registry = MetricsRegistry::new();
605
606        registry.register_counter("counter1", vec![]);
607        registry.register_counter("counter2", vec![]);
608        registry.register_gauge("gauge1", vec![]);
609        registry.register_histogram("histogram1", vec![1.0, 10.0], vec![]);
610
611        let counter_names = registry.counter_names();
612        assert_eq!(counter_names.len(), 2);
613        assert!(counter_names.contains(&"counter1".to_string()));
614        assert!(counter_names.contains(&"counter2".to_string()));
615
616        let gauge_names = registry.gauge_names();
617        assert_eq!(gauge_names.len(), 1);
618        assert!(gauge_names.contains(&"gauge1".to_string()));
619
620        let histogram_names = registry.histogram_names();
621        assert_eq!(histogram_names.len(), 1);
622        assert!(histogram_names.contains(&"histogram1".to_string()));
623    }
624
625    #[test]
626    fn test_counter_with_labels() {
627        let counter = Counter::new(
628            "test_counter".to_string(),
629            vec![
630                ("node_id".to_string(), "node-1".to_string()),
631                ("region".to_string(), "us-west".to_string()),
632            ],
633        );
634
635        assert_eq!(counter.labels().get("node_id").unwrap(), "node-1");
636        assert_eq!(counter.labels().get("region").unwrap(), "us-west");
637    }
638
639    #[test]
640    #[should_panic(expected = "Histogram buckets cannot be empty")]
641    fn test_histogram_empty_buckets() {
642        Histogram::new("test".to_string(), vec![], vec![]);
643    }
644
645    #[test]
646    #[should_panic(expected = "Histogram buckets must be sorted in ascending order")]
647    fn test_histogram_unsorted_buckets() {
648        Histogram::new("test".to_string(), vec![10.0, 5.0, 20.0], vec![]);
649    }
650
651    #[test]
652    fn test_node_metrics_creation() {
653        let registry = MetricsRegistry::new();
654        let metrics = NodeMetrics::new(&registry);
655
656        // Verify all metrics are initialized to zero
657        assert_eq!(metrics.active_connections.get(), 0);
658        assert_eq!(metrics.total_connections.get(), 0);
659        assert_eq!(metrics.failed_connections.get(), 0);
660        assert_eq!(metrics.messages_sent.get(), 0);
661        assert_eq!(metrics.messages_received.get(), 0);
662        assert_eq!(metrics.messages_dropped.get(), 0);
663        assert_eq!(metrics.memory_usage_bytes.get(), 0);
664        assert_eq!(metrics.cpu_usage_percent.get(), 0);
665        assert_eq!(metrics.time_drift_ms.get(), 0);
666        assert_eq!(metrics.state_divergence_count.get(), 0);
667    }
668
669    #[test]
670    fn test_node_metrics_connection_tracking() {
671        let registry = MetricsRegistry::new();
672        let metrics = NodeMetrics::new(&registry);
673
674        // Simulate connection lifecycle
675        metrics.active_connections.inc();
676        metrics.total_connections.inc();
677        assert_eq!(metrics.active_connections.get(), 1);
678        assert_eq!(metrics.total_connections.get(), 1);
679
680        // Add more connections
681        metrics.active_connections.inc();
682        metrics.total_connections.inc();
683        assert_eq!(metrics.active_connections.get(), 2);
684        assert_eq!(metrics.total_connections.get(), 2);
685
686        // Close a connection
687        metrics.active_connections.dec();
688        assert_eq!(metrics.active_connections.get(), 1);
689        assert_eq!(metrics.total_connections.get(), 2); // Total doesn't decrease
690
691        // Track failed connection
692        metrics.failed_connections.inc();
693        assert_eq!(metrics.failed_connections.get(), 1);
694    }
695
696    #[test]
697    fn test_node_metrics_message_tracking() {
698        let registry = MetricsRegistry::new();
699        let metrics = NodeMetrics::new(&registry);
700
701        // Track sent messages
702        metrics.messages_sent.inc_by(10);
703        assert_eq!(metrics.messages_sent.get(), 10);
704
705        // Track received messages
706        metrics.messages_received.inc_by(8);
707        assert_eq!(metrics.messages_received.get(), 8);
708
709        // Track dropped messages
710        metrics.messages_dropped.inc();
711        assert_eq!(metrics.messages_dropped.get(), 1);
712
713        // Track message sizes
714        metrics.message_size_bytes.observe(512.0);
715        metrics.message_size_bytes.observe(2048.0);
716        assert_eq!(metrics.message_size_bytes.get_count(), 2);
717    }
718
719    #[test]
720    fn test_node_metrics_latency_tracking() {
721        let registry = MetricsRegistry::new();
722        let metrics = NodeMetrics::new(&registry);
723
724        // Track message latencies
725        metrics.message_latency_ms.observe(5.0);
726        metrics.message_latency_ms.observe(15.0);
727        metrics.message_latency_ms.observe(50.0);
728        assert_eq!(metrics.message_latency_ms.get_count(), 3);
729
730        // Track state sync latencies
731        metrics.state_sync_latency_ms.observe(100.0);
732        metrics.state_sync_latency_ms.observe(500.0);
733        assert_eq!(metrics.state_sync_latency_ms.get_count(), 2);
734
735        // Verify percentiles work
736        let p50 = metrics.message_latency_ms.percentile(0.50);
737        let p95 = metrics.message_latency_ms.percentile(0.95);
738        assert!(p50 <= p95);
739    }
740
741    #[test]
742    fn test_node_metrics_resource_tracking() {
743        let registry = MetricsRegistry::new();
744        let metrics = NodeMetrics::new(&registry);
745
746        // Track memory usage (512 MB)
747        metrics.memory_usage_bytes.set(512 * 1024 * 1024);
748        assert_eq!(metrics.memory_usage_bytes.get(), 536870912);
749
750        // Track CPU usage
751        metrics.cpu_usage_percent.set(45);
752        assert_eq!(metrics.cpu_usage_percent.get(), 45);
753
754        // Update CPU usage
755        metrics.cpu_usage_percent.set(60);
756        assert_eq!(metrics.cpu_usage_percent.get(), 60);
757    }
758
759    #[test]
760    fn test_node_metrics_protocol_tracking() {
761        let registry = MetricsRegistry::new();
762        let metrics = NodeMetrics::new(&registry);
763
764        // Track time drift (positive = ahead, negative = behind)
765        metrics.time_drift_ms.set(50);
766        assert_eq!(metrics.time_drift_ms.get(), 50);
767
768        metrics.time_drift_ms.set(-25);
769        assert_eq!(metrics.time_drift_ms.get(), -25);
770
771        // Track state divergence
772        metrics.state_divergence_count.set(0);
773        assert_eq!(metrics.state_divergence_count.get(), 0);
774
775        metrics.state_divergence_count.inc();
776        assert_eq!(metrics.state_divergence_count.get(), 1);
777    }
778
779    #[test]
780    fn test_node_metrics_accessor_methods() {
781        let registry = MetricsRegistry::new();
782        let metrics = NodeMetrics::new(&registry);
783
784        // Test that accessor methods return the correct references
785        metrics.active_connections().inc();
786        assert_eq!(metrics.active_connections.get(), 1);
787
788        metrics.total_connections().inc();
789        assert_eq!(metrics.total_connections.get(), 1);
790
791        metrics.messages_sent().inc_by(5);
792        assert_eq!(metrics.messages_sent.get(), 5);
793
794        metrics.message_latency_ms().observe(42.0);
795        assert_eq!(metrics.message_latency_ms.get_count(), 1);
796    }
797
798    #[test]
799    fn test_node_metrics_registry_integration() {
800        let registry = MetricsRegistry::new();
801        let _metrics = NodeMetrics::new(&registry);
802
803        // Verify all metrics are registered in the registry
804        let counter_names = registry.counter_names();
805        assert!(counter_names.contains(&"elara_total_connections".to_string()));
806        assert!(counter_names.contains(&"elara_failed_connections".to_string()));
807        assert!(counter_names.contains(&"elara_messages_sent".to_string()));
808        assert!(counter_names.contains(&"elara_messages_received".to_string()));
809        assert!(counter_names.contains(&"elara_messages_dropped".to_string()));
810
811        let gauge_names = registry.gauge_names();
812        assert!(gauge_names.contains(&"elara_active_connections".to_string()));
813        assert!(gauge_names.contains(&"elara_memory_usage_bytes".to_string()));
814        assert!(gauge_names.contains(&"elara_cpu_usage_percent".to_string()));
815        assert!(gauge_names.contains(&"elara_time_drift_ms".to_string()));
816        assert!(gauge_names.contains(&"elara_state_divergence_count".to_string()));
817
818        let histogram_names = registry.histogram_names();
819        assert!(histogram_names.contains(&"elara_message_size_bytes".to_string()));
820        assert!(histogram_names.contains(&"elara_message_latency_ms".to_string()));
821        assert!(histogram_names.contains(&"elara_state_sync_latency_ms".to_string()));
822    }
823
824    #[test]
825    fn test_node_metrics_histogram_buckets() {
826        let registry = MetricsRegistry::new();
827        let metrics = NodeMetrics::new(&registry);
828
829        // Verify message latency buckets are appropriate
830        let latency_buckets = metrics.message_latency_ms.get_buckets();
831        assert_eq!(latency_buckets[0], 1.0);
832        assert_eq!(latency_buckets[latency_buckets.len() - 1], 5000.0);
833
834        // Verify state sync latency buckets are appropriate
835        let sync_buckets = metrics.state_sync_latency_ms.get_buckets();
836        assert_eq!(sync_buckets[0], 10.0);
837        assert_eq!(sync_buckets[sync_buckets.len() - 1], 30000.0);
838
839        // Verify message size buckets are appropriate
840        let size_buckets = metrics.message_size_bytes.get_buckets();
841        assert_eq!(size_buckets[0], 64.0);
842        assert_eq!(size_buckets[size_buckets.len() - 1], 1048576.0);
843    }
844}
845
846/// Core metrics for monitoring ELARA node health and performance.
847///
848/// This struct provides a centralized collection of all key metrics that should be
849/// monitored in a production ELARA deployment. Metrics are organized into categories:
850///
851/// - **Connection metrics**: Track connection lifecycle and health
852/// - **Message metrics**: Track message throughput and reliability
853/// - **Latency metrics**: Track performance characteristics
854/// - **Resource metrics**: Track system resource usage
855/// - **Protocol metrics**: Track protocol-specific health indicators
856///
857/// # Example
858///
859/// ```rust
860/// use elara_runtime::observability::metrics::{MetricsRegistry, NodeMetrics};
861///
862/// let registry = MetricsRegistry::new();
863/// let metrics = NodeMetrics::new(&registry);
864///
865/// // Track connections
866/// metrics.active_connections.inc();
867/// metrics.total_connections.inc();
868///
869/// // Track messages
870/// metrics.messages_sent.inc();
871/// metrics.message_latency_ms.observe(42.5);
872///
873/// // Track resources
874/// metrics.memory_usage_bytes.set(1024 * 1024 * 512); // 512 MB
875/// metrics.cpu_usage_percent.set(45);
876/// ```
877#[derive(Clone)]
878pub struct NodeMetrics {
879    // Connection metrics
880    /// Number of currently active connections.
881    pub active_connections: Gauge,
882    
883    /// Total number of connections established since node start.
884    pub total_connections: Counter,
885    
886    /// Total number of failed connection attempts.
887    pub failed_connections: Counter,
888
889    // Message metrics
890    /// Total number of messages sent.
891    pub messages_sent: Counter,
892    
893    /// Total number of messages received.
894    pub messages_received: Counter,
895    
896    /// Total number of messages dropped (e.g., due to queue overflow).
897    pub messages_dropped: Counter,
898    
899    /// Distribution of message sizes in bytes.
900    pub message_size_bytes: Histogram,
901
902    // Latency metrics
903    /// Distribution of message processing latency in milliseconds.
904    pub message_latency_ms: Histogram,
905    
906    /// Distribution of state synchronization latency in milliseconds.
907    pub state_sync_latency_ms: Histogram,
908
909    // Resource metrics
910    /// Current memory usage in bytes.
911    pub memory_usage_bytes: Gauge,
912    
913    /// Current CPU usage as a percentage (0-100).
914    pub cpu_usage_percent: Gauge,
915
916    // Protocol metrics
917    /// Current time drift from reference time in milliseconds.
918    /// Positive values indicate local clock is ahead, negative values indicate behind.
919    pub time_drift_ms: Gauge,
920    
921    /// Number of state divergences detected.
922    /// This tracks instances where state reconciliation found inconsistencies.
923    pub state_divergence_count: Gauge,
924    
925    /// Number of events in the quarantine buffer.
926    /// Events are quarantined when they have missing dependencies.
927    pub quarantine_buffer_size: Gauge,
928}
929
930impl NodeMetrics {
931    /// Creates a new `NodeMetrics` instance and registers all metrics with the provided registry.
932    ///
933    /// This constructor initializes all core metrics with appropriate types and bucket
934    /// configurations for histograms. All metrics are registered with the provided
935    /// `MetricsRegistry` and can be accessed through the returned struct.
936    ///
937    /// # Histogram Buckets
938    ///
939    /// - **message_latency_ms**: Buckets optimized for typical message processing times
940    ///   (1ms to 5 seconds)
941    /// - **state_sync_latency_ms**: Buckets optimized for state synchronization times
942    ///   (10ms to 30 seconds)
943    /// - **message_size_bytes**: Buckets optimized for typical message sizes
944    ///   (64 bytes to 1 MB)
945    ///
946    /// # Arguments
947    ///
948    /// * `registry` - The metrics registry to register all metrics with
949    ///
950    /// # Example
951    ///
952    /// ```rust
953    /// use elara_runtime::observability::metrics::{MetricsRegistry, NodeMetrics};
954    ///
955    /// let registry = MetricsRegistry::new();
956    /// let metrics = NodeMetrics::new(&registry);
957    ///
958    /// // Metrics are now registered and ready to use
959    /// metrics.active_connections.inc();
960    /// ```
961    pub fn new(registry: &MetricsRegistry) -> Self {
962        // Connection metrics
963        let active_connections = registry.register_gauge("elara_active_connections", vec![]);
964        let total_connections = registry.register_counter("elara_total_connections", vec![]);
965        let failed_connections = registry.register_counter("elara_failed_connections", vec![]);
966
967        // Message metrics
968        let messages_sent = registry.register_counter("elara_messages_sent", vec![]);
969        let messages_received = registry.register_counter("elara_messages_received", vec![]);
970        let messages_dropped = registry.register_counter("elara_messages_dropped", vec![]);
971
972        // Message size histogram with buckets from 64 bytes to 1 MB
973        let message_size_bytes = registry.register_histogram(
974            "elara_message_size_bytes",
975            vec![
976                64.0,      // 64 B
977                256.0,     // 256 B
978                1024.0,    // 1 KB
979                4096.0,    // 4 KB
980                16384.0,   // 16 KB
981                65536.0,   // 64 KB
982                262144.0,  // 256 KB
983                1048576.0, // 1 MB
984            ],
985            vec![],
986        );
987
988        // Latency metrics
989        // Message latency histogram with buckets from 1ms to 5 seconds
990        let message_latency_ms = registry.register_histogram(
991            "elara_message_latency_ms",
992            vec![
993                1.0,    // 1 ms
994                5.0,    // 5 ms
995                10.0,   // 10 ms
996                25.0,   // 25 ms
997                50.0,   // 50 ms
998                100.0,  // 100 ms
999                250.0,  // 250 ms
1000                500.0,  // 500 ms
1001                1000.0, // 1 second
1002                2500.0, // 2.5 seconds
1003                5000.0, // 5 seconds
1004            ],
1005            vec![],
1006        );
1007
1008        // State sync latency histogram with buckets from 10ms to 30 seconds
1009        let state_sync_latency_ms = registry.register_histogram(
1010            "elara_state_sync_latency_ms",
1011            vec![
1012                10.0,    // 10 ms
1013                50.0,    // 50 ms
1014                100.0,   // 100 ms
1015                250.0,   // 250 ms
1016                500.0,   // 500 ms
1017                1000.0,  // 1 second
1018                2500.0,  // 2.5 seconds
1019                5000.0,  // 5 seconds
1020                10000.0, // 10 seconds
1021                30000.0, // 30 seconds
1022            ],
1023            vec![],
1024        );
1025
1026        // Resource metrics
1027        let memory_usage_bytes = registry.register_gauge("elara_memory_usage_bytes", vec![]);
1028        let cpu_usage_percent = registry.register_gauge("elara_cpu_usage_percent", vec![]);
1029
1030        // Protocol metrics
1031        let time_drift_ms = registry.register_gauge("elara_time_drift_ms", vec![]);
1032        let state_divergence_count =
1033            registry.register_gauge("elara_state_divergence_count", vec![]);
1034        let quarantine_buffer_size =
1035            registry.register_gauge("elara_quarantine_buffer_size", vec![]);
1036
1037        Self {
1038            // Connection metrics
1039            active_connections,
1040            total_connections,
1041            failed_connections,
1042
1043            // Message metrics
1044            messages_sent,
1045            messages_received,
1046            messages_dropped,
1047            message_size_bytes,
1048
1049            // Latency metrics
1050            message_latency_ms,
1051            state_sync_latency_ms,
1052
1053            // Resource metrics
1054            memory_usage_bytes,
1055            cpu_usage_percent,
1056
1057            // Protocol metrics
1058            time_drift_ms,
1059            state_divergence_count,
1060            quarantine_buffer_size,
1061        }
1062    }
1063
1064    /// Returns a reference to the active connections gauge.
1065    pub fn active_connections(&self) -> &Gauge {
1066        &self.active_connections
1067    }
1068
1069    /// Returns a reference to the total connections counter.
1070    pub fn total_connections(&self) -> &Counter {
1071        &self.total_connections
1072    }
1073
1074    /// Returns a reference to the failed connections counter.
1075    pub fn failed_connections(&self) -> &Counter {
1076        &self.failed_connections
1077    }
1078
1079    /// Returns a reference to the messages sent counter.
1080    pub fn messages_sent(&self) -> &Counter {
1081        &self.messages_sent
1082    }
1083
1084    /// Returns a reference to the messages received counter.
1085    pub fn messages_received(&self) -> &Counter {
1086        &self.messages_received
1087    }
1088
1089    /// Returns a reference to the messages dropped counter.
1090    pub fn messages_dropped(&self) -> &Counter {
1091        &self.messages_dropped
1092    }
1093
1094    /// Returns a reference to the message size histogram.
1095    pub fn message_size_bytes(&self) -> &Histogram {
1096        &self.message_size_bytes
1097    }
1098
1099    /// Returns a reference to the message latency histogram.
1100    pub fn message_latency_ms(&self) -> &Histogram {
1101        &self.message_latency_ms
1102    }
1103
1104    /// Returns a reference to the state sync latency histogram.
1105    pub fn state_sync_latency_ms(&self) -> &Histogram {
1106        &self.state_sync_latency_ms
1107    }
1108
1109    /// Returns a reference to the memory usage gauge.
1110    pub fn memory_usage_bytes(&self) -> &Gauge {
1111        &self.memory_usage_bytes
1112    }
1113
1114    /// Returns a reference to the CPU usage gauge.
1115    pub fn cpu_usage_percent(&self) -> &Gauge {
1116        &self.cpu_usage_percent
1117    }
1118
1119    /// Returns a reference to the time drift gauge.
1120    pub fn time_drift_ms(&self) -> &Gauge {
1121        &self.time_drift_ms
1122    }
1123
1124    /// Returns a reference to the state divergence count gauge.
1125    pub fn state_divergence_count(&self) -> &Gauge {
1126        &self.state_divergence_count
1127    }
1128
1129    /// Returns a reference to the quarantine buffer size gauge.
1130    pub fn quarantine_buffer_size(&self) -> &Gauge {
1131        &self.quarantine_buffer_size
1132    }
1133}
1134
1135impl std::fmt::Debug for NodeMetrics {
1136    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1137        f.debug_struct("NodeMetrics")
1138            .field("active_connections", &self.active_connections.get())
1139            .field("total_connections", &self.total_connections.get())
1140            .field("failed_connections", &self.failed_connections.get())
1141            .field("messages_sent", &self.messages_sent.get())
1142            .field("messages_received", &self.messages_received.get())
1143            .field("messages_dropped", &self.messages_dropped.get())
1144            .field("message_size_bytes_count", &self.message_size_bytes.get_count())
1145            .field("message_latency_ms_count", &self.message_latency_ms.get_count())
1146            .field("state_sync_latency_ms_count", &self.state_sync_latency_ms.get_count())
1147            .field("memory_usage_bytes", &self.memory_usage_bytes.get())
1148            .field("cpu_usage_percent", &self.cpu_usage_percent.get())
1149            .field("time_drift_ms", &self.time_drift_ms.get())
1150            .field("state_divergence_count", &self.state_divergence_count.get())
1151            .field("quarantine_buffer_size", &self.quarantine_buffer_size.get())
1152            .finish()
1153    }
1154}
1155
1156impl MetricsRegistry {
1157    /// Exports all metrics in Prometheus text exposition format.
1158    ///
1159    /// This method generates a string containing all registered metrics in the
1160    /// Prometheus text format, which can be scraped by Prometheus servers.
1161    ///
1162    /// The format follows the Prometheus specification:
1163    /// ```text
1164    /// # HELP metric_name Description
1165    /// # TYPE metric_name counter|gauge|histogram
1166    /// metric_name{label1="value1"} 42
1167    /// ```
1168    ///
1169    /// # Example
1170    ///
1171    /// ```rust
1172    /// use elara_runtime::observability::metrics::MetricsRegistry;
1173    ///
1174    /// let registry = MetricsRegistry::new();
1175    /// let counter = registry.register_counter("test_counter", vec![]);
1176    /// counter.inc();
1177    ///
1178    /// let prometheus_text = registry.export_prometheus();
1179    /// assert!(prometheus_text.contains("test_counter"));
1180    /// ```
1181    pub fn export_prometheus(&self) -> String {
1182        let mut output = String::new();
1183
1184        // Export counters
1185        let counters = self.counters.read();
1186        for (name, counter) in counters.iter() {
1187            output.push_str(&format!("# HELP {} Counter metric\n", name));
1188            output.push_str(&format!("# TYPE {} counter\n", name));
1189            
1190            if counter.labels().is_empty() {
1191                output.push_str(&format!("{} {}\n", name, counter.get()));
1192            } else {
1193                let labels = format_labels(counter.labels());
1194                output.push_str(&format!("{}{} {}\n", name, labels, counter.get()));
1195            }
1196        }
1197
1198        // Export gauges
1199        let gauges = self.gauges.read();
1200        for (name, gauge) in gauges.iter() {
1201            output.push_str(&format!("# HELP {} Gauge metric\n", name));
1202            output.push_str(&format!("# TYPE {} gauge\n", name));
1203            
1204            if gauge.labels().is_empty() {
1205                output.push_str(&format!("{} {}\n", name, gauge.get()));
1206            } else {
1207                let labels = format_labels(gauge.labels());
1208                output.push_str(&format!("{}{} {}\n", name, labels, gauge.get()));
1209            }
1210        }
1211
1212        // Export histograms
1213        let histograms = self.histograms.read();
1214        for (name, histogram) in histograms.iter() {
1215            output.push_str(&format!("# HELP {} Histogram metric\n", name));
1216            output.push_str(&format!("# TYPE {} histogram\n", name));
1217            
1218            let label_prefix = if histogram.labels().is_empty() {
1219                String::new()
1220            } else {
1221                format_labels(histogram.labels())
1222            };
1223
1224            // Export bucket counts
1225            let buckets = histogram.get_buckets();
1226            let counts = histogram.get_bucket_counts();
1227            let mut cumulative = 0u64;
1228
1229            for (i, &bucket) in buckets.iter().enumerate() {
1230                cumulative += counts[i];
1231                let bucket_label = if label_prefix.is_empty() {
1232                    format!("{{le=\"{:.1}\"}}", bucket)
1233                } else {
1234                    // Insert le label into existing labels
1235                    let trimmed = label_prefix.trim_end_matches('}');
1236                    format!("{},le=\"{:.1}\"}}", trimmed, bucket)
1237                };
1238                output.push_str(&format!("{}_bucket{} {}\n", name, bucket_label, cumulative));
1239            }
1240
1241            // Export +Inf bucket
1242            cumulative += counts[buckets.len()];
1243            let inf_label = if label_prefix.is_empty() {
1244                "{le=\"+Inf\"}".to_string()
1245            } else {
1246                let trimmed = label_prefix.trim_end_matches('}');
1247                format!("{},le=\"+Inf\"}}", trimmed)
1248            };
1249            output.push_str(&format!("{}_bucket{} {}\n", name, inf_label, cumulative));
1250
1251            // Export sum and count
1252            output.push_str(&format!("{}_sum{} {}\n", name, label_prefix, histogram.get_sum()));
1253            output.push_str(&format!("{}_count{} {}\n", name, label_prefix, histogram.get_count()));
1254        }
1255
1256        output
1257    }
1258}
1259
1260/// Formats labels as a Prometheus label string.
1261///
1262/// Converts a HashMap of labels into the Prometheus format: `{key1="value1",key2="value2"}`
1263fn format_labels(labels: &HashMap<String, String>) -> String {
1264    if labels.is_empty() {
1265        return String::new();
1266    }
1267
1268    let mut label_pairs: Vec<String> = labels
1269        .iter()
1270        .map(|(k, v)| format!("{}=\"{}\"", k, escape_label_value(v)))
1271        .collect();
1272    
1273    // Sort for consistent output
1274    label_pairs.sort();
1275    
1276    format!("{{{}}}", label_pairs.join(","))
1277}
1278
1279/// Escapes special characters in label values according to Prometheus format.
1280fn escape_label_value(value: &str) -> String {
1281    value
1282        .replace('\\', "\\\\")
1283        .replace('"', "\\\"")
1284        .replace('\n', "\\n")
1285}