metriki_prometheus_exporter/
lib.rs

1use std::sync::Arc;
2use std::thread;
3
4use derive_builder::Builder;
5use log::warn;
6use metriki_core::key::Key;
7use metriki_core::metrics::*;
8use metriki_core::MetricsRegistry;
9use prometheus::proto::{
10    Counter as PromethuesCount, Gauge as PromethuesGauge, LabelPair, Metric as PrometheusMetric,
11    MetricFamily, MetricType, Quantile, Summary,
12};
13use prometheus::{Encoder, TextEncoder};
14use tiny_http::{Response, Server};
15
16#[derive(Builder)]
17pub struct PrometheusExporter {
18    registry: Arc<MetricsRegistry>,
19    #[builder(setter(into), default = "\"0.0.0.0\".to_string()")]
20    host: String,
21    #[builder(setter)]
22    port: u16,
23    #[builder(default, setter(into))]
24    prefix: String,
25}
26
27fn new_counter(v: f64) -> PrometheusMetric {
28    let mut counter = PromethuesCount::new();
29    counter.set_value(v);
30
31    let mut metric = PrometheusMetric::new();
32    metric.set_counter(counter);
33
34    metric
35}
36
37fn new_gauge(v: f64) -> PrometheusMetric {
38    let mut gauge = PromethuesGauge::new();
39    gauge.set_value(v);
40
41    let mut metric = PrometheusMetric::new();
42    metric.set_gauge(gauge);
43
44    metric
45}
46
47fn new_quantile(f: f64, s: &HistogramSnapshot) -> Quantile {
48    let mut q = Quantile::new();
49
50    q.set_quantile(f);
51    q.set_value(s.quantile(f) as f64);
52
53    q
54}
55
56impl PrometheusExporter {
57    pub fn start(self) {
58        let addr = format!("{}:{}", self.host, self.port);
59        let server = Server::http(addr).expect("Failed to start promethues exporter server.");
60        let encoder = TextEncoder::new();
61
62        let looper = move || loop {
63            if let Ok(req) = server.recv() {
64                let metrics = self.registry.snapshots();
65                let metric_families: Vec<MetricFamily> = metrics
66                    .iter()
67                    .map(|(key, metric)| match metric {
68                        Metric::Counter(c) => self.report_counter(key, c.as_ref()),
69                        Metric::Gauge(g) => self.report_gauge(key, g.as_ref()),
70                        Metric::Timer(t) => self.report_timer(key, t.as_ref()),
71                        Metric::Meter(m) => self.report_meter(key, m.as_ref()),
72                        Metric::Histogram(h) => self.report_histogram(key, &h.snapshot()),
73                    })
74                    .collect();
75
76                let mut buffer = Vec::new();
77                encoder.encode(&metric_families, &mut buffer).unwrap();
78
79                if let Err(e) = req.respond(Response::from_data(buffer)) {
80                    warn!("Error on response {}", e);
81                }
82            }
83        };
84
85        thread::spawn(looper);
86    }
87
88    fn new_metric_family(&self, name: &str, mtype: MetricType) -> MetricFamily {
89        let mut family = MetricFamily::new();
90        family.set_name(format!("{}{}", self.prefix, name));
91        family.set_field_type(mtype);
92
93        family
94    }
95
96    fn report_meter(&self, key: &Key, meter: &Meter) -> MetricFamily {
97        let mut family = self.new_metric_family(key.key(), MetricType::COUNTER);
98
99        let counter = setup_tags(key, new_counter(meter.count() as f64));
100
101        family.set_metric(vec![counter].into());
102        family
103    }
104
105    fn report_gauge(&self, key: &Key, gauge: &Gauge) -> MetricFamily {
106        let mut family = self.new_metric_family(key.key(), MetricType::GAUGE);
107
108        let metric = setup_tags(key, new_gauge(gauge.value()));
109        family.set_metric(vec![metric].into());
110        family
111    }
112
113    fn report_histogram(&self, key: &Key, snapshot: &HistogramSnapshot) -> MetricFamily {
114        let mut family = self.new_metric_family(key.key(), MetricType::SUMMARY);
115
116        let mut metric = setup_tags(key, PrometheusMetric::new());
117        let quantiles = vec![
118            new_quantile(0.5, snapshot),
119            new_quantile(0.75, snapshot),
120            new_quantile(0.9, snapshot),
121            new_quantile(0.99, snapshot),
122            new_quantile(0.999, snapshot),
123        ];
124        let mut summary = Summary::new();
125        summary.set_quantile(quantiles.into());
126        metric.set_summary(summary);
127        family.set_metric(vec![metric].into());
128        family
129    }
130
131    fn report_counter(&self, key: &Key, c: &Counter) -> MetricFamily {
132        let mut family = self.new_metric_family(key.key(), MetricType::COUNTER);
133
134        let counter = setup_tags(key, new_counter(c.value() as f64));
135
136        family.set_metric(vec![counter].into());
137        family
138    }
139
140    fn report_timer(&self, key: &Key, t: &Timer) -> MetricFamily {
141        let rate = t.rate();
142        let latency = t.latency();
143
144        let mut family = self.new_metric_family(key.key(), MetricType::SUMMARY);
145        let mut metric = setup_tags(key, PrometheusMetric::new());
146        let quantiles = vec![
147            new_quantile(0.5, &latency),
148            new_quantile(0.75, &latency),
149            new_quantile(0.9, &latency),
150            new_quantile(0.99, &latency),
151            new_quantile(0.999, &latency),
152        ];
153        let mut summary = Summary::new();
154        summary.set_quantile(quantiles.into());
155        summary.set_sample_count(rate.count());
156        metric.set_summary(summary);
157        family.set_metric(vec![metric].into());
158        family
159    }
160}
161
162fn setup_tags(key: &Key, mut metric: PrometheusMetric) -> PrometheusMetric {
163    let labels = metric.mut_label();
164
165    for tag in key.tags() {
166        let mut lp = LabelPair::new();
167        lp.set_name(tag.key().to_string());
168        lp.set_value(tag.value().to_string());
169
170        labels.push(lp);
171    }
172    metric
173}