Skip to main content

fast_telemetry/metric/dynamic/
counter.rs

1//! Runtime-labeled counter for dynamic dimensions.
2
3use super::cache::{CacheableSeries, LabelCache, SERIES_CACHE_SIZE};
4#[cfg(feature = "eviction")]
5use super::current_cycle;
6use super::{COUNTER_IDS, DynamicLabelSet, thread_id};
7use crossbeam_utils::CachePadded;
8use parking_lot::RwLock;
9use std::cell::RefCell;
10use std::collections::HashMap;
11use std::hash::{Hash, Hasher};
12#[cfg(feature = "eviction")]
13use std::sync::atomic::AtomicU32;
14use std::sync::atomic::{AtomicBool, AtomicIsize, AtomicU64, AtomicUsize, Ordering};
15use std::sync::{Arc, Weak};
16
17const DEFAULT_MAX_SERIES: usize = 2000;
18const OVERFLOW_LABEL_KEY: &str = "__ft_overflow";
19const OVERFLOW_LABEL_VALUE: &str = "true";
20
21struct CounterSeries {
22    cells: Vec<CachePadded<AtomicIsize>>,
23    /// Tombstone flag set by exporter before removing from map.
24    /// Checked in cached_series() to invalidate stale cache entries.
25    evicted: AtomicBool,
26    /// Last export cycle when this series was accessed.
27    /// Used for staleness-based eviction.
28    #[cfg(feature = "eviction")]
29    last_accessed_cycle: AtomicU32,
30}
31
32type CounterIndexShard = CachePadded<RwLock<HashMap<DynamicLabelSet, Arc<CounterSeries>>>>;
33
34impl CounterSeries {
35    #[cfg(feature = "eviction")]
36    fn new(shard_count: usize, current_cycle: u32) -> Self {
37        Self {
38            cells: (0..shard_count)
39                .map(|_| CachePadded::new(AtomicIsize::new(0)))
40                .collect(),
41            evicted: AtomicBool::new(false),
42            last_accessed_cycle: AtomicU32::new(current_cycle),
43        }
44    }
45
46    #[cfg(not(feature = "eviction"))]
47    fn new(shard_count: usize) -> Self {
48        Self {
49            cells: (0..shard_count)
50                .map(|_| CachePadded::new(AtomicIsize::new(0)))
51                .collect(),
52            evicted: AtomicBool::new(false),
53        }
54    }
55
56    #[inline]
57    fn add_at(&self, shard_idx: usize, value: isize) {
58        self.cells[shard_idx].fetch_add(value, Ordering::Relaxed);
59        // Note: timestamp updated on slow path (lookup/cache miss) to avoid
60        // global atomic read on every increment.
61    }
62
63    /// Touch the series timestamp. Called on slow path only.
64    #[cfg(feature = "eviction")]
65    #[inline]
66    fn touch(&self, cycle: u32) {
67        self.last_accessed_cycle.store(cycle, Ordering::Relaxed);
68    }
69
70    #[inline]
71    fn sum(&self) -> isize {
72        self.cells
73            .iter()
74            .map(|cell| cell.load(Ordering::Relaxed))
75            .sum()
76    }
77
78    #[inline]
79    fn is_evicted(&self) -> bool {
80        self.evicted.load(Ordering::Relaxed)
81    }
82
83    #[cfg(feature = "eviction")]
84    fn mark_evicted(&self) {
85        self.evicted.store(true, Ordering::Relaxed);
86    }
87}
88
89impl CacheableSeries for CounterSeries {
90    fn is_evicted(&self) -> bool {
91        self.is_evicted()
92    }
93}
94
95/// A reusable handle to a dynamic-label counter series.
96///
97/// Use this for hot paths to avoid per-update label canonicalization and map
98/// lookups. Resolve once with `DynamicCounter::series(...)`, then call `inc()`
99/// / `add()` on the handle.
100#[derive(Clone)]
101pub struct DynamicCounterSeries {
102    series: Arc<CounterSeries>,
103    shard_mask: usize,
104}
105
106impl DynamicCounterSeries {
107    /// Increment this series by 1.
108    #[inline]
109    pub fn inc(&self) {
110        self.add(1);
111    }
112
113    /// Add `value` to this series.
114    #[inline]
115    pub fn add(&self, value: isize) {
116        let shard_idx = thread_id() & self.shard_mask;
117        self.series.add_at(shard_idx, value);
118    }
119
120    /// Get this series total across shards.
121    #[inline]
122    pub fn get(&self) -> isize {
123        self.series.sum()
124    }
125
126    /// Check if this series handle has been evicted.
127    ///
128    /// If true, writes go to a detached series that is no longer exported.
129    /// Callers holding long-lived handles can check this and re-resolve
130    /// via `DynamicCounter::series()` if needed.
131    #[inline]
132    pub fn is_evicted(&self) -> bool {
133        self.series.is_evicted()
134    }
135}
136
137thread_local! {
138    static SERIES_CACHE: RefCell<LabelCache<Weak<CounterSeries>, SERIES_CACHE_SIZE>> =
139        RefCell::new(LabelCache::new());
140}
141
142/// Counter keyed by runtime label sets.
143///
144/// Uses a sharded index for key->series lookup and per-series sharded atomics
145/// for fast updates.
146pub struct DynamicCounter {
147    id: usize,
148    shard_count: usize,
149    max_series: usize,
150    shard_mask: usize,
151    index_shards: Vec<CounterIndexShard>,
152    /// Approximate total series count across all shards for fast cap checks.
153    series_count: AtomicUsize,
154    /// Count of records routed to overflow bucket due to cardinality cap.
155    overflow_count: AtomicU64,
156}
157
158impl DynamicCounter {
159    /// Creates a new runtime-labeled counter.
160    pub fn new(shard_count: usize) -> Self {
161        Self::with_max_series(shard_count, DEFAULT_MAX_SERIES)
162    }
163
164    /// Creates a new runtime-labeled counter with a series cardinality cap.
165    ///
166    /// When the number of unique label sets approximately reaches `max_series`,
167    /// new label sets are redirected into a single overflow series
168    /// (`__ft_overflow=true`). The cap is checked via a lock-free atomic counter,
169    /// so concurrent inserts may briefly overshoot by the number of in-flight
170    /// writers before the overflow kicks in.
171    pub fn with_max_series(shard_count: usize, max_series: usize) -> Self {
172        let shard_count = shard_count.next_power_of_two();
173        let id = COUNTER_IDS.fetch_add(1, Ordering::Relaxed);
174        Self {
175            id,
176            shard_count,
177            max_series,
178            shard_mask: shard_count - 1,
179            index_shards: (0..shard_count)
180                .map(|_| CachePadded::new(RwLock::new(HashMap::new())))
181                .collect(),
182            series_count: AtomicUsize::new(0),
183            overflow_count: AtomicU64::new(0),
184        }
185    }
186
187    /// Resolve a reusable series handle for `labels`.
188    ///
189    /// Preferred for hot paths when labels come from a finite active set.
190    pub fn series(&self, labels: &[(&str, &str)]) -> DynamicCounterSeries {
191        if let Some(series) = self.cached_series(labels) {
192            return DynamicCounterSeries {
193                series,
194                shard_mask: self.shard_mask,
195            };
196        }
197        let series = self.lookup_or_create(labels);
198        self.update_cache(labels, &series);
199        DynamicCounterSeries {
200            series,
201            shard_mask: self.shard_mask,
202        }
203    }
204
205    /// Increments the series identified by `labels` by 1.
206    #[inline]
207    pub fn inc(&self, labels: &[(&str, &str)]) {
208        self.add(labels, 1);
209    }
210
211    /// Adds `value` to the series identified by `labels`.
212    #[inline]
213    pub fn add(&self, labels: &[(&str, &str)], value: isize) {
214        if let Some(series) = self.cached_series(labels) {
215            let shard_idx = thread_id() & self.shard_mask;
216            series.add_at(shard_idx, value);
217            return;
218        }
219
220        let series = self.lookup_or_create(labels);
221        self.update_cache(labels, &series);
222        let shard_idx = thread_id() & self.shard_mask;
223        series.add_at(shard_idx, value);
224    }
225
226    /// Gets the current value for the series identified by `labels`.
227    pub fn get(&self, labels: &[(&str, &str)]) -> isize {
228        let key = DynamicLabelSet::from_pairs(labels);
229        let index_shard = self.index_shard_for(&key);
230        self.index_shards[index_shard]
231            .read()
232            .get(&key)
233            .map(|series| series.sum())
234            .unwrap_or(0)
235    }
236
237    /// Sums all series.
238    pub fn sum_all(&self) -> isize {
239        self.snapshot().into_iter().map(|(_, value)| value).sum()
240    }
241
242    /// Returns a snapshot of all label-set/count pairs.
243    pub fn snapshot(&self) -> Vec<(DynamicLabelSet, isize)> {
244        let mut out = Vec::new();
245        for shard in &self.index_shards {
246            let guard = shard.read();
247            for (labels, series) in guard.iter() {
248                out.push((labels.clone(), series.sum()));
249            }
250        }
251        out
252    }
253
254    /// Returns the current number of distinct label sets.
255    pub fn cardinality(&self) -> usize {
256        self.index_shards
257            .iter()
258            .map(|shard| shard.read().len())
259            .sum()
260    }
261
262    /// Returns the number of records routed to the overflow bucket.
263    ///
264    /// A non-zero value indicates the cardinality cap was hit and label
265    /// fidelity is being lost. Use this to alert on cardinality pressure.
266    pub fn overflow_count(&self) -> u64 {
267        self.overflow_count.load(Ordering::Relaxed)
268    }
269
270    /// Iterate all series without cloning label sets.
271    ///
272    /// Calls `f` with borrowed label pairs and the current sum for each series.
273    /// Used by exporters/macros to avoid the intermediate `snapshot()` allocation.
274    #[doc(hidden)]
275    pub fn visit_series(&self, mut f: impl FnMut(&[(String, String)], isize)) {
276        for shard in &self.index_shards {
277            let guard = shard.read();
278            for (labels, series) in guard.iter() {
279                f(labels.pairs(), series.sum());
280            }
281        }
282    }
283
284    /// Evict series that haven't been accessed for `max_staleness` cycles.
285    ///
286    /// Call this after `advance_cycle()` in your exporter task.
287    /// Series are marked as evicted (so cached handles see the tombstone),
288    /// then removed from the index.
289    ///
290    /// Protected series (Arc::strong_count > 1) are never evicted - someone
291    /// holds a DynamicCounterSeries handle to them.
292    ///
293    /// Returns the number of series evicted.
294    #[cfg(feature = "eviction")]
295    pub fn evict_stale(&self, max_staleness: u32) -> usize {
296        let cycle = current_cycle();
297        let mut removed = 0;
298
299        for shard in &self.index_shards {
300            let mut guard = shard.write();
301            guard.retain(|_labels, series| {
302                // Protected if someone holds a handle (strong_count > 1 means
303                // both the map and at least one DynamicCounterSeries hold refs)
304                if Arc::strong_count(series) > 1 {
305                    return true;
306                }
307                // Otherwise check timestamp staleness
308                let last = series.last_accessed_cycle.load(Ordering::Relaxed);
309                let stale = cycle.saturating_sub(last) > max_staleness;
310                if stale {
311                    series.mark_evicted();
312                    removed += 1;
313                    self.series_count.fetch_sub(1, Ordering::Relaxed);
314                }
315                !stale
316            });
317        }
318
319        removed
320    }
321
322    fn lookup_or_create(&self, labels: &[(&str, &str)]) -> Arc<CounterSeries> {
323        let requested_key = DynamicLabelSet::from_pairs(labels);
324        let requested_shard = self.index_shard_for(&requested_key);
325        #[cfg(feature = "eviction")]
326        let cycle = current_cycle();
327
328        // Fast path: read lock only.
329        if let Some(series) = self.index_shards[requested_shard]
330            .read()
331            .get(&requested_key)
332        {
333            #[cfg(feature = "eviction")]
334            series.touch(cycle);
335            return Arc::clone(series);
336        }
337
338        // Check cardinality cap BEFORE taking any write lock (lock-free).
339        // series_count is approximate — concurrent inserts may briefly exceed the
340        // cap by the number of in-flight writers, but it cannot deadlock and the
341        // overshoot is bounded by thread count, not by workload cardinality.
342        let key = if self.max_series > 0
343            && self.series_count.load(Ordering::Relaxed) >= self.max_series
344        {
345            self.overflow_count.fetch_add(1, Ordering::Relaxed);
346            DynamicLabelSet::from_pairs(&[(OVERFLOW_LABEL_KEY, OVERFLOW_LABEL_VALUE)])
347        } else {
348            requested_key
349        };
350        let shard = self.index_shard_for(&key);
351
352        // Check read lock on the (possibly redirected) shard.
353        if let Some(series) = self.index_shards[shard].read().get(&key) {
354            #[cfg(feature = "eviction")]
355            series.touch(cycle);
356            return Arc::clone(series);
357        }
358
359        let mut guard = self.index_shards[shard].write();
360        if let Some(series) = guard.get(&key) {
361            #[cfg(feature = "eviction")]
362            series.touch(cycle);
363            return Arc::clone(series);
364        }
365        #[cfg(feature = "eviction")]
366        let series = Arc::new(CounterSeries::new(self.shard_count, cycle));
367        #[cfg(not(feature = "eviction"))]
368        let series = Arc::new(CounterSeries::new(self.shard_count));
369        guard.insert(key, Arc::clone(&series));
370        self.series_count.fetch_add(1, Ordering::Relaxed);
371        series
372    }
373
374    fn index_shard_for(&self, key: &DynamicLabelSet) -> usize {
375        let mut hasher = std::collections::hash_map::DefaultHasher::new();
376        key.hash(&mut hasher);
377        (hasher.finish() as usize) & self.shard_mask
378    }
379
380    fn cached_series(&self, labels: &[(&str, &str)]) -> Option<Arc<CounterSeries>> {
381        SERIES_CACHE.with(|cache| {
382            let series = cache.borrow_mut().get(self.id, labels)?;
383            #[cfg(feature = "eviction")]
384            series.touch(current_cycle());
385            Some(series)
386        })
387    }
388
389    fn update_cache(&self, labels: &[(&str, &str)], series: &Arc<CounterSeries>) {
390        SERIES_CACHE.with(|cache| {
391            cache
392                .borrow_mut()
393                .insert(self.id, labels, Arc::downgrade(series));
394        });
395    }
396}
397
398#[cfg(test)]
399mod tests {
400    #[cfg(feature = "eviction")]
401    use super::super::advance_cycle;
402    use super::*;
403
404    #[test]
405    fn test_basic_operations() {
406        let counter = DynamicCounter::new(4);
407        counter.inc(&[("org_id", "42"), ("endpoint_uuid", "abc")]);
408        counter.add(&[("org_id", "42"), ("endpoint_uuid", "abc")], 2);
409
410        assert_eq!(
411            counter.get(&[("org_id", "42"), ("endpoint_uuid", "abc")]),
412            3
413        );
414        assert_eq!(counter.sum_all(), 3);
415    }
416
417    #[test]
418    fn test_label_order_is_canonicalized() {
419        let counter = DynamicCounter::new(4);
420        counter.inc(&[("org_id", "42"), ("endpoint_uuid", "abc")]);
421
422        assert_eq!(
423            counter.get(&[("endpoint_uuid", "abc"), ("org_id", "42")]),
424            1
425        );
426    }
427
428    #[test]
429    fn test_series_handle() {
430        let counter = DynamicCounter::new(4);
431        let series = counter.series(&[("org_id", "42"), ("endpoint_uuid", "abc")]);
432        series.inc();
433        series.add(9);
434
435        assert_eq!(series.get(), 10);
436        assert_eq!(
437            counter.get(&[("org_id", "42"), ("endpoint_uuid", "abc")]),
438            10
439        );
440    }
441
442    #[test]
443    fn test_concurrent_adds() {
444        let counter = DynamicCounter::new(8);
445        let series = counter.series(&[("org_id", "42"), ("endpoint_uuid", "abc")]);
446
447        std::thread::scope(|s| {
448            for _ in 0..8 {
449                let series = series.clone();
450                s.spawn(move || {
451                    for _ in 0..10_000 {
452                        series.inc();
453                    }
454                });
455            }
456        });
457
458        assert_eq!(
459            counter.get(&[("org_id", "42"), ("endpoint_uuid", "abc")]),
460            80_000
461        );
462    }
463
464    #[cfg(feature = "eviction")]
465    #[test]
466    fn test_evict_stale() {
467        let counter = DynamicCounter::new(4);
468        let labels = &[("org_id", "42")];
469
470        // Create series and increment
471        counter.inc(labels);
472        assert_eq!(counter.cardinality(), 1);
473        assert_eq!(counter.get(labels), 1);
474
475        // Advance cycle past staleness threshold
476        advance_cycle();
477        advance_cycle();
478
479        // Flush thread-local cache by accessing a different label set
480        counter.inc(&[("flush", "cache")]);
481
482        // Evict series not accessed in last 1 cycle
483        let removed = counter.evict_stale(1);
484        assert_eq!(removed, 1); // Only the original label set, not the flush one
485        assert_eq!(counter.cardinality(), 1); // flush series remains
486
487        // Series is gone - get returns 0
488        assert_eq!(counter.get(labels), 0);
489
490        // New inc creates fresh series
491        counter.inc(labels);
492        assert_eq!(counter.cardinality(), 2);
493        assert_eq!(counter.get(labels), 1);
494    }
495
496    #[cfg(feature = "eviction")]
497    #[test]
498    fn test_evict_stale_keeps_active() {
499        let counter = DynamicCounter::new(4);
500        let active = &[("status", "active")];
501        let stale = &[("status", "stale")];
502
503        // Create both series
504        counter.inc(active);
505        counter.inc(stale);
506        assert_eq!(counter.cardinality(), 2);
507
508        // Advance cycle
509        advance_cycle();
510
511        // Touch only the active series
512        counter.inc(active);
513
514        // Advance again
515        advance_cycle();
516
517        // Evict with staleness of 1 - should only evict 'stale'
518        let removed = counter.evict_stale(1);
519        assert_eq!(removed, 1);
520        assert_eq!(counter.cardinality(), 1);
521        assert_eq!(counter.get(active), 2);
522        assert_eq!(counter.get(stale), 0);
523    }
524
525    #[cfg(feature = "eviction")]
526    #[test]
527    fn test_eviction_tombstone_invalidates_cache() {
528        let counter = DynamicCounter::new(4);
529        let labels = &[("org_id", "evict_test")];
530
531        // Populate the thread-local cache
532        counter.inc(labels);
533        counter.inc(labels); // Second call uses cached series
534        assert_eq!(counter.get(labels), 2);
535
536        // Force eviction by advancing cycles
537        advance_cycle();
538        advance_cycle();
539
540        // Flush thread-local cache by accessing a different label set
541        counter.inc(&[("flush", "cache")]);
542
543        counter.evict_stale(1);
544
545        // Next inc should create fresh series (tombstone invalidates cache)
546        counter.inc(labels);
547        assert_eq!(counter.get(labels), 1); // Fresh series starts at 1, not 3
548    }
549
550    #[cfg(feature = "eviction")]
551    #[test]
552    fn test_series_handle_protects_from_eviction() {
553        let counter = DynamicCounter::new(4);
554        let labels = &[("org_id", "handle_test")];
555
556        // Get a long-lived handle
557        let series = counter.series(labels);
558        series.inc();
559        assert!(!series.is_evicted());
560
561        // Try to evict - but handle protects the series
562        advance_cycle();
563        advance_cycle();
564        let removed = counter.evict_stale(1);
565
566        // Handle protects series from eviction (Arc::strong_count > 1)
567        assert_eq!(removed, 0);
568        assert!(!series.is_evicted());
569        assert_eq!(counter.cardinality(), 1);
570        assert_eq!(counter.get(labels), 1);
571
572        // Writes still work
573        series.inc();
574        assert_eq!(counter.get(labels), 2);
575    }
576
577    #[cfg(feature = "eviction")]
578    #[test]
579    fn test_series_evicted_after_handle_dropped() {
580        let counter = DynamicCounter::new(4);
581        let labels = &[("org_id", "handle_drop_test")];
582
583        // Create series via handle, then drop it
584        {
585            let series = counter.series(labels);
586            series.inc();
587        }
588        // Handle dropped, but thread-local cache still holds reference
589
590        assert_eq!(counter.cardinality(), 1);
591        assert_eq!(counter.get(labels), 1);
592
593        // Advance cycles
594        advance_cycle();
595        advance_cycle();
596
597        // Flush thread-local cache by accessing a different label set
598        counter.inc(&[("flush", "cache")]);
599
600        // Now eviction should work
601        let removed = counter.evict_stale(1);
602        assert_eq!(removed, 1);
603        assert_eq!(counter.get(labels), 0);
604    }
605
606    #[test]
607    fn test_overflow_bucket_routes_new_series_at_capacity() {
608        let counter = DynamicCounter::with_max_series(4, 2);
609
610        counter.inc(&[("org_id", "1")]);
611        counter.inc(&[("org_id", "2")]);
612        counter.inc(&[("org_id", "3")]);
613
614        assert_eq!(counter.cardinality(), 3);
615        assert_eq!(
616            counter.get(&[(OVERFLOW_LABEL_KEY, OVERFLOW_LABEL_VALUE)]),
617            1
618        );
619    }
620
621    #[test]
622    fn test_concurrent_cap_bounded_overshoot() {
623        use std::sync::{Arc, Barrier};
624        use std::thread;
625
626        let cap = 10;
627        let threads = 16;
628        let counter = Arc::new(DynamicCounter::with_max_series(4, cap));
629        let barrier = Arc::new(Barrier::new(threads));
630
631        let handles: Vec<_> = (0..threads)
632            .map(|t| {
633                let counter = Arc::clone(&counter);
634                let barrier = Arc::clone(&barrier);
635                thread::spawn(move || {
636                    barrier.wait();
637                    // Each thread creates a unique label set
638                    for i in 0..5 {
639                        let label = format!("t{t}_s{i}");
640                        counter.inc(&[("key", &label)]);
641                    }
642                })
643            })
644            .collect();
645
646        for h in handles {
647            h.join().unwrap();
648        }
649
650        let card = counter.cardinality();
651        // Cap is approximate: may overshoot by at most thread count, but must
652        // not grow unboundedly (80 distinct labels were attempted).
653        assert!(
654            card <= cap + threads + 1, // +1 for the overflow bucket
655            "cardinality {card} exceeded bounded overshoot (cap={cap}, threads={threads})"
656        );
657        // Must have hit overflow at least once
658        assert!(
659            counter.overflow_count() > 0,
660            "overflow should have triggered"
661        );
662    }
663
664    #[cfg(feature = "eviction")]
665    #[test]
666    fn test_eviction_and_reinsertion_bookkeeping() {
667        let counter = DynamicCounter::with_max_series(4, 3);
668
669        counter.inc(&[("k", "a")]);
670        counter.inc(&[("k", "b")]);
671        counter.inc(&[("k", "c")]);
672        assert_eq!(counter.cardinality(), 3);
673
674        counter.inc(&[("k", "d")]);
675        assert!(counter.overflow_count() > 0);
676        let card_after_overflow = counter.cardinality();
677        assert!(card_after_overflow <= 4);
678
679        advance_cycle();
680        advance_cycle();
681        advance_cycle();
682        counter.inc(&[("flush", "cache")]);
683        let evicted = counter.evict_stale(1);
684        assert!(evicted > 0);
685
686        let card_after_evict = counter.cardinality();
687        assert!(
688            card_after_evict < card_after_overflow,
689            "cardinality should decrease after eviction: before={card_after_overflow} after={card_after_evict}"
690        );
691
692        let overflow_before = counter.overflow_count();
693        counter.inc(&[("k", "new1")]);
694        counter.inc(&[("k", "new2")]);
695
696        assert!(counter.cardinality() <= 5);
697
698        let overflow_after = counter.overflow_count();
699        assert!(
700            overflow_after - overflow_before <= 1,
701            "unexpected overflow after eviction freed space"
702        );
703    }
704}