carbon_prometheus_metrics/
lib.rs1use std::net::SocketAddr;
2use {
3 async_trait::async_trait,
4 carbon_core::{
5 error::{CarbonResult, Error},
6 metrics::Metrics,
7 },
8 metrics::{counter, gauge, histogram},
9 metrics_exporter_prometheus::PrometheusBuilder,
10 std::{collections::HashMap, sync::Once},
11 tokio::sync::RwLock,
12};
13
14pub struct PrometheusMetrics {
15 pub counters: RwLock<HashMap<String, metrics::Counter>>,
16 pub gauges: RwLock<HashMap<String, metrics::Gauge>>,
17 pub histograms: RwLock<HashMap<String, metrics::Histogram>>,
18 pub listen_port: u16,
19}
20
21impl Default for PrometheusMetrics {
22 fn default() -> Self {
23 Self {
24 counters: RwLock::new(HashMap::new()),
25 gauges: RwLock::new(HashMap::new()),
26 histograms: RwLock::new(HashMap::new()),
27 listen_port: 9100,
28 }
29 }
30}
31impl PrometheusMetrics {
32 pub fn new() -> Self {
33 Self::default()
34 }
35
36 pub fn new_with_port(listen_port: u16) -> Self {
37 Self {
38 gauges: RwLock::new(HashMap::new()),
39 counters: RwLock::new(HashMap::new()),
40 histograms: RwLock::new(HashMap::new()),
41 listen_port,
42 }
43 }
44}
45
46#[async_trait]
47impl Metrics for PrometheusMetrics {
48 async fn initialize(&self) -> CarbonResult<()> {
49 static INIT: Once = Once::new();
50
51 let mut result = Ok(());
52 INIT.call_once(|| {
53 let addr = format!("127.0.0.1:{}", self.listen_port)
54 .parse::<SocketAddr>()
55 .expect("Failed to parse address");
56
57 let builder = PrometheusBuilder::new().with_http_listener(addr);
58
59 match builder.install() {
60 Ok(_handle) => {
61 log::info!("Prometheus exporter installed and listening on {}", addr);
62 }
63 Err(e) => {
64 result = Err(Error::Custom(format!(
65 "Failed to install Prometheus exporter: {}",
66 e
67 )));
68 }
69 }
70 });
71 result
72 }
73
74 async fn flush(&self) -> CarbonResult<()> {
75 Ok(())
76 }
77
78 async fn shutdown(&self) -> CarbonResult<()> {
79 Ok(())
80 }
81
82 async fn update_gauge(&self, name: &str, value: f64) -> CarbonResult<()> {
83 let mut gauge = self.gauges.write().await;
84
85 if let Some(gauge) = gauge.get(name) {
86 gauge.set(value);
87 } else {
88 let new_gauge = gauge!(name.to_string());
89 new_gauge.set(value);
90 gauge.insert(name.to_string(), new_gauge);
91 }
92
93 Ok(())
94 }
95
96 async fn increment_counter(&self, name: &str, value: u64) -> CarbonResult<()> {
97 let mut counter = self.counters.write().await;
98
99 if let Some(counter) = counter.get(name) {
100 counter.increment(value);
101 } else {
102 let new_counter = counter!(name.to_string());
103 new_counter.increment(value);
104 counter.insert(name.to_string(), new_counter);
105 }
106
107 Ok(())
108 }
109
110 async fn record_histogram(&self, name: &str, value: f64) -> CarbonResult<()> {
111 let mut histogram = self.histograms.write().await;
112
113 if let Some(histogram) = histogram.get(name) {
114 histogram.record(value);
115 } else {
116 let new_histogram = histogram!(name.to_string());
117 new_histogram.record(value);
118 histogram.insert(name.to_string(), new_histogram);
119 }
120
121 Ok(())
122 }
123}