1use crate::{RragError, RragResult};
7use chrono::{DateTime, Utc};
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
11use std::sync::Arc;
12use tokio::sync::{mpsc, RwLock};
13
14#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct MetricsConfig {
17 pub enabled: bool,
18 pub collection_interval_seconds: u64,
19 pub buffer_size: usize,
20 pub export_interval_seconds: u64,
21 pub retention_days: u32,
22 pub labels: HashMap<String, String>,
23}
24
25impl Default for MetricsConfig {
26 fn default() -> Self {
27 Self {
28 enabled: true,
29 collection_interval_seconds: 10,
30 buffer_size: 10000,
31 export_interval_seconds: 60,
32 retention_days: 30,
33 labels: HashMap::new(),
34 }
35 }
36}
37
38#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
40pub enum MetricType {
41 Counter,
42 Gauge,
43 Histogram,
44 Timer,
45 Summary,
46}
47
48#[derive(Debug, Clone, Serialize, Deserialize)]
50pub enum MetricValue {
51 Counter(u64),
52 Gauge(f64),
53 Histogram {
54 buckets: Vec<(f64, u64)>,
55 sum: f64,
56 count: u64,
57 },
58 Timer {
59 duration_ms: f64,
60 count: u64,
61 },
62 Summary {
63 sum: f64,
64 count: u64,
65 quantiles: Vec<(f64, f64)>,
66 },
67}
68
69#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct Metric {
72 pub name: String,
73 pub metric_type: MetricType,
74 pub value: MetricValue,
75 pub labels: HashMap<String, String>,
76 pub timestamp: DateTime<Utc>,
77 pub help: Option<String>,
78}
79
80impl Metric {
81 pub fn counter(name: impl Into<String>, value: u64) -> Self {
82 Self {
83 name: name.into(),
84 metric_type: MetricType::Counter,
85 value: MetricValue::Counter(value),
86 labels: HashMap::new(),
87 timestamp: Utc::now(),
88 help: None,
89 }
90 }
91
92 pub fn gauge(name: impl Into<String>, value: f64) -> Self {
93 Self {
94 name: name.into(),
95 metric_type: MetricType::Gauge,
96 value: MetricValue::Gauge(value),
97 labels: HashMap::new(),
98 timestamp: Utc::now(),
99 help: None,
100 }
101 }
102
103 pub fn timer(name: impl Into<String>, duration_ms: f64) -> Self {
104 Self {
105 name: name.into(),
106 metric_type: MetricType::Timer,
107 value: MetricValue::Timer {
108 duration_ms,
109 count: 1,
110 },
111 labels: HashMap::new(),
112 timestamp: Utc::now(),
113 help: None,
114 }
115 }
116
117 pub fn with_label(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
118 self.labels.insert(key.into(), value.into());
119 self
120 }
121
122 pub fn with_labels(mut self, labels: HashMap<String, String>) -> Self {
123 self.labels.extend(labels);
124 self
125 }
126
127 pub fn with_help(mut self, help: impl Into<String>) -> Self {
128 self.help = Some(help.into());
129 self
130 }
131}
132
133pub struct CounterMetric {
135 name: String,
136 value: AtomicU64,
137 labels: HashMap<String, String>,
138 help: Option<String>,
139}
140
141impl CounterMetric {
142 pub fn new(name: impl Into<String>) -> Self {
143 Self {
144 name: name.into(),
145 value: AtomicU64::new(0),
146 labels: HashMap::new(),
147 help: None,
148 }
149 }
150
151 pub fn inc(&self) {
152 self.value.fetch_add(1, Ordering::Relaxed);
153 }
154
155 pub fn inc_by(&self, n: u64) {
156 self.value.fetch_add(n, Ordering::Relaxed);
157 }
158
159 pub fn get(&self) -> u64 {
160 self.value.load(Ordering::Relaxed)
161 }
162
163 pub fn reset(&self) {
164 self.value.store(0, Ordering::Relaxed);
165 }
166
167 pub fn to_metric(&self) -> Metric {
168 Metric {
169 name: self.name.clone(),
170 metric_type: MetricType::Counter,
171 value: MetricValue::Counter(self.get()),
172 labels: self.labels.clone(),
173 timestamp: Utc::now(),
174 help: self.help.clone(),
175 }
176 }
177}
178
179pub struct GaugeMetric {
181 name: String,
182 value: AtomicI64, labels: HashMap<String, String>,
184 help: Option<String>,
185}
186
187impl GaugeMetric {
188 pub fn new(name: impl Into<String>) -> Self {
189 Self {
190 name: name.into(),
191 value: AtomicI64::new(0),
192 labels: HashMap::new(),
193 help: None,
194 }
195 }
196
197 pub fn set(&self, value: f64) {
198 self.value.store(value.to_bits() as i64, Ordering::Relaxed);
199 }
200
201 pub fn inc(&self) {
202 let current = f64::from_bits(self.value.load(Ordering::Relaxed) as u64);
203 self.set(current + 1.0);
204 }
205
206 pub fn dec(&self) {
207 let current = f64::from_bits(self.value.load(Ordering::Relaxed) as u64);
208 self.set(current - 1.0);
209 }
210
211 pub fn add(&self, value: f64) {
212 let current = f64::from_bits(self.value.load(Ordering::Relaxed) as u64);
213 self.set(current + value);
214 }
215
216 pub fn get(&self) -> f64 {
217 f64::from_bits(self.value.load(Ordering::Relaxed) as u64)
218 }
219
220 pub fn to_metric(&self) -> Metric {
221 Metric {
222 name: self.name.clone(),
223 metric_type: MetricType::Gauge,
224 value: MetricValue::Gauge(self.get()),
225 labels: self.labels.clone(),
226 timestamp: Utc::now(),
227 help: self.help.clone(),
228 }
229 }
230}
231
232pub struct HistogramMetric {
234 name: String,
235 buckets: Vec<(f64, AtomicU64)>,
236 sum: AtomicI64, count: AtomicU64,
238 labels: HashMap<String, String>,
239 help: Option<String>,
240}
241
242impl HistogramMetric {
243 pub fn new(name: impl Into<String>, buckets: Vec<f64>) -> Self {
244 let mut histogram_buckets = Vec::new();
245 for bucket in buckets {
246 histogram_buckets.push((bucket, AtomicU64::new(0)));
247 }
248 histogram_buckets.push((f64::INFINITY, AtomicU64::new(0)));
250
251 Self {
252 name: name.into(),
253 buckets: histogram_buckets,
254 sum: AtomicI64::new(0),
255 count: AtomicU64::new(0),
256 labels: HashMap::new(),
257 help: None,
258 }
259 }
260
261 pub fn observe(&self, value: f64) {
262 for (le, counter) in &self.buckets {
264 if value <= *le {
265 counter.fetch_add(1, Ordering::Relaxed);
266 }
267 }
268
269 let current_sum = f64::from_bits(self.sum.load(Ordering::Relaxed) as u64);
271 self.sum
272 .store((current_sum + value).to_bits() as i64, Ordering::Relaxed);
273 self.count.fetch_add(1, Ordering::Relaxed);
274 }
275
276 pub fn to_metric(&self) -> Metric {
277 let buckets: Vec<(f64, u64)> = self
278 .buckets
279 .iter()
280 .map(|(le, counter)| (*le, counter.load(Ordering::Relaxed)))
281 .collect();
282
283 let sum = f64::from_bits(self.sum.load(Ordering::Relaxed) as u64);
284 let count = self.count.load(Ordering::Relaxed);
285
286 Metric {
287 name: self.name.clone(),
288 metric_type: MetricType::Histogram,
289 value: MetricValue::Histogram {
290 buckets,
291 sum,
292 count,
293 },
294 labels: self.labels.clone(),
295 timestamp: Utc::now(),
296 help: self.help.clone(),
297 }
298 }
299}
300
301pub struct TimerMetric {
303 name: String,
304 total_duration_ms: AtomicI64,
305 count: AtomicU64,
306 labels: HashMap<String, String>,
307 help: Option<String>,
308}
309
310impl TimerMetric {
311 pub fn new(name: impl Into<String>) -> Self {
312 Self {
313 name: name.into(),
314 total_duration_ms: AtomicI64::new(0),
315 count: AtomicU64::new(0),
316 labels: HashMap::new(),
317 help: None,
318 }
319 }
320
321 pub fn record(&self, duration_ms: f64) {
322 let current_total = f64::from_bits(self.total_duration_ms.load(Ordering::Relaxed) as u64);
323 self.total_duration_ms.store(
324 (current_total + duration_ms).to_bits() as i64,
325 Ordering::Relaxed,
326 );
327 self.count.fetch_add(1, Ordering::Relaxed);
328 }
329
330 pub fn average_duration(&self) -> f64 {
331 let total = f64::from_bits(self.total_duration_ms.load(Ordering::Relaxed) as u64);
332 let count = self.count.load(Ordering::Relaxed);
333 if count > 0 {
334 total / count as f64
335 } else {
336 0.0
337 }
338 }
339
340 pub fn to_metric(&self) -> Metric {
341 let duration_ms = self.average_duration();
342 let count = self.count.load(Ordering::Relaxed);
343
344 Metric {
345 name: self.name.clone(),
346 metric_type: MetricType::Timer,
347 value: MetricValue::Timer { duration_ms, count },
348 labels: self.labels.clone(),
349 timestamp: Utc::now(),
350 help: self.help.clone(),
351 }
352 }
353}
354
355pub struct MetricsRegistry {
357 counters: Arc<RwLock<HashMap<String, Arc<CounterMetric>>>>,
358 gauges: Arc<RwLock<HashMap<String, Arc<GaugeMetric>>>>,
359 histograms: Arc<RwLock<HashMap<String, Arc<HistogramMetric>>>>,
360 timers: Arc<RwLock<HashMap<String, Arc<TimerMetric>>>>,
361}
362
363impl MetricsRegistry {
364 pub fn new() -> Self {
365 Self {
366 counters: Arc::new(RwLock::new(HashMap::new())),
367 gauges: Arc::new(RwLock::new(HashMap::new())),
368 histograms: Arc::new(RwLock::new(HashMap::new())),
369 timers: Arc::new(RwLock::new(HashMap::new())),
370 }
371 }
372
373 pub async fn get_or_create_counter(&self, name: &str) -> Arc<CounterMetric> {
374 let counters = self.counters.read().await;
375 if let Some(counter) = counters.get(name) {
376 return counter.clone();
377 }
378 drop(counters);
379
380 let mut counters = self.counters.write().await;
381 counters
382 .entry(name.to_string())
383 .or_insert_with(|| Arc::new(CounterMetric::new(name)))
384 .clone()
385 }
386
387 pub async fn get_or_create_gauge(&self, name: &str) -> Arc<GaugeMetric> {
388 let gauges = self.gauges.read().await;
389 if let Some(gauge) = gauges.get(name) {
390 return gauge.clone();
391 }
392 drop(gauges);
393
394 let mut gauges = self.gauges.write().await;
395 gauges
396 .entry(name.to_string())
397 .or_insert_with(|| Arc::new(GaugeMetric::new(name)))
398 .clone()
399 }
400
401 pub async fn get_or_create_histogram(
402 &self,
403 name: &str,
404 buckets: Vec<f64>,
405 ) -> Arc<HistogramMetric> {
406 let histograms = self.histograms.read().await;
407 if let Some(histogram) = histograms.get(name) {
408 return histogram.clone();
409 }
410 drop(histograms);
411
412 let mut histograms = self.histograms.write().await;
413 histograms
414 .entry(name.to_string())
415 .or_insert_with(|| Arc::new(HistogramMetric::new(name, buckets)))
416 .clone()
417 }
418
419 pub async fn get_or_create_timer(&self, name: &str) -> Arc<TimerMetric> {
420 let timers = self.timers.read().await;
421 if let Some(timer) = timers.get(name) {
422 return timer.clone();
423 }
424 drop(timers);
425
426 let mut timers = self.timers.write().await;
427 timers
428 .entry(name.to_string())
429 .or_insert_with(|| Arc::new(TimerMetric::new(name)))
430 .clone()
431 }
432
433 pub async fn collect_all_metrics(&self) -> Vec<Metric> {
434 let mut metrics = Vec::new();
435
436 let counters = self.counters.read().await;
438 for counter in counters.values() {
439 metrics.push(counter.to_metric());
440 }
441 drop(counters);
442
443 let gauges = self.gauges.read().await;
445 for gauge in gauges.values() {
446 metrics.push(gauge.to_metric());
447 }
448 drop(gauges);
449
450 let histograms = self.histograms.read().await;
452 for histogram in histograms.values() {
453 metrics.push(histogram.to_metric());
454 }
455 drop(histograms);
456
457 let timers = self.timers.read().await;
459 for timer in timers.values() {
460 metrics.push(timer.to_metric());
461 }
462
463 metrics
464 }
465}
466
467pub struct MetricsCollector {
469 config: MetricsConfig,
470 registry: Arc<MetricsRegistry>,
471 buffer: Arc<RwLock<Vec<Metric>>>,
472 sender: mpsc::UnboundedSender<Metric>,
473 _receiver_handle: tokio::task::JoinHandle<()>,
474 is_running: Arc<RwLock<bool>>,
475}
476
477impl MetricsCollector {
478 pub async fn new(config: MetricsConfig) -> RragResult<Self> {
479 let registry = Arc::new(MetricsRegistry::new());
480 let buffer = Arc::new(RwLock::new(Vec::with_capacity(config.buffer_size)));
481 let (sender, mut receiver) = mpsc::unbounded_channel();
482 let is_running = Arc::new(RwLock::new(false));
483
484 let buffer_clone = buffer.clone();
485 let is_running_clone = is_running.clone();
486 let buffer_size = config.buffer_size;
487
488 let receiver_handle = tokio::spawn(async move {
489 while let Some(metric) = receiver.recv().await {
490 if !*is_running_clone.read().await {
491 break;
492 }
493
494 let mut buffer = buffer_clone.write().await;
495 buffer.push(metric);
496
497 if buffer.len() >= buffer_size {
499 buffer.drain(0..buffer_size / 2);
500 }
501 }
502 });
503
504 Ok(Self {
505 config,
506 registry,
507 buffer,
508 sender,
509 _receiver_handle: receiver_handle,
510 is_running,
511 })
512 }
513
514 pub async fn start(&self) -> RragResult<()> {
515 let mut running = self.is_running.write().await;
516 if *running {
517 return Err(RragError::config("metrics", "stopped", "already running"));
518 }
519 *running = true;
520 tracing::info!("Metrics collector started");
521 Ok(())
522 }
523
524 pub async fn stop(&self) -> RragResult<()> {
525 let mut running = self.is_running.write().await;
526 if !*running {
527 return Ok(());
528 }
529 *running = false;
530 tracing::info!("Metrics collector stopped");
531 Ok(())
532 }
533
534 pub async fn is_healthy(&self) -> bool {
535 *self.is_running.read().await
536 }
537
538 pub async fn record_metric(&self, metric: Metric) -> RragResult<()> {
539 if !*self.is_running.read().await {
540 return Err(RragError::config("metrics", "running", "stopped"));
541 }
542
543 self.sender
544 .send(metric)
545 .map_err(|e| RragError::agent("metrics", e.to_string()))?;
546
547 Ok(())
548 }
549
550 pub async fn inc_counter(&self, name: &str) -> RragResult<()> {
551 let counter = self.registry.get_or_create_counter(name).await;
552 counter.inc();
553 Ok(())
554 }
555
556 pub async fn inc_counter_by(&self, name: &str, value: u64) -> RragResult<()> {
557 let counter = self.registry.get_or_create_counter(name).await;
558 counter.inc_by(value);
559 Ok(())
560 }
561
562 pub async fn set_gauge(&self, name: &str, value: f64) -> RragResult<()> {
563 let gauge = self.registry.get_or_create_gauge(name).await;
564 gauge.set(value);
565 Ok(())
566 }
567
568 pub async fn observe_histogram(
569 &self,
570 name: &str,
571 value: f64,
572 buckets: Option<Vec<f64>>,
573 ) -> RragResult<()> {
574 let default_buckets = vec![0.001, 0.01, 0.1, 1.0, 10.0, 100.0, 1000.0];
575 let histogram = self
576 .registry
577 .get_or_create_histogram(name, buckets.unwrap_or(default_buckets))
578 .await;
579 histogram.observe(value);
580 Ok(())
581 }
582
583 pub async fn record_timer(&self, name: &str, duration_ms: f64) -> RragResult<()> {
584 let timer = self.registry.get_or_create_timer(name).await;
585 timer.record(duration_ms);
586 Ok(())
587 }
588
589 pub async fn get_all_metrics(&self) -> Vec<Metric> {
590 let registry_metrics = self.registry.collect_all_metrics().await;
591 let buffer_metrics = self.buffer.read().await.clone();
592
593 let mut all_metrics = registry_metrics;
594 all_metrics.extend(buffer_metrics);
595 all_metrics
596 }
597
598 pub async fn get_metrics_count(&self) -> usize {
599 self.buffer.read().await.len()
600 }
601
602 pub async fn clear_buffer(&self) -> Vec<Metric> {
603 let mut buffer = self.buffer.write().await;
604 let metrics = buffer.clone();
605 buffer.clear();
606 metrics
607 }
608}
609
610#[cfg(test)]
611mod tests {
612 use super::*;
613
614 #[tokio::test]
615 async fn test_counter_metric() {
616 let counter = CounterMetric::new("test_counter");
617
618 assert_eq!(counter.get(), 0);
619
620 counter.inc();
621 assert_eq!(counter.get(), 1);
622
623 counter.inc_by(5);
624 assert_eq!(counter.get(), 6);
625
626 counter.reset();
627 assert_eq!(counter.get(), 0);
628 }
629
630 #[tokio::test]
631 async fn test_gauge_metric() {
632 let gauge = GaugeMetric::new("test_gauge");
633
634 assert_eq!(gauge.get(), 0.0);
635
636 gauge.set(10.5);
637 assert_eq!(gauge.get(), 10.5);
638
639 gauge.inc();
640 assert_eq!(gauge.get(), 11.5);
641
642 gauge.dec();
643 assert_eq!(gauge.get(), 10.5);
644
645 gauge.add(-5.0);
646 assert_eq!(gauge.get(), 5.5);
647 }
648
649 #[tokio::test]
650 async fn test_histogram_metric() {
651 let histogram = HistogramMetric::new("test_histogram", vec![1.0, 5.0, 10.0]);
652
653 histogram.observe(0.5);
654 histogram.observe(3.0);
655 histogram.observe(7.0);
656 histogram.observe(15.0);
657
658 let metric = histogram.to_metric();
659 if let MetricValue::Histogram {
660 buckets,
661 sum,
662 count,
663 } = metric.value
664 {
665 assert_eq!(count, 4);
666 assert_eq!(sum, 25.5);
667
668 assert_eq!(buckets[0], (1.0, 1)); assert_eq!(buckets[1], (5.0, 2)); assert_eq!(buckets[2], (10.0, 3)); assert_eq!(buckets[3], (f64::INFINITY, 4)); }
674 }
675
676 #[tokio::test]
677 async fn test_timer_metric() {
678 let timer = TimerMetric::new("test_timer");
679
680 timer.record(100.0);
681 timer.record(200.0);
682 timer.record(300.0);
683
684 assert_eq!(timer.average_duration(), 200.0);
685
686 let metric = timer.to_metric();
687 if let MetricValue::Timer { duration_ms, count } = metric.value {
688 assert_eq!(duration_ms, 200.0);
689 assert_eq!(count, 3);
690 }
691 }
692
693 #[tokio::test]
694 async fn test_metrics_registry() {
695 let registry = MetricsRegistry::new();
696
697 let counter = registry.get_or_create_counter("test_counter").await;
699 counter.inc();
700
701 let gauge = registry.get_or_create_gauge("test_gauge").await;
703 gauge.set(42.0);
704
705 let metrics = registry.collect_all_metrics().await;
707 assert_eq!(metrics.len(), 2);
708
709 let counter_metric = metrics.iter().find(|m| m.name == "test_counter").unwrap();
711 assert_eq!(counter_metric.metric_type, MetricType::Counter);
712 if let MetricValue::Counter(value) = counter_metric.value {
713 assert_eq!(value, 1);
714 }
715
716 let gauge_metric = metrics.iter().find(|m| m.name == "test_gauge").unwrap();
717 assert_eq!(gauge_metric.metric_type, MetricType::Gauge);
718 if let MetricValue::Gauge(value) = gauge_metric.value {
719 assert_eq!(value, 42.0);
720 }
721 }
722
723 #[tokio::test]
724 async fn test_metrics_collector() {
725 let config = MetricsConfig::default();
726 let collector = MetricsCollector::new(config).await.unwrap();
727
728 collector.start().await.unwrap();
729 assert!(collector.is_healthy().await);
730
731 collector.inc_counter("requests_total").await.unwrap();
733 collector.inc_counter_by("requests_total", 5).await.unwrap();
734
735 collector
737 .set_gauge("active_connections", 10.0)
738 .await
739 .unwrap();
740
741 collector
743 .observe_histogram("request_duration", 0.5, None)
744 .await
745 .unwrap();
746
747 collector.record_timer("process_time", 150.0).await.unwrap();
749
750 let metrics = collector.get_all_metrics().await;
751 assert!(!metrics.is_empty());
752
753 collector.stop().await.unwrap();
754 assert!(!collector.is_healthy().await);
755 }
756
757 #[test]
758 fn test_metric_creation() {
759 let counter = Metric::counter("test_counter", 10);
760 assert_eq!(counter.name, "test_counter");
761 assert_eq!(counter.metric_type, MetricType::Counter);
762 if let MetricValue::Counter(value) = counter.value {
763 assert_eq!(value, 10);
764 }
765
766 let gauge = Metric::gauge("test_gauge", 42.5)
767 .with_label("host", "server1")
768 .with_help("Test gauge metric");
769
770 assert_eq!(gauge.name, "test_gauge");
771 assert_eq!(gauge.metric_type, MetricType::Gauge);
772 assert!(gauge.labels.contains_key("host"));
773 assert_eq!(gauge.labels["host"], "server1");
774 assert_eq!(gauge.help.as_ref().unwrap(), "Test gauge metric");
775 }
776}