Skip to main content

fast_telemetry/metric/dynamic/
gauge_i64.rs

1//! Runtime-labeled signed integer gauge for dynamic dimensions.
2//!
3//! Use this for metrics like "active connections" or "in-flight requests"
4//! that need atomic add/sub semantics but should export as absolute gauge values
5//! (not counter deltas).
6
7use super::cache::{CacheableSeries, LabelCache, SERIES_CACHE_SIZE};
8#[cfg(feature = "eviction")]
9use super::current_cycle;
10use super::{DynamicLabelSet, thread_id};
11use crossbeam_utils::CachePadded;
12use parking_lot::RwLock;
13use std::cell::RefCell;
14use std::collections::HashMap;
15use std::hash::{Hash, Hasher};
16#[cfg(feature = "eviction")]
17use std::sync::atomic::AtomicU32;
18use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, AtomicUsize, Ordering};
19use std::sync::{Arc, Weak};
20
21static GAUGE_I64_IDS: AtomicUsize = AtomicUsize::new(1);
22const DEFAULT_MAX_SERIES: usize = 2000;
23const OVERFLOW_LABEL_KEY: &str = "__ft_overflow";
24const OVERFLOW_LABEL_VALUE: &str = "true";
25type GaugeI64IndexShard = CachePadded<RwLock<HashMap<DynamicLabelSet, Arc<GaugeI64Series>>>>;
26
27struct GaugeI64Series {
28    cells: Vec<CachePadded<AtomicI64>>,
29    /// Tombstone flag set by exporter before removing from map.
30    evicted: AtomicBool,
31    /// Last export cycle when this series was accessed.
32    #[cfg(feature = "eviction")]
33    last_accessed_cycle: AtomicU32,
34}
35
36impl GaugeI64Series {
37    #[cfg(feature = "eviction")]
38    fn new(shard_count: usize, cycle: u32) -> Self {
39        Self {
40            cells: (0..shard_count)
41                .map(|_| CachePadded::new(AtomicI64::new(0)))
42                .collect(),
43            evicted: AtomicBool::new(false),
44            last_accessed_cycle: AtomicU32::new(cycle),
45        }
46    }
47
48    #[cfg(not(feature = "eviction"))]
49    fn new(shard_count: usize) -> Self {
50        Self {
51            cells: (0..shard_count)
52                .map(|_| CachePadded::new(AtomicI64::new(0)))
53                .collect(),
54            evicted: AtomicBool::new(false),
55        }
56    }
57
58    #[inline]
59    fn add_at(&self, shard_idx: usize, value: i64) {
60        self.cells[shard_idx].fetch_add(value, Ordering::Relaxed);
61        // Note: timestamp updated on slow path (lookup/cache miss) to avoid
62        // global atomic read on every add.
63    }
64
65    #[inline]
66    fn set_at(&self, shard_idx: usize, value: i64) {
67        // For set, we need to clear other shards and set the target shard
68        // This is inherently racy but acceptable for gauge semantics
69        for (i, cell) in self.cells.iter().enumerate() {
70            if i == shard_idx {
71                cell.store(value, Ordering::Relaxed);
72            } else {
73                cell.store(0, Ordering::Relaxed);
74            }
75        }
76        // Note: timestamp updated on slow path (lookup/cache miss) to avoid
77        // global atomic read on every set.
78    }
79
80    /// Touch the series timestamp. Called on slow path only.
81    #[cfg(feature = "eviction")]
82    #[inline]
83    fn touch(&self, cycle: u32) {
84        self.last_accessed_cycle.store(cycle, Ordering::Relaxed);
85    }
86
87    #[inline]
88    fn sum(&self) -> i64 {
89        self.cells
90            .iter()
91            .map(|cell| cell.load(Ordering::Relaxed))
92            .sum()
93    }
94
95    #[inline]
96    fn is_evicted(&self) -> bool {
97        self.evicted.load(Ordering::Relaxed)
98    }
99
100    #[cfg(feature = "eviction")]
101    fn mark_evicted(&self) {
102        self.evicted.store(true, Ordering::Relaxed);
103    }
104}
105
106impl CacheableSeries for GaugeI64Series {
107    fn is_evicted(&self) -> bool {
108        self.is_evicted()
109    }
110}
111
112/// A reusable handle to a dynamic-label i64 gauge series.
113///
114/// Use this for hot paths to avoid per-update label canonicalization and map
115/// lookups. Resolve once with `DynamicGaugeI64::series(...)`, then call `add()`
116/// / `set()` on the handle.
117#[derive(Clone)]
118pub struct DynamicGaugeI64Series {
119    series: Arc<GaugeI64Series>,
120    shard_mask: usize,
121}
122
123impl DynamicGaugeI64Series {
124    /// Increment this gauge by 1.
125    #[inline]
126    pub fn inc(&self) {
127        self.add(1);
128    }
129
130    /// Decrement this gauge by 1.
131    #[inline]
132    pub fn dec(&self) {
133        self.add(-1);
134    }
135
136    /// Add `value` to this gauge (can be negative).
137    #[inline]
138    pub fn add(&self, value: i64) {
139        let shard_idx = thread_id() & self.shard_mask;
140        self.series.add_at(shard_idx, value);
141    }
142
143    /// Set this gauge to an absolute value.
144    #[inline]
145    pub fn set(&self, value: i64) {
146        let shard_idx = thread_id() & self.shard_mask;
147        self.series.set_at(shard_idx, value);
148    }
149
150    /// Get this gauge's total across shards.
151    #[inline]
152    pub fn get(&self) -> i64 {
153        self.series.sum()
154    }
155
156    /// Check if this series handle has been evicted.
157    #[inline]
158    pub fn is_evicted(&self) -> bool {
159        self.series.is_evicted()
160    }
161}
162
163thread_local! {
164    static SERIES_CACHE: RefCell<LabelCache<Weak<GaugeI64Series>, SERIES_CACHE_SIZE>> =
165        RefCell::new(LabelCache::new());
166}
167
168/// Signed integer gauge keyed by runtime label sets.
169///
170/// Unlike `DynamicCounter`, this exports as a gauge (absolute value) rather than
171/// a counter (delta). Use for metrics like "active connections" that go up and down.
172///
173/// Uses sharded atomics internally for fast concurrent updates.
174pub struct DynamicGaugeI64 {
175    id: usize,
176    shard_count: usize,
177    max_series: usize,
178    shard_mask: usize,
179    index_shards: Vec<GaugeI64IndexShard>,
180    /// Approximate number of live series (incremented on insert, decremented on evict).
181    series_count: AtomicUsize,
182    /// Count of records routed to overflow bucket due to cardinality cap.
183    overflow_count: AtomicU64,
184}
185
186impl DynamicGaugeI64 {
187    /// Creates a new runtime-labeled i64 gauge.
188    pub fn new(shard_count: usize) -> Self {
189        Self::with_max_series(shard_count, DEFAULT_MAX_SERIES)
190    }
191
192    /// Creates a new runtime-labeled i64 gauge with a series cardinality cap.
193    ///
194    /// When the number of unique label sets approximately reaches `max_series`,
195    /// new label sets are redirected into a single overflow series
196    /// (`__ft_overflow=true`). The cap is checked via a lock-free atomic counter,
197    /// so concurrent inserts may briefly overshoot by the number of in-flight
198    /// writers before the overflow kicks in.
199    pub fn with_max_series(shard_count: usize, max_series: usize) -> Self {
200        let shard_count = shard_count.next_power_of_two();
201        let id = GAUGE_I64_IDS.fetch_add(1, Ordering::Relaxed);
202        Self {
203            id,
204            shard_count,
205            max_series,
206            shard_mask: shard_count - 1,
207            index_shards: (0..shard_count)
208                .map(|_| CachePadded::new(RwLock::new(HashMap::new())))
209                .collect(),
210            series_count: AtomicUsize::new(0),
211            overflow_count: AtomicU64::new(0),
212        }
213    }
214
215    /// Resolve a reusable series handle for `labels`.
216    pub fn series(&self, labels: &[(&str, &str)]) -> DynamicGaugeI64Series {
217        if let Some(series) = self.cached_series(labels) {
218            return DynamicGaugeI64Series {
219                series,
220                shard_mask: self.shard_mask,
221            };
222        }
223        let series = self.lookup_or_create(labels);
224        self.update_cache(labels, &series);
225        DynamicGaugeI64Series {
226            series,
227            shard_mask: self.shard_mask,
228        }
229    }
230
231    /// Increments the gauge identified by `labels` by 1.
232    #[inline]
233    pub fn inc(&self, labels: &[(&str, &str)]) {
234        self.add(labels, 1);
235    }
236
237    /// Decrements the gauge identified by `labels` by 1.
238    #[inline]
239    pub fn dec(&self, labels: &[(&str, &str)]) {
240        self.add(labels, -1);
241    }
242
243    /// Adds `value` to the gauge identified by `labels` (can be negative).
244    #[inline]
245    pub fn add(&self, labels: &[(&str, &str)], value: i64) {
246        if let Some(series) = self.cached_series(labels) {
247            let shard_idx = thread_id() & self.shard_mask;
248            series.add_at(shard_idx, value);
249            return;
250        }
251
252        let series = self.lookup_or_create(labels);
253        self.update_cache(labels, &series);
254        let shard_idx = thread_id() & self.shard_mask;
255        series.add_at(shard_idx, value);
256    }
257
258    /// Sets the gauge identified by `labels` to an absolute value.
259    #[inline]
260    pub fn set(&self, labels: &[(&str, &str)], value: i64) {
261        if let Some(series) = self.cached_series(labels) {
262            let shard_idx = thread_id() & self.shard_mask;
263            series.set_at(shard_idx, value);
264            return;
265        }
266
267        let series = self.lookup_or_create(labels);
268        self.update_cache(labels, &series);
269        let shard_idx = thread_id() & self.shard_mask;
270        series.set_at(shard_idx, value);
271    }
272
273    /// Gets the current value for the gauge identified by `labels`.
274    pub fn get(&self, labels: &[(&str, &str)]) -> i64 {
275        let key = DynamicLabelSet::from_pairs(labels);
276        let index_shard = self.index_shard_for(&key);
277        self.index_shards[index_shard]
278            .read()
279            .get(&key)
280            .map(|series| series.sum())
281            .unwrap_or(0)
282    }
283
284    /// Sums all series.
285    pub fn sum_all(&self) -> i64 {
286        self.snapshot().into_iter().map(|(_, value)| value).sum()
287    }
288
289    /// Returns a snapshot of all label-set/value pairs.
290    pub fn snapshot(&self) -> Vec<(DynamicLabelSet, i64)> {
291        let mut out = Vec::new();
292        for shard in &self.index_shards {
293            let guard = shard.read();
294            for (labels, series) in guard.iter() {
295                out.push((labels.clone(), series.sum()));
296            }
297        }
298        out
299    }
300
301    /// Returns the current number of distinct label sets.
302    pub fn cardinality(&self) -> usize {
303        self.index_shards
304            .iter()
305            .map(|shard| shard.read().len())
306            .sum()
307    }
308
309    /// Returns the number of records routed to the overflow bucket.
310    ///
311    /// A non-zero value indicates the cardinality cap was hit and label
312    /// fidelity is being lost. Use this to alert on cardinality pressure.
313    pub fn overflow_count(&self) -> u64 {
314        self.overflow_count.load(Ordering::Relaxed)
315    }
316
317    /// Iterate all series without cloning label sets.
318    ///
319    /// Calls `f` with borrowed label pairs and the current value for each series.
320    /// Used by exporters to avoid the intermediate `snapshot()` allocation.
321    /// The callback runs while a shard read lock is held; it must be fast and
322    /// must not call back into this metric.
323    #[doc(hidden)]
324    pub fn visit_series(&self, mut f: impl FnMut(&[(String, String)], i64)) {
325        for shard in &self.index_shards {
326            let guard = shard.read();
327            for (labels, series) in guard.iter() {
328                f(labels.pairs(), series.sum());
329            }
330        }
331    }
332
333    /// Evict series that haven't been accessed for `max_staleness` cycles.
334    ///
335    /// Call this after `advance_cycle()` in your exporter task.
336    /// Series are marked as evicted (so cached handles see the tombstone),
337    /// then removed from the index.
338    ///
339    /// Protected series (Arc::strong_count > 1) are never evicted - someone
340    /// holds a DynamicGaugeI64Series handle to them.
341    ///
342    /// Returns the number of series evicted.
343    #[cfg(feature = "eviction")]
344    pub fn evict_stale(&self, max_staleness: u32) -> usize {
345        let cycle = current_cycle();
346        let mut removed = 0;
347
348        for shard in &self.index_shards {
349            let mut guard = shard.write();
350            guard.retain(|_labels, series| {
351                // Protected if someone holds a handle (strong_count > 1 means
352                // both the map and at least one DynamicGaugeI64Series hold refs)
353                if Arc::strong_count(series) > 1 {
354                    return true;
355                }
356                // Otherwise check timestamp staleness
357                let last = series.last_accessed_cycle.load(Ordering::Relaxed);
358                let stale = cycle.saturating_sub(last) > max_staleness;
359                if stale {
360                    series.mark_evicted();
361                    removed += 1;
362                    self.series_count.fetch_sub(1, Ordering::Relaxed);
363                }
364                !stale
365            });
366        }
367
368        removed
369    }
370
371    fn lookup_or_create(&self, labels: &[(&str, &str)]) -> Arc<GaugeI64Series> {
372        let requested_key = DynamicLabelSet::from_pairs(labels);
373        let requested_shard = self.index_shard_for(&requested_key);
374        #[cfg(feature = "eviction")]
375        let cycle = current_cycle();
376
377        // Fast path: read lock only.
378        if let Some(series) = self.index_shards[requested_shard]
379            .read()
380            .get(&requested_key)
381        {
382            #[cfg(feature = "eviction")]
383            series.touch(cycle);
384            return Arc::clone(series);
385        }
386
387        // Check cardinality cap BEFORE taking any write lock (lock-free).
388        let key = if self.max_series > 0
389            && self.series_count.load(Ordering::Relaxed) >= self.max_series
390        {
391            self.overflow_count.fetch_add(1, Ordering::Relaxed);
392            DynamicLabelSet::from_pairs(&[(OVERFLOW_LABEL_KEY, OVERFLOW_LABEL_VALUE)])
393        } else {
394            requested_key
395        };
396        let shard = self.index_shard_for(&key);
397
398        if let Some(series) = self.index_shards[shard].read().get(&key) {
399            #[cfg(feature = "eviction")]
400            series.touch(cycle);
401            return Arc::clone(series);
402        }
403
404        let mut guard = self.index_shards[shard].write();
405        if let Some(series) = guard.get(&key) {
406            #[cfg(feature = "eviction")]
407            series.touch(cycle);
408            return Arc::clone(series);
409        }
410        #[cfg(feature = "eviction")]
411        let series = Arc::new(GaugeI64Series::new(self.shard_count, cycle));
412        #[cfg(not(feature = "eviction"))]
413        let series = Arc::new(GaugeI64Series::new(self.shard_count));
414        guard.insert(key, Arc::clone(&series));
415        self.series_count.fetch_add(1, Ordering::Relaxed);
416        series
417    }
418
419    fn index_shard_for(&self, key: &DynamicLabelSet) -> usize {
420        let mut hasher = std::collections::hash_map::DefaultHasher::new();
421        key.hash(&mut hasher);
422        (hasher.finish() as usize) & self.shard_mask
423    }
424
425    fn cached_series(&self, labels: &[(&str, &str)]) -> Option<Arc<GaugeI64Series>> {
426        SERIES_CACHE.with(|cache| {
427            let series = cache.borrow_mut().get(self.id, labels)?;
428            #[cfg(feature = "eviction")]
429            series.touch(current_cycle());
430            Some(series)
431        })
432    }
433
434    fn update_cache(&self, labels: &[(&str, &str)], series: &Arc<GaugeI64Series>) {
435        SERIES_CACHE.with(|cache| {
436            cache
437                .borrow_mut()
438                .insert(self.id, labels, Arc::downgrade(series));
439        });
440    }
441}
442
443#[cfg(test)]
444mod tests {
445    #[cfg(feature = "eviction")]
446    use super::super::advance_cycle;
447    use super::*;
448
449    #[test]
450    fn test_basic_operations() {
451        let gauge = DynamicGaugeI64::new(4);
452        gauge.inc(&[("endpoint_id", "ep1")]);
453        gauge.add(&[("endpoint_id", "ep1")], 2);
454
455        assert_eq!(gauge.get(&[("endpoint_id", "ep1")]), 3);
456
457        gauge.dec(&[("endpoint_id", "ep1")]);
458        assert_eq!(gauge.get(&[("endpoint_id", "ep1")]), 2);
459
460        gauge.add(&[("endpoint_id", "ep1")], -2);
461        assert_eq!(gauge.get(&[("endpoint_id", "ep1")]), 0);
462    }
463
464    #[test]
465    fn test_series_handle() {
466        let gauge = DynamicGaugeI64::new(4);
467        let series = gauge.series(&[("endpoint_id", "ep1")]);
468        series.inc();
469        series.inc();
470        series.dec();
471
472        assert_eq!(series.get(), 1);
473        assert_eq!(gauge.get(&[("endpoint_id", "ep1")]), 1);
474    }
475
476    #[test]
477    fn test_snapshot() {
478        let gauge = DynamicGaugeI64::new(4);
479        gauge.add(&[("endpoint_id", "ep1")], 10);
480        gauge.add(&[("endpoint_id", "ep2")], 20);
481
482        let snap = gauge.snapshot();
483        assert_eq!(snap.len(), 2);
484
485        let total: i64 = snap.iter().map(|(_, v)| v).sum();
486        assert_eq!(total, 30);
487    }
488
489    #[cfg(feature = "eviction")]
490    #[test]
491    fn test_evict_stale() {
492        let gauge = DynamicGaugeI64::new(4);
493        let labels = &[("endpoint_id", "evict_i64")];
494
495        gauge.add(labels, 5);
496        assert_eq!(gauge.cardinality(), 1);
497
498        // Advance cycles past staleness threshold
499        advance_cycle();
500        advance_cycle();
501
502        // Flush thread-local cache by accessing a different label set
503        gauge.add(&[("flush", "cache")], 1);
504
505        let removed = gauge.evict_stale(1);
506        assert_eq!(removed, 1);
507        assert_eq!(gauge.cardinality(), 1); // flush series remains
508        assert_eq!(gauge.get(labels), 0);
509    }
510
511    #[cfg(feature = "eviction")]
512    #[test]
513    fn test_series_handle_protects_from_eviction() {
514        let gauge = DynamicGaugeI64::new(4);
515        let labels = &[("endpoint_id", "tombstone_i64")];
516
517        let series = gauge.series(labels);
518        series.add(5);
519        assert!(!series.is_evicted());
520
521        // Try to evict - but handle protects the series
522        advance_cycle();
523        advance_cycle();
524        let removed = gauge.evict_stale(1);
525
526        // Handle protects series from eviction (Arc::strong_count > 1)
527        assert_eq!(removed, 0);
528        assert!(!series.is_evicted());
529        assert_eq!(gauge.get(labels), 5);
530    }
531
532    #[test]
533    fn test_overflow_bucket_routes_new_series_at_capacity() {
534        let gauge = DynamicGaugeI64::with_max_series(4, 1);
535        gauge.add(&[("endpoint_id", "1")], 1);
536        gauge.add(&[("endpoint_id", "2")], 2);
537
538        assert_eq!(gauge.cardinality(), 2);
539        assert_eq!(gauge.get(&[(OVERFLOW_LABEL_KEY, OVERFLOW_LABEL_VALUE)]), 2);
540    }
541}