metrics_util/registry/
recency.rs

1//! Metric recency.
2//!
3//! `Recency` deals with the concept of removing metrics that have not been updated for a certain
4//! amount of time.  In some use cases, metrics are tied to specific labels which are short-lived,
5//! such as labels referencing a date or a version of software.  When these labels change, exporters
6//! may still be emitting those older metrics which are no longer relevant.  In many cases, a
7//! long-lived application could continue tracking metrics such that the unique number of metrics
8//! grows until a significant portion of memory is required to track them all, even if the majority
9//! of them are no longer used.
10//!
11//! As metrics are typically backed by atomic storage, exporters don't see the individual changes to
12//! a metric, and so need a way to measure if a metric has changed since the last time it was
13//! observed.  This could potentially be achieved by observing the value directly, but metrics like
14//! gauges can be updated in such a way that their value is the same between two observations even
15//! though it had actually been changed in between.
16//!
17//! We solve for this by tracking the generation of a metric, which represents the number of times
18//! it has been modified. In doing so, we can compare the generation of a metric between
19//! observations, which only ever increases monotonically.  This provides a universal mechanism that
20//! works for all metric types.
21//!
22//! `Recency` uses the generation of a metric, along with a measurement of time when a metric is
23//! observed, to build a complete picture that allows deciding if a given metric has gone "idle" or
24//! not, and thus whether it should actually be deleted.
25use std::sync::atomic::{AtomicUsize, Ordering};
26use std::sync::{Arc, Mutex, PoisonError};
27use std::time::Duration;
28use std::{collections::HashMap, ops::DerefMut};
29
30use metrics::{Counter, CounterFn, Gauge, GaugeFn, Histogram, HistogramFn};
31use quanta::{Clock, Instant};
32
33use crate::Hashable;
34use crate::{
35    kind::MetricKindMask,
36    registry::{AtomicStorage, Registry, Storage},
37    MetricKind,
38};
39
40/// The generation of a metric.
41///
42/// Generations are opaque and are not meant to be used directly, but meant to be used as a
43/// comparison amongst each other in terms of ordering.
44#[derive(Clone, Copy, Debug, Eq, Ord, PartialEq, PartialOrd)]
45pub struct Generation(usize);
46
47/// Generation tracking for a metric.
48///
49/// Holds a generic interior value, and provides way to access the value such that each access
50/// increments the "generation" of the value.  This provides a means to understand if the value has
51/// been updated since the last time it was observed.
52///
53/// For example, if a gauge was observed to be X at one point in time, and then observed to be X
54/// again at a later point in time, it could have changed in between the two observations.  It also
55/// may not have changed, and thus `Generational` provides a way to determine if either of these
56/// events occurred.
57#[derive(Clone, Debug)]
58pub struct Generational<T> {
59    inner: T,
60    gen: Arc<AtomicUsize>,
61}
62
63impl<T> Generational<T> {
64    /// Creates a new `Generational<T>`.
65    fn new(inner: T) -> Generational<T> {
66        Generational { inner, gen: Arc::new(AtomicUsize::new(0)) }
67    }
68
69    /// Gets a reference to the inner value.
70    pub fn get_inner(&self) -> &T {
71        &self.inner
72    }
73
74    /// Gets the current generation.
75    pub fn get_generation(&self) -> Generation {
76        Generation(self.gen.load(Ordering::Acquire))
77    }
78
79    /// Acquires a reference to the inner value, and increments the generation.
80    pub fn with_increment<F, V>(&self, f: F) -> V
81    where
82        F: Fn(&T) -> V,
83    {
84        let result = f(&self.inner);
85        let _ = self.gen.fetch_add(1, Ordering::AcqRel);
86        result
87    }
88}
89
90impl<T> CounterFn for Generational<T>
91where
92    T: CounterFn,
93{
94    fn increment(&self, value: u64) {
95        self.with_increment(|c| c.increment(value))
96    }
97
98    fn absolute(&self, value: u64) {
99        self.with_increment(|c| c.absolute(value))
100    }
101}
102
103impl<T> GaugeFn for Generational<T>
104where
105    T: GaugeFn,
106{
107    fn increment(&self, value: f64) {
108        self.with_increment(|g| g.increment(value))
109    }
110
111    fn decrement(&self, value: f64) {
112        self.with_increment(|g| g.decrement(value))
113    }
114
115    fn set(&self, value: f64) {
116        self.with_increment(|g| g.set(value))
117    }
118}
119
120impl<T> HistogramFn for Generational<T>
121where
122    T: HistogramFn,
123{
124    fn record(&self, value: f64) {
125        self.with_increment(|h| h.record(value))
126    }
127}
128
129impl<T> From<Generational<T>> for Counter
130where
131    T: CounterFn + Send + Sync + 'static,
132{
133    fn from(inner: Generational<T>) -> Self {
134        Counter::from_arc(Arc::new(inner))
135    }
136}
137
138impl<T> From<Generational<T>> for Gauge
139where
140    T: GaugeFn + Send + Sync + 'static,
141{
142    fn from(inner: Generational<T>) -> Self {
143        Gauge::from_arc(Arc::new(inner))
144    }
145}
146
147impl<T> From<Generational<T>> for Histogram
148where
149    T: HistogramFn + Send + Sync + 'static,
150{
151    fn from(inner: Generational<T>) -> Self {
152        Histogram::from_arc(Arc::new(inner))
153    }
154}
155
156/// Generational metric storage.
157///
158/// Tracks the "generation" of a metric, which is used to detect updates to metrics where the value
159/// otherwise would not be sufficient to be used as an indicator.
160#[derive(Debug)]
161pub struct GenerationalStorage<S> {
162    inner: S,
163}
164
165impl<S> GenerationalStorage<S> {
166    /// Creates a new [`GenerationalStorage`].
167    ///
168    /// This wraps the given `storage` and provides generational semantics on top of it.
169    pub fn new(storage: S) -> Self {
170        Self { inner: storage }
171    }
172}
173
174impl<K, S: Storage<K>> Storage<K> for GenerationalStorage<S> {
175    type Counter = Generational<S::Counter>;
176    type Gauge = Generational<S::Gauge>;
177    type Histogram = Generational<S::Histogram>;
178
179    fn counter(&self, key: &K) -> Self::Counter {
180        Generational::new(self.inner.counter(key))
181    }
182
183    fn gauge(&self, key: &K) -> Self::Gauge {
184        Generational::new(self.inner.gauge(key))
185    }
186
187    fn histogram(&self, key: &K) -> Self::Histogram {
188        Generational::new(self.inner.histogram(key))
189    }
190}
191
192/// Generational atomic metric storage.
193///
194/// `GenerationalAtomicStorage` is based on [`AtomicStorage`], but additionally tracks the
195/// "generation" of a metric, which is used to detect updates to metrics where the value otherwise
196/// would not be sufficient to be used as an indicator.
197pub type GenerationalAtomicStorage = GenerationalStorage<AtomicStorage>;
198
199impl GenerationalAtomicStorage {
200    /// Creates a [`GenerationalStorage`] that uses [`AtomicStorage`] as its underlying storage.
201    pub fn atomic() -> Self {
202        Self { inner: AtomicStorage }
203    }
204}
205
206/// Tracks recency of metric updates by their registry generation and time.
207///
208/// In many cases, a user may have a long-running process where metrics are stored over time using
209/// labels that change for some particular reason, leaving behind versions of that metric with
210/// labels that are no longer relevant to the current process state.  This can lead to cases where
211/// metrics that no longer matter are still present in rendered output, adding bloat.
212///
213/// When coupled with [`Registry`], [`Recency`] can be used to track when the last update to a
214/// metric has occurred for the purposes of removing idle metrics from the registry.  In addition,
215/// it will remove the value from the registry itself to reduce the aforementioned bloat.
216///
217/// [`Recency`] is separate from [`Registry`] specifically to avoid imposing any slowdowns when
218/// tracking recency does not matter, despite their otherwise tight coupling.
219#[derive(Debug)]
220pub struct Recency<K> {
221    mask: MetricKindMask,
222    #[allow(clippy::type_complexity)]
223    inner: Mutex<(Clock, HashMap<K, (Generation, Instant)>)>,
224    idle_timeout: Option<Duration>,
225}
226
227impl<K> Recency<K>
228where
229    K: Clone + Eq + Hashable,
230{
231    /// Creates a new [`Recency`].
232    ///
233    /// If `idle_timeout` is `None`, no recency checking will occur.  Otherwise, any metric that has
234    /// not been updated for longer than `idle_timeout` will be subject for deletion the next time
235    /// the metric is checked.
236    ///
237    /// The provided `clock` is used for tracking time, while `mask` controls which metrics
238    /// are covered by the recency logic.  For example, if `mask` only contains counters and
239    /// histograms, then gauges will not be considered for recency, and thus will never be deleted.
240    ///
241    /// Refer to the documentation for [`MetricKindMask`](crate::MetricKindMask) for more
242    /// information on defining a metric kind mask.
243    pub fn new(clock: Clock, mask: MetricKindMask, idle_timeout: Option<Duration>) -> Self {
244        Recency { mask, inner: Mutex::new((clock, HashMap::new())), idle_timeout }
245    }
246
247    /// Checks if the given counter should be stored, based on its known recency.
248    ///
249    /// If the given key has been updated recently enough, and should continue to be stored, this
250    /// method will return `true` and will update the last update time internally.  If the given key
251    /// has not been updated recently enough, the key will be removed from the given registry if the
252    /// given generation also matches.
253    pub fn should_store_counter<S>(
254        &self,
255        key: &K,
256        gen: Generation,
257        registry: &Registry<K, S>,
258    ) -> bool
259    where
260        S: Storage<K>,
261    {
262        self.should_store(key, gen, registry, MetricKind::Counter, |registry, key| {
263            registry.delete_counter(key)
264        })
265    }
266
267    /// Checks if the given gauge should be stored, based on its known recency.
268    ///
269    /// If the given key has been updated recently enough, and should continue to be stored, this
270    /// method will return `true` and will update the last update time internally.  If the given key
271    /// has not been updated recently enough, the key will be removed from the given registry if the
272    /// given generation also matches.
273    pub fn should_store_gauge<S>(&self, key: &K, gen: Generation, registry: &Registry<K, S>) -> bool
274    where
275        S: Storage<K>,
276    {
277        self.should_store(key, gen, registry, MetricKind::Gauge, |registry, key| {
278            registry.delete_gauge(key)
279        })
280    }
281
282    /// Checks if the given histogram should be stored, based on its known recency.
283    ///
284    /// If the given key has been updated recently enough, and should continue to be stored, this
285    /// method will return `true` and will update the last update time internally.  If the given key
286    /// has not been updated recently enough, the key will be removed from the given registry if the
287    /// given generation also matches.
288    pub fn should_store_histogram<S>(
289        &self,
290        key: &K,
291        gen: Generation,
292        registry: &Registry<K, S>,
293    ) -> bool
294    where
295        S: Storage<K>,
296    {
297        self.should_store(key, gen, registry, MetricKind::Histogram, |registry, key| {
298            registry.delete_histogram(key)
299        })
300    }
301
302    fn should_store<F, S>(
303        &self,
304        key: &K,
305        gen: Generation,
306        registry: &Registry<K, S>,
307        kind: MetricKind,
308        delete_op: F,
309    ) -> bool
310    where
311        F: Fn(&Registry<K, S>, &K) -> bool,
312        S: Storage<K>,
313    {
314        if let Some(idle_timeout) = self.idle_timeout {
315            if self.mask.matches(kind) {
316                let mut guard = self.inner.lock().unwrap_or_else(PoisonError::into_inner);
317                let (clock, entries) = guard.deref_mut();
318
319                let now = clock.now();
320                let deleted = if let Some((last_gen, last_update)) = entries.get_mut(key) {
321                    // If the value is the same as the latest value we have internally, and
322                    // we're over the idle timeout period, then remove it and continue.
323                    if *last_gen == gen {
324                        // If the delete returns false, that means that our generation counter is
325                        // out-of-date, and that the metric has been updated since, so we don't
326                        // actually want to delete it yet.
327                        (now - *last_update) > idle_timeout && delete_op(registry, key)
328                    } else {
329                        // Value has changed, so mark it such.
330                        *last_update = now;
331                        *last_gen = gen;
332                        false
333                    }
334                } else {
335                    entries.insert(key.clone(), (gen, now));
336                    false
337                };
338
339                if deleted {
340                    entries.remove(key);
341                    return false;
342                }
343            }
344        }
345
346        true
347    }
348}