metrics_observer_prometheus/
lib.rs1#![deny(missing_docs)]
3use hdrhistogram::Histogram;
4use metrics_core::{Builder, Drain, Key, Label, Observer};
5use metrics_util::{parse_quantiles, Quantile};
6use std::iter::FromIterator;
7use std::{collections::HashMap, time::SystemTime};
8
9pub struct PrometheusBuilder {
11 quantiles: Vec<Quantile>,
12 buckets: Vec<u64>,
13 buckets_by_name: Option<HashMap<String, Vec<u64>>>,
14}
15
16impl PrometheusBuilder {
17 pub fn new() -> Self {
19 let quantiles = parse_quantiles(&[0.0, 0.5, 0.9, 0.95, 0.99, 0.999, 1.0]);
20
21 Self {
22 quantiles,
23 buckets: vec![],
24 buckets_by_name: None,
25 }
26 }
27
28 pub fn set_quantiles(mut self, quantiles: &[f64]) -> Self {
35 self.quantiles = parse_quantiles(quantiles);
36 self
37 }
38
39 pub fn set_buckets(mut self, values: &[u64]) -> Self {
45 self.buckets = values.to_vec();
46 self
47 }
48
49 pub fn set_buckets_for_metric(mut self, name: &str, values: &[u64]) -> Self {
56 let buckets = self.buckets_by_name.get_or_insert_with(|| HashMap::new());
57 buckets.insert(name.to_owned(), values.to_vec());
58 self
59 }
60}
61
62impl Builder for PrometheusBuilder {
63 type Output = PrometheusObserver;
64
65 fn build(&self) -> Self::Output {
66 PrometheusObserver {
67 quantiles: self.quantiles.clone(),
68 buckets: self.buckets.clone(),
69 histos: HashMap::new(),
70 output: get_prom_expo_header(),
71 counters: HashMap::new(),
72 gauges: HashMap::new(),
73 buckets_by_name: self.buckets_by_name.clone(),
74 }
75 }
76}
77
78impl Default for PrometheusBuilder {
79 fn default() -> Self {
80 Self::new()
81 }
82}
83
84pub struct PrometheusObserver {
86 pub(crate) quantiles: Vec<Quantile>,
87 pub(crate) buckets: Vec<u64>,
88 pub(crate) histos: HashMap<String, HashMap<Vec<String>, (u64, Histogram<u64>)>>,
89 pub(crate) output: String,
90 pub(crate) counters: HashMap<String, HashMap<Vec<String>, u64>>,
91 pub(crate) gauges: HashMap<String, HashMap<Vec<String>, i64>>,
92 pub(crate) buckets_by_name: Option<HashMap<String, Vec<u64>>>,
93}
94
95impl Observer for PrometheusObserver {
96 fn observe_counter(&mut self, key: Key, value: u64) {
97 let (name, labels) = key_to_parts(key);
98
99 let entry = self
100 .counters
101 .entry(name)
102 .or_insert_with(|| HashMap::new())
103 .entry(labels)
104 .or_insert_with(|| 0);
105
106 *entry += value;
107 }
108
109 fn observe_gauge(&mut self, key: Key, value: i64) {
110 let (name, labels) = key_to_parts(key);
111
112 let entry = self
113 .gauges
114 .entry(name)
115 .or_insert_with(|| HashMap::new())
116 .entry(labels)
117 .or_insert_with(|| 0);
118
119 *entry = value;
120 }
121
122 fn observe_histogram(&mut self, key: Key, values: &[u64]) {
123 let (name, labels) = key_to_parts(key);
124
125 let entry = self
126 .histos
127 .entry(name)
128 .or_insert_with(|| HashMap::new())
129 .entry(labels)
130 .or_insert_with(|| {
131 let h = Histogram::<u64>::new(3).expect("failed to create histogram");
132 (0, h)
133 });
134
135 let (sum, h) = entry;
136 for value in values {
137 h.record(*value).expect("failed to observe histogram value");
138 *sum += *value;
139 }
140 }
141}
142
143impl Drain<String> for PrometheusObserver {
144 fn drain(&mut self) -> String {
145 let mut output: String = self.output.drain(..).collect();
146
147 for (name, mut by_labels) in self.counters.drain() {
148 output.push_str("\n# TYPE ");
149 output.push_str(name.as_str());
150 output.push_str(" counter\n");
151 for (labels, value) in by_labels.drain() {
152 let full_name = render_labeled_name(&name, &labels);
153 output.push_str(full_name.as_str());
154 output.push_str(" ");
155 output.push_str(value.to_string().as_str());
156 output.push_str("\n");
157 }
158 }
159
160 for (name, mut by_labels) in self.gauges.drain() {
161 output.push_str("\n# TYPE ");
162 output.push_str(name.as_str());
163 output.push_str(" gauge\n");
164 for (labels, value) in by_labels.drain() {
165 let full_name = render_labeled_name(&name, &labels);
166 output.push_str(full_name.as_str());
167 output.push_str(" ");
168 output.push_str(value.to_string().as_str());
169 output.push_str("\n");
170 }
171 }
172 let mut sorted_overrides = self
173 .buckets_by_name
174 .as_ref()
175 .map(|h| Vec::from_iter(h.iter()))
176 .unwrap_or_else(|| vec![]);
177 sorted_overrides.sort_by(|(a, _), (b, _)| b.len().cmp(&a.len()));
178
179 for (name, mut by_labels) in self.histos.drain() {
180 let buckets = sorted_overrides
181 .iter()
182 .find_map(|(k, buckets)| {
183 if name.ends_with(*k) {
184 Some(*buckets)
185 } else {
186 None
187 }
188 })
189 .unwrap_or(&self.buckets);
190 let use_quantiles = buckets.is_empty();
191
192 output.push_str("\n# TYPE ");
193 output.push_str(name.as_str());
194 output.push_str(" ");
195 output.push_str(if use_quantiles {
196 "summary"
197 } else {
198 "histogram"
199 });
200 output.push_str("\n");
201
202 for (labels, sh) in by_labels.drain() {
203 let (sum, hist) = sh;
204
205 if use_quantiles {
206 for quantile in &self.quantiles {
207 let value = hist.value_at_quantile(quantile.value());
208 let mut labels = labels.clone();
209 labels.push(format!("quantile=\"{}\"", quantile.value()));
210 let full_name = render_labeled_name(&name, &labels);
211 output.push_str(full_name.as_str());
212 output.push_str(" ");
213 output.push_str(value.to_string().as_str());
214 output.push_str("\n");
215 }
216 } else {
217 for bucket in buckets {
218 let value = hist.count_between(0, *bucket);
219 let mut labels = labels.clone();
220 labels.push(format!("le=\"{}\"", bucket));
221 let bucket_name = format!("{}_bucket", name);
222 let full_name = render_labeled_name(&bucket_name, &labels);
223 output.push_str(full_name.as_str());
224 output.push_str(" ");
225 output.push_str(value.to_string().as_str());
226 output.push_str("\n");
227 }
228 let mut labels = labels.clone();
229 labels.push("le=\"+Inf\"".to_owned());
230 let bucket_name = format!("{}_bucket", name);
231 let full_name = render_labeled_name(&bucket_name, &labels);
232 output.push_str(full_name.as_str());
233 output.push_str(" ");
234 output.push_str(hist.len().to_string().as_str());
235 output.push_str("\n");
236 }
237 let sum_name = format!("{}_sum", name);
238 let full_sum_name = render_labeled_name(&sum_name, &labels);
239 output.push_str(full_sum_name.as_str());
240 output.push_str(" ");
241 output.push_str(sum.to_string().as_str());
242 output.push_str("\n");
243 let count_name = format!("{}_count", name);
244 let full_count_name = render_labeled_name(&count_name, &labels);
245 output.push_str(full_count_name.as_str());
246 output.push_str(" ");
247 output.push_str(hist.len().to_string().as_str());
248 output.push_str("\n");
249 }
250 }
251
252 output
253 }
254}
255
256fn key_to_parts(key: Key) -> (String, Vec<String>) {
257 let (name, labels) = key.into_parts();
258 let sanitize = |c| c == '.' || c == '=' || c == '{' || c == '}' || c == '+' || c == '-';
259 let name = name.replace(sanitize, "_");
260 let labels = labels
261 .into_iter()
262 .map(Label::into_parts)
263 .map(|(k, v)| {
264 format!(
265 "{}=\"{}\"",
266 k,
267 v.replace("\\", "\\\\")
268 .replace("\"", "\\\"")
269 .replace("\n", "\\n")
270 )
271 })
272 .collect();
273
274 (name, labels)
275}
276
277fn render_labeled_name(name: &str, labels: &[String]) -> String {
278 let mut output = name.to_string();
279 if !labels.is_empty() {
280 let joined = labels.join(",");
281 output.push_str("{");
282 output.push_str(&joined);
283 output.push_str("}");
284 }
285 output
286}
287
288fn get_prom_expo_header() -> String {
289 let ts = SystemTime::now()
290 .duration_since(SystemTime::UNIX_EPOCH)
291 .map(|d| d.as_secs())
292 .unwrap_or(0);
293
294 format!(
295 "# metrics snapshot (ts={}) (prometheus exposition format)",
296 ts
297 )
298}