Skip to main content

fast_telemetry/metric/dynamic/
gauge_i64.rs

1//! Runtime-labeled signed integer gauge for dynamic dimensions.
2//!
3//! Use this for metrics like "active connections" or "in-flight requests"
4//! that need atomic add/sub semantics but should export as absolute gauge values
5//! (not counter deltas).
6
7use super::cache::{CacheableSeries, LabelCache, SERIES_CACHE_SIZE};
8#[cfg(feature = "eviction")]
9use super::current_cycle;
10use super::{DynamicLabelSet, thread_id};
11use crossbeam_utils::CachePadded;
12use parking_lot::RwLock;
13use std::cell::RefCell;
14use std::collections::HashMap;
15use std::hash::{Hash, Hasher};
16#[cfg(feature = "eviction")]
17use std::sync::atomic::AtomicU32;
18use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, AtomicUsize, Ordering};
19use std::sync::{Arc, Weak};
20
21static GAUGE_I64_IDS: AtomicUsize = AtomicUsize::new(1);
22const DEFAULT_MAX_SERIES: usize = 2000;
23const OVERFLOW_LABEL_KEY: &str = "__ft_overflow";
24const OVERFLOW_LABEL_VALUE: &str = "true";
25type GaugeI64IndexShard = CachePadded<RwLock<HashMap<DynamicLabelSet, Arc<GaugeI64Series>>>>;
26
27struct GaugeI64Series {
28    cells: Vec<CachePadded<AtomicI64>>,
29    /// Tombstone flag set by exporter before removing from map.
30    evicted: AtomicBool,
31    /// Last export cycle when this series was accessed.
32    #[cfg(feature = "eviction")]
33    last_accessed_cycle: AtomicU32,
34}
35
36impl GaugeI64Series {
37    #[cfg(feature = "eviction")]
38    fn new(shard_count: usize, cycle: u32) -> Self {
39        Self {
40            cells: (0..shard_count)
41                .map(|_| CachePadded::new(AtomicI64::new(0)))
42                .collect(),
43            evicted: AtomicBool::new(false),
44            last_accessed_cycle: AtomicU32::new(cycle),
45        }
46    }
47
48    #[cfg(not(feature = "eviction"))]
49    fn new(shard_count: usize) -> Self {
50        Self {
51            cells: (0..shard_count)
52                .map(|_| CachePadded::new(AtomicI64::new(0)))
53                .collect(),
54            evicted: AtomicBool::new(false),
55        }
56    }
57
58    #[inline]
59    fn add_at(&self, shard_idx: usize, value: i64) {
60        self.cells[shard_idx].fetch_add(value, Ordering::Relaxed);
61        // Note: timestamp updated on slow path (lookup/cache miss) to avoid
62        // global atomic read on every add.
63    }
64
65    #[inline]
66    fn set_at(&self, shard_idx: usize, value: i64) {
67        // For set, we need to clear other shards and set the target shard
68        // This is inherently racy but acceptable for gauge semantics
69        for (i, cell) in self.cells.iter().enumerate() {
70            if i == shard_idx {
71                cell.store(value, Ordering::Relaxed);
72            } else {
73                cell.store(0, Ordering::Relaxed);
74            }
75        }
76        // Note: timestamp updated on slow path (lookup/cache miss) to avoid
77        // global atomic read on every set.
78    }
79
80    /// Touch the series timestamp. Called on slow path only.
81    #[cfg(feature = "eviction")]
82    #[inline]
83    fn touch(&self, cycle: u32) {
84        self.last_accessed_cycle.store(cycle, Ordering::Relaxed);
85    }
86
87    #[inline]
88    fn sum(&self) -> i64 {
89        self.cells
90            .iter()
91            .map(|cell| cell.load(Ordering::Relaxed))
92            .sum()
93    }
94
95    #[inline]
96    fn is_evicted(&self) -> bool {
97        self.evicted.load(Ordering::Relaxed)
98    }
99
100    #[cfg(feature = "eviction")]
101    fn mark_evicted(&self) {
102        self.evicted.store(true, Ordering::Relaxed);
103    }
104}
105
106impl CacheableSeries for GaugeI64Series {
107    fn is_evicted(&self) -> bool {
108        self.is_evicted()
109    }
110}
111
112/// A reusable handle to a dynamic-label i64 gauge series.
113///
114/// Use this for hot paths to avoid per-update label canonicalization and map
115/// lookups. Resolve once with `DynamicGaugeI64::series(...)`, then call `add()`
116/// / `set()` on the handle.
117#[derive(Clone)]
118pub struct DynamicGaugeI64Series {
119    series: Arc<GaugeI64Series>,
120    shard_mask: usize,
121}
122
123impl DynamicGaugeI64Series {
124    /// Increment this gauge by 1.
125    #[inline]
126    pub fn inc(&self) {
127        self.add(1);
128    }
129
130    /// Decrement this gauge by 1.
131    #[inline]
132    pub fn dec(&self) {
133        self.add(-1);
134    }
135
136    /// Add `value` to this gauge (can be negative).
137    #[inline]
138    pub fn add(&self, value: i64) {
139        let shard_idx = thread_id() & self.shard_mask;
140        self.series.add_at(shard_idx, value);
141    }
142
143    /// Set this gauge to an absolute value.
144    #[inline]
145    pub fn set(&self, value: i64) {
146        let shard_idx = thread_id() & self.shard_mask;
147        self.series.set_at(shard_idx, value);
148    }
149
150    /// Get this gauge's total across shards.
151    #[inline]
152    pub fn get(&self) -> i64 {
153        self.series.sum()
154    }
155
156    /// Check if this series handle has been evicted.
157    #[inline]
158    pub fn is_evicted(&self) -> bool {
159        self.series.is_evicted()
160    }
161}
162
163thread_local! {
164    static SERIES_CACHE: RefCell<LabelCache<Weak<GaugeI64Series>, SERIES_CACHE_SIZE>> =
165        RefCell::new(LabelCache::new());
166}
167
168/// Signed integer gauge keyed by runtime label sets.
169///
170/// Unlike `DynamicCounter`, this exports as a gauge (absolute value) rather than
171/// a counter (delta). Use for metrics like "active connections" that go up and down.
172///
173/// Uses sharded atomics internally for fast concurrent updates.
174pub struct DynamicGaugeI64 {
175    id: usize,
176    shard_count: usize,
177    max_series: usize,
178    shard_mask: usize,
179    index_shards: Vec<GaugeI64IndexShard>,
180    /// Approximate number of live series (incremented on insert, decremented on evict).
181    series_count: AtomicUsize,
182    /// Count of records routed to overflow bucket due to cardinality cap.
183    overflow_count: AtomicU64,
184}
185
186impl DynamicGaugeI64 {
187    /// Creates a new runtime-labeled i64 gauge.
188    pub fn new(shard_count: usize) -> Self {
189        Self::with_max_series(shard_count, DEFAULT_MAX_SERIES)
190    }
191
192    /// Creates a new runtime-labeled i64 gauge with a series cardinality cap.
193    ///
194    /// When the number of unique label sets approximately reaches `max_series`,
195    /// new label sets are redirected into a single overflow series
196    /// (`__ft_overflow=true`). The cap is checked via a lock-free atomic counter,
197    /// so concurrent inserts may briefly overshoot by the number of in-flight
198    /// writers before the overflow kicks in.
199    pub fn with_max_series(shard_count: usize, max_series: usize) -> Self {
200        let shard_count = shard_count.next_power_of_two();
201        let id = GAUGE_I64_IDS.fetch_add(1, Ordering::Relaxed);
202        Self {
203            id,
204            shard_count,
205            max_series,
206            shard_mask: shard_count - 1,
207            index_shards: (0..shard_count)
208                .map(|_| CachePadded::new(RwLock::new(HashMap::new())))
209                .collect(),
210            series_count: AtomicUsize::new(0),
211            overflow_count: AtomicU64::new(0),
212        }
213    }
214
215    /// Resolve a reusable series handle for `labels`.
216    pub fn series(&self, labels: &[(&str, &str)]) -> DynamicGaugeI64Series {
217        if let Some(series) = self.cached_series(labels) {
218            return DynamicGaugeI64Series {
219                series,
220                shard_mask: self.shard_mask,
221            };
222        }
223        let series = self.lookup_or_create(labels);
224        self.update_cache(labels, &series);
225        DynamicGaugeI64Series {
226            series,
227            shard_mask: self.shard_mask,
228        }
229    }
230
231    /// Increments the gauge identified by `labels` by 1.
232    #[inline]
233    pub fn inc(&self, labels: &[(&str, &str)]) {
234        self.add(labels, 1);
235    }
236
237    /// Decrements the gauge identified by `labels` by 1.
238    #[inline]
239    pub fn dec(&self, labels: &[(&str, &str)]) {
240        self.add(labels, -1);
241    }
242
243    /// Adds `value` to the gauge identified by `labels` (can be negative).
244    #[inline]
245    pub fn add(&self, labels: &[(&str, &str)], value: i64) {
246        if let Some(series) = self.cached_series(labels) {
247            let shard_idx = thread_id() & self.shard_mask;
248            series.add_at(shard_idx, value);
249            return;
250        }
251
252        let series = self.lookup_or_create(labels);
253        self.update_cache(labels, &series);
254        let shard_idx = thread_id() & self.shard_mask;
255        series.add_at(shard_idx, value);
256    }
257
258    /// Sets the gauge identified by `labels` to an absolute value.
259    #[inline]
260    pub fn set(&self, labels: &[(&str, &str)], value: i64) {
261        if let Some(series) = self.cached_series(labels) {
262            let shard_idx = thread_id() & self.shard_mask;
263            series.set_at(shard_idx, value);
264            return;
265        }
266
267        let series = self.lookup_or_create(labels);
268        self.update_cache(labels, &series);
269        let shard_idx = thread_id() & self.shard_mask;
270        series.set_at(shard_idx, value);
271    }
272
273    /// Gets the current value for the gauge identified by `labels`.
274    pub fn get(&self, labels: &[(&str, &str)]) -> i64 {
275        let key = DynamicLabelSet::from_pairs(labels);
276        let index_shard = self.index_shard_for(&key);
277        self.index_shards[index_shard]
278            .read()
279            .get(&key)
280            .map(|series| series.sum())
281            .unwrap_or(0)
282    }
283
284    /// Sums all series.
285    pub fn sum_all(&self) -> i64 {
286        self.snapshot().into_iter().map(|(_, value)| value).sum()
287    }
288
289    /// Returns a snapshot of all label-set/value pairs.
290    pub fn snapshot(&self) -> Vec<(DynamicLabelSet, i64)> {
291        let mut out = Vec::new();
292        for shard in &self.index_shards {
293            let guard = shard.read();
294            for (labels, series) in guard.iter() {
295                out.push((labels.clone(), series.sum()));
296            }
297        }
298        out
299    }
300
301    /// Returns the current number of distinct label sets.
302    pub fn cardinality(&self) -> usize {
303        self.index_shards
304            .iter()
305            .map(|shard| shard.read().len())
306            .sum()
307    }
308
309    /// Returns the number of records routed to the overflow bucket.
310    ///
311    /// A non-zero value indicates the cardinality cap was hit and label
312    /// fidelity is being lost. Use this to alert on cardinality pressure.
313    pub fn overflow_count(&self) -> u64 {
314        self.overflow_count.load(Ordering::Relaxed)
315    }
316
317    /// Iterate all series without cloning label sets.
318    ///
319    /// Calls `f` with borrowed label pairs and the current value for each series.
320    /// Used by exporters to avoid the intermediate `snapshot()` allocation.
321    pub(crate) fn visit_series(&self, mut f: impl FnMut(&[(String, String)], i64)) {
322        for shard in &self.index_shards {
323            let guard = shard.read();
324            for (labels, series) in guard.iter() {
325                f(labels.pairs(), series.sum());
326            }
327        }
328    }
329
330    /// Evict series that haven't been accessed for `max_staleness` cycles.
331    ///
332    /// Call this after `advance_cycle()` in your exporter task.
333    /// Series are marked as evicted (so cached handles see the tombstone),
334    /// then removed from the index.
335    ///
336    /// Protected series (Arc::strong_count > 1) are never evicted - someone
337    /// holds a DynamicGaugeI64Series handle to them.
338    ///
339    /// Returns the number of series evicted.
340    #[cfg(feature = "eviction")]
341    pub fn evict_stale(&self, max_staleness: u32) -> usize {
342        let cycle = current_cycle();
343        let mut removed = 0;
344
345        for shard in &self.index_shards {
346            let mut guard = shard.write();
347            guard.retain(|_labels, series| {
348                // Protected if someone holds a handle (strong_count > 1 means
349                // both the map and at least one DynamicGaugeI64Series hold refs)
350                if Arc::strong_count(series) > 1 {
351                    return true;
352                }
353                // Otherwise check timestamp staleness
354                let last = series.last_accessed_cycle.load(Ordering::Relaxed);
355                let stale = cycle.saturating_sub(last) > max_staleness;
356                if stale {
357                    series.mark_evicted();
358                    removed += 1;
359                    self.series_count.fetch_sub(1, Ordering::Relaxed);
360                }
361                !stale
362            });
363        }
364
365        removed
366    }
367
368    fn lookup_or_create(&self, labels: &[(&str, &str)]) -> Arc<GaugeI64Series> {
369        let requested_key = DynamicLabelSet::from_pairs(labels);
370        let requested_shard = self.index_shard_for(&requested_key);
371        #[cfg(feature = "eviction")]
372        let cycle = current_cycle();
373
374        // Fast path: read lock only.
375        if let Some(series) = self.index_shards[requested_shard]
376            .read()
377            .get(&requested_key)
378        {
379            #[cfg(feature = "eviction")]
380            series.touch(cycle);
381            return Arc::clone(series);
382        }
383
384        // Check cardinality cap BEFORE taking any write lock (lock-free).
385        let key = if self.max_series > 0
386            && self.series_count.load(Ordering::Relaxed) >= self.max_series
387        {
388            self.overflow_count.fetch_add(1, Ordering::Relaxed);
389            DynamicLabelSet::from_pairs(&[(OVERFLOW_LABEL_KEY, OVERFLOW_LABEL_VALUE)])
390        } else {
391            requested_key
392        };
393        let shard = self.index_shard_for(&key);
394
395        if let Some(series) = self.index_shards[shard].read().get(&key) {
396            #[cfg(feature = "eviction")]
397            series.touch(cycle);
398            return Arc::clone(series);
399        }
400
401        let mut guard = self.index_shards[shard].write();
402        if let Some(series) = guard.get(&key) {
403            #[cfg(feature = "eviction")]
404            series.touch(cycle);
405            return Arc::clone(series);
406        }
407        #[cfg(feature = "eviction")]
408        let series = Arc::new(GaugeI64Series::new(self.shard_count, cycle));
409        #[cfg(not(feature = "eviction"))]
410        let series = Arc::new(GaugeI64Series::new(self.shard_count));
411        guard.insert(key, Arc::clone(&series));
412        self.series_count.fetch_add(1, Ordering::Relaxed);
413        series
414    }
415
416    fn index_shard_for(&self, key: &DynamicLabelSet) -> usize {
417        let mut hasher = std::collections::hash_map::DefaultHasher::new();
418        key.hash(&mut hasher);
419        (hasher.finish() as usize) & self.shard_mask
420    }
421
422    fn cached_series(&self, labels: &[(&str, &str)]) -> Option<Arc<GaugeI64Series>> {
423        SERIES_CACHE.with(|cache| {
424            let series = cache.borrow_mut().get(self.id, labels)?;
425            #[cfg(feature = "eviction")]
426            series.touch(current_cycle());
427            Some(series)
428        })
429    }
430
431    fn update_cache(&self, labels: &[(&str, &str)], series: &Arc<GaugeI64Series>) {
432        SERIES_CACHE.with(|cache| {
433            cache
434                .borrow_mut()
435                .insert(self.id, labels, Arc::downgrade(series));
436        });
437    }
438}
439
440#[cfg(test)]
441mod tests {
442    #[cfg(feature = "eviction")]
443    use super::super::advance_cycle;
444    use super::*;
445
446    #[test]
447    fn test_basic_operations() {
448        let gauge = DynamicGaugeI64::new(4);
449        gauge.inc(&[("endpoint_id", "ep1")]);
450        gauge.add(&[("endpoint_id", "ep1")], 2);
451
452        assert_eq!(gauge.get(&[("endpoint_id", "ep1")]), 3);
453
454        gauge.dec(&[("endpoint_id", "ep1")]);
455        assert_eq!(gauge.get(&[("endpoint_id", "ep1")]), 2);
456
457        gauge.add(&[("endpoint_id", "ep1")], -2);
458        assert_eq!(gauge.get(&[("endpoint_id", "ep1")]), 0);
459    }
460
461    #[test]
462    fn test_series_handle() {
463        let gauge = DynamicGaugeI64::new(4);
464        let series = gauge.series(&[("endpoint_id", "ep1")]);
465        series.inc();
466        series.inc();
467        series.dec();
468
469        assert_eq!(series.get(), 1);
470        assert_eq!(gauge.get(&[("endpoint_id", "ep1")]), 1);
471    }
472
473    #[test]
474    fn test_snapshot() {
475        let gauge = DynamicGaugeI64::new(4);
476        gauge.add(&[("endpoint_id", "ep1")], 10);
477        gauge.add(&[("endpoint_id", "ep2")], 20);
478
479        let snap = gauge.snapshot();
480        assert_eq!(snap.len(), 2);
481
482        let total: i64 = snap.iter().map(|(_, v)| v).sum();
483        assert_eq!(total, 30);
484    }
485
486    #[cfg(feature = "eviction")]
487    #[test]
488    fn test_evict_stale() {
489        let gauge = DynamicGaugeI64::new(4);
490        let labels = &[("endpoint_id", "evict_i64")];
491
492        gauge.add(labels, 5);
493        assert_eq!(gauge.cardinality(), 1);
494
495        // Advance cycles past staleness threshold
496        advance_cycle();
497        advance_cycle();
498
499        // Flush thread-local cache by accessing a different label set
500        gauge.add(&[("flush", "cache")], 1);
501
502        let removed = gauge.evict_stale(1);
503        assert_eq!(removed, 1);
504        assert_eq!(gauge.cardinality(), 1); // flush series remains
505        assert_eq!(gauge.get(labels), 0);
506    }
507
508    #[cfg(feature = "eviction")]
509    #[test]
510    fn test_series_handle_protects_from_eviction() {
511        let gauge = DynamicGaugeI64::new(4);
512        let labels = &[("endpoint_id", "tombstone_i64")];
513
514        let series = gauge.series(labels);
515        series.add(5);
516        assert!(!series.is_evicted());
517
518        // Try to evict - but handle protects the series
519        advance_cycle();
520        advance_cycle();
521        let removed = gauge.evict_stale(1);
522
523        // Handle protects series from eviction (Arc::strong_count > 1)
524        assert_eq!(removed, 0);
525        assert!(!series.is_evicted());
526        assert_eq!(gauge.get(labels), 5);
527    }
528
529    #[test]
530    fn test_overflow_bucket_routes_new_series_at_capacity() {
531        let gauge = DynamicGaugeI64::with_max_series(4, 1);
532        gauge.add(&[("endpoint_id", "1")], 1);
533        gauge.add(&[("endpoint_id", "2")], 2);
534
535        assert_eq!(gauge.cardinality(), 2);
536        assert_eq!(gauge.get(&[(OVERFLOW_LABEL_KEY, OVERFLOW_LABEL_VALUE)]), 2);
537    }
538}