metrics_exporter_prometheus/
recorder.rs

1use std::collections::HashMap;
2use std::io;
3use std::sync::atomic::Ordering;
4use std::sync::Arc;
5use std::sync::{PoisonError, RwLock};
6
7use indexmap::IndexMap;
8use metrics::{Counter, Gauge, Histogram, Key, KeyName, Metadata, Recorder, SharedString, Unit};
9use metrics_util::registry::{Recency, Registry};
10use quanta::Instant;
11
12use crate::common::{LabelSet, Snapshot};
13use crate::distribution::{Distribution, DistributionBuilder};
14use crate::formatting::{
15    sanitize_metric_name, write_help_line, write_metric_line, write_type_line,
16};
17use crate::registry::GenerationalAtomicStorage;
18
19#[derive(Debug)]
20pub(crate) struct Inner {
21    pub registry: Registry<Key, GenerationalAtomicStorage>,
22    pub recency: Recency<Key>,
23    pub distributions: RwLock<HashMap<String, IndexMap<LabelSet, Distribution>>>,
24    pub distribution_builder: DistributionBuilder,
25    pub descriptions: RwLock<HashMap<String, (SharedString, Option<Unit>)>>,
26    pub global_labels: IndexMap<String, String>,
27    pub enable_unit_suffix: bool,
28    pub counter_suffix: Option<&'static str>,
29}
30
31impl Inner {
32    fn get_recent_metrics(&self) -> Snapshot {
33        let mut counters = HashMap::new();
34        let counter_handles = self.registry.get_counter_handles();
35        for (key, counter) in counter_handles {
36            let gen = counter.get_generation();
37            if !self.recency.should_store_counter(&key, gen, &self.registry) {
38                continue;
39            }
40
41            let name = sanitize_metric_name(key.name());
42            let labels = LabelSet::from_key_and_global(&key, &self.global_labels);
43            let value = counter.get_inner().load(Ordering::Acquire);
44            let entry =
45                counters.entry(name).or_insert_with(HashMap::new).entry(labels).or_insert(0);
46            *entry = value;
47        }
48
49        let mut gauges = HashMap::new();
50        let gauge_handles = self.registry.get_gauge_handles();
51        for (key, gauge) in gauge_handles {
52            let gen = gauge.get_generation();
53            if !self.recency.should_store_gauge(&key, gen, &self.registry) {
54                continue;
55            }
56
57            let name = sanitize_metric_name(key.name());
58            let labels = LabelSet::from_key_and_global(&key, &self.global_labels);
59            let value = f64::from_bits(gauge.get_inner().load(Ordering::Acquire));
60            let entry =
61                gauges.entry(name).or_insert_with(HashMap::new).entry(labels).or_insert(0.0);
62            *entry = value;
63        }
64
65        // Update distributions
66        self.drain_histograms_to_distributions();
67        // Remove expired histograms
68        let histogram_handles = self.registry.get_histogram_handles();
69        for (key, histogram) in histogram_handles {
70            let gen = histogram.get_generation();
71            if !self.recency.should_store_histogram(&key, gen, &self.registry) {
72                // Since we store aggregated distributions directly, when we're told that a metric
73                // is not recent enough and should be/was deleted from the registry, we also need to
74                // delete it on our side as well.
75                let name = sanitize_metric_name(key.name());
76                let labels = LabelSet::from_key_and_global(&key, &self.global_labels);
77                let mut wg = self.distributions.write().unwrap_or_else(PoisonError::into_inner);
78                let delete_by_name = if let Some(by_name) = wg.get_mut(&name) {
79                    by_name.swap_remove(&labels);
80                    by_name.is_empty()
81                } else {
82                    false
83                };
84
85                // If there's no more variants in the per-metric-name distribution map, then delete
86                // it entirely, otherwise we end up with weird empty output during render.
87                if delete_by_name {
88                    wg.remove(&name);
89                }
90            }
91        }
92
93        let distributions =
94            self.distributions.read().unwrap_or_else(PoisonError::into_inner).clone();
95
96        Snapshot { counters, gauges, distributions }
97    }
98
99    /// Drains histogram samples into distribution.
100    fn drain_histograms_to_distributions(&self) {
101        let histogram_handles = self.registry.get_histogram_handles();
102        for (key, histogram) in histogram_handles {
103            let name = sanitize_metric_name(key.name());
104            let labels = LabelSet::from_key_and_global(&key, &self.global_labels);
105
106            let mut wg = self.distributions.write().unwrap_or_else(PoisonError::into_inner);
107            let entry = wg
108                .entry(name.clone())
109                .or_default()
110                .entry(labels)
111                .or_insert_with(|| self.distribution_builder.get_distribution(name.as_str()));
112
113            histogram.get_inner().clear_with(|samples| entry.record_samples(samples));
114        }
115    }
116
117    fn render_to_write(&self, output: &mut impl io::Write) -> io::Result<()> {
118        let Snapshot { mut counters, mut distributions, mut gauges } = self.get_recent_metrics();
119
120        let mut intermediate = String::new();
121        let descriptions = self.descriptions.read().unwrap_or_else(PoisonError::into_inner);
122
123        for (name, mut by_labels) in counters.drain() {
124            let unit = descriptions.get(name.as_str()).and_then(|(desc, unit)| {
125                let unit = unit.filter(|_| self.enable_unit_suffix);
126                write_help_line(&mut intermediate, name.as_str(), unit, self.counter_suffix, desc);
127                unit
128            });
129
130            write_type_line(&mut intermediate, name.as_str(), unit, self.counter_suffix, "counter");
131
132            // A chunk is emitted here, just in case there are a large number of sets below.
133            output.write_all(intermediate.as_bytes())?;
134            intermediate.clear();
135
136            for (labels, value) in by_labels.drain() {
137                write_metric_line::<&str, u64>(
138                    &mut intermediate,
139                    &name,
140                    self.counter_suffix,
141                    &labels,
142                    None,
143                    value,
144                    unit,
145                );
146                // Each set gets its own write invocation.
147                output.write_all(intermediate.as_bytes())?;
148                intermediate.clear();
149            }
150            output.write_all(b"\n")?;
151        }
152
153        for (name, mut by_labels) in gauges.drain() {
154            let unit = descriptions.get(name.as_str()).and_then(|(desc, unit)| {
155                let unit = unit.filter(|_| self.enable_unit_suffix);
156                write_help_line(&mut intermediate, name.as_str(), unit, None, desc);
157                unit
158            });
159
160            write_type_line(&mut intermediate, name.as_str(), unit, None, "gauge");
161
162            // A chunk is emitted here, just in case there are a large number of sets below.
163            output.write_all(intermediate.as_bytes())?;
164            intermediate.clear();
165
166            for (labels, value) in by_labels.drain() {
167                write_metric_line::<&str, f64>(
168                    &mut intermediate,
169                    &name,
170                    None,
171                    &labels,
172                    None,
173                    value,
174                    unit,
175                );
176                // Each set gets its own write invocation.
177                output.write_all(intermediate.as_bytes())?;
178                intermediate.clear();
179            }
180            output.write_all(b"\n")?;
181        }
182
183        for (name, mut by_labels) in distributions.drain() {
184            let distribution_type = self.distribution_builder.get_distribution_type(name.as_str());
185
186            // Skip native histograms in text format - they're only supported in protobuf format
187            if distribution_type == "native_histogram" {
188                continue;
189            }
190
191            let unit = descriptions.get(name.as_str()).and_then(|(desc, unit)| {
192                let unit = unit.filter(|_| self.enable_unit_suffix);
193                write_help_line(&mut intermediate, name.as_str(), unit, None, desc);
194                unit
195            });
196
197            write_type_line(&mut intermediate, name.as_str(), unit, None, distribution_type);
198
199            // A chunk is emitted here, just in case there are a large number of sets below.
200            output.write_all(intermediate.as_bytes())?;
201            intermediate.clear();
202
203            for (labels, distribution) in by_labels.drain(..) {
204                let (sum, count) = match distribution {
205                    Distribution::Summary(summary, quantiles, sum) => {
206                        let snapshot = summary.snapshot(Instant::now());
207                        for quantile in quantiles.iter() {
208                            let value = snapshot.quantile(quantile.value()).unwrap_or(0.0);
209                            write_metric_line(
210                                &mut intermediate,
211                                &name,
212                                None,
213                                &labels,
214                                Some(("quantile", quantile.value())),
215                                value,
216                                unit,
217                            );
218                        }
219
220                        (sum, summary.count() as u64)
221                    }
222                    Distribution::Histogram(histogram) => {
223                        for (le, count) in histogram.buckets() {
224                            write_metric_line(
225                                &mut intermediate,
226                                &name,
227                                Some("bucket"),
228                                &labels,
229                                Some(("le", le)),
230                                count,
231                                unit,
232                            );
233                        }
234                        write_metric_line(
235                            &mut intermediate,
236                            &name,
237                            Some("bucket"),
238                            &labels,
239                            Some(("le", "+Inf")),
240                            histogram.count(),
241                            unit,
242                        );
243
244                        (histogram.sum(), histogram.count())
245                    }
246                    Distribution::NativeHistogram(_) => {
247                        // Native histograms are not supported in text format
248                        // This branch should not be reached due to the continue above
249                        continue;
250                    }
251                };
252
253                write_metric_line::<&str, f64>(
254                    &mut intermediate,
255                    &name,
256                    Some("sum"),
257                    &labels,
258                    None,
259                    sum,
260                    unit,
261                );
262                write_metric_line::<&str, u64>(
263                    &mut intermediate,
264                    &name,
265                    Some("count"),
266                    &labels,
267                    None,
268                    count,
269                    unit,
270                );
271
272                // Each set gets its own write invocation.
273                output.write_all(intermediate.as_bytes())?;
274                intermediate.clear();
275            }
276
277            output.write_all(b"\n")?;
278        }
279
280        Ok(())
281    }
282
283    fn run_upkeep(&self) {
284        self.drain_histograms_to_distributions();
285    }
286}
287
288/// A Prometheus recorder.
289///
290/// Most users will not need to interact directly with the recorder, and can simply deal with the
291/// builder methods on [`PrometheusBuilder`](crate::PrometheusBuilder) for building and installing
292/// the recorder/exporter.
293#[derive(Debug)]
294pub struct PrometheusRecorder {
295    inner: Arc<Inner>,
296}
297
298impl PrometheusRecorder {
299    /// Gets a [`PrometheusHandle`] to this recorder.
300    pub fn handle(&self) -> PrometheusHandle {
301        PrometheusHandle { inner: self.inner.clone() }
302    }
303
304    fn add_description_if_missing(
305        &self,
306        key_name: &KeyName,
307        description: SharedString,
308        unit: Option<Unit>,
309    ) {
310        let sanitized = sanitize_metric_name(key_name.as_str());
311        let mut descriptions =
312            self.inner.descriptions.write().unwrap_or_else(PoisonError::into_inner);
313        descriptions.entry(sanitized).or_insert((description, unit));
314    }
315}
316
317impl From<Inner> for PrometheusRecorder {
318    fn from(inner: Inner) -> Self {
319        PrometheusRecorder { inner: Arc::new(inner) }
320    }
321}
322
323impl Recorder for PrometheusRecorder {
324    fn describe_counter(&self, key_name: KeyName, unit: Option<Unit>, description: SharedString) {
325        self.add_description_if_missing(&key_name, description, unit);
326    }
327
328    fn describe_gauge(&self, key_name: KeyName, unit: Option<Unit>, description: SharedString) {
329        self.add_description_if_missing(&key_name, description, unit);
330    }
331
332    fn describe_histogram(&self, key_name: KeyName, unit: Option<Unit>, description: SharedString) {
333        self.add_description_if_missing(&key_name, description, unit);
334    }
335
336    fn register_counter(&self, key: &Key, _metadata: &Metadata<'_>) -> Counter {
337        self.inner.registry.get_or_create_counter(key, |c| c.clone().into())
338    }
339
340    fn register_gauge(&self, key: &Key, _metadata: &Metadata<'_>) -> Gauge {
341        self.inner.registry.get_or_create_gauge(key, |c| c.clone().into())
342    }
343
344    fn register_histogram(&self, key: &Key, _metadata: &Metadata<'_>) -> Histogram {
345        self.inner.registry.get_or_create_histogram(key, |c| c.clone().into())
346    }
347}
348
349/// Handle for accessing metrics stored via [`PrometheusRecorder`].
350///
351/// In certain scenarios, it may be necessary to directly handle requests that would otherwise be
352/// handled directly by the HTTP listener, or push gateway background task.  [`PrometheusHandle`]
353/// allows rendering a snapshot of the current metrics stored by an installed [`PrometheusRecorder`]
354/// as a payload conforming to the Prometheus exposition format.
355#[derive(Clone, Debug)]
356pub struct PrometheusHandle {
357    inner: Arc<Inner>,
358}
359
360impl PrometheusHandle {
361    /// Takes a snapshot of the metrics held by the recorder and generates a payload conforming to
362    /// the Prometheus exposition format.
363    #[allow(clippy::missing_panics_doc)]
364    pub fn render(&self) -> String {
365        let mut buf = Vec::new();
366        // UNWRAP: writing to a Vec<u8> does not fail.
367        self.inner.render_to_write(&mut buf).unwrap();
368        // UNWRAP: Prometheus exposition format is always UTF-8.
369        String::from_utf8(buf).unwrap()
370    }
371
372    /// Takes a snapshot of the metrics held by the recorder and generates a payload conforming to
373    /// the Prometheus exposition format, incrementally. Use this function to emit metrics as a
374    /// stream without buffering the entire metrics export.
375    ///
376    /// # Errors
377    ///
378    /// Writing to the provided output fails.
379    pub fn render_to_write(&self, output: &mut impl io::Write) -> io::Result<()> {
380        self.inner.render_to_write(output)
381    }
382
383    /// Takes a snapshot of the metrics held by the recorder and generates a payload conforming to
384    /// the Prometheus protobuf format.
385    #[cfg(feature = "protobuf")]
386    pub fn render_protobuf(&self) -> Vec<u8> {
387        let snapshot = self.inner.get_recent_metrics();
388        let descriptions = self.inner.descriptions.read().unwrap_or_else(PoisonError::into_inner);
389
390        crate::protobuf::render_protobuf(snapshot, &descriptions, self.inner.counter_suffix)
391    }
392
393    /// Performs upkeeping operations to ensure metrics held by recorder are up-to-date and do not
394    /// grow unboundedly.
395    pub fn run_upkeep(&self) {
396        self.inner.run_upkeep();
397    }
398}