rexis_rag/observability/
metrics.rs

1//! # Metrics Collection System
2//!
3//! Comprehensive metrics collection and aggregation for RRAG system performance,
4//! usage patterns, and operational insights.
5
6use crate::{RragError, RragResult};
7use chrono::{DateTime, Utc};
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
11use std::sync::Arc;
12use tokio::sync::{mpsc, RwLock};
13
14/// Metrics collection configuration
15#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct MetricsConfig {
17    pub enabled: bool,
18    pub collection_interval_seconds: u64,
19    pub buffer_size: usize,
20    pub export_interval_seconds: u64,
21    pub retention_days: u32,
22    pub labels: HashMap<String, String>,
23}
24
25impl Default for MetricsConfig {
26    fn default() -> Self {
27        Self {
28            enabled: true,
29            collection_interval_seconds: 10,
30            buffer_size: 10000,
31            export_interval_seconds: 60,
32            retention_days: 30,
33            labels: HashMap::new(),
34        }
35    }
36}
37
38/// Metric types supported by the system
39#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
40pub enum MetricType {
41    Counter,
42    Gauge,
43    Histogram,
44    Timer,
45    Summary,
46}
47
48/// Metric value variants
49#[derive(Debug, Clone, Serialize, Deserialize)]
50pub enum MetricValue {
51    Counter(u64),
52    Gauge(f64),
53    Histogram {
54        buckets: Vec<(f64, u64)>,
55        sum: f64,
56        count: u64,
57    },
58    Timer {
59        duration_ms: f64,
60        count: u64,
61    },
62    Summary {
63        sum: f64,
64        count: u64,
65        quantiles: Vec<(f64, f64)>,
66    },
67}
68
69/// Individual metric instance
70#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct Metric {
72    pub name: String,
73    pub metric_type: MetricType,
74    pub value: MetricValue,
75    pub labels: HashMap<String, String>,
76    pub timestamp: DateTime<Utc>,
77    pub help: Option<String>,
78}
79
80impl Metric {
81    pub fn counter(name: impl Into<String>, value: u64) -> Self {
82        Self {
83            name: name.into(),
84            metric_type: MetricType::Counter,
85            value: MetricValue::Counter(value),
86            labels: HashMap::new(),
87            timestamp: Utc::now(),
88            help: None,
89        }
90    }
91
92    pub fn gauge(name: impl Into<String>, value: f64) -> Self {
93        Self {
94            name: name.into(),
95            metric_type: MetricType::Gauge,
96            value: MetricValue::Gauge(value),
97            labels: HashMap::new(),
98            timestamp: Utc::now(),
99            help: None,
100        }
101    }
102
103    pub fn timer(name: impl Into<String>, duration_ms: f64) -> Self {
104        Self {
105            name: name.into(),
106            metric_type: MetricType::Timer,
107            value: MetricValue::Timer {
108                duration_ms,
109                count: 1,
110            },
111            labels: HashMap::new(),
112            timestamp: Utc::now(),
113            help: None,
114        }
115    }
116
117    pub fn with_label(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
118        self.labels.insert(key.into(), value.into());
119        self
120    }
121
122    pub fn with_labels(mut self, labels: HashMap<String, String>) -> Self {
123        self.labels.extend(labels);
124        self
125    }
126
127    pub fn with_help(mut self, help: impl Into<String>) -> Self {
128        self.help = Some(help.into());
129        self
130    }
131}
132
133/// Counter metric implementation
134pub struct CounterMetric {
135    name: String,
136    value: AtomicU64,
137    labels: HashMap<String, String>,
138    help: Option<String>,
139}
140
141impl CounterMetric {
142    pub fn new(name: impl Into<String>) -> Self {
143        Self {
144            name: name.into(),
145            value: AtomicU64::new(0),
146            labels: HashMap::new(),
147            help: None,
148        }
149    }
150
151    pub fn inc(&self) {
152        self.value.fetch_add(1, Ordering::Relaxed);
153    }
154
155    pub fn inc_by(&self, n: u64) {
156        self.value.fetch_add(n, Ordering::Relaxed);
157    }
158
159    pub fn get(&self) -> u64 {
160        self.value.load(Ordering::Relaxed)
161    }
162
163    pub fn reset(&self) {
164        self.value.store(0, Ordering::Relaxed);
165    }
166
167    pub fn to_metric(&self) -> Metric {
168        Metric {
169            name: self.name.clone(),
170            metric_type: MetricType::Counter,
171            value: MetricValue::Counter(self.get()),
172            labels: self.labels.clone(),
173            timestamp: Utc::now(),
174            help: self.help.clone(),
175        }
176    }
177}
178
179/// Gauge metric implementation
180pub struct GaugeMetric {
181    name: String,
182    value: AtomicI64, // Using i64 to support negative values with bit manipulation
183    labels: HashMap<String, String>,
184    help: Option<String>,
185}
186
187impl GaugeMetric {
188    pub fn new(name: impl Into<String>) -> Self {
189        Self {
190            name: name.into(),
191            value: AtomicI64::new(0),
192            labels: HashMap::new(),
193            help: None,
194        }
195    }
196
197    pub fn set(&self, value: f64) {
198        self.value.store(value.to_bits() as i64, Ordering::Relaxed);
199    }
200
201    pub fn inc(&self) {
202        let current = f64::from_bits(self.value.load(Ordering::Relaxed) as u64);
203        self.set(current + 1.0);
204    }
205
206    pub fn dec(&self) {
207        let current = f64::from_bits(self.value.load(Ordering::Relaxed) as u64);
208        self.set(current - 1.0);
209    }
210
211    pub fn add(&self, value: f64) {
212        let current = f64::from_bits(self.value.load(Ordering::Relaxed) as u64);
213        self.set(current + value);
214    }
215
216    pub fn get(&self) -> f64 {
217        f64::from_bits(self.value.load(Ordering::Relaxed) as u64)
218    }
219
220    pub fn to_metric(&self) -> Metric {
221        Metric {
222            name: self.name.clone(),
223            metric_type: MetricType::Gauge,
224            value: MetricValue::Gauge(self.get()),
225            labels: self.labels.clone(),
226            timestamp: Utc::now(),
227            help: self.help.clone(),
228        }
229    }
230}
231
232/// Histogram metric for tracking distributions
233pub struct HistogramMetric {
234    name: String,
235    buckets: Vec<(f64, AtomicU64)>,
236    sum: AtomicI64, // Using i64 for atomic f64
237    count: AtomicU64,
238    labels: HashMap<String, String>,
239    help: Option<String>,
240}
241
242impl HistogramMetric {
243    pub fn new(name: impl Into<String>, buckets: Vec<f64>) -> Self {
244        let mut histogram_buckets = Vec::new();
245        for bucket in buckets {
246            histogram_buckets.push((bucket, AtomicU64::new(0)));
247        }
248        // Add +Inf bucket
249        histogram_buckets.push((f64::INFINITY, AtomicU64::new(0)));
250
251        Self {
252            name: name.into(),
253            buckets: histogram_buckets,
254            sum: AtomicI64::new(0),
255            count: AtomicU64::new(0),
256            labels: HashMap::new(),
257            help: None,
258        }
259    }
260
261    pub fn observe(&self, value: f64) {
262        // Update buckets
263        for (le, counter) in &self.buckets {
264            if value <= *le {
265                counter.fetch_add(1, Ordering::Relaxed);
266            }
267        }
268
269        // Update sum and count
270        let current_sum = f64::from_bits(self.sum.load(Ordering::Relaxed) as u64);
271        self.sum
272            .store((current_sum + value).to_bits() as i64, Ordering::Relaxed);
273        self.count.fetch_add(1, Ordering::Relaxed);
274    }
275
276    pub fn to_metric(&self) -> Metric {
277        let buckets: Vec<(f64, u64)> = self
278            .buckets
279            .iter()
280            .map(|(le, counter)| (*le, counter.load(Ordering::Relaxed)))
281            .collect();
282
283        let sum = f64::from_bits(self.sum.load(Ordering::Relaxed) as u64);
284        let count = self.count.load(Ordering::Relaxed);
285
286        Metric {
287            name: self.name.clone(),
288            metric_type: MetricType::Histogram,
289            value: MetricValue::Histogram {
290                buckets,
291                sum,
292                count,
293            },
294            labels: self.labels.clone(),
295            timestamp: Utc::now(),
296            help: self.help.clone(),
297        }
298    }
299}
300
301/// Timer metric for measuring durations
302pub struct TimerMetric {
303    name: String,
304    total_duration_ms: AtomicI64,
305    count: AtomicU64,
306    labels: HashMap<String, String>,
307    help: Option<String>,
308}
309
310impl TimerMetric {
311    pub fn new(name: impl Into<String>) -> Self {
312        Self {
313            name: name.into(),
314            total_duration_ms: AtomicI64::new(0),
315            count: AtomicU64::new(0),
316            labels: HashMap::new(),
317            help: None,
318        }
319    }
320
321    pub fn record(&self, duration_ms: f64) {
322        let current_total = f64::from_bits(self.total_duration_ms.load(Ordering::Relaxed) as u64);
323        self.total_duration_ms.store(
324            (current_total + duration_ms).to_bits() as i64,
325            Ordering::Relaxed,
326        );
327        self.count.fetch_add(1, Ordering::Relaxed);
328    }
329
330    pub fn average_duration(&self) -> f64 {
331        let total = f64::from_bits(self.total_duration_ms.load(Ordering::Relaxed) as u64);
332        let count = self.count.load(Ordering::Relaxed);
333        if count > 0 {
334            total / count as f64
335        } else {
336            0.0
337        }
338    }
339
340    pub fn to_metric(&self) -> Metric {
341        let duration_ms = self.average_duration();
342        let count = self.count.load(Ordering::Relaxed);
343
344        Metric {
345            name: self.name.clone(),
346            metric_type: MetricType::Timer,
347            value: MetricValue::Timer { duration_ms, count },
348            labels: self.labels.clone(),
349            timestamp: Utc::now(),
350            help: self.help.clone(),
351        }
352    }
353}
354
355/// Metrics registry for managing all metrics
356pub struct MetricsRegistry {
357    counters: Arc<RwLock<HashMap<String, Arc<CounterMetric>>>>,
358    gauges: Arc<RwLock<HashMap<String, Arc<GaugeMetric>>>>,
359    histograms: Arc<RwLock<HashMap<String, Arc<HistogramMetric>>>>,
360    timers: Arc<RwLock<HashMap<String, Arc<TimerMetric>>>>,
361}
362
363impl MetricsRegistry {
364    pub fn new() -> Self {
365        Self {
366            counters: Arc::new(RwLock::new(HashMap::new())),
367            gauges: Arc::new(RwLock::new(HashMap::new())),
368            histograms: Arc::new(RwLock::new(HashMap::new())),
369            timers: Arc::new(RwLock::new(HashMap::new())),
370        }
371    }
372
373    pub async fn get_or_create_counter(&self, name: &str) -> Arc<CounterMetric> {
374        let counters = self.counters.read().await;
375        if let Some(counter) = counters.get(name) {
376            return counter.clone();
377        }
378        drop(counters);
379
380        let mut counters = self.counters.write().await;
381        counters
382            .entry(name.to_string())
383            .or_insert_with(|| Arc::new(CounterMetric::new(name)))
384            .clone()
385    }
386
387    pub async fn get_or_create_gauge(&self, name: &str) -> Arc<GaugeMetric> {
388        let gauges = self.gauges.read().await;
389        if let Some(gauge) = gauges.get(name) {
390            return gauge.clone();
391        }
392        drop(gauges);
393
394        let mut gauges = self.gauges.write().await;
395        gauges
396            .entry(name.to_string())
397            .or_insert_with(|| Arc::new(GaugeMetric::new(name)))
398            .clone()
399    }
400
401    pub async fn get_or_create_histogram(
402        &self,
403        name: &str,
404        buckets: Vec<f64>,
405    ) -> Arc<HistogramMetric> {
406        let histograms = self.histograms.read().await;
407        if let Some(histogram) = histograms.get(name) {
408            return histogram.clone();
409        }
410        drop(histograms);
411
412        let mut histograms = self.histograms.write().await;
413        histograms
414            .entry(name.to_string())
415            .or_insert_with(|| Arc::new(HistogramMetric::new(name, buckets)))
416            .clone()
417    }
418
419    pub async fn get_or_create_timer(&self, name: &str) -> Arc<TimerMetric> {
420        let timers = self.timers.read().await;
421        if let Some(timer) = timers.get(name) {
422            return timer.clone();
423        }
424        drop(timers);
425
426        let mut timers = self.timers.write().await;
427        timers
428            .entry(name.to_string())
429            .or_insert_with(|| Arc::new(TimerMetric::new(name)))
430            .clone()
431    }
432
433    pub async fn collect_all_metrics(&self) -> Vec<Metric> {
434        let mut metrics = Vec::new();
435
436        // Collect counters
437        let counters = self.counters.read().await;
438        for counter in counters.values() {
439            metrics.push(counter.to_metric());
440        }
441        drop(counters);
442
443        // Collect gauges
444        let gauges = self.gauges.read().await;
445        for gauge in gauges.values() {
446            metrics.push(gauge.to_metric());
447        }
448        drop(gauges);
449
450        // Collect histograms
451        let histograms = self.histograms.read().await;
452        for histogram in histograms.values() {
453            metrics.push(histogram.to_metric());
454        }
455        drop(histograms);
456
457        // Collect timers
458        let timers = self.timers.read().await;
459        for timer in timers.values() {
460            metrics.push(timer.to_metric());
461        }
462
463        metrics
464    }
465}
466
467/// Main metrics collector
468pub struct MetricsCollector {
469    config: MetricsConfig,
470    registry: Arc<MetricsRegistry>,
471    buffer: Arc<RwLock<Vec<Metric>>>,
472    sender: mpsc::UnboundedSender<Metric>,
473    _receiver_handle: tokio::task::JoinHandle<()>,
474    is_running: Arc<RwLock<bool>>,
475}
476
477impl MetricsCollector {
478    pub async fn new(config: MetricsConfig) -> RragResult<Self> {
479        let registry = Arc::new(MetricsRegistry::new());
480        let buffer = Arc::new(RwLock::new(Vec::with_capacity(config.buffer_size)));
481        let (sender, mut receiver) = mpsc::unbounded_channel();
482        let is_running = Arc::new(RwLock::new(false));
483
484        let buffer_clone = buffer.clone();
485        let is_running_clone = is_running.clone();
486        let buffer_size = config.buffer_size;
487
488        let receiver_handle = tokio::spawn(async move {
489            while let Some(metric) = receiver.recv().await {
490                if !*is_running_clone.read().await {
491                    break;
492                }
493
494                let mut buffer = buffer_clone.write().await;
495                buffer.push(metric);
496
497                // Prevent buffer overflow
498                if buffer.len() >= buffer_size {
499                    buffer.drain(0..buffer_size / 2);
500                }
501            }
502        });
503
504        Ok(Self {
505            config,
506            registry,
507            buffer,
508            sender,
509            _receiver_handle: receiver_handle,
510            is_running,
511        })
512    }
513
514    pub async fn start(&self) -> RragResult<()> {
515        let mut running = self.is_running.write().await;
516        if *running {
517            return Err(RragError::config("metrics", "stopped", "already running"));
518        }
519        *running = true;
520        tracing::info!("Metrics collector started");
521        Ok(())
522    }
523
524    pub async fn stop(&self) -> RragResult<()> {
525        let mut running = self.is_running.write().await;
526        if !*running {
527            return Ok(());
528        }
529        *running = false;
530        tracing::info!("Metrics collector stopped");
531        Ok(())
532    }
533
534    pub async fn is_healthy(&self) -> bool {
535        *self.is_running.read().await
536    }
537
538    pub async fn record_metric(&self, metric: Metric) -> RragResult<()> {
539        if !*self.is_running.read().await {
540            return Err(RragError::config("metrics", "running", "stopped"));
541        }
542
543        self.sender
544            .send(metric)
545            .map_err(|e| RragError::agent("metrics", e.to_string()))?;
546
547        Ok(())
548    }
549
550    pub async fn inc_counter(&self, name: &str) -> RragResult<()> {
551        let counter = self.registry.get_or_create_counter(name).await;
552        counter.inc();
553        Ok(())
554    }
555
556    pub async fn inc_counter_by(&self, name: &str, value: u64) -> RragResult<()> {
557        let counter = self.registry.get_or_create_counter(name).await;
558        counter.inc_by(value);
559        Ok(())
560    }
561
562    pub async fn set_gauge(&self, name: &str, value: f64) -> RragResult<()> {
563        let gauge = self.registry.get_or_create_gauge(name).await;
564        gauge.set(value);
565        Ok(())
566    }
567
568    pub async fn observe_histogram(
569        &self,
570        name: &str,
571        value: f64,
572        buckets: Option<Vec<f64>>,
573    ) -> RragResult<()> {
574        let default_buckets = vec![0.001, 0.01, 0.1, 1.0, 10.0, 100.0, 1000.0];
575        let histogram = self
576            .registry
577            .get_or_create_histogram(name, buckets.unwrap_or(default_buckets))
578            .await;
579        histogram.observe(value);
580        Ok(())
581    }
582
583    pub async fn record_timer(&self, name: &str, duration_ms: f64) -> RragResult<()> {
584        let timer = self.registry.get_or_create_timer(name).await;
585        timer.record(duration_ms);
586        Ok(())
587    }
588
589    pub async fn get_all_metrics(&self) -> Vec<Metric> {
590        let registry_metrics = self.registry.collect_all_metrics().await;
591        let buffer_metrics = self.buffer.read().await.clone();
592
593        let mut all_metrics = registry_metrics;
594        all_metrics.extend(buffer_metrics);
595        all_metrics
596    }
597
598    pub async fn get_metrics_count(&self) -> usize {
599        self.buffer.read().await.len()
600    }
601
602    pub async fn clear_buffer(&self) -> Vec<Metric> {
603        let mut buffer = self.buffer.write().await;
604        let metrics = buffer.clone();
605        buffer.clear();
606        metrics
607    }
608}
609
610#[cfg(test)]
611mod tests {
612    use super::*;
613
614    #[tokio::test]
615    async fn test_counter_metric() {
616        let counter = CounterMetric::new("test_counter");
617
618        assert_eq!(counter.get(), 0);
619
620        counter.inc();
621        assert_eq!(counter.get(), 1);
622
623        counter.inc_by(5);
624        assert_eq!(counter.get(), 6);
625
626        counter.reset();
627        assert_eq!(counter.get(), 0);
628    }
629
630    #[tokio::test]
631    async fn test_gauge_metric() {
632        let gauge = GaugeMetric::new("test_gauge");
633
634        assert_eq!(gauge.get(), 0.0);
635
636        gauge.set(10.5);
637        assert_eq!(gauge.get(), 10.5);
638
639        gauge.inc();
640        assert_eq!(gauge.get(), 11.5);
641
642        gauge.dec();
643        assert_eq!(gauge.get(), 10.5);
644
645        gauge.add(-5.0);
646        assert_eq!(gauge.get(), 5.5);
647    }
648
649    #[tokio::test]
650    async fn test_histogram_metric() {
651        let histogram = HistogramMetric::new("test_histogram", vec![1.0, 5.0, 10.0]);
652
653        histogram.observe(0.5);
654        histogram.observe(3.0);
655        histogram.observe(7.0);
656        histogram.observe(15.0);
657
658        let metric = histogram.to_metric();
659        if let MetricValue::Histogram {
660            buckets,
661            sum,
662            count,
663        } = metric.value
664        {
665            assert_eq!(count, 4);
666            assert_eq!(sum, 25.5);
667
668            // Check bucket counts
669            assert_eq!(buckets[0], (1.0, 1)); // 0.5 <= 1.0
670            assert_eq!(buckets[1], (5.0, 2)); // 0.5, 3.0 <= 5.0
671            assert_eq!(buckets[2], (10.0, 3)); // 0.5, 3.0, 7.0 <= 10.0
672            assert_eq!(buckets[3], (f64::INFINITY, 4)); // All values <= +Inf
673        }
674    }
675
676    #[tokio::test]
677    async fn test_timer_metric() {
678        let timer = TimerMetric::new("test_timer");
679
680        timer.record(100.0);
681        timer.record(200.0);
682        timer.record(300.0);
683
684        assert_eq!(timer.average_duration(), 200.0);
685
686        let metric = timer.to_metric();
687        if let MetricValue::Timer { duration_ms, count } = metric.value {
688            assert_eq!(duration_ms, 200.0);
689            assert_eq!(count, 3);
690        }
691    }
692
693    #[tokio::test]
694    async fn test_metrics_registry() {
695        let registry = MetricsRegistry::new();
696
697        // Test counter
698        let counter = registry.get_or_create_counter("test_counter").await;
699        counter.inc();
700
701        // Test gauge
702        let gauge = registry.get_or_create_gauge("test_gauge").await;
703        gauge.set(42.0);
704
705        // Collect all metrics
706        let metrics = registry.collect_all_metrics().await;
707        assert_eq!(metrics.len(), 2);
708
709        // Verify metrics
710        let counter_metric = metrics.iter().find(|m| m.name == "test_counter").unwrap();
711        assert_eq!(counter_metric.metric_type, MetricType::Counter);
712        if let MetricValue::Counter(value) = counter_metric.value {
713            assert_eq!(value, 1);
714        }
715
716        let gauge_metric = metrics.iter().find(|m| m.name == "test_gauge").unwrap();
717        assert_eq!(gauge_metric.metric_type, MetricType::Gauge);
718        if let MetricValue::Gauge(value) = gauge_metric.value {
719            assert_eq!(value, 42.0);
720        }
721    }
722
723    #[tokio::test]
724    async fn test_metrics_collector() {
725        let config = MetricsConfig::default();
726        let collector = MetricsCollector::new(config).await.unwrap();
727
728        collector.start().await.unwrap();
729        assert!(collector.is_healthy().await);
730
731        // Test counter operations
732        collector.inc_counter("requests_total").await.unwrap();
733        collector.inc_counter_by("requests_total", 5).await.unwrap();
734
735        // Test gauge operations
736        collector
737            .set_gauge("active_connections", 10.0)
738            .await
739            .unwrap();
740
741        // Test histogram operations
742        collector
743            .observe_histogram("request_duration", 0.5, None)
744            .await
745            .unwrap();
746
747        // Test timer operations
748        collector.record_timer("process_time", 150.0).await.unwrap();
749
750        let metrics = collector.get_all_metrics().await;
751        assert!(!metrics.is_empty());
752
753        collector.stop().await.unwrap();
754        assert!(!collector.is_healthy().await);
755    }
756
757    #[test]
758    fn test_metric_creation() {
759        let counter = Metric::counter("test_counter", 10);
760        assert_eq!(counter.name, "test_counter");
761        assert_eq!(counter.metric_type, MetricType::Counter);
762        if let MetricValue::Counter(value) = counter.value {
763            assert_eq!(value, 10);
764        }
765
766        let gauge = Metric::gauge("test_gauge", 42.5)
767            .with_label("host", "server1")
768            .with_help("Test gauge metric");
769
770        assert_eq!(gauge.name, "test_gauge");
771        assert_eq!(gauge.metric_type, MetricType::Gauge);
772        assert!(gauge.labels.contains_key("host"));
773        assert_eq!(gauge.labels["host"], "server1");
774        assert_eq!(gauge.help.as_ref().unwrap(), "Test gauge metric");
775    }
776}