Skip to main content

fast_telemetry/metric/dynamic/
counter.rs

1//! Runtime-labeled counter for dynamic dimensions.
2
3#[cfg(feature = "eviction")]
4use super::current_cycle;
5use super::{COUNTER_IDS, DynamicLabelSet, thread_id};
6use crossbeam_utils::CachePadded;
7use parking_lot::RwLock;
8use std::cell::RefCell;
9use std::collections::HashMap;
10use std::hash::{Hash, Hasher};
11#[cfg(feature = "eviction")]
12use std::sync::atomic::AtomicU32;
13use std::sync::atomic::{AtomicBool, AtomicIsize, AtomicU64, AtomicUsize, Ordering};
14use std::sync::{Arc, Weak};
15
16const DEFAULT_MAX_SERIES: usize = 2000;
17const OVERFLOW_LABEL_KEY: &str = "__ft_overflow";
18const OVERFLOW_LABEL_VALUE: &str = "true";
19
20struct CounterSeries {
21    cells: Vec<CachePadded<AtomicIsize>>,
22    /// Tombstone flag set by exporter before removing from map.
23    /// Checked in cached_series() to invalidate stale cache entries.
24    evicted: AtomicBool,
25    /// Last export cycle when this series was accessed.
26    /// Used for staleness-based eviction.
27    #[cfg(feature = "eviction")]
28    last_accessed_cycle: AtomicU32,
29}
30
31type CounterIndexShard = CachePadded<RwLock<HashMap<DynamicLabelSet, Arc<CounterSeries>>>>;
32
33impl CounterSeries {
34    #[cfg(feature = "eviction")]
35    fn new(shard_count: usize, current_cycle: u32) -> Self {
36        Self {
37            cells: (0..shard_count)
38                .map(|_| CachePadded::new(AtomicIsize::new(0)))
39                .collect(),
40            evicted: AtomicBool::new(false),
41            last_accessed_cycle: AtomicU32::new(current_cycle),
42        }
43    }
44
45    #[cfg(not(feature = "eviction"))]
46    fn new(shard_count: usize) -> Self {
47        Self {
48            cells: (0..shard_count)
49                .map(|_| CachePadded::new(AtomicIsize::new(0)))
50                .collect(),
51            evicted: AtomicBool::new(false),
52        }
53    }
54
55    #[inline]
56    fn add_at(&self, shard_idx: usize, value: isize) {
57        self.cells[shard_idx].fetch_add(value, Ordering::Relaxed);
58        // Note: timestamp updated on slow path (lookup/cache miss) to avoid
59        // global atomic read on every increment.
60    }
61
62    /// Touch the series timestamp. Called on slow path only.
63    #[cfg(feature = "eviction")]
64    #[inline]
65    fn touch(&self, cycle: u32) {
66        self.last_accessed_cycle.store(cycle, Ordering::Relaxed);
67    }
68
69    #[inline]
70    fn sum(&self) -> isize {
71        self.cells
72            .iter()
73            .map(|cell| cell.load(Ordering::Relaxed))
74            .sum()
75    }
76
77    #[inline]
78    fn is_evicted(&self) -> bool {
79        self.evicted.load(Ordering::Relaxed)
80    }
81
82    #[cfg(feature = "eviction")]
83    fn mark_evicted(&self) {
84        self.evicted.store(true, Ordering::Relaxed);
85    }
86}
87
88/// A reusable handle to a dynamic-label counter series.
89///
90/// Use this for hot paths to avoid per-update label canonicalization and map
91/// lookups. Resolve once with `DynamicCounter::series(...)`, then call `inc()`
92/// / `add()` on the handle.
93#[derive(Clone)]
94pub struct DynamicCounterSeries {
95    series: Arc<CounterSeries>,
96    shard_mask: usize,
97}
98
99impl DynamicCounterSeries {
100    /// Increment this series by 1.
101    #[inline]
102    pub fn inc(&self) {
103        self.add(1);
104    }
105
106    /// Add `value` to this series.
107    #[inline]
108    pub fn add(&self, value: isize) {
109        let shard_idx = thread_id() & self.shard_mask;
110        self.series.add_at(shard_idx, value);
111    }
112
113    /// Get this series total across shards.
114    #[inline]
115    pub fn get(&self) -> isize {
116        self.series.sum()
117    }
118
119    /// Check if this series handle has been evicted.
120    ///
121    /// If true, writes go to a detached series that is no longer exported.
122    /// Callers holding long-lived handles can check this and re-resolve
123    /// via `DynamicCounter::series()` if needed.
124    #[inline]
125    pub fn is_evicted(&self) -> bool {
126        self.series.is_evicted()
127    }
128}
129
130struct SeriesCacheEntry {
131    counter_id: usize,
132    ordered_labels: Vec<(String, String)>,
133    series: Weak<CounterSeries>,
134}
135
136thread_local! {
137    static SERIES_CACHE: RefCell<Option<SeriesCacheEntry>> = const { RefCell::new(None) };
138}
139
140/// Counter keyed by runtime label sets.
141///
142/// Uses a sharded index for key->series lookup and per-series sharded atomics
143/// for fast updates.
144pub struct DynamicCounter {
145    id: usize,
146    shard_count: usize,
147    max_series: usize,
148    shard_mask: usize,
149    index_shards: Vec<CounterIndexShard>,
150    /// Approximate total series count across all shards for fast cap checks.
151    series_count: AtomicUsize,
152    /// Count of records routed to overflow bucket due to cardinality cap.
153    overflow_count: AtomicU64,
154}
155
156impl DynamicCounter {
157    /// Creates a new runtime-labeled counter.
158    pub fn new(shard_count: usize) -> Self {
159        Self::with_max_series(shard_count, DEFAULT_MAX_SERIES)
160    }
161
162    /// Creates a new runtime-labeled counter with a series cardinality cap.
163    ///
164    /// When the number of unique label sets approximately reaches `max_series`,
165    /// new label sets are redirected into a single overflow series
166    /// (`__ft_overflow=true`). The cap is checked via a lock-free atomic counter,
167    /// so concurrent inserts may briefly overshoot by the number of in-flight
168    /// writers before the overflow kicks in.
169    pub fn with_max_series(shard_count: usize, max_series: usize) -> Self {
170        let shard_count = shard_count.next_power_of_two();
171        let id = COUNTER_IDS.fetch_add(1, Ordering::Relaxed);
172        Self {
173            id,
174            shard_count,
175            max_series,
176            shard_mask: shard_count - 1,
177            index_shards: (0..shard_count)
178                .map(|_| CachePadded::new(RwLock::new(HashMap::new())))
179                .collect(),
180            series_count: AtomicUsize::new(0),
181            overflow_count: AtomicU64::new(0),
182        }
183    }
184
185    /// Resolve a reusable series handle for `labels`.
186    ///
187    /// Preferred for hot paths when labels come from a finite active set.
188    pub fn series(&self, labels: &[(&str, &str)]) -> DynamicCounterSeries {
189        if let Some(series) = self.cached_series(labels) {
190            return DynamicCounterSeries {
191                series,
192                shard_mask: self.shard_mask,
193            };
194        }
195        let series = self.lookup_or_create(labels);
196        self.update_cache(labels, Arc::clone(&series));
197        DynamicCounterSeries {
198            series,
199            shard_mask: self.shard_mask,
200        }
201    }
202
203    /// Increments the series identified by `labels` by 1.
204    #[inline]
205    pub fn inc(&self, labels: &[(&str, &str)]) {
206        self.add(labels, 1);
207    }
208
209    /// Adds `value` to the series identified by `labels`.
210    #[inline]
211    pub fn add(&self, labels: &[(&str, &str)], value: isize) {
212        if let Some(series) = self.cached_series(labels) {
213            let shard_idx = thread_id() & self.shard_mask;
214            series.add_at(shard_idx, value);
215            return;
216        }
217
218        let series = self.lookup_or_create(labels);
219        self.update_cache(labels, Arc::clone(&series));
220        let shard_idx = thread_id() & self.shard_mask;
221        series.add_at(shard_idx, value);
222    }
223
224    /// Gets the current value for the series identified by `labels`.
225    pub fn get(&self, labels: &[(&str, &str)]) -> isize {
226        let key = DynamicLabelSet::from_pairs(labels);
227        let index_shard = self.index_shard_for(&key);
228        self.index_shards[index_shard]
229            .read()
230            .get(&key)
231            .map(|series| series.sum())
232            .unwrap_or(0)
233    }
234
235    /// Sums all series.
236    pub fn sum_all(&self) -> isize {
237        self.snapshot().into_iter().map(|(_, value)| value).sum()
238    }
239
240    /// Returns a snapshot of all label-set/count pairs.
241    pub fn snapshot(&self) -> Vec<(DynamicLabelSet, isize)> {
242        let mut out = Vec::new();
243        for shard in &self.index_shards {
244            let guard = shard.read();
245            for (labels, series) in guard.iter() {
246                out.push((labels.clone(), series.sum()));
247            }
248        }
249        out
250    }
251
252    /// Returns the current number of distinct label sets.
253    pub fn cardinality(&self) -> usize {
254        self.index_shards
255            .iter()
256            .map(|shard| shard.read().len())
257            .sum()
258    }
259
260    /// Returns the number of records routed to the overflow bucket.
261    ///
262    /// A non-zero value indicates the cardinality cap was hit and label
263    /// fidelity is being lost. Use this to alert on cardinality pressure.
264    pub fn overflow_count(&self) -> u64 {
265        self.overflow_count.load(Ordering::Relaxed)
266    }
267
268    /// Iterate all series without cloning label sets.
269    ///
270    /// Calls `f` with borrowed label pairs and the current sum for each series.
271    /// Used by exporters/macros to avoid the intermediate `snapshot()` allocation.
272    #[doc(hidden)]
273    pub fn visit_series(&self, mut f: impl FnMut(&[(String, String)], isize)) {
274        for shard in &self.index_shards {
275            let guard = shard.read();
276            for (labels, series) in guard.iter() {
277                f(labels.pairs(), series.sum());
278            }
279        }
280    }
281
282    /// Evict series that haven't been accessed for `max_staleness` cycles.
283    ///
284    /// Call this after `advance_cycle()` in your exporter task.
285    /// Series are marked as evicted (so cached handles see the tombstone),
286    /// then removed from the index.
287    ///
288    /// Protected series (Arc::strong_count > 1) are never evicted - someone
289    /// holds a DynamicCounterSeries handle to them.
290    ///
291    /// Returns the number of series evicted.
292    #[cfg(feature = "eviction")]
293    pub fn evict_stale(&self, max_staleness: u32) -> usize {
294        let cycle = current_cycle();
295        let mut removed = 0;
296
297        for shard in &self.index_shards {
298            let mut guard = shard.write();
299            guard.retain(|_labels, series| {
300                // Protected if someone holds a handle (strong_count > 1 means
301                // both the map and at least one DynamicCounterSeries hold refs)
302                if Arc::strong_count(series) > 1 {
303                    return true;
304                }
305                // Otherwise check timestamp staleness
306                let last = series.last_accessed_cycle.load(Ordering::Relaxed);
307                let stale = cycle.saturating_sub(last) > max_staleness;
308                if stale {
309                    series.mark_evicted();
310                    removed += 1;
311                    self.series_count.fetch_sub(1, Ordering::Relaxed);
312                }
313                !stale
314            });
315        }
316
317        removed
318    }
319
320    fn lookup_or_create(&self, labels: &[(&str, &str)]) -> Arc<CounterSeries> {
321        let requested_key = DynamicLabelSet::from_pairs(labels);
322        let requested_shard = self.index_shard_for(&requested_key);
323        #[cfg(feature = "eviction")]
324        let cycle = current_cycle();
325
326        // Fast path: read lock only.
327        if let Some(series) = self.index_shards[requested_shard]
328            .read()
329            .get(&requested_key)
330        {
331            #[cfg(feature = "eviction")]
332            series.touch(cycle);
333            return Arc::clone(series);
334        }
335
336        // Check cardinality cap BEFORE taking any write lock (lock-free).
337        // series_count is approximate — concurrent inserts may briefly exceed the
338        // cap by the number of in-flight writers, but it cannot deadlock and the
339        // overshoot is bounded by thread count, not by workload cardinality.
340        let key = if self.max_series > 0
341            && self.series_count.load(Ordering::Relaxed) >= self.max_series
342        {
343            self.overflow_count.fetch_add(1, Ordering::Relaxed);
344            DynamicLabelSet::from_pairs(&[(OVERFLOW_LABEL_KEY, OVERFLOW_LABEL_VALUE)])
345        } else {
346            requested_key
347        };
348        let shard = self.index_shard_for(&key);
349
350        // Check read lock on the (possibly redirected) shard.
351        if let Some(series) = self.index_shards[shard].read().get(&key) {
352            #[cfg(feature = "eviction")]
353            series.touch(cycle);
354            return Arc::clone(series);
355        }
356
357        let mut guard = self.index_shards[shard].write();
358        if let Some(series) = guard.get(&key) {
359            #[cfg(feature = "eviction")]
360            series.touch(cycle);
361            return Arc::clone(series);
362        }
363        #[cfg(feature = "eviction")]
364        let series = Arc::new(CounterSeries::new(self.shard_count, cycle));
365        #[cfg(not(feature = "eviction"))]
366        let series = Arc::new(CounterSeries::new(self.shard_count));
367        guard.insert(key, Arc::clone(&series));
368        self.series_count.fetch_add(1, Ordering::Relaxed);
369        series
370    }
371
372    fn index_shard_for(&self, key: &DynamicLabelSet) -> usize {
373        let mut hasher = std::collections::hash_map::DefaultHasher::new();
374        key.hash(&mut hasher);
375        (hasher.finish() as usize) & self.shard_mask
376    }
377
378    fn cached_series(&self, labels: &[(&str, &str)]) -> Option<Arc<CounterSeries>> {
379        SERIES_CACHE.with(|cache| {
380            let cache_ref = cache.borrow();
381            let entry = cache_ref.as_ref()?;
382            if entry.counter_id != self.id {
383                return None;
384            }
385            if entry.ordered_labels.len() != labels.len() {
386                return None;
387            }
388            for (idx, (k, v)) in labels.iter().enumerate() {
389                let (ek, ev) = &entry.ordered_labels[idx];
390                if ek != k || ev != v {
391                    return None;
392                }
393            }
394            let series = entry.series.upgrade()?;
395            if series.is_evicted() {
396                return None;
397            }
398            #[cfg(feature = "eviction")]
399            series.touch(current_cycle());
400            Some(series)
401        })
402    }
403
404    fn update_cache(&self, labels: &[(&str, &str)], series: Arc<CounterSeries>) {
405        SERIES_CACHE.with(|cache| {
406            let ordered_labels = labels
407                .iter()
408                .map(|(k, v)| ((*k).to_string(), (*v).to_string()))
409                .collect();
410            *cache.borrow_mut() = Some(SeriesCacheEntry {
411                counter_id: self.id,
412                ordered_labels,
413                series: Arc::downgrade(&series),
414            });
415        });
416    }
417}
418
419#[cfg(test)]
420mod tests {
421    #[cfg(feature = "eviction")]
422    use super::super::advance_cycle;
423    use super::*;
424
425    #[test]
426    fn test_basic_operations() {
427        let counter = DynamicCounter::new(4);
428        counter.inc(&[("org_id", "42"), ("endpoint_uuid", "abc")]);
429        counter.add(&[("org_id", "42"), ("endpoint_uuid", "abc")], 2);
430
431        assert_eq!(
432            counter.get(&[("org_id", "42"), ("endpoint_uuid", "abc")]),
433            3
434        );
435        assert_eq!(counter.sum_all(), 3);
436    }
437
438    #[test]
439    fn test_label_order_is_canonicalized() {
440        let counter = DynamicCounter::new(4);
441        counter.inc(&[("org_id", "42"), ("endpoint_uuid", "abc")]);
442
443        assert_eq!(
444            counter.get(&[("endpoint_uuid", "abc"), ("org_id", "42")]),
445            1
446        );
447    }
448
449    #[test]
450    fn test_series_handle() {
451        let counter = DynamicCounter::new(4);
452        let series = counter.series(&[("org_id", "42"), ("endpoint_uuid", "abc")]);
453        series.inc();
454        series.add(9);
455
456        assert_eq!(series.get(), 10);
457        assert_eq!(
458            counter.get(&[("org_id", "42"), ("endpoint_uuid", "abc")]),
459            10
460        );
461    }
462
463    #[test]
464    fn test_concurrent_adds() {
465        let counter = DynamicCounter::new(8);
466        let series = counter.series(&[("org_id", "42"), ("endpoint_uuid", "abc")]);
467
468        std::thread::scope(|s| {
469            for _ in 0..8 {
470                let series = series.clone();
471                s.spawn(move || {
472                    for _ in 0..10_000 {
473                        series.inc();
474                    }
475                });
476            }
477        });
478
479        assert_eq!(
480            counter.get(&[("org_id", "42"), ("endpoint_uuid", "abc")]),
481            80_000
482        );
483    }
484
485    #[cfg(feature = "eviction")]
486    #[test]
487    fn test_evict_stale() {
488        let counter = DynamicCounter::new(4);
489        let labels = &[("org_id", "42")];
490
491        // Create series and increment
492        counter.inc(labels);
493        assert_eq!(counter.cardinality(), 1);
494        assert_eq!(counter.get(labels), 1);
495
496        // Advance cycle past staleness threshold
497        advance_cycle();
498        advance_cycle();
499
500        // Flush thread-local cache by accessing a different label set
501        counter.inc(&[("flush", "cache")]);
502
503        // Evict series not accessed in last 1 cycle
504        let removed = counter.evict_stale(1);
505        assert_eq!(removed, 1); // Only the original label set, not the flush one
506        assert_eq!(counter.cardinality(), 1); // flush series remains
507
508        // Series is gone - get returns 0
509        assert_eq!(counter.get(labels), 0);
510
511        // New inc creates fresh series
512        counter.inc(labels);
513        assert_eq!(counter.cardinality(), 2);
514        assert_eq!(counter.get(labels), 1);
515    }
516
517    #[cfg(feature = "eviction")]
518    #[test]
519    fn test_evict_stale_keeps_active() {
520        let counter = DynamicCounter::new(4);
521        let active = &[("status", "active")];
522        let stale = &[("status", "stale")];
523
524        // Create both series
525        counter.inc(active);
526        counter.inc(stale);
527        assert_eq!(counter.cardinality(), 2);
528
529        // Advance cycle
530        advance_cycle();
531
532        // Touch only the active series
533        counter.inc(active);
534
535        // Advance again
536        advance_cycle();
537
538        // Evict with staleness of 1 - should only evict 'stale'
539        let removed = counter.evict_stale(1);
540        assert_eq!(removed, 1);
541        assert_eq!(counter.cardinality(), 1);
542        assert_eq!(counter.get(active), 2);
543        assert_eq!(counter.get(stale), 0);
544    }
545
546    #[cfg(feature = "eviction")]
547    #[test]
548    fn test_eviction_tombstone_invalidates_cache() {
549        let counter = DynamicCounter::new(4);
550        let labels = &[("org_id", "evict_test")];
551
552        // Populate the thread-local cache
553        counter.inc(labels);
554        counter.inc(labels); // Second call uses cached series
555        assert_eq!(counter.get(labels), 2);
556
557        // Force eviction by advancing cycles
558        advance_cycle();
559        advance_cycle();
560
561        // Flush thread-local cache by accessing a different label set
562        counter.inc(&[("flush", "cache")]);
563
564        counter.evict_stale(1);
565
566        // Next inc should create fresh series (tombstone invalidates cache)
567        counter.inc(labels);
568        assert_eq!(counter.get(labels), 1); // Fresh series starts at 1, not 3
569    }
570
571    #[cfg(feature = "eviction")]
572    #[test]
573    fn test_series_handle_protects_from_eviction() {
574        let counter = DynamicCounter::new(4);
575        let labels = &[("org_id", "handle_test")];
576
577        // Get a long-lived handle
578        let series = counter.series(labels);
579        series.inc();
580        assert!(!series.is_evicted());
581
582        // Try to evict - but handle protects the series
583        advance_cycle();
584        advance_cycle();
585        let removed = counter.evict_stale(1);
586
587        // Handle protects series from eviction (Arc::strong_count > 1)
588        assert_eq!(removed, 0);
589        assert!(!series.is_evicted());
590        assert_eq!(counter.cardinality(), 1);
591        assert_eq!(counter.get(labels), 1);
592
593        // Writes still work
594        series.inc();
595        assert_eq!(counter.get(labels), 2);
596    }
597
598    #[cfg(feature = "eviction")]
599    #[test]
600    fn test_series_evicted_after_handle_dropped() {
601        let counter = DynamicCounter::new(4);
602        let labels = &[("org_id", "handle_drop_test")];
603
604        // Create series via handle, then drop it
605        {
606            let series = counter.series(labels);
607            series.inc();
608        }
609        // Handle dropped, but thread-local cache still holds reference
610
611        assert_eq!(counter.cardinality(), 1);
612        assert_eq!(counter.get(labels), 1);
613
614        // Advance cycles
615        advance_cycle();
616        advance_cycle();
617
618        // Flush thread-local cache by accessing a different label set
619        counter.inc(&[("flush", "cache")]);
620
621        // Now eviction should work
622        let removed = counter.evict_stale(1);
623        assert_eq!(removed, 1);
624        assert_eq!(counter.get(labels), 0);
625    }
626
627    #[test]
628    fn test_overflow_bucket_routes_new_series_at_capacity() {
629        let counter = DynamicCounter::with_max_series(4, 2);
630
631        counter.inc(&[("org_id", "1")]);
632        counter.inc(&[("org_id", "2")]);
633        counter.inc(&[("org_id", "3")]);
634
635        assert_eq!(counter.cardinality(), 3);
636        assert_eq!(
637            counter.get(&[(OVERFLOW_LABEL_KEY, OVERFLOW_LABEL_VALUE)]),
638            1
639        );
640    }
641
642    #[test]
643    fn test_concurrent_cap_bounded_overshoot() {
644        use std::sync::{Arc, Barrier};
645        use std::thread;
646
647        let cap = 10;
648        let threads = 16;
649        let counter = Arc::new(DynamicCounter::with_max_series(4, cap));
650        let barrier = Arc::new(Barrier::new(threads));
651
652        let handles: Vec<_> = (0..threads)
653            .map(|t| {
654                let counter = Arc::clone(&counter);
655                let barrier = Arc::clone(&barrier);
656                thread::spawn(move || {
657                    barrier.wait();
658                    // Each thread creates a unique label set
659                    for i in 0..5 {
660                        let label = format!("t{t}_s{i}");
661                        counter.inc(&[("key", &label)]);
662                    }
663                })
664            })
665            .collect();
666
667        for h in handles {
668            h.join().unwrap();
669        }
670
671        let card = counter.cardinality();
672        // Cap is approximate: may overshoot by at most thread count, but must
673        // not grow unboundedly (80 distinct labels were attempted).
674        assert!(
675            card <= cap + threads + 1, // +1 for the overflow bucket
676            "cardinality {card} exceeded bounded overshoot (cap={cap}, threads={threads})"
677        );
678        // Must have hit overflow at least once
679        assert!(
680            counter.overflow_count() > 0,
681            "overflow should have triggered"
682        );
683    }
684
685    #[cfg(feature = "eviction")]
686    #[test]
687    fn test_eviction_and_reinsertion_bookkeeping() {
688        let counter = DynamicCounter::with_max_series(4, 3);
689
690        counter.inc(&[("k", "a")]);
691        counter.inc(&[("k", "b")]);
692        counter.inc(&[("k", "c")]);
693        assert_eq!(counter.cardinality(), 3);
694
695        counter.inc(&[("k", "d")]);
696        assert!(counter.overflow_count() > 0);
697        let card_after_overflow = counter.cardinality();
698        assert!(card_after_overflow <= 4);
699
700        advance_cycle();
701        advance_cycle();
702        advance_cycle();
703        counter.inc(&[("flush", "cache")]);
704        let evicted = counter.evict_stale(1);
705        assert!(evicted > 0);
706
707        let card_after_evict = counter.cardinality();
708        assert!(
709            card_after_evict < card_after_overflow,
710            "cardinality should decrease after eviction: before={card_after_overflow} after={card_after_evict}"
711        );
712
713        let overflow_before = counter.overflow_count();
714        counter.inc(&[("k", "new1")]);
715        counter.inc(&[("k", "new2")]);
716
717        assert!(counter.cardinality() <= 5);
718
719        let overflow_after = counter.overflow_count();
720        assert!(
721            overflow_after - overflow_before <= 1,
722            "unexpected overflow after eviction freed space"
723        );
724    }
725}