Skip to main content

fast_telemetry/metric/dynamic/
gauge.rs

1//! Runtime-labeled gauge for dynamic dimensions.
2
3#[cfg(feature = "eviction")]
4use super::current_cycle;
5use super::{DynamicLabelSet, GAUGE_IDS};
6use crossbeam_utils::CachePadded;
7use parking_lot::RwLock;
8use std::cell::RefCell;
9use std::collections::HashMap;
10use std::hash::{Hash, Hasher};
11#[cfg(feature = "eviction")]
12use std::sync::atomic::AtomicU32;
13use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
14use std::sync::{Arc, Weak};
15
16const DEFAULT_MAX_SERIES: usize = 2000;
17const OVERFLOW_LABEL_KEY: &str = "__ft_overflow";
18const OVERFLOW_LABEL_VALUE: &str = "true";
19
20type GaugeIndexShard = CachePadded<RwLock<HashMap<DynamicLabelSet, Arc<GaugeSeries>>>>;
21
22struct GaugeSeries {
23    bits: CachePadded<AtomicU64>,
24    /// Tombstone flag set by exporter before removing from map.
25    evicted: AtomicBool,
26    /// Last export cycle when this series was accessed.
27    #[cfg(feature = "eviction")]
28    last_accessed_cycle: AtomicU32,
29}
30
31impl GaugeSeries {
32    #[cfg(feature = "eviction")]
33    fn new(cycle: u32) -> Self {
34        Self {
35            bits: CachePadded::new(AtomicU64::new(0.0_f64.to_bits())),
36            evicted: AtomicBool::new(false),
37            last_accessed_cycle: AtomicU32::new(cycle),
38        }
39    }
40
41    #[cfg(not(feature = "eviction"))]
42    fn new() -> Self {
43        Self {
44            bits: CachePadded::new(AtomicU64::new(0.0_f64.to_bits())),
45            evicted: AtomicBool::new(false),
46        }
47    }
48
49    #[inline]
50    fn set(&self, value: f64) {
51        self.bits.store(value.to_bits(), Ordering::Relaxed);
52        // Note: timestamp updated on slow path (lookup/cache miss) to avoid
53        // global atomic read on every set.
54    }
55
56    /// Touch the series timestamp. Called on slow path only.
57    #[cfg(feature = "eviction")]
58    #[inline]
59    fn touch(&self, cycle: u32) {
60        self.last_accessed_cycle.store(cycle, Ordering::Relaxed);
61    }
62
63    #[inline]
64    fn get(&self) -> f64 {
65        f64::from_bits(self.bits.load(Ordering::Relaxed))
66    }
67
68    #[inline]
69    fn is_evicted(&self) -> bool {
70        self.evicted.load(Ordering::Relaxed)
71    }
72
73    #[cfg(feature = "eviction")]
74    fn mark_evicted(&self) {
75        self.evicted.store(true, Ordering::Relaxed);
76    }
77}
78
79/// A reusable handle to a dynamic-label gauge series.
80///
81/// Use this for hot paths to avoid per-update label canonicalization and map
82/// lookups. Resolve once with `DynamicGauge::series(...)`, then call `set()`
83/// / `get()` on the handle.
84#[derive(Clone)]
85pub struct DynamicGaugeSeries {
86    series: Arc<GaugeSeries>,
87}
88
89impl DynamicGaugeSeries {
90    /// Set the gauge value.
91    #[inline]
92    pub fn set(&self, value: f64) {
93        self.series.set(value);
94    }
95
96    /// Get the current value.
97    #[inline]
98    pub fn get(&self) -> f64 {
99        self.series.get()
100    }
101
102    /// Check if this series handle has been evicted.
103    #[inline]
104    pub fn is_evicted(&self) -> bool {
105        self.series.is_evicted()
106    }
107}
108
109struct SeriesCacheEntry {
110    gauge_id: usize,
111    ordered_labels: Vec<(String, String)>,
112    series: Weak<GaugeSeries>,
113}
114
115thread_local! {
116    static SERIES_CACHE: RefCell<Option<SeriesCacheEntry>> = const { RefCell::new(None) };
117}
118
119/// Gauge keyed by runtime label sets.
120///
121/// Uses sharded index for key->series lookup for concurrent access.
122pub struct DynamicGauge {
123    id: usize,
124    max_series: usize,
125    shard_mask: usize,
126    index_shards: Vec<GaugeIndexShard>,
127    /// Approximate number of live series (incremented on insert, decremented on evict).
128    series_count: AtomicUsize,
129    /// Count of records routed to overflow bucket due to cardinality cap.
130    overflow_count: AtomicU64,
131}
132
133impl DynamicGauge {
134    /// Creates a new runtime-labeled gauge.
135    pub fn new(shard_count: usize) -> Self {
136        Self::with_max_series(shard_count, DEFAULT_MAX_SERIES)
137    }
138
139    /// Creates a new runtime-labeled gauge with a series cardinality cap.
140    ///
141    /// When the number of unique label sets approximately reaches `max_series`,
142    /// new label sets are redirected into a single overflow series
143    /// (`__ft_overflow=true`). The cap is checked via a lock-free atomic counter,
144    /// so concurrent inserts may briefly overshoot by the number of in-flight
145    /// writers before the overflow kicks in.
146    pub fn with_max_series(shard_count: usize, max_series: usize) -> Self {
147        let shard_count = shard_count.next_power_of_two();
148        let id = GAUGE_IDS.fetch_add(1, Ordering::Relaxed);
149        Self {
150            id,
151            max_series,
152            shard_mask: shard_count - 1,
153            index_shards: (0..shard_count)
154                .map(|_| CachePadded::new(RwLock::new(HashMap::new())))
155                .collect(),
156            series_count: AtomicUsize::new(0),
157            overflow_count: AtomicU64::new(0),
158        }
159    }
160
161    /// Resolve a reusable series handle for `labels`.
162    ///
163    /// Preferred for hot paths when labels come from a finite active set.
164    pub fn series(&self, labels: &[(&str, &str)]) -> DynamicGaugeSeries {
165        if let Some(series) = self.cached_series(labels) {
166            return DynamicGaugeSeries { series };
167        }
168        let series = self.lookup_or_create(labels);
169        self.update_cache(labels, Arc::clone(&series));
170        DynamicGaugeSeries { series }
171    }
172
173    /// Set the gauge value for the series identified by `labels`.
174    #[inline]
175    pub fn set(&self, labels: &[(&str, &str)], value: f64) {
176        if let Some(series) = self.cached_series(labels) {
177            series.set(value);
178            return;
179        }
180
181        let series = self.lookup_or_create(labels);
182        self.update_cache(labels, Arc::clone(&series));
183        series.set(value);
184    }
185
186    /// Get the current value for the series identified by `labels`.
187    pub fn get(&self, labels: &[(&str, &str)]) -> f64 {
188        let key = DynamicLabelSet::from_pairs(labels);
189        let index_shard = self.index_shard_for(&key);
190        self.index_shards[index_shard]
191            .read()
192            .get(&key)
193            .map(|series| series.get())
194            .unwrap_or(0.0)
195    }
196
197    /// Returns a snapshot of all label-set/value pairs.
198    pub fn snapshot(&self) -> Vec<(DynamicLabelSet, f64)> {
199        let mut out = Vec::new();
200        for shard in &self.index_shards {
201            let guard = shard.read();
202            for (labels, series) in guard.iter() {
203                out.push((labels.clone(), series.get()));
204            }
205        }
206        out
207    }
208
209    /// Returns the current number of distinct label sets.
210    pub fn cardinality(&self) -> usize {
211        self.index_shards
212            .iter()
213            .map(|shard| shard.read().len())
214            .sum()
215    }
216
217    /// Returns the number of records routed to the overflow bucket.
218    ///
219    /// A non-zero value indicates the cardinality cap was hit and label
220    /// fidelity is being lost. Use this to alert on cardinality pressure.
221    pub fn overflow_count(&self) -> u64 {
222        self.overflow_count.load(Ordering::Relaxed)
223    }
224
225    /// Iterate all series without cloning label sets.
226    ///
227    /// Calls `f` with borrowed label pairs and the current value for each series.
228    /// Used by exporters to avoid the intermediate `snapshot()` allocation.
229    pub(crate) fn visit_series(&self, mut f: impl FnMut(&[(String, String)], f64)) {
230        for shard in &self.index_shards {
231            let guard = shard.read();
232            for (labels, series) in guard.iter() {
233                f(labels.pairs(), series.get());
234            }
235        }
236    }
237
238    /// Evict series that haven't been accessed for `max_staleness` cycles.
239    ///
240    /// Call this after `advance_cycle()` in your exporter task.
241    /// Series are marked as evicted (so cached handles see the tombstone),
242    /// then removed from the index.
243    ///
244    /// Protected series (Arc::strong_count > 1) are never evicted - someone
245    /// holds a DynamicGaugeSeries handle to them.
246    ///
247    /// Returns the number of series evicted.
248    #[cfg(feature = "eviction")]
249    pub fn evict_stale(&self, max_staleness: u32) -> usize {
250        let cycle = current_cycle();
251        let mut removed = 0;
252
253        for shard in &self.index_shards {
254            let mut guard = shard.write();
255            guard.retain(|_labels, series| {
256                // Protected if someone holds a handle (strong_count > 1 means
257                // both the map and at least one DynamicGaugeSeries hold refs)
258                if Arc::strong_count(series) > 1 {
259                    return true;
260                }
261                // Otherwise check timestamp staleness
262                let last = series.last_accessed_cycle.load(Ordering::Relaxed);
263                let stale = cycle.saturating_sub(last) > max_staleness;
264                if stale {
265                    series.mark_evicted();
266                    removed += 1;
267                    self.series_count.fetch_sub(1, Ordering::Relaxed);
268                }
269                !stale
270            });
271        }
272
273        removed
274    }
275
276    fn lookup_or_create(&self, labels: &[(&str, &str)]) -> Arc<GaugeSeries> {
277        let requested_key = DynamicLabelSet::from_pairs(labels);
278        let requested_shard = self.index_shard_for(&requested_key);
279        #[cfg(feature = "eviction")]
280        let cycle = current_cycle();
281
282        // Fast path: read lock only.
283        if let Some(series) = self.index_shards[requested_shard]
284            .read()
285            .get(&requested_key)
286        {
287            #[cfg(feature = "eviction")]
288            series.touch(cycle);
289            return Arc::clone(series);
290        }
291
292        // Check cardinality cap BEFORE taking any write lock (lock-free).
293        let key = if self.max_series > 0
294            && self.series_count.load(Ordering::Relaxed) >= self.max_series
295        {
296            self.overflow_count.fetch_add(1, Ordering::Relaxed);
297            DynamicLabelSet::from_pairs(&[(OVERFLOW_LABEL_KEY, OVERFLOW_LABEL_VALUE)])
298        } else {
299            requested_key
300        };
301        let shard = self.index_shard_for(&key);
302
303        if let Some(series) = self.index_shards[shard].read().get(&key) {
304            #[cfg(feature = "eviction")]
305            series.touch(cycle);
306            return Arc::clone(series);
307        }
308
309        let mut guard = self.index_shards[shard].write();
310        if let Some(series) = guard.get(&key) {
311            #[cfg(feature = "eviction")]
312            series.touch(cycle);
313            return Arc::clone(series);
314        }
315        #[cfg(feature = "eviction")]
316        let series = Arc::new(GaugeSeries::new(cycle));
317        #[cfg(not(feature = "eviction"))]
318        let series = Arc::new(GaugeSeries::new());
319        guard.insert(key, Arc::clone(&series));
320        self.series_count.fetch_add(1, Ordering::Relaxed);
321        series
322    }
323
324    fn index_shard_for(&self, key: &DynamicLabelSet) -> usize {
325        let mut hasher = std::collections::hash_map::DefaultHasher::new();
326        key.hash(&mut hasher);
327        (hasher.finish() as usize) & self.shard_mask
328    }
329
330    fn cached_series(&self, labels: &[(&str, &str)]) -> Option<Arc<GaugeSeries>> {
331        SERIES_CACHE.with(|cache| {
332            let cache_ref = cache.borrow();
333            let entry = cache_ref.as_ref()?;
334            if entry.gauge_id != self.id {
335                return None;
336            }
337            if entry.ordered_labels.len() != labels.len() {
338                return None;
339            }
340            for (idx, (k, v)) in labels.iter().enumerate() {
341                let (ek, ev) = &entry.ordered_labels[idx];
342                if ek != k || ev != v {
343                    return None;
344                }
345            }
346            let series = entry.series.upgrade()?;
347            if series.is_evicted() {
348                return None;
349            }
350            #[cfg(feature = "eviction")]
351            series.touch(current_cycle());
352            Some(series)
353        })
354    }
355
356    fn update_cache(&self, labels: &[(&str, &str)], series: Arc<GaugeSeries>) {
357        SERIES_CACHE.with(|cache| {
358            let ordered_labels = labels
359                .iter()
360                .map(|(k, v)| ((*k).to_string(), (*v).to_string()))
361                .collect();
362            *cache.borrow_mut() = Some(SeriesCacheEntry {
363                gauge_id: self.id,
364                ordered_labels,
365                series: Arc::downgrade(&series),
366            });
367        });
368    }
369}
370
371#[cfg(test)]
372mod tests {
373    use super::*;
374
375    #[test]
376    fn test_basic_operations() {
377        let gauge = DynamicGauge::new(4);
378        gauge.set(&[("org_id", "42"), ("endpoint_uuid", "abc")], 100.5);
379
380        assert!(
381            (gauge.get(&[("org_id", "42"), ("endpoint_uuid", "abc")]) - 100.5).abs() < f64::EPSILON
382        );
383    }
384
385    #[test]
386    fn test_label_order_is_canonicalized() {
387        let gauge = DynamicGauge::new(4);
388        gauge.set(&[("org_id", "42"), ("endpoint_uuid", "abc")], 50.0);
389
390        assert!(
391            (gauge.get(&[("endpoint_uuid", "abc"), ("org_id", "42")]) - 50.0).abs() < f64::EPSILON
392        );
393    }
394
395    #[test]
396    fn test_series_handle() {
397        let gauge = DynamicGauge::new(4);
398        let series = gauge.series(&[("org_id", "42"), ("endpoint_uuid", "abc")]);
399        series.set(123.456);
400
401        assert!((series.get() - 123.456).abs() < f64::EPSILON);
402        assert!(
403            (gauge.get(&[("org_id", "42"), ("endpoint_uuid", "abc")]) - 123.456).abs()
404                < f64::EPSILON
405        );
406    }
407
408    #[test]
409    fn test_snapshot() {
410        let gauge = DynamicGauge::new(4);
411        gauge.set(&[("org_id", "1")], 10.0);
412        gauge.set(&[("org_id", "2")], 20.0);
413
414        let snap = gauge.snapshot();
415        assert_eq!(snap.len(), 2);
416
417        let total: f64 = snap.iter().map(|(_, v)| v).sum();
418        assert!((total - 30.0).abs() < f64::EPSILON);
419    }
420
421    #[test]
422    fn test_overflow_bucket_routes_new_series_at_capacity() {
423        let gauge = DynamicGauge::with_max_series(4, 1);
424        gauge.set(&[("org_id", "1")], 1.0);
425        gauge.set(&[("org_id", "2")], 2.0);
426
427        assert_eq!(gauge.cardinality(), 2);
428        assert!(
429            (gauge.get(&[(OVERFLOW_LABEL_KEY, OVERFLOW_LABEL_VALUE)]) - 2.0).abs() < f64::EPSILON
430        );
431    }
432}