1use std::collections::HashMap;
2use std::io;
3use std::sync::atomic::Ordering;
4use std::sync::Arc;
5use std::sync::{PoisonError, RwLock};
6
7use indexmap::IndexMap;
8use metrics::{Counter, Gauge, Histogram, Key, KeyName, Metadata, Recorder, SharedString, Unit};
9use metrics_util::registry::{Recency, Registry};
10use quanta::Instant;
11
12use crate::common::{LabelSet, Snapshot};
13use crate::distribution::{Distribution, DistributionBuilder};
14use crate::formatting::{
15 sanitize_metric_name, write_help_line, write_metric_line, write_type_line,
16};
17use crate::registry::GenerationalAtomicStorage;
18
19#[derive(Debug)]
20pub(crate) struct Inner {
21 pub registry: Registry<Key, GenerationalAtomicStorage>,
22 pub recency: Recency<Key>,
23 pub distributions: RwLock<HashMap<String, IndexMap<LabelSet, Distribution>>>,
24 pub distribution_builder: DistributionBuilder,
25 pub descriptions: RwLock<HashMap<String, (SharedString, Option<Unit>)>>,
26 pub global_labels: IndexMap<String, String>,
27 pub enable_unit_suffix: bool,
28 pub counter_suffix: Option<&'static str>,
29}
30
31impl Inner {
32 fn get_recent_metrics(&self) -> Snapshot {
33 let mut counters = HashMap::new();
34 let counter_handles = self.registry.get_counter_handles();
35 for (key, counter) in counter_handles {
36 let gen = counter.get_generation();
37 if !self.recency.should_store_counter(&key, gen, &self.registry) {
38 continue;
39 }
40
41 let name = sanitize_metric_name(key.name());
42 let labels = LabelSet::from_key_and_global(&key, &self.global_labels);
43 let value = counter.get_inner().load(Ordering::Acquire);
44 let entry =
45 counters.entry(name).or_insert_with(HashMap::new).entry(labels).or_insert(0);
46 *entry = value;
47 }
48
49 let mut gauges = HashMap::new();
50 let gauge_handles = self.registry.get_gauge_handles();
51 for (key, gauge) in gauge_handles {
52 let gen = gauge.get_generation();
53 if !self.recency.should_store_gauge(&key, gen, &self.registry) {
54 continue;
55 }
56
57 let name = sanitize_metric_name(key.name());
58 let labels = LabelSet::from_key_and_global(&key, &self.global_labels);
59 let value = f64::from_bits(gauge.get_inner().load(Ordering::Acquire));
60 let entry =
61 gauges.entry(name).or_insert_with(HashMap::new).entry(labels).or_insert(0.0);
62 *entry = value;
63 }
64
65 self.drain_histograms_to_distributions();
67 let histogram_handles = self.registry.get_histogram_handles();
69 for (key, histogram) in histogram_handles {
70 let gen = histogram.get_generation();
71 if !self.recency.should_store_histogram(&key, gen, &self.registry) {
72 let name = sanitize_metric_name(key.name());
76 let labels = LabelSet::from_key_and_global(&key, &self.global_labels);
77 let mut wg = self.distributions.write().unwrap_or_else(PoisonError::into_inner);
78 let delete_by_name = if let Some(by_name) = wg.get_mut(&name) {
79 by_name.swap_remove(&labels);
80 by_name.is_empty()
81 } else {
82 false
83 };
84
85 if delete_by_name {
88 wg.remove(&name);
89 }
90 }
91 }
92
93 let distributions =
94 self.distributions.read().unwrap_or_else(PoisonError::into_inner).clone();
95
96 Snapshot { counters, gauges, distributions }
97 }
98
99 fn drain_histograms_to_distributions(&self) {
101 let histogram_handles = self.registry.get_histogram_handles();
102 for (key, histogram) in histogram_handles {
103 let name = sanitize_metric_name(key.name());
104 let labels = LabelSet::from_key_and_global(&key, &self.global_labels);
105
106 let mut wg = self.distributions.write().unwrap_or_else(PoisonError::into_inner);
107 let entry = wg
108 .entry(name.clone())
109 .or_default()
110 .entry(labels)
111 .or_insert_with(|| self.distribution_builder.get_distribution(name.as_str()));
112
113 histogram.get_inner().clear_with(|samples| entry.record_samples(samples));
114 }
115 }
116
117 fn render_to_write(&self, output: &mut impl io::Write) -> io::Result<()> {
118 let Snapshot { mut counters, mut distributions, mut gauges } = self.get_recent_metrics();
119
120 let mut intermediate = String::new();
121 let descriptions = self.descriptions.read().unwrap_or_else(PoisonError::into_inner);
122
123 for (name, mut by_labels) in counters.drain() {
124 let unit = descriptions.get(name.as_str()).and_then(|(desc, unit)| {
125 let unit = unit.filter(|_| self.enable_unit_suffix);
126 write_help_line(&mut intermediate, name.as_str(), unit, self.counter_suffix, desc);
127 unit
128 });
129
130 write_type_line(&mut intermediate, name.as_str(), unit, self.counter_suffix, "counter");
131
132 output.write_all(intermediate.as_bytes())?;
134 intermediate.clear();
135
136 for (labels, value) in by_labels.drain() {
137 write_metric_line::<&str, u64>(
138 &mut intermediate,
139 &name,
140 self.counter_suffix,
141 &labels,
142 None,
143 value,
144 unit,
145 );
146 output.write_all(intermediate.as_bytes())?;
148 intermediate.clear();
149 }
150 output.write_all(b"\n")?;
151 }
152
153 for (name, mut by_labels) in gauges.drain() {
154 let unit = descriptions.get(name.as_str()).and_then(|(desc, unit)| {
155 let unit = unit.filter(|_| self.enable_unit_suffix);
156 write_help_line(&mut intermediate, name.as_str(), unit, None, desc);
157 unit
158 });
159
160 write_type_line(&mut intermediate, name.as_str(), unit, None, "gauge");
161
162 output.write_all(intermediate.as_bytes())?;
164 intermediate.clear();
165
166 for (labels, value) in by_labels.drain() {
167 write_metric_line::<&str, f64>(
168 &mut intermediate,
169 &name,
170 None,
171 &labels,
172 None,
173 value,
174 unit,
175 );
176 output.write_all(intermediate.as_bytes())?;
178 intermediate.clear();
179 }
180 output.write_all(b"\n")?;
181 }
182
183 for (name, mut by_labels) in distributions.drain() {
184 let distribution_type = self.distribution_builder.get_distribution_type(name.as_str());
185
186 if distribution_type == "native_histogram" {
188 continue;
189 }
190
191 let unit = descriptions.get(name.as_str()).and_then(|(desc, unit)| {
192 let unit = unit.filter(|_| self.enable_unit_suffix);
193 write_help_line(&mut intermediate, name.as_str(), unit, None, desc);
194 unit
195 });
196
197 write_type_line(&mut intermediate, name.as_str(), unit, None, distribution_type);
198
199 output.write_all(intermediate.as_bytes())?;
201 intermediate.clear();
202
203 for (labels, distribution) in by_labels.drain(..) {
204 let (sum, count) = match distribution {
205 Distribution::Summary(summary, quantiles, sum) => {
206 let snapshot = summary.snapshot(Instant::now());
207 for quantile in quantiles.iter() {
208 let value = snapshot.quantile(quantile.value()).unwrap_or(0.0);
209 write_metric_line(
210 &mut intermediate,
211 &name,
212 None,
213 &labels,
214 Some(("quantile", quantile.value())),
215 value,
216 unit,
217 );
218 }
219
220 (sum, summary.count() as u64)
221 }
222 Distribution::Histogram(histogram) => {
223 for (le, count) in histogram.buckets() {
224 write_metric_line(
225 &mut intermediate,
226 &name,
227 Some("bucket"),
228 &labels,
229 Some(("le", le)),
230 count,
231 unit,
232 );
233 }
234 write_metric_line(
235 &mut intermediate,
236 &name,
237 Some("bucket"),
238 &labels,
239 Some(("le", "+Inf")),
240 histogram.count(),
241 unit,
242 );
243
244 (histogram.sum(), histogram.count())
245 }
246 Distribution::NativeHistogram(_) => {
247 continue;
250 }
251 };
252
253 write_metric_line::<&str, f64>(
254 &mut intermediate,
255 &name,
256 Some("sum"),
257 &labels,
258 None,
259 sum,
260 unit,
261 );
262 write_metric_line::<&str, u64>(
263 &mut intermediate,
264 &name,
265 Some("count"),
266 &labels,
267 None,
268 count,
269 unit,
270 );
271
272 output.write_all(intermediate.as_bytes())?;
274 intermediate.clear();
275 }
276
277 output.write_all(b"\n")?;
278 }
279
280 Ok(())
281 }
282
283 fn run_upkeep(&self) {
284 self.drain_histograms_to_distributions();
285 }
286}
287
288#[derive(Debug)]
294pub struct PrometheusRecorder {
295 inner: Arc<Inner>,
296}
297
298impl PrometheusRecorder {
299 pub fn handle(&self) -> PrometheusHandle {
301 PrometheusHandle { inner: self.inner.clone() }
302 }
303
304 fn add_description_if_missing(
305 &self,
306 key_name: &KeyName,
307 description: SharedString,
308 unit: Option<Unit>,
309 ) {
310 let sanitized = sanitize_metric_name(key_name.as_str());
311 let mut descriptions =
312 self.inner.descriptions.write().unwrap_or_else(PoisonError::into_inner);
313 descriptions.entry(sanitized).or_insert((description, unit));
314 }
315}
316
317impl From<Inner> for PrometheusRecorder {
318 fn from(inner: Inner) -> Self {
319 PrometheusRecorder { inner: Arc::new(inner) }
320 }
321}
322
323impl Recorder for PrometheusRecorder {
324 fn describe_counter(&self, key_name: KeyName, unit: Option<Unit>, description: SharedString) {
325 self.add_description_if_missing(&key_name, description, unit);
326 }
327
328 fn describe_gauge(&self, key_name: KeyName, unit: Option<Unit>, description: SharedString) {
329 self.add_description_if_missing(&key_name, description, unit);
330 }
331
332 fn describe_histogram(&self, key_name: KeyName, unit: Option<Unit>, description: SharedString) {
333 self.add_description_if_missing(&key_name, description, unit);
334 }
335
336 fn register_counter(&self, key: &Key, _metadata: &Metadata<'_>) -> Counter {
337 self.inner.registry.get_or_create_counter(key, |c| c.clone().into())
338 }
339
340 fn register_gauge(&self, key: &Key, _metadata: &Metadata<'_>) -> Gauge {
341 self.inner.registry.get_or_create_gauge(key, |c| c.clone().into())
342 }
343
344 fn register_histogram(&self, key: &Key, _metadata: &Metadata<'_>) -> Histogram {
345 self.inner.registry.get_or_create_histogram(key, |c| c.clone().into())
346 }
347}
348
349#[derive(Clone, Debug)]
356pub struct PrometheusHandle {
357 inner: Arc<Inner>,
358}
359
360impl PrometheusHandle {
361 #[allow(clippy::missing_panics_doc)]
364 pub fn render(&self) -> String {
365 let mut buf = Vec::new();
366 self.inner.render_to_write(&mut buf).unwrap();
368 String::from_utf8(buf).unwrap()
370 }
371
372 pub fn render_to_write(&self, output: &mut impl io::Write) -> io::Result<()> {
380 self.inner.render_to_write(output)
381 }
382
383 #[cfg(feature = "protobuf")]
386 pub fn render_protobuf(&self) -> Vec<u8> {
387 let snapshot = self.inner.get_recent_metrics();
388 let descriptions = self.inner.descriptions.read().unwrap_or_else(PoisonError::into_inner);
389
390 crate::protobuf::render_protobuf(snapshot, &descriptions, self.inner.counter_suffix)
391 }
392
393 pub fn run_upkeep(&self) {
396 self.inner.run_upkeep();
397 }
398}