metrics_util/registry/recency.rs
1//! Metric recency.
2//!
3//! `Recency` deals with the concept of removing metrics that have not been updated for a certain
4//! amount of time. In some use cases, metrics are tied to specific labels which are short-lived,
5//! such as labels referencing a date or a version of software. When these labels change, exporters
6//! may still be emitting those older metrics which are no longer relevant. In many cases, a
7//! long-lived application could continue tracking metrics such that the unique number of metrics
8//! grows until a significant portion of memory is required to track them all, even if the majority
9//! of them are no longer used.
10//!
11//! As metrics are typically backed by atomic storage, exporters don't see the individual changes to
12//! a metric, and so need a way to measure if a metric has changed since the last time it was
13//! observed. This could potentially be achieved by observing the value directly, but metrics like
14//! gauges can be updated in such a way that their value is the same between two observations even
15//! though it had actually been changed in between.
16//!
17//! We solve for this by tracking the generation of a metric, which represents the number of times
18//! it has been modified. In doing so, we can compare the generation of a metric between
19//! observations, which only ever increases monotonically. This provides a universal mechanism that
20//! works for all metric types.
21//!
22//! `Recency` uses the generation of a metric, along with a measurement of time when a metric is
23//! observed, to build a complete picture that allows deciding if a given metric has gone "idle" or
24//! not, and thus whether it should actually be deleted.
25use std::sync::atomic::{AtomicUsize, Ordering};
26use std::sync::{Arc, Mutex, PoisonError};
27use std::time::Duration;
28use std::{collections::HashMap, ops::DerefMut};
29
30use metrics::{Counter, CounterFn, Gauge, GaugeFn, Histogram, HistogramFn};
31use quanta::{Clock, Instant};
32
33use crate::Hashable;
34use crate::{
35 kind::MetricKindMask,
36 registry::{AtomicStorage, Registry, Storage},
37 MetricKind,
38};
39
40/// The generation of a metric.
41///
42/// Generations are opaque and are not meant to be used directly, but meant to be used as a
43/// comparison amongst each other in terms of ordering.
44#[derive(Clone, Copy, Debug, Eq, Ord, PartialEq, PartialOrd)]
45pub struct Generation(usize);
46
47/// Generation tracking for a metric.
48///
49/// Holds a generic interior value, and provides way to access the value such that each access
50/// increments the "generation" of the value. This provides a means to understand if the value has
51/// been updated since the last time it was observed.
52///
53/// For example, if a gauge was observed to be X at one point in time, and then observed to be X
54/// again at a later point in time, it could have changed in between the two observations. It also
55/// may not have changed, and thus `Generational` provides a way to determine if either of these
56/// events occurred.
57#[derive(Clone, Debug)]
58pub struct Generational<T> {
59 inner: T,
60 gen: Arc<AtomicUsize>,
61}
62
63impl<T> Generational<T> {
64 /// Creates a new `Generational<T>`.
65 fn new(inner: T) -> Generational<T> {
66 Generational { inner, gen: Arc::new(AtomicUsize::new(0)) }
67 }
68
69 /// Gets a reference to the inner value.
70 pub fn get_inner(&self) -> &T {
71 &self.inner
72 }
73
74 /// Gets the current generation.
75 pub fn get_generation(&self) -> Generation {
76 Generation(self.gen.load(Ordering::Acquire))
77 }
78
79 /// Acquires a reference to the inner value, and increments the generation.
80 pub fn with_increment<F, V>(&self, f: F) -> V
81 where
82 F: Fn(&T) -> V,
83 {
84 let result = f(&self.inner);
85 let _ = self.gen.fetch_add(1, Ordering::AcqRel);
86 result
87 }
88}
89
90impl<T> CounterFn for Generational<T>
91where
92 T: CounterFn,
93{
94 fn increment(&self, value: u64) {
95 self.with_increment(|c| c.increment(value))
96 }
97
98 fn absolute(&self, value: u64) {
99 self.with_increment(|c| c.absolute(value))
100 }
101}
102
103impl<T> GaugeFn for Generational<T>
104where
105 T: GaugeFn,
106{
107 fn increment(&self, value: f64) {
108 self.with_increment(|g| g.increment(value))
109 }
110
111 fn decrement(&self, value: f64) {
112 self.with_increment(|g| g.decrement(value))
113 }
114
115 fn set(&self, value: f64) {
116 self.with_increment(|g| g.set(value))
117 }
118}
119
120impl<T> HistogramFn for Generational<T>
121where
122 T: HistogramFn,
123{
124 fn record(&self, value: f64) {
125 self.with_increment(|h| h.record(value))
126 }
127}
128
129impl<T> From<Generational<T>> for Counter
130where
131 T: CounterFn + Send + Sync + 'static,
132{
133 fn from(inner: Generational<T>) -> Self {
134 Counter::from_arc(Arc::new(inner))
135 }
136}
137
138impl<T> From<Generational<T>> for Gauge
139where
140 T: GaugeFn + Send + Sync + 'static,
141{
142 fn from(inner: Generational<T>) -> Self {
143 Gauge::from_arc(Arc::new(inner))
144 }
145}
146
147impl<T> From<Generational<T>> for Histogram
148where
149 T: HistogramFn + Send + Sync + 'static,
150{
151 fn from(inner: Generational<T>) -> Self {
152 Histogram::from_arc(Arc::new(inner))
153 }
154}
155
156/// Generational metric storage.
157///
158/// Tracks the "generation" of a metric, which is used to detect updates to metrics where the value
159/// otherwise would not be sufficient to be used as an indicator.
160#[derive(Debug)]
161pub struct GenerationalStorage<S> {
162 inner: S,
163}
164
165impl<S> GenerationalStorage<S> {
166 /// Creates a new [`GenerationalStorage`].
167 ///
168 /// This wraps the given `storage` and provides generational semantics on top of it.
169 pub fn new(storage: S) -> Self {
170 Self { inner: storage }
171 }
172}
173
174impl<K, S: Storage<K>> Storage<K> for GenerationalStorage<S> {
175 type Counter = Generational<S::Counter>;
176 type Gauge = Generational<S::Gauge>;
177 type Histogram = Generational<S::Histogram>;
178
179 fn counter(&self, key: &K) -> Self::Counter {
180 Generational::new(self.inner.counter(key))
181 }
182
183 fn gauge(&self, key: &K) -> Self::Gauge {
184 Generational::new(self.inner.gauge(key))
185 }
186
187 fn histogram(&self, key: &K) -> Self::Histogram {
188 Generational::new(self.inner.histogram(key))
189 }
190}
191
192/// Generational atomic metric storage.
193///
194/// `GenerationalAtomicStorage` is based on [`AtomicStorage`], but additionally tracks the
195/// "generation" of a metric, which is used to detect updates to metrics where the value otherwise
196/// would not be sufficient to be used as an indicator.
197pub type GenerationalAtomicStorage = GenerationalStorage<AtomicStorage>;
198
199impl GenerationalAtomicStorage {
200 /// Creates a [`GenerationalStorage`] that uses [`AtomicStorage`] as its underlying storage.
201 pub fn atomic() -> Self {
202 Self { inner: AtomicStorage }
203 }
204}
205
206/// Tracks recency of metric updates by their registry generation and time.
207///
208/// In many cases, a user may have a long-running process where metrics are stored over time using
209/// labels that change for some particular reason, leaving behind versions of that metric with
210/// labels that are no longer relevant to the current process state. This can lead to cases where
211/// metrics that no longer matter are still present in rendered output, adding bloat.
212///
213/// When coupled with [`Registry`], [`Recency`] can be used to track when the last update to a
214/// metric has occurred for the purposes of removing idle metrics from the registry. In addition,
215/// it will remove the value from the registry itself to reduce the aforementioned bloat.
216///
217/// [`Recency`] is separate from [`Registry`] specifically to avoid imposing any slowdowns when
218/// tracking recency does not matter, despite their otherwise tight coupling.
219#[derive(Debug)]
220pub struct Recency<K> {
221 mask: MetricKindMask,
222 #[allow(clippy::type_complexity)]
223 inner: Mutex<(Clock, HashMap<K, (Generation, Instant)>)>,
224 idle_timeout: Option<Duration>,
225}
226
227impl<K> Recency<K>
228where
229 K: Clone + Eq + Hashable,
230{
231 /// Creates a new [`Recency`].
232 ///
233 /// If `idle_timeout` is `None`, no recency checking will occur. Otherwise, any metric that has
234 /// not been updated for longer than `idle_timeout` will be subject for deletion the next time
235 /// the metric is checked.
236 ///
237 /// The provided `clock` is used for tracking time, while `mask` controls which metrics
238 /// are covered by the recency logic. For example, if `mask` only contains counters and
239 /// histograms, then gauges will not be considered for recency, and thus will never be deleted.
240 ///
241 /// Refer to the documentation for [`MetricKindMask`](crate::MetricKindMask) for more
242 /// information on defining a metric kind mask.
243 pub fn new(clock: Clock, mask: MetricKindMask, idle_timeout: Option<Duration>) -> Self {
244 Recency { mask, inner: Mutex::new((clock, HashMap::new())), idle_timeout }
245 }
246
247 /// Checks if the given counter should be stored, based on its known recency.
248 ///
249 /// If the given key has been updated recently enough, and should continue to be stored, this
250 /// method will return `true` and will update the last update time internally. If the given key
251 /// has not been updated recently enough, the key will be removed from the given registry if the
252 /// given generation also matches.
253 pub fn should_store_counter<S>(
254 &self,
255 key: &K,
256 gen: Generation,
257 registry: &Registry<K, S>,
258 ) -> bool
259 where
260 S: Storage<K>,
261 {
262 self.should_store(key, gen, registry, MetricKind::Counter, |registry, key| {
263 registry.delete_counter(key)
264 })
265 }
266
267 /// Checks if the given gauge should be stored, based on its known recency.
268 ///
269 /// If the given key has been updated recently enough, and should continue to be stored, this
270 /// method will return `true` and will update the last update time internally. If the given key
271 /// has not been updated recently enough, the key will be removed from the given registry if the
272 /// given generation also matches.
273 pub fn should_store_gauge<S>(&self, key: &K, gen: Generation, registry: &Registry<K, S>) -> bool
274 where
275 S: Storage<K>,
276 {
277 self.should_store(key, gen, registry, MetricKind::Gauge, |registry, key| {
278 registry.delete_gauge(key)
279 })
280 }
281
282 /// Checks if the given histogram should be stored, based on its known recency.
283 ///
284 /// If the given key has been updated recently enough, and should continue to be stored, this
285 /// method will return `true` and will update the last update time internally. If the given key
286 /// has not been updated recently enough, the key will be removed from the given registry if the
287 /// given generation also matches.
288 pub fn should_store_histogram<S>(
289 &self,
290 key: &K,
291 gen: Generation,
292 registry: &Registry<K, S>,
293 ) -> bool
294 where
295 S: Storage<K>,
296 {
297 self.should_store(key, gen, registry, MetricKind::Histogram, |registry, key| {
298 registry.delete_histogram(key)
299 })
300 }
301
302 fn should_store<F, S>(
303 &self,
304 key: &K,
305 gen: Generation,
306 registry: &Registry<K, S>,
307 kind: MetricKind,
308 delete_op: F,
309 ) -> bool
310 where
311 F: Fn(&Registry<K, S>, &K) -> bool,
312 S: Storage<K>,
313 {
314 if let Some(idle_timeout) = self.idle_timeout {
315 if self.mask.matches(kind) {
316 let mut guard = self.inner.lock().unwrap_or_else(PoisonError::into_inner);
317 let (clock, entries) = guard.deref_mut();
318
319 let now = clock.now();
320 let deleted = if let Some((last_gen, last_update)) = entries.get_mut(key) {
321 // If the value is the same as the latest value we have internally, and
322 // we're over the idle timeout period, then remove it and continue.
323 if *last_gen == gen {
324 // If the delete returns false, that means that our generation counter is
325 // out-of-date, and that the metric has been updated since, so we don't
326 // actually want to delete it yet.
327 (now - *last_update) > idle_timeout && delete_op(registry, key)
328 } else {
329 // Value has changed, so mark it such.
330 *last_update = now;
331 *last_gen = gen;
332 false
333 }
334 } else {
335 entries.insert(key.clone(), (gen, now));
336 false
337 };
338
339 if deleted {
340 entries.remove(key);
341 return false;
342 }
343 }
344 }
345
346 true
347 }
348}