carbon_prometheus_metrics/
lib.rs

1use std::net::SocketAddr;
2use {
3    async_trait::async_trait,
4    carbon_core::{
5        error::{CarbonResult, Error},
6        metrics::Metrics,
7    },
8    metrics::{counter, gauge, histogram},
9    metrics_exporter_prometheus::PrometheusBuilder,
10    std::{collections::HashMap, sync::Once},
11    tokio::sync::RwLock,
12};
13
14pub struct PrometheusMetrics {
15    pub counters: RwLock<HashMap<String, metrics::Counter>>,
16    pub gauges: RwLock<HashMap<String, metrics::Gauge>>,
17    pub histograms: RwLock<HashMap<String, metrics::Histogram>>,
18    pub listen_port: u16,
19}
20
21impl Default for PrometheusMetrics {
22    fn default() -> Self {
23        Self {
24            counters: RwLock::new(HashMap::new()),
25            gauges: RwLock::new(HashMap::new()),
26            histograms: RwLock::new(HashMap::new()),
27            listen_port: 9100,
28        }
29    }
30}
31impl PrometheusMetrics {
32    pub fn new() -> Self {
33        Self::default()
34    }
35
36    pub fn new_with_port(listen_port: u16) -> Self {
37        Self {
38            gauges: RwLock::new(HashMap::new()),
39            counters: RwLock::new(HashMap::new()),
40            histograms: RwLock::new(HashMap::new()),
41            listen_port,
42        }
43    }
44}
45
46#[async_trait]
47impl Metrics for PrometheusMetrics {
48    async fn initialize(&self) -> CarbonResult<()> {
49        static INIT: Once = Once::new();
50
51        let mut result = Ok(());
52        INIT.call_once(|| {
53            let addr = format!("127.0.0.1:{}", self.listen_port)
54                .parse::<SocketAddr>()
55                .expect("Failed to parse address");
56
57            let builder = PrometheusBuilder::new().with_http_listener(addr);
58
59            match builder.install() {
60                Ok(_handle) => {
61                    log::info!("Prometheus exporter installed and listening on {}", addr);
62                }
63                Err(e) => {
64                    result = Err(Error::Custom(format!(
65                        "Failed to install Prometheus exporter: {}",
66                        e
67                    )));
68                }
69            }
70        });
71        result
72    }
73
74    async fn flush(&self) -> CarbonResult<()> {
75        Ok(())
76    }
77
78    async fn shutdown(&self) -> CarbonResult<()> {
79        Ok(())
80    }
81
82    async fn update_gauge(&self, name: &str, value: f64) -> CarbonResult<()> {
83        let mut gauge = self.gauges.write().await;
84
85        if let Some(gauge) = gauge.get(name) {
86            gauge.set(value);
87        } else {
88            let new_gauge = gauge!(name.to_string());
89            new_gauge.set(value);
90            gauge.insert(name.to_string(), new_gauge);
91        }
92
93        Ok(())
94    }
95
96    async fn increment_counter(&self, name: &str, value: u64) -> CarbonResult<()> {
97        let mut counter = self.counters.write().await;
98
99        if let Some(counter) = counter.get(name) {
100            counter.increment(value);
101        } else {
102            let new_counter = counter!(name.to_string());
103            new_counter.increment(value);
104            counter.insert(name.to_string(), new_counter);
105        }
106
107        Ok(())
108    }
109
110    async fn record_histogram(&self, name: &str, value: f64) -> CarbonResult<()> {
111        let mut histogram = self.histograms.write().await;
112
113        if let Some(histogram) = histogram.get(name) {
114            histogram.record(value);
115        } else {
116            let new_histogram = histogram!(name.to_string());
117            new_histogram.record(value);
118            histogram.insert(name.to_string(), new_histogram);
119        }
120
121        Ok(())
122    }
123}