metriki_prometheus_exporter/
lib.rs1use std::sync::Arc;
2use std::thread;
3
4use derive_builder::Builder;
5use log::warn;
6use metriki_core::key::Key;
7use metriki_core::metrics::*;
8use metriki_core::MetricsRegistry;
9use prometheus::proto::{
10 Counter as PromethuesCount, Gauge as PromethuesGauge, LabelPair, Metric as PrometheusMetric,
11 MetricFamily, MetricType, Quantile, Summary,
12};
13use prometheus::{Encoder, TextEncoder};
14use tiny_http::{Response, Server};
15
16#[derive(Builder)]
17pub struct PrometheusExporter {
18 registry: Arc<MetricsRegistry>,
19 #[builder(setter(into), default = "\"0.0.0.0\".to_string()")]
20 host: String,
21 #[builder(setter)]
22 port: u16,
23 #[builder(default, setter(into))]
24 prefix: String,
25}
26
27fn new_counter(v: f64) -> PrometheusMetric {
28 let mut counter = PromethuesCount::new();
29 counter.set_value(v);
30
31 let mut metric = PrometheusMetric::new();
32 metric.set_counter(counter);
33
34 metric
35}
36
37fn new_gauge(v: f64) -> PrometheusMetric {
38 let mut gauge = PromethuesGauge::new();
39 gauge.set_value(v);
40
41 let mut metric = PrometheusMetric::new();
42 metric.set_gauge(gauge);
43
44 metric
45}
46
47fn new_quantile(f: f64, s: &HistogramSnapshot) -> Quantile {
48 let mut q = Quantile::new();
49
50 q.set_quantile(f);
51 q.set_value(s.quantile(f) as f64);
52
53 q
54}
55
56impl PrometheusExporter {
57 pub fn start(self) {
58 let addr = format!("{}:{}", self.host, self.port);
59 let server = Server::http(addr).expect("Failed to start promethues exporter server.");
60 let encoder = TextEncoder::new();
61
62 let looper = move || loop {
63 if let Ok(req) = server.recv() {
64 let metrics = self.registry.snapshots();
65 let metric_families: Vec<MetricFamily> = metrics
66 .iter()
67 .map(|(key, metric)| match metric {
68 Metric::Counter(c) => self.report_counter(key, c.as_ref()),
69 Metric::Gauge(g) => self.report_gauge(key, g.as_ref()),
70 Metric::Timer(t) => self.report_timer(key, t.as_ref()),
71 Metric::Meter(m) => self.report_meter(key, m.as_ref()),
72 Metric::Histogram(h) => self.report_histogram(key, &h.snapshot()),
73 })
74 .collect();
75
76 let mut buffer = Vec::new();
77 encoder.encode(&metric_families, &mut buffer).unwrap();
78
79 if let Err(e) = req.respond(Response::from_data(buffer)) {
80 warn!("Error on response {}", e);
81 }
82 }
83 };
84
85 thread::spawn(looper);
86 }
87
88 fn new_metric_family(&self, name: &str, mtype: MetricType) -> MetricFamily {
89 let mut family = MetricFamily::new();
90 family.set_name(format!("{}{}", self.prefix, name));
91 family.set_field_type(mtype);
92
93 family
94 }
95
96 fn report_meter(&self, key: &Key, meter: &Meter) -> MetricFamily {
97 let mut family = self.new_metric_family(key.key(), MetricType::COUNTER);
98
99 let counter = setup_tags(key, new_counter(meter.count() as f64));
100
101 family.set_metric(vec![counter].into());
102 family
103 }
104
105 fn report_gauge(&self, key: &Key, gauge: &Gauge) -> MetricFamily {
106 let mut family = self.new_metric_family(key.key(), MetricType::GAUGE);
107
108 let metric = setup_tags(key, new_gauge(gauge.value()));
109 family.set_metric(vec![metric].into());
110 family
111 }
112
113 fn report_histogram(&self, key: &Key, snapshot: &HistogramSnapshot) -> MetricFamily {
114 let mut family = self.new_metric_family(key.key(), MetricType::SUMMARY);
115
116 let mut metric = setup_tags(key, PrometheusMetric::new());
117 let quantiles = vec![
118 new_quantile(0.5, snapshot),
119 new_quantile(0.75, snapshot),
120 new_quantile(0.9, snapshot),
121 new_quantile(0.99, snapshot),
122 new_quantile(0.999, snapshot),
123 ];
124 let mut summary = Summary::new();
125 summary.set_quantile(quantiles.into());
126 metric.set_summary(summary);
127 family.set_metric(vec![metric].into());
128 family
129 }
130
131 fn report_counter(&self, key: &Key, c: &Counter) -> MetricFamily {
132 let mut family = self.new_metric_family(key.key(), MetricType::COUNTER);
133
134 let counter = setup_tags(key, new_counter(c.value() as f64));
135
136 family.set_metric(vec![counter].into());
137 family
138 }
139
140 fn report_timer(&self, key: &Key, t: &Timer) -> MetricFamily {
141 let rate = t.rate();
142 let latency = t.latency();
143
144 let mut family = self.new_metric_family(key.key(), MetricType::SUMMARY);
145 let mut metric = setup_tags(key, PrometheusMetric::new());
146 let quantiles = vec![
147 new_quantile(0.5, &latency),
148 new_quantile(0.75, &latency),
149 new_quantile(0.9, &latency),
150 new_quantile(0.99, &latency),
151 new_quantile(0.999, &latency),
152 ];
153 let mut summary = Summary::new();
154 summary.set_quantile(quantiles.into());
155 summary.set_sample_count(rate.count());
156 metric.set_summary(summary);
157 family.set_metric(vec![metric].into());
158 family
159 }
160}
161
162fn setup_tags(key: &Key, mut metric: PrometheusMetric) -> PrometheusMetric {
163 let labels = metric.mut_label();
164
165 for tag in key.tags() {
166 let mut lp = LabelPair::new();
167 lp.set_name(tag.key().to_string());
168 lp.set_value(tag.value().to_string());
169
170 labels.push(lp);
171 }
172 metric
173}