Skip to main content

fast_telemetry/metric/dynamic/
counter.rs

1//! Runtime-labeled counter for dynamic dimensions.
2
3use super::cache::{CacheableSeries, LabelCache, SERIES_CACHE_SIZE};
4#[cfg(feature = "eviction")]
5use super::current_cycle;
6use super::{COUNTER_IDS, DynamicLabelSet, thread_id};
7use crossbeam_utils::CachePadded;
8use parking_lot::RwLock;
9use std::cell::RefCell;
10use std::collections::HashMap;
11use std::hash::{Hash, Hasher};
12#[cfg(feature = "eviction")]
13use std::sync::atomic::AtomicU32;
14use std::sync::atomic::{AtomicBool, AtomicIsize, AtomicU64, AtomicUsize, Ordering};
15use std::sync::{Arc, Weak};
16
17const DEFAULT_MAX_SERIES: usize = 2000;
18const OVERFLOW_LABEL_KEY: &str = "__ft_overflow";
19const OVERFLOW_LABEL_VALUE: &str = "true";
20
21struct CounterSeries {
22    cells: Vec<CachePadded<AtomicIsize>>,
23    /// Tombstone flag set by exporter before removing from map.
24    /// Checked in cached_series() to invalidate stale cache entries.
25    evicted: AtomicBool,
26    /// Last export cycle when this series was accessed.
27    /// Used for staleness-based eviction.
28    #[cfg(feature = "eviction")]
29    last_accessed_cycle: AtomicU32,
30}
31
32type CounterIndexShard = CachePadded<RwLock<HashMap<DynamicLabelSet, Arc<CounterSeries>>>>;
33
34impl CounterSeries {
35    #[cfg(feature = "eviction")]
36    fn new(shard_count: usize, current_cycle: u32) -> Self {
37        Self {
38            cells: (0..shard_count)
39                .map(|_| CachePadded::new(AtomicIsize::new(0)))
40                .collect(),
41            evicted: AtomicBool::new(false),
42            last_accessed_cycle: AtomicU32::new(current_cycle),
43        }
44    }
45
46    #[cfg(not(feature = "eviction"))]
47    fn new(shard_count: usize) -> Self {
48        Self {
49            cells: (0..shard_count)
50                .map(|_| CachePadded::new(AtomicIsize::new(0)))
51                .collect(),
52            evicted: AtomicBool::new(false),
53        }
54    }
55
56    #[inline]
57    fn add_at(&self, shard_idx: usize, value: isize) {
58        self.cells[shard_idx].fetch_add(value, Ordering::Relaxed);
59        // Note: timestamp updated on slow path (lookup/cache miss) to avoid
60        // global atomic read on every increment.
61    }
62
63    /// Touch the series timestamp. Called on slow path only.
64    #[cfg(feature = "eviction")]
65    #[inline]
66    fn touch(&self, cycle: u32) {
67        self.last_accessed_cycle.store(cycle, Ordering::Relaxed);
68    }
69
70    #[inline]
71    fn sum(&self) -> isize {
72        self.cells
73            .iter()
74            .map(|cell| cell.load(Ordering::Relaxed))
75            .sum()
76    }
77
78    #[inline]
79    fn is_evicted(&self) -> bool {
80        self.evicted.load(Ordering::Relaxed)
81    }
82
83    #[cfg(feature = "eviction")]
84    fn mark_evicted(&self) {
85        self.evicted.store(true, Ordering::Relaxed);
86    }
87}
88
89impl CacheableSeries for CounterSeries {
90    fn is_evicted(&self) -> bool {
91        self.is_evicted()
92    }
93}
94
95/// A reusable handle to a dynamic-label counter series.
96///
97/// Use this for hot paths to avoid per-update label canonicalization and map
98/// lookups. Resolve once with `DynamicCounter::series(...)`, then call `inc()`
99/// / `add()` on the handle.
100#[derive(Clone)]
101pub struct DynamicCounterSeries {
102    series: Arc<CounterSeries>,
103    shard_mask: usize,
104}
105
106impl DynamicCounterSeries {
107    /// Increment this series by 1.
108    #[inline]
109    pub fn inc(&self) {
110        self.add(1);
111    }
112
113    /// Add `value` to this series.
114    #[inline]
115    pub fn add(&self, value: isize) {
116        let shard_idx = thread_id() & self.shard_mask;
117        self.series.add_at(shard_idx, value);
118    }
119
120    /// Get this series total across shards.
121    #[inline]
122    pub fn get(&self) -> isize {
123        self.series.sum()
124    }
125
126    /// Check if this series handle has been evicted.
127    ///
128    /// If true, writes go to a detached series that is no longer exported.
129    /// Callers holding long-lived handles can check this and re-resolve
130    /// via `DynamicCounter::series()` if needed.
131    #[inline]
132    pub fn is_evicted(&self) -> bool {
133        self.series.is_evicted()
134    }
135}
136
137thread_local! {
138    static SERIES_CACHE: RefCell<LabelCache<Weak<CounterSeries>, SERIES_CACHE_SIZE>> =
139        RefCell::new(LabelCache::new());
140}
141
142/// Counter keyed by runtime label sets.
143///
144/// Uses a sharded index for key->series lookup and per-series sharded atomics
145/// for fast updates.
146pub struct DynamicCounter {
147    id: usize,
148    shard_count: usize,
149    max_series: usize,
150    shard_mask: usize,
151    index_shards: Vec<CounterIndexShard>,
152    /// Approximate total series count across all shards for fast cap checks.
153    series_count: AtomicUsize,
154    /// Count of records routed to overflow bucket due to cardinality cap.
155    overflow_count: AtomicU64,
156}
157
158impl DynamicCounter {
159    /// Creates a new runtime-labeled counter.
160    pub fn new(shard_count: usize) -> Self {
161        Self::with_max_series(shard_count, DEFAULT_MAX_SERIES)
162    }
163
164    /// Creates a new runtime-labeled counter with a series cardinality cap.
165    ///
166    /// When the number of unique label sets approximately reaches `max_series`,
167    /// new label sets are redirected into a single overflow series
168    /// (`__ft_overflow=true`). The cap is checked via a lock-free atomic counter,
169    /// so concurrent inserts may briefly overshoot by the number of in-flight
170    /// writers before the overflow kicks in.
171    pub fn with_max_series(shard_count: usize, max_series: usize) -> Self {
172        let shard_count = shard_count.next_power_of_two();
173        let id = COUNTER_IDS.fetch_add(1, Ordering::Relaxed);
174        Self {
175            id,
176            shard_count,
177            max_series,
178            shard_mask: shard_count - 1,
179            index_shards: (0..shard_count)
180                .map(|_| CachePadded::new(RwLock::new(HashMap::new())))
181                .collect(),
182            series_count: AtomicUsize::new(0),
183            overflow_count: AtomicU64::new(0),
184        }
185    }
186
187    /// Resolve a reusable series handle for `labels`.
188    ///
189    /// Preferred for hot paths when labels come from a finite active set.
190    pub fn series(&self, labels: &[(&str, &str)]) -> DynamicCounterSeries {
191        if let Some(series) = self.cached_series(labels) {
192            return DynamicCounterSeries {
193                series,
194                shard_mask: self.shard_mask,
195            };
196        }
197        let series = self.lookup_or_create(labels);
198        self.update_cache(labels, &series);
199        DynamicCounterSeries {
200            series,
201            shard_mask: self.shard_mask,
202        }
203    }
204
205    /// Increments the series identified by `labels` by 1.
206    #[inline]
207    pub fn inc(&self, labels: &[(&str, &str)]) {
208        self.add(labels, 1);
209    }
210
211    /// Adds `value` to the series identified by `labels`.
212    #[inline]
213    pub fn add(&self, labels: &[(&str, &str)], value: isize) {
214        if let Some(series) = self.cached_series(labels) {
215            let shard_idx = thread_id() & self.shard_mask;
216            series.add_at(shard_idx, value);
217            return;
218        }
219
220        let series = self.lookup_or_create(labels);
221        self.update_cache(labels, &series);
222        let shard_idx = thread_id() & self.shard_mask;
223        series.add_at(shard_idx, value);
224    }
225
226    /// Gets the current value for the series identified by `labels`.
227    pub fn get(&self, labels: &[(&str, &str)]) -> isize {
228        let key = DynamicLabelSet::from_pairs(labels);
229        let index_shard = self.index_shard_for(&key);
230        self.index_shards[index_shard]
231            .read()
232            .get(&key)
233            .map(|series| series.sum())
234            .unwrap_or(0)
235    }
236
237    /// Sums all series.
238    pub fn sum_all(&self) -> isize {
239        self.snapshot().into_iter().map(|(_, value)| value).sum()
240    }
241
242    /// Returns a snapshot of all label-set/count pairs.
243    pub fn snapshot(&self) -> Vec<(DynamicLabelSet, isize)> {
244        let mut out = Vec::new();
245        for shard in &self.index_shards {
246            let guard = shard.read();
247            for (labels, series) in guard.iter() {
248                out.push((labels.clone(), series.sum()));
249            }
250        }
251        out
252    }
253
254    /// Returns the current number of distinct label sets.
255    pub fn cardinality(&self) -> usize {
256        self.index_shards
257            .iter()
258            .map(|shard| shard.read().len())
259            .sum()
260    }
261
262    /// Returns the number of records routed to the overflow bucket.
263    ///
264    /// A non-zero value indicates the cardinality cap was hit and label
265    /// fidelity is being lost. Use this to alert on cardinality pressure.
266    pub fn overflow_count(&self) -> u64 {
267        self.overflow_count.load(Ordering::Relaxed)
268    }
269
270    /// Iterate all series without cloning label sets.
271    ///
272    /// Calls `f` with borrowed label pairs and the current sum for each series.
273    /// Used by exporters/macros to avoid the intermediate `snapshot()` allocation.
274    /// The callback runs while a shard read lock is held; it must be fast and
275    /// must not call back into this metric.
276    #[doc(hidden)]
277    pub fn visit_series(&self, mut f: impl FnMut(&[(String, String)], isize)) {
278        for shard in &self.index_shards {
279            let guard = shard.read();
280            for (labels, series) in guard.iter() {
281                f(labels.pairs(), series.sum());
282            }
283        }
284    }
285
286    /// Evict series that haven't been accessed for `max_staleness` cycles.
287    ///
288    /// Call this after `advance_cycle()` in your exporter task.
289    /// Series are marked as evicted (so cached handles see the tombstone),
290    /// then removed from the index.
291    ///
292    /// Protected series (Arc::strong_count > 1) are never evicted - someone
293    /// holds a DynamicCounterSeries handle to them.
294    ///
295    /// Returns the number of series evicted.
296    #[cfg(feature = "eviction")]
297    pub fn evict_stale(&self, max_staleness: u32) -> usize {
298        let cycle = current_cycle();
299        let mut removed = 0;
300
301        for shard in &self.index_shards {
302            let mut guard = shard.write();
303            guard.retain(|_labels, series| {
304                // Protected if someone holds a handle (strong_count > 1 means
305                // both the map and at least one DynamicCounterSeries hold refs)
306                if Arc::strong_count(series) > 1 {
307                    return true;
308                }
309                // Otherwise check timestamp staleness
310                let last = series.last_accessed_cycle.load(Ordering::Relaxed);
311                let stale = cycle.saturating_sub(last) > max_staleness;
312                if stale {
313                    series.mark_evicted();
314                    removed += 1;
315                    self.series_count.fetch_sub(1, Ordering::Relaxed);
316                }
317                !stale
318            });
319        }
320
321        removed
322    }
323
324    fn lookup_or_create(&self, labels: &[(&str, &str)]) -> Arc<CounterSeries> {
325        let requested_key = DynamicLabelSet::from_pairs(labels);
326        let requested_shard = self.index_shard_for(&requested_key);
327        #[cfg(feature = "eviction")]
328        let cycle = current_cycle();
329
330        // Fast path: read lock only.
331        if let Some(series) = self.index_shards[requested_shard]
332            .read()
333            .get(&requested_key)
334        {
335            #[cfg(feature = "eviction")]
336            series.touch(cycle);
337            return Arc::clone(series);
338        }
339
340        // Check cardinality cap BEFORE taking any write lock (lock-free).
341        // series_count is approximate — concurrent inserts may briefly exceed the
342        // cap by the number of in-flight writers, but it cannot deadlock and the
343        // overshoot is bounded by thread count, not by workload cardinality.
344        let key = if self.max_series > 0
345            && self.series_count.load(Ordering::Relaxed) >= self.max_series
346        {
347            self.overflow_count.fetch_add(1, Ordering::Relaxed);
348            DynamicLabelSet::from_pairs(&[(OVERFLOW_LABEL_KEY, OVERFLOW_LABEL_VALUE)])
349        } else {
350            requested_key
351        };
352        let shard = self.index_shard_for(&key);
353
354        // Check read lock on the (possibly redirected) shard.
355        if let Some(series) = self.index_shards[shard].read().get(&key) {
356            #[cfg(feature = "eviction")]
357            series.touch(cycle);
358            return Arc::clone(series);
359        }
360
361        let mut guard = self.index_shards[shard].write();
362        if let Some(series) = guard.get(&key) {
363            #[cfg(feature = "eviction")]
364            series.touch(cycle);
365            return Arc::clone(series);
366        }
367        #[cfg(feature = "eviction")]
368        let series = Arc::new(CounterSeries::new(self.shard_count, cycle));
369        #[cfg(not(feature = "eviction"))]
370        let series = Arc::new(CounterSeries::new(self.shard_count));
371        guard.insert(key, Arc::clone(&series));
372        self.series_count.fetch_add(1, Ordering::Relaxed);
373        series
374    }
375
376    fn index_shard_for(&self, key: &DynamicLabelSet) -> usize {
377        let mut hasher = std::collections::hash_map::DefaultHasher::new();
378        key.hash(&mut hasher);
379        (hasher.finish() as usize) & self.shard_mask
380    }
381
382    fn cached_series(&self, labels: &[(&str, &str)]) -> Option<Arc<CounterSeries>> {
383        SERIES_CACHE.with(|cache| {
384            let series = cache.borrow_mut().get(self.id, labels)?;
385            #[cfg(feature = "eviction")]
386            series.touch(current_cycle());
387            Some(series)
388        })
389    }
390
391    fn update_cache(&self, labels: &[(&str, &str)], series: &Arc<CounterSeries>) {
392        SERIES_CACHE.with(|cache| {
393            cache
394                .borrow_mut()
395                .insert(self.id, labels, Arc::downgrade(series));
396        });
397    }
398}
399
400#[cfg(test)]
401mod tests {
402    #[cfg(feature = "eviction")]
403    use super::super::advance_cycle;
404    use super::*;
405
406    #[test]
407    fn test_basic_operations() {
408        let counter = DynamicCounter::new(4);
409        counter.inc(&[("org_id", "42"), ("endpoint_uuid", "abc")]);
410        counter.add(&[("org_id", "42"), ("endpoint_uuid", "abc")], 2);
411
412        assert_eq!(
413            counter.get(&[("org_id", "42"), ("endpoint_uuid", "abc")]),
414            3
415        );
416        assert_eq!(counter.sum_all(), 3);
417    }
418
419    #[test]
420    fn test_label_order_is_canonicalized() {
421        let counter = DynamicCounter::new(4);
422        counter.inc(&[("org_id", "42"), ("endpoint_uuid", "abc")]);
423
424        assert_eq!(
425            counter.get(&[("endpoint_uuid", "abc"), ("org_id", "42")]),
426            1
427        );
428    }
429
430    #[test]
431    fn test_series_handle() {
432        let counter = DynamicCounter::new(4);
433        let series = counter.series(&[("org_id", "42"), ("endpoint_uuid", "abc")]);
434        series.inc();
435        series.add(9);
436
437        assert_eq!(series.get(), 10);
438        assert_eq!(
439            counter.get(&[("org_id", "42"), ("endpoint_uuid", "abc")]),
440            10
441        );
442    }
443
444    #[test]
445    fn test_concurrent_adds() {
446        let counter = DynamicCounter::new(8);
447        let series = counter.series(&[("org_id", "42"), ("endpoint_uuid", "abc")]);
448
449        std::thread::scope(|s| {
450            for _ in 0..8 {
451                let series = series.clone();
452                s.spawn(move || {
453                    for _ in 0..10_000 {
454                        series.inc();
455                    }
456                });
457            }
458        });
459
460        assert_eq!(
461            counter.get(&[("org_id", "42"), ("endpoint_uuid", "abc")]),
462            80_000
463        );
464    }
465
466    #[cfg(feature = "eviction")]
467    #[test]
468    fn test_evict_stale() {
469        let counter = DynamicCounter::new(4);
470        let labels = &[("org_id", "42")];
471
472        // Create series and increment
473        counter.inc(labels);
474        assert_eq!(counter.cardinality(), 1);
475        assert_eq!(counter.get(labels), 1);
476
477        // Advance cycle past staleness threshold
478        advance_cycle();
479        advance_cycle();
480
481        // Flush thread-local cache by accessing a different label set
482        counter.inc(&[("flush", "cache")]);
483
484        // Evict series not accessed in last 1 cycle
485        let removed = counter.evict_stale(1);
486        assert_eq!(removed, 1); // Only the original label set, not the flush one
487        assert_eq!(counter.cardinality(), 1); // flush series remains
488
489        // Series is gone - get returns 0
490        assert_eq!(counter.get(labels), 0);
491
492        // New inc creates fresh series
493        counter.inc(labels);
494        assert_eq!(counter.cardinality(), 2);
495        assert_eq!(counter.get(labels), 1);
496    }
497
498    #[cfg(feature = "eviction")]
499    #[test]
500    fn test_evict_stale_keeps_active() {
501        let counter = DynamicCounter::new(4);
502        let active = &[("status", "active")];
503        let stale = &[("status", "stale")];
504
505        // Create both series
506        counter.inc(active);
507        counter.inc(stale);
508        assert_eq!(counter.cardinality(), 2);
509
510        // Advance cycle
511        advance_cycle();
512
513        // Touch only the active series
514        counter.inc(active);
515
516        // Advance again
517        advance_cycle();
518
519        // Evict with staleness of 1 - should only evict 'stale'
520        let removed = counter.evict_stale(1);
521        assert_eq!(removed, 1);
522        assert_eq!(counter.cardinality(), 1);
523        assert_eq!(counter.get(active), 2);
524        assert_eq!(counter.get(stale), 0);
525    }
526
527    #[cfg(feature = "eviction")]
528    #[test]
529    fn test_eviction_tombstone_invalidates_cache() {
530        let counter = DynamicCounter::new(4);
531        let labels = &[("org_id", "evict_test")];
532
533        // Populate the thread-local cache
534        counter.inc(labels);
535        counter.inc(labels); // Second call uses cached series
536        assert_eq!(counter.get(labels), 2);
537
538        // Force eviction by advancing cycles
539        advance_cycle();
540        advance_cycle();
541
542        // Flush thread-local cache by accessing a different label set
543        counter.inc(&[("flush", "cache")]);
544
545        counter.evict_stale(1);
546
547        // Next inc should create fresh series (tombstone invalidates cache)
548        counter.inc(labels);
549        assert_eq!(counter.get(labels), 1); // Fresh series starts at 1, not 3
550    }
551
552    #[cfg(feature = "eviction")]
553    #[test]
554    fn test_series_handle_protects_from_eviction() {
555        let counter = DynamicCounter::new(4);
556        let labels = &[("org_id", "handle_test")];
557
558        // Get a long-lived handle
559        let series = counter.series(labels);
560        series.inc();
561        assert!(!series.is_evicted());
562
563        // Try to evict - but handle protects the series
564        advance_cycle();
565        advance_cycle();
566        let removed = counter.evict_stale(1);
567
568        // Handle protects series from eviction (Arc::strong_count > 1)
569        assert_eq!(removed, 0);
570        assert!(!series.is_evicted());
571        assert_eq!(counter.cardinality(), 1);
572        assert_eq!(counter.get(labels), 1);
573
574        // Writes still work
575        series.inc();
576        assert_eq!(counter.get(labels), 2);
577    }
578
579    #[cfg(feature = "eviction")]
580    #[test]
581    fn test_series_evicted_after_handle_dropped() {
582        let counter = DynamicCounter::new(4);
583        let labels = &[("org_id", "handle_drop_test")];
584
585        // Create series via handle, then drop it
586        {
587            let series = counter.series(labels);
588            series.inc();
589        }
590        // Handle dropped, but thread-local cache still holds reference
591
592        assert_eq!(counter.cardinality(), 1);
593        assert_eq!(counter.get(labels), 1);
594
595        // Advance cycles
596        advance_cycle();
597        advance_cycle();
598
599        // Flush thread-local cache by accessing a different label set
600        counter.inc(&[("flush", "cache")]);
601
602        // Now eviction should work
603        let removed = counter.evict_stale(1);
604        assert_eq!(removed, 1);
605        assert_eq!(counter.get(labels), 0);
606    }
607
608    #[test]
609    fn test_overflow_bucket_routes_new_series_at_capacity() {
610        let counter = DynamicCounter::with_max_series(4, 2);
611
612        counter.inc(&[("org_id", "1")]);
613        counter.inc(&[("org_id", "2")]);
614        counter.inc(&[("org_id", "3")]);
615
616        assert_eq!(counter.cardinality(), 3);
617        assert_eq!(
618            counter.get(&[(OVERFLOW_LABEL_KEY, OVERFLOW_LABEL_VALUE)]),
619            1
620        );
621    }
622
623    #[test]
624    fn test_concurrent_cap_bounded_overshoot() {
625        use std::sync::{Arc, Barrier};
626        use std::thread;
627
628        let cap = 10;
629        let threads = 16;
630        let counter = Arc::new(DynamicCounter::with_max_series(4, cap));
631        let barrier = Arc::new(Barrier::new(threads));
632
633        let handles: Vec<_> = (0..threads)
634            .map(|t| {
635                let counter = Arc::clone(&counter);
636                let barrier = Arc::clone(&barrier);
637                thread::spawn(move || {
638                    barrier.wait();
639                    // Each thread creates a unique label set
640                    for i in 0..5 {
641                        let label = format!("t{t}_s{i}");
642                        counter.inc(&[("key", &label)]);
643                    }
644                })
645            })
646            .collect();
647
648        for h in handles {
649            h.join().unwrap();
650        }
651
652        let card = counter.cardinality();
653        // Cap is approximate: may overshoot by at most thread count, but must
654        // not grow unboundedly (80 distinct labels were attempted).
655        assert!(
656            card <= cap + threads + 1, // +1 for the overflow bucket
657            "cardinality {card} exceeded bounded overshoot (cap={cap}, threads={threads})"
658        );
659        // Must have hit overflow at least once
660        assert!(
661            counter.overflow_count() > 0,
662            "overflow should have triggered"
663        );
664    }
665
666    #[cfg(feature = "eviction")]
667    #[test]
668    fn test_eviction_and_reinsertion_bookkeeping() {
669        let counter = DynamicCounter::with_max_series(4, 3);
670
671        counter.inc(&[("k", "a")]);
672        counter.inc(&[("k", "b")]);
673        counter.inc(&[("k", "c")]);
674        assert_eq!(counter.cardinality(), 3);
675
676        counter.inc(&[("k", "d")]);
677        assert!(counter.overflow_count() > 0);
678        let card_after_overflow = counter.cardinality();
679        assert!(card_after_overflow <= 4);
680
681        advance_cycle();
682        advance_cycle();
683        advance_cycle();
684        counter.inc(&[("flush", "cache")]);
685        let evicted = counter.evict_stale(1);
686        assert!(evicted > 0);
687
688        let card_after_evict = counter.cardinality();
689        assert!(
690            card_after_evict < card_after_overflow,
691            "cardinality should decrease after eviction: before={card_after_overflow} after={card_after_evict}"
692        );
693
694        let overflow_before = counter.overflow_count();
695        counter.inc(&[("k", "new1")]);
696        counter.inc(&[("k", "new2")]);
697
698        assert!(counter.cardinality() <= 5);
699
700        let overflow_after = counter.overflow_count();
701        assert!(
702            overflow_after - overflow_before <= 1,
703            "unexpected overflow after eviction freed space"
704        );
705    }
706}