Skip to main content

fast_telemetry/metric/dynamic/
gauge.rs

1//! Runtime-labeled gauge for dynamic dimensions.
2
3use super::cache::{CacheableSeries, LabelCache, SERIES_CACHE_SIZE};
4#[cfg(feature = "eviction")]
5use super::current_cycle;
6use super::{DynamicLabelSet, GAUGE_IDS};
7use crossbeam_utils::CachePadded;
8use parking_lot::RwLock;
9use std::cell::RefCell;
10use std::collections::HashMap;
11use std::hash::{Hash, Hasher};
12#[cfg(feature = "eviction")]
13use std::sync::atomic::AtomicU32;
14use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
15use std::sync::{Arc, Weak};
16
17const DEFAULT_MAX_SERIES: usize = 2000;
18const OVERFLOW_LABEL_KEY: &str = "__ft_overflow";
19const OVERFLOW_LABEL_VALUE: &str = "true";
20
21type GaugeIndexShard = CachePadded<RwLock<HashMap<DynamicLabelSet, Arc<GaugeSeries>>>>;
22
23struct GaugeSeries {
24    bits: CachePadded<AtomicU64>,
25    /// Tombstone flag set by exporter before removing from map.
26    evicted: AtomicBool,
27    /// Last export cycle when this series was accessed.
28    #[cfg(feature = "eviction")]
29    last_accessed_cycle: AtomicU32,
30}
31
32impl GaugeSeries {
33    #[cfg(feature = "eviction")]
34    fn new(cycle: u32) -> Self {
35        Self {
36            bits: CachePadded::new(AtomicU64::new(0.0_f64.to_bits())),
37            evicted: AtomicBool::new(false),
38            last_accessed_cycle: AtomicU32::new(cycle),
39        }
40    }
41
42    #[cfg(not(feature = "eviction"))]
43    fn new() -> Self {
44        Self {
45            bits: CachePadded::new(AtomicU64::new(0.0_f64.to_bits())),
46            evicted: AtomicBool::new(false),
47        }
48    }
49
50    #[inline]
51    fn set(&self, value: f64) {
52        self.bits.store(value.to_bits(), Ordering::Relaxed);
53        // Note: timestamp updated on slow path (lookup/cache miss) to avoid
54        // global atomic read on every set.
55    }
56
57    /// Touch the series timestamp. Called on slow path only.
58    #[cfg(feature = "eviction")]
59    #[inline]
60    fn touch(&self, cycle: u32) {
61        self.last_accessed_cycle.store(cycle, Ordering::Relaxed);
62    }
63
64    #[inline]
65    fn get(&self) -> f64 {
66        f64::from_bits(self.bits.load(Ordering::Relaxed))
67    }
68
69    #[inline]
70    fn is_evicted(&self) -> bool {
71        self.evicted.load(Ordering::Relaxed)
72    }
73
74    #[cfg(feature = "eviction")]
75    fn mark_evicted(&self) {
76        self.evicted.store(true, Ordering::Relaxed);
77    }
78}
79
80impl CacheableSeries for GaugeSeries {
81    fn is_evicted(&self) -> bool {
82        self.is_evicted()
83    }
84}
85
86/// A reusable handle to a dynamic-label gauge series.
87///
88/// Use this for hot paths to avoid per-update label canonicalization and map
89/// lookups. Resolve once with `DynamicGauge::series(...)`, then call `set()`
90/// / `get()` on the handle.
91#[derive(Clone)]
92pub struct DynamicGaugeSeries {
93    series: Arc<GaugeSeries>,
94}
95
96impl DynamicGaugeSeries {
97    /// Set the gauge value.
98    #[inline]
99    pub fn set(&self, value: f64) {
100        self.series.set(value);
101    }
102
103    /// Get the current value.
104    #[inline]
105    pub fn get(&self) -> f64 {
106        self.series.get()
107    }
108
109    /// Check if this series handle has been evicted.
110    #[inline]
111    pub fn is_evicted(&self) -> bool {
112        self.series.is_evicted()
113    }
114}
115
116thread_local! {
117    static SERIES_CACHE: RefCell<LabelCache<Weak<GaugeSeries>, SERIES_CACHE_SIZE>> =
118        RefCell::new(LabelCache::new());
119}
120
121/// Gauge keyed by runtime label sets.
122///
123/// Uses sharded index for key->series lookup for concurrent access.
124pub struct DynamicGauge {
125    id: usize,
126    max_series: usize,
127    shard_mask: usize,
128    index_shards: Vec<GaugeIndexShard>,
129    /// Approximate number of live series (incremented on insert, decremented on evict).
130    series_count: AtomicUsize,
131    /// Count of records routed to overflow bucket due to cardinality cap.
132    overflow_count: AtomicU64,
133}
134
135impl DynamicGauge {
136    /// Creates a new runtime-labeled gauge.
137    pub fn new(shard_count: usize) -> Self {
138        Self::with_max_series(shard_count, DEFAULT_MAX_SERIES)
139    }
140
141    /// Creates a new runtime-labeled gauge with a series cardinality cap.
142    ///
143    /// When the number of unique label sets approximately reaches `max_series`,
144    /// new label sets are redirected into a single overflow series
145    /// (`__ft_overflow=true`). The cap is checked via a lock-free atomic counter,
146    /// so concurrent inserts may briefly overshoot by the number of in-flight
147    /// writers before the overflow kicks in.
148    pub fn with_max_series(shard_count: usize, max_series: usize) -> Self {
149        let shard_count = shard_count.next_power_of_two();
150        let id = GAUGE_IDS.fetch_add(1, Ordering::Relaxed);
151        Self {
152            id,
153            max_series,
154            shard_mask: shard_count - 1,
155            index_shards: (0..shard_count)
156                .map(|_| CachePadded::new(RwLock::new(HashMap::new())))
157                .collect(),
158            series_count: AtomicUsize::new(0),
159            overflow_count: AtomicU64::new(0),
160        }
161    }
162
163    /// Resolve a reusable series handle for `labels`.
164    ///
165    /// Preferred for hot paths when labels come from a finite active set.
166    pub fn series(&self, labels: &[(&str, &str)]) -> DynamicGaugeSeries {
167        if let Some(series) = self.cached_series(labels) {
168            return DynamicGaugeSeries { series };
169        }
170        let series = self.lookup_or_create(labels);
171        self.update_cache(labels, &series);
172        DynamicGaugeSeries { series }
173    }
174
175    /// Set the gauge value for the series identified by `labels`.
176    #[inline]
177    pub fn set(&self, labels: &[(&str, &str)], value: f64) {
178        if let Some(series) = self.cached_series(labels) {
179            series.set(value);
180            return;
181        }
182
183        let series = self.lookup_or_create(labels);
184        self.update_cache(labels, &series);
185        series.set(value);
186    }
187
188    /// Get the current value for the series identified by `labels`.
189    pub fn get(&self, labels: &[(&str, &str)]) -> f64 {
190        let key = DynamicLabelSet::from_pairs(labels);
191        let index_shard = self.index_shard_for(&key);
192        self.index_shards[index_shard]
193            .read()
194            .get(&key)
195            .map(|series| series.get())
196            .unwrap_or(0.0)
197    }
198
199    /// Returns a snapshot of all label-set/value pairs.
200    pub fn snapshot(&self) -> Vec<(DynamicLabelSet, f64)> {
201        let mut out = Vec::new();
202        for shard in &self.index_shards {
203            let guard = shard.read();
204            for (labels, series) in guard.iter() {
205                out.push((labels.clone(), series.get()));
206            }
207        }
208        out
209    }
210
211    /// Returns the current number of distinct label sets.
212    pub fn cardinality(&self) -> usize {
213        self.index_shards
214            .iter()
215            .map(|shard| shard.read().len())
216            .sum()
217    }
218
219    /// Returns the number of records routed to the overflow bucket.
220    ///
221    /// A non-zero value indicates the cardinality cap was hit and label
222    /// fidelity is being lost. Use this to alert on cardinality pressure.
223    pub fn overflow_count(&self) -> u64 {
224        self.overflow_count.load(Ordering::Relaxed)
225    }
226
227    /// Iterate all series without cloning label sets.
228    ///
229    /// Calls `f` with borrowed label pairs and the current value for each series.
230    /// Used by exporters to avoid the intermediate `snapshot()` allocation.
231    /// The callback runs while a shard read lock is held; it must be fast and
232    /// must not call back into this metric.
233    #[doc(hidden)]
234    pub fn visit_series(&self, mut f: impl FnMut(&[(String, String)], f64)) {
235        for shard in &self.index_shards {
236            let guard = shard.read();
237            for (labels, series) in guard.iter() {
238                f(labels.pairs(), series.get());
239            }
240        }
241    }
242
243    /// Evict series that haven't been accessed for `max_staleness` cycles.
244    ///
245    /// Call this after `advance_cycle()` in your exporter task.
246    /// Series are marked as evicted (so cached handles see the tombstone),
247    /// then removed from the index.
248    ///
249    /// Protected series (Arc::strong_count > 1) are never evicted - someone
250    /// holds a DynamicGaugeSeries handle to them.
251    ///
252    /// Returns the number of series evicted.
253    #[cfg(feature = "eviction")]
254    pub fn evict_stale(&self, max_staleness: u32) -> usize {
255        let cycle = current_cycle();
256        let mut removed = 0;
257
258        for shard in &self.index_shards {
259            let mut guard = shard.write();
260            guard.retain(|_labels, series| {
261                // Protected if someone holds a handle (strong_count > 1 means
262                // both the map and at least one DynamicGaugeSeries hold refs)
263                if Arc::strong_count(series) > 1 {
264                    return true;
265                }
266                // Otherwise check timestamp staleness
267                let last = series.last_accessed_cycle.load(Ordering::Relaxed);
268                let stale = cycle.saturating_sub(last) > max_staleness;
269                if stale {
270                    series.mark_evicted();
271                    removed += 1;
272                    self.series_count.fetch_sub(1, Ordering::Relaxed);
273                }
274                !stale
275            });
276        }
277
278        removed
279    }
280
281    fn lookup_or_create(&self, labels: &[(&str, &str)]) -> Arc<GaugeSeries> {
282        let requested_key = DynamicLabelSet::from_pairs(labels);
283        let requested_shard = self.index_shard_for(&requested_key);
284        #[cfg(feature = "eviction")]
285        let cycle = current_cycle();
286
287        // Fast path: read lock only.
288        if let Some(series) = self.index_shards[requested_shard]
289            .read()
290            .get(&requested_key)
291        {
292            #[cfg(feature = "eviction")]
293            series.touch(cycle);
294            return Arc::clone(series);
295        }
296
297        // Check cardinality cap BEFORE taking any write lock (lock-free).
298        let key = if self.max_series > 0
299            && self.series_count.load(Ordering::Relaxed) >= self.max_series
300        {
301            self.overflow_count.fetch_add(1, Ordering::Relaxed);
302            DynamicLabelSet::from_pairs(&[(OVERFLOW_LABEL_KEY, OVERFLOW_LABEL_VALUE)])
303        } else {
304            requested_key
305        };
306        let shard = self.index_shard_for(&key);
307
308        if let Some(series) = self.index_shards[shard].read().get(&key) {
309            #[cfg(feature = "eviction")]
310            series.touch(cycle);
311            return Arc::clone(series);
312        }
313
314        let mut guard = self.index_shards[shard].write();
315        if let Some(series) = guard.get(&key) {
316            #[cfg(feature = "eviction")]
317            series.touch(cycle);
318            return Arc::clone(series);
319        }
320        #[cfg(feature = "eviction")]
321        let series = Arc::new(GaugeSeries::new(cycle));
322        #[cfg(not(feature = "eviction"))]
323        let series = Arc::new(GaugeSeries::new());
324        guard.insert(key, Arc::clone(&series));
325        self.series_count.fetch_add(1, Ordering::Relaxed);
326        series
327    }
328
329    fn index_shard_for(&self, key: &DynamicLabelSet) -> usize {
330        let mut hasher = std::collections::hash_map::DefaultHasher::new();
331        key.hash(&mut hasher);
332        (hasher.finish() as usize) & self.shard_mask
333    }
334
335    fn cached_series(&self, labels: &[(&str, &str)]) -> Option<Arc<GaugeSeries>> {
336        SERIES_CACHE.with(|cache| {
337            let series = cache.borrow_mut().get(self.id, labels)?;
338            #[cfg(feature = "eviction")]
339            series.touch(current_cycle());
340            Some(series)
341        })
342    }
343
344    fn update_cache(&self, labels: &[(&str, &str)], series: &Arc<GaugeSeries>) {
345        SERIES_CACHE.with(|cache| {
346            cache
347                .borrow_mut()
348                .insert(self.id, labels, Arc::downgrade(series));
349        });
350    }
351}
352
353#[cfg(test)]
354mod tests {
355    use super::*;
356
357    #[test]
358    fn test_basic_operations() {
359        let gauge = DynamicGauge::new(4);
360        gauge.set(&[("org_id", "42"), ("endpoint_uuid", "abc")], 100.5);
361
362        assert!(
363            (gauge.get(&[("org_id", "42"), ("endpoint_uuid", "abc")]) - 100.5).abs() < f64::EPSILON
364        );
365    }
366
367    #[test]
368    fn test_label_order_is_canonicalized() {
369        let gauge = DynamicGauge::new(4);
370        gauge.set(&[("org_id", "42"), ("endpoint_uuid", "abc")], 50.0);
371
372        assert!(
373            (gauge.get(&[("endpoint_uuid", "abc"), ("org_id", "42")]) - 50.0).abs() < f64::EPSILON
374        );
375    }
376
377    #[test]
378    fn test_series_handle() {
379        let gauge = DynamicGauge::new(4);
380        let series = gauge.series(&[("org_id", "42"), ("endpoint_uuid", "abc")]);
381        series.set(123.456);
382
383        assert!((series.get() - 123.456).abs() < f64::EPSILON);
384        assert!(
385            (gauge.get(&[("org_id", "42"), ("endpoint_uuid", "abc")]) - 123.456).abs()
386                < f64::EPSILON
387        );
388    }
389
390    #[test]
391    fn test_snapshot() {
392        let gauge = DynamicGauge::new(4);
393        gauge.set(&[("org_id", "1")], 10.0);
394        gauge.set(&[("org_id", "2")], 20.0);
395
396        let snap = gauge.snapshot();
397        assert_eq!(snap.len(), 2);
398
399        let total: f64 = snap.iter().map(|(_, v)| v).sum();
400        assert!((total - 30.0).abs() < f64::EPSILON);
401    }
402
403    #[test]
404    fn test_overflow_bucket_routes_new_series_at_capacity() {
405        let gauge = DynamicGauge::with_max_series(4, 1);
406        gauge.set(&[("org_id", "1")], 1.0);
407        gauge.set(&[("org_id", "2")], 2.0);
408
409        assert_eq!(gauge.cardinality(), 2);
410        assert!(
411            (gauge.get(&[(OVERFLOW_LABEL_KEY, OVERFLOW_LABEL_VALUE)]) - 2.0).abs() < f64::EPSILON
412        );
413    }
414}