Skip to main content

fast_telemetry/metric/dynamic/
gauge.rs

1//! Runtime-labeled gauge for dynamic dimensions.
2
3use super::cache::{CacheableSeries, LabelCache, SERIES_CACHE_SIZE};
4#[cfg(feature = "eviction")]
5use super::current_cycle;
6use super::{DynamicLabelSet, GAUGE_IDS};
7use crossbeam_utils::CachePadded;
8use parking_lot::RwLock;
9use std::cell::RefCell;
10use std::collections::HashMap;
11use std::hash::{Hash, Hasher};
12#[cfg(feature = "eviction")]
13use std::sync::atomic::AtomicU32;
14use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
15use std::sync::{Arc, Weak};
16
17const DEFAULT_MAX_SERIES: usize = 2000;
18const OVERFLOW_LABEL_KEY: &str = "__ft_overflow";
19const OVERFLOW_LABEL_VALUE: &str = "true";
20
21type GaugeIndexShard = CachePadded<RwLock<HashMap<DynamicLabelSet, Arc<GaugeSeries>>>>;
22
23struct GaugeSeries {
24    bits: CachePadded<AtomicU64>,
25    /// Tombstone flag set by exporter before removing from map.
26    evicted: AtomicBool,
27    /// Last export cycle when this series was accessed.
28    #[cfg(feature = "eviction")]
29    last_accessed_cycle: AtomicU32,
30}
31
32impl GaugeSeries {
33    #[cfg(feature = "eviction")]
34    fn new(cycle: u32) -> Self {
35        Self {
36            bits: CachePadded::new(AtomicU64::new(0.0_f64.to_bits())),
37            evicted: AtomicBool::new(false),
38            last_accessed_cycle: AtomicU32::new(cycle),
39        }
40    }
41
42    #[cfg(not(feature = "eviction"))]
43    fn new() -> Self {
44        Self {
45            bits: CachePadded::new(AtomicU64::new(0.0_f64.to_bits())),
46            evicted: AtomicBool::new(false),
47        }
48    }
49
50    #[inline]
51    fn set(&self, value: f64) {
52        self.bits.store(value.to_bits(), Ordering::Relaxed);
53        // Note: timestamp updated on slow path (lookup/cache miss) to avoid
54        // global atomic read on every set.
55    }
56
57    /// Touch the series timestamp. Called on slow path only.
58    #[cfg(feature = "eviction")]
59    #[inline]
60    fn touch(&self, cycle: u32) {
61        self.last_accessed_cycle.store(cycle, Ordering::Relaxed);
62    }
63
64    #[inline]
65    fn get(&self) -> f64 {
66        f64::from_bits(self.bits.load(Ordering::Relaxed))
67    }
68
69    #[inline]
70    fn is_evicted(&self) -> bool {
71        self.evicted.load(Ordering::Relaxed)
72    }
73
74    #[cfg(feature = "eviction")]
75    fn mark_evicted(&self) {
76        self.evicted.store(true, Ordering::Relaxed);
77    }
78}
79
80impl CacheableSeries for GaugeSeries {
81    fn is_evicted(&self) -> bool {
82        self.is_evicted()
83    }
84}
85
86/// A reusable handle to a dynamic-label gauge series.
87///
88/// Use this for hot paths to avoid per-update label canonicalization and map
89/// lookups. Resolve once with `DynamicGauge::series(...)`, then call `set()`
90/// / `get()` on the handle.
91#[derive(Clone)]
92pub struct DynamicGaugeSeries {
93    series: Arc<GaugeSeries>,
94}
95
96impl DynamicGaugeSeries {
97    /// Set the gauge value.
98    #[inline]
99    pub fn set(&self, value: f64) {
100        self.series.set(value);
101    }
102
103    /// Get the current value.
104    #[inline]
105    pub fn get(&self) -> f64 {
106        self.series.get()
107    }
108
109    /// Check if this series handle has been evicted.
110    #[inline]
111    pub fn is_evicted(&self) -> bool {
112        self.series.is_evicted()
113    }
114}
115
116thread_local! {
117    static SERIES_CACHE: RefCell<LabelCache<Weak<GaugeSeries>, SERIES_CACHE_SIZE>> =
118        RefCell::new(LabelCache::new());
119}
120
121/// Gauge keyed by runtime label sets.
122///
123/// Uses sharded index for key->series lookup for concurrent access.
124pub struct DynamicGauge {
125    id: usize,
126    max_series: usize,
127    shard_mask: usize,
128    index_shards: Vec<GaugeIndexShard>,
129    /// Approximate number of live series (incremented on insert, decremented on evict).
130    series_count: AtomicUsize,
131    /// Count of records routed to overflow bucket due to cardinality cap.
132    overflow_count: AtomicU64,
133}
134
135impl DynamicGauge {
136    /// Creates a new runtime-labeled gauge.
137    pub fn new(shard_count: usize) -> Self {
138        Self::with_max_series(shard_count, DEFAULT_MAX_SERIES)
139    }
140
141    /// Creates a new runtime-labeled gauge with a series cardinality cap.
142    ///
143    /// When the number of unique label sets approximately reaches `max_series`,
144    /// new label sets are redirected into a single overflow series
145    /// (`__ft_overflow=true`). The cap is checked via a lock-free atomic counter,
146    /// so concurrent inserts may briefly overshoot by the number of in-flight
147    /// writers before the overflow kicks in.
148    pub fn with_max_series(shard_count: usize, max_series: usize) -> Self {
149        let shard_count = shard_count.next_power_of_two();
150        let id = GAUGE_IDS.fetch_add(1, Ordering::Relaxed);
151        Self {
152            id,
153            max_series,
154            shard_mask: shard_count - 1,
155            index_shards: (0..shard_count)
156                .map(|_| CachePadded::new(RwLock::new(HashMap::new())))
157                .collect(),
158            series_count: AtomicUsize::new(0),
159            overflow_count: AtomicU64::new(0),
160        }
161    }
162
163    /// Resolve a reusable series handle for `labels`.
164    ///
165    /// Preferred for hot paths when labels come from a finite active set.
166    pub fn series(&self, labels: &[(&str, &str)]) -> DynamicGaugeSeries {
167        if let Some(series) = self.cached_series(labels) {
168            return DynamicGaugeSeries { series };
169        }
170        let series = self.lookup_or_create(labels);
171        self.update_cache(labels, &series);
172        DynamicGaugeSeries { series }
173    }
174
175    /// Set the gauge value for the series identified by `labels`.
176    #[inline]
177    pub fn set(&self, labels: &[(&str, &str)], value: f64) {
178        if let Some(series) = self.cached_series(labels) {
179            series.set(value);
180            return;
181        }
182
183        let series = self.lookup_or_create(labels);
184        self.update_cache(labels, &series);
185        series.set(value);
186    }
187
188    /// Get the current value for the series identified by `labels`.
189    pub fn get(&self, labels: &[(&str, &str)]) -> f64 {
190        let key = DynamicLabelSet::from_pairs(labels);
191        let index_shard = self.index_shard_for(&key);
192        self.index_shards[index_shard]
193            .read()
194            .get(&key)
195            .map(|series| series.get())
196            .unwrap_or(0.0)
197    }
198
199    /// Returns a snapshot of all label-set/value pairs.
200    pub fn snapshot(&self) -> Vec<(DynamicLabelSet, f64)> {
201        let mut out = Vec::new();
202        for shard in &self.index_shards {
203            let guard = shard.read();
204            for (labels, series) in guard.iter() {
205                out.push((labels.clone(), series.get()));
206            }
207        }
208        out
209    }
210
211    /// Returns the current number of distinct label sets.
212    pub fn cardinality(&self) -> usize {
213        self.index_shards
214            .iter()
215            .map(|shard| shard.read().len())
216            .sum()
217    }
218
219    /// Returns the number of records routed to the overflow bucket.
220    ///
221    /// A non-zero value indicates the cardinality cap was hit and label
222    /// fidelity is being lost. Use this to alert on cardinality pressure.
223    pub fn overflow_count(&self) -> u64 {
224        self.overflow_count.load(Ordering::Relaxed)
225    }
226
227    /// Iterate all series without cloning label sets.
228    ///
229    /// Calls `f` with borrowed label pairs and the current value for each series.
230    /// Used by exporters to avoid the intermediate `snapshot()` allocation.
231    pub(crate) fn visit_series(&self, mut f: impl FnMut(&[(String, String)], f64)) {
232        for shard in &self.index_shards {
233            let guard = shard.read();
234            for (labels, series) in guard.iter() {
235                f(labels.pairs(), series.get());
236            }
237        }
238    }
239
240    /// Evict series that haven't been accessed for `max_staleness` cycles.
241    ///
242    /// Call this after `advance_cycle()` in your exporter task.
243    /// Series are marked as evicted (so cached handles see the tombstone),
244    /// then removed from the index.
245    ///
246    /// Protected series (Arc::strong_count > 1) are never evicted - someone
247    /// holds a DynamicGaugeSeries handle to them.
248    ///
249    /// Returns the number of series evicted.
250    #[cfg(feature = "eviction")]
251    pub fn evict_stale(&self, max_staleness: u32) -> usize {
252        let cycle = current_cycle();
253        let mut removed = 0;
254
255        for shard in &self.index_shards {
256            let mut guard = shard.write();
257            guard.retain(|_labels, series| {
258                // Protected if someone holds a handle (strong_count > 1 means
259                // both the map and at least one DynamicGaugeSeries hold refs)
260                if Arc::strong_count(series) > 1 {
261                    return true;
262                }
263                // Otherwise check timestamp staleness
264                let last = series.last_accessed_cycle.load(Ordering::Relaxed);
265                let stale = cycle.saturating_sub(last) > max_staleness;
266                if stale {
267                    series.mark_evicted();
268                    removed += 1;
269                    self.series_count.fetch_sub(1, Ordering::Relaxed);
270                }
271                !stale
272            });
273        }
274
275        removed
276    }
277
278    fn lookup_or_create(&self, labels: &[(&str, &str)]) -> Arc<GaugeSeries> {
279        let requested_key = DynamicLabelSet::from_pairs(labels);
280        let requested_shard = self.index_shard_for(&requested_key);
281        #[cfg(feature = "eviction")]
282        let cycle = current_cycle();
283
284        // Fast path: read lock only.
285        if let Some(series) = self.index_shards[requested_shard]
286            .read()
287            .get(&requested_key)
288        {
289            #[cfg(feature = "eviction")]
290            series.touch(cycle);
291            return Arc::clone(series);
292        }
293
294        // Check cardinality cap BEFORE taking any write lock (lock-free).
295        let key = if self.max_series > 0
296            && self.series_count.load(Ordering::Relaxed) >= self.max_series
297        {
298            self.overflow_count.fetch_add(1, Ordering::Relaxed);
299            DynamicLabelSet::from_pairs(&[(OVERFLOW_LABEL_KEY, OVERFLOW_LABEL_VALUE)])
300        } else {
301            requested_key
302        };
303        let shard = self.index_shard_for(&key);
304
305        if let Some(series) = self.index_shards[shard].read().get(&key) {
306            #[cfg(feature = "eviction")]
307            series.touch(cycle);
308            return Arc::clone(series);
309        }
310
311        let mut guard = self.index_shards[shard].write();
312        if let Some(series) = guard.get(&key) {
313            #[cfg(feature = "eviction")]
314            series.touch(cycle);
315            return Arc::clone(series);
316        }
317        #[cfg(feature = "eviction")]
318        let series = Arc::new(GaugeSeries::new(cycle));
319        #[cfg(not(feature = "eviction"))]
320        let series = Arc::new(GaugeSeries::new());
321        guard.insert(key, Arc::clone(&series));
322        self.series_count.fetch_add(1, Ordering::Relaxed);
323        series
324    }
325
326    fn index_shard_for(&self, key: &DynamicLabelSet) -> usize {
327        let mut hasher = std::collections::hash_map::DefaultHasher::new();
328        key.hash(&mut hasher);
329        (hasher.finish() as usize) & self.shard_mask
330    }
331
332    fn cached_series(&self, labels: &[(&str, &str)]) -> Option<Arc<GaugeSeries>> {
333        SERIES_CACHE.with(|cache| {
334            let series = cache.borrow_mut().get(self.id, labels)?;
335            #[cfg(feature = "eviction")]
336            series.touch(current_cycle());
337            Some(series)
338        })
339    }
340
341    fn update_cache(&self, labels: &[(&str, &str)], series: &Arc<GaugeSeries>) {
342        SERIES_CACHE.with(|cache| {
343            cache
344                .borrow_mut()
345                .insert(self.id, labels, Arc::downgrade(series));
346        });
347    }
348}
349
350#[cfg(test)]
351mod tests {
352    use super::*;
353
354    #[test]
355    fn test_basic_operations() {
356        let gauge = DynamicGauge::new(4);
357        gauge.set(&[("org_id", "42"), ("endpoint_uuid", "abc")], 100.5);
358
359        assert!(
360            (gauge.get(&[("org_id", "42"), ("endpoint_uuid", "abc")]) - 100.5).abs() < f64::EPSILON
361        );
362    }
363
364    #[test]
365    fn test_label_order_is_canonicalized() {
366        let gauge = DynamicGauge::new(4);
367        gauge.set(&[("org_id", "42"), ("endpoint_uuid", "abc")], 50.0);
368
369        assert!(
370            (gauge.get(&[("endpoint_uuid", "abc"), ("org_id", "42")]) - 50.0).abs() < f64::EPSILON
371        );
372    }
373
374    #[test]
375    fn test_series_handle() {
376        let gauge = DynamicGauge::new(4);
377        let series = gauge.series(&[("org_id", "42"), ("endpoint_uuid", "abc")]);
378        series.set(123.456);
379
380        assert!((series.get() - 123.456).abs() < f64::EPSILON);
381        assert!(
382            (gauge.get(&[("org_id", "42"), ("endpoint_uuid", "abc")]) - 123.456).abs()
383                < f64::EPSILON
384        );
385    }
386
387    #[test]
388    fn test_snapshot() {
389        let gauge = DynamicGauge::new(4);
390        gauge.set(&[("org_id", "1")], 10.0);
391        gauge.set(&[("org_id", "2")], 20.0);
392
393        let snap = gauge.snapshot();
394        assert_eq!(snap.len(), 2);
395
396        let total: f64 = snap.iter().map(|(_, v)| v).sum();
397        assert!((total - 30.0).abs() < f64::EPSILON);
398    }
399
400    #[test]
401    fn test_overflow_bucket_routes_new_series_at_capacity() {
402        let gauge = DynamicGauge::with_max_series(4, 1);
403        gauge.set(&[("org_id", "1")], 1.0);
404        gauge.set(&[("org_id", "2")], 2.0);
405
406        assert_eq!(gauge.cardinality(), 2);
407        assert!(
408            (gauge.get(&[(OVERFLOW_LABEL_KEY, OVERFLOW_LABEL_VALUE)]) - 2.0).abs() < f64::EPSILON
409        );
410    }
411}