Skip to main content

fast_telemetry/metric/dynamic/
distribution.rs

1//! Runtime-labeled distribution for dynamic dimensions.
2//!
3//! Uses base-2 exponential histogram buckets per label set, matching the
4//! `Distribution` implementation.  Each label set × thread gets its own
5//! fixed-size bucket array.
6
7use super::cache::{CacheValue, LabelCache, SERIES_CACHE_SIZE};
8#[cfg(feature = "eviction")]
9use super::current_cycle;
10use super::{DISTRIBUTION_IDS, DynamicLabelSet};
11use crate::exp_buckets::{ExpBuckets, ExpBucketsSnapshot};
12use crossbeam_utils::CachePadded;
13use parking_lot::{Mutex, RwLock};
14use std::cell::RefCell;
15use std::collections::HashMap;
16use std::hash::{Hash, Hasher};
17use std::sync::Arc;
18use std::sync::Weak;
19#[cfg(feature = "eviction")]
20use std::sync::atomic::AtomicU32;
21use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
22
23const DEFAULT_MAX_SERIES: usize = 2000;
24const OVERFLOW_LABEL_KEY: &str = "__ft_overflow";
25const OVERFLOW_LABEL_VALUE: &str = "true";
26type DistributionIndexShard =
27    CachePadded<RwLock<HashMap<DynamicLabelSet, Arc<DistributionSeries>>>>;
28type DistributionSnapshotEntry = (DynamicLabelSet, u64, u64, ExpBucketsSnapshot);
29static SERIES_IDS: AtomicUsize = AtomicUsize::new(1);
30
31struct DistributionSeries {
32    id: usize,
33    registry: Mutex<Vec<Arc<ExpBuckets>>>,
34    /// Tombstone flag set by exporter before removing from map.
35    evicted: AtomicBool,
36    /// Last export cycle when this series was accessed.
37    #[cfg(feature = "eviction")]
38    last_accessed_cycle: AtomicU32,
39}
40
41impl DistributionSeries {
42    #[cfg(feature = "eviction")]
43    fn new(cycle: u32) -> Self {
44        Self {
45            id: SERIES_IDS.fetch_add(1, Ordering::Relaxed),
46            registry: Mutex::new(Vec::new()),
47            evicted: AtomicBool::new(false),
48            last_accessed_cycle: AtomicU32::new(cycle),
49        }
50    }
51
52    #[cfg(not(feature = "eviction"))]
53    fn new() -> Self {
54        Self {
55            id: SERIES_IDS.fetch_add(1, Ordering::Relaxed),
56            registry: Mutex::new(Vec::new()),
57            evicted: AtomicBool::new(false),
58        }
59    }
60
61    /// Touch the series timestamp. Called on slow path only.
62    #[cfg(feature = "eviction")]
63    #[inline]
64    fn touch(&self, cycle: u32) {
65        self.last_accessed_cycle.store(cycle, Ordering::Relaxed);
66    }
67
68    #[inline]
69    fn is_evicted(&self) -> bool {
70        self.evicted.load(Ordering::Relaxed)
71    }
72
73    #[cfg(feature = "eviction")]
74    fn mark_evicted(&self) {
75        self.evicted.store(true, Ordering::Relaxed);
76    }
77
78    fn get_or_create_buf(&self) -> Arc<ExpBuckets> {
79        let buf = Arc::new(ExpBuckets::new());
80        self.registry.lock().push(Arc::clone(&buf));
81        buf
82    }
83
84    fn count(&self) -> u64 {
85        self.registry.lock().iter().map(|buf| buf.get_count()).sum()
86    }
87
88    fn sum(&self) -> u64 {
89        self.registry.lock().iter().map(|buf| buf.get_sum()).sum()
90    }
91
92    fn buckets_snapshot(&self) -> ExpBucketsSnapshot {
93        let mut positive = [0u64; 64];
94        let mut zero_count = 0u64;
95        let mut sum = 0u64;
96        let mut count = 0u64;
97
98        let registry = self.registry.lock();
99        for buf in registry.iter() {
100            let thread_buckets = buf.get_positive_buckets();
101            for (i, &c) in thread_buckets.iter().enumerate() {
102                positive[i] += c;
103            }
104            zero_count += buf.get_zero_count();
105            sum += buf.get_sum();
106            count += buf.get_count();
107        }
108
109        ExpBucketsSnapshot {
110            positive,
111            zero_count,
112            sum,
113            count,
114        }
115    }
116}
117
118struct DistributionCacheValue {
119    series: Weak<DistributionSeries>,
120    buf: Arc<ExpBuckets>,
121}
122
123impl CacheValue for DistributionCacheValue {
124    type Strong = (Arc<DistributionSeries>, Arc<ExpBuckets>);
125
126    fn upgrade(&self) -> Option<Self::Strong> {
127        Some((self.series.upgrade()?, Arc::clone(&self.buf)))
128    }
129
130    fn is_valid(strong: &Self::Strong) -> bool {
131        !strong.0.is_evicted()
132    }
133}
134
135/// A reusable handle to a dynamic-label distribution series.
136///
137/// Use this for hot paths to avoid per-update label canonicalization and map
138/// lookups. Resolve once with `DynamicDistribution::series(...)`, then call
139/// `record()` on the handle.
140#[derive(Clone)]
141pub struct DynamicDistributionSeries {
142    series: Arc<DistributionSeries>,
143    buf: Arc<ExpBuckets>,
144}
145
146impl DynamicDistributionSeries {
147    /// Record a value.
148    #[inline]
149    pub fn record(&self, value: u64) {
150        self.buf.record(value);
151    }
152
153    /// Get the count across all threads for this series.
154    pub fn count(&self) -> u64 {
155        self.series.count()
156    }
157
158    /// Get the sum across all threads for this series.
159    pub fn sum(&self) -> u64 {
160        self.series.sum()
161    }
162
163    /// Check if this series handle has been evicted.
164    #[inline]
165    pub fn is_evicted(&self) -> bool {
166        self.series.is_evicted()
167    }
168}
169
170thread_local! {
171    static SERIES_CACHE: RefCell<LabelCache<DistributionCacheValue, SERIES_CACHE_SIZE>> =
172        RefCell::new(LabelCache::new());
173    static SERIES_BUF_CACHE: RefCell<Vec<(usize, usize, Weak<ExpBuckets>)>> = const { RefCell::new(Vec::new()) };
174}
175
176/// Distribution keyed by runtime label sets.
177///
178/// Each label set gets its own set of thread-local exponential histogram buckets.
179pub struct DynamicDistribution {
180    id: usize,
181    max_series: usize,
182    shard_mask: usize,
183    index_shards: Vec<DistributionIndexShard>,
184    /// Approximate number of live series (incremented on insert, decremented on evict).
185    series_count: AtomicUsize,
186    /// Count of records routed to overflow bucket due to cardinality cap.
187    overflow_count: AtomicU64,
188}
189
190impl DynamicDistribution {
191    /// Creates a new runtime-labeled distribution with default cardinality cap.
192    pub fn new(shard_count: usize) -> Self {
193        Self::with_max_series(shard_count, DEFAULT_MAX_SERIES)
194    }
195
196    /// Creates a new runtime-labeled distribution with a custom cardinality cap.
197    pub fn with_max_series(shard_count: usize, max_series: usize) -> Self {
198        let shard_count = shard_count.next_power_of_two();
199        let id = DISTRIBUTION_IDS.fetch_add(1, Ordering::Relaxed);
200        Self {
201            id,
202            max_series,
203            shard_mask: shard_count - 1,
204            index_shards: (0..shard_count)
205                .map(|_| CachePadded::new(RwLock::new(HashMap::new())))
206                .collect(),
207            series_count: AtomicUsize::new(0),
208            overflow_count: AtomicU64::new(0),
209        }
210    }
211
212    /// Resolve a reusable series handle for `labels`.
213    ///
214    /// Preferred for hot paths when labels come from a finite active set.
215    pub fn series(&self, labels: &[(&str, &str)]) -> DynamicDistributionSeries {
216        if let Some((series, buf)) = self.cached_series(labels) {
217            return DynamicDistributionSeries { series, buf };
218        }
219        let series = self.lookup_or_create(labels);
220        let buf = self.get_or_create_thread_buf(&series);
221        self.update_cache(labels, &series, Arc::clone(&buf));
222        DynamicDistributionSeries { series, buf }
223    }
224
225    /// Record a value for the series identified by `labels`.
226    #[inline]
227    pub fn record(&self, labels: &[(&str, &str)], value: u64) {
228        if let Some((_series, buf)) = self.cached_series(labels) {
229            buf.record(value);
230            return;
231        }
232
233        let series = self.lookup_or_create(labels);
234        let buf = self.get_or_create_thread_buf(&series);
235        self.update_cache(labels, &series, Arc::clone(&buf));
236        buf.record(value);
237    }
238
239    /// Get count for the series identified by `labels`.
240    pub fn count(&self, labels: &[(&str, &str)]) -> u64 {
241        let key = DynamicLabelSet::from_pairs(labels);
242        let index_shard = self.index_shard_for(&key);
243        self.index_shards[index_shard]
244            .read()
245            .get(&key)
246            .map(|series| series.count())
247            .unwrap_or(0)
248    }
249
250    /// Get sum for the series identified by `labels`.
251    pub fn sum(&self, labels: &[(&str, &str)]) -> u64 {
252        let key = DynamicLabelSet::from_pairs(labels);
253        let index_shard = self.index_shard_for(&key);
254        self.index_shards[index_shard]
255            .read()
256            .get(&key)
257            .map(|series| series.sum())
258            .unwrap_or(0)
259    }
260
261    /// Returns a snapshot of all label-sets with their stats.
262    pub fn snapshot(&self) -> Vec<DistributionSnapshotEntry> {
263        let mut out = Vec::new();
264        for shard in &self.index_shards {
265            let guard = shard.read();
266            for (labels, series) in guard.iter() {
267                let snap = series.buckets_snapshot();
268                out.push((labels.clone(), snap.count, snap.sum, snap));
269            }
270        }
271        out
272    }
273
274    /// Returns the current number of distinct label sets.
275    pub fn cardinality(&self) -> usize {
276        self.index_shards
277            .iter()
278            .map(|shard| shard.read().len())
279            .sum()
280    }
281
282    /// Returns the number of records routed to the overflow bucket.
283    ///
284    /// A non-zero value indicates the cardinality cap was hit and label
285    /// fidelity is being lost. Use this to alert on cardinality pressure.
286    pub fn overflow_count(&self) -> u64 {
287        self.overflow_count.load(Ordering::Relaxed)
288    }
289
290    /// Iterate all series without cloning label sets.
291    ///
292    /// Calls `f` with borrowed label pairs, count, sum, and bucket snapshot
293    /// for each series. Used by exporters/macros to avoid `snapshot()` cloning.
294    /// The callback runs while a shard read lock is held; it must be fast and
295    /// must not call back into this metric.
296    #[doc(hidden)]
297    pub fn visit_series(
298        &self,
299        mut f: impl FnMut(&[(String, String)], u64, u64, ExpBucketsSnapshot),
300    ) {
301        for shard in &self.index_shards {
302            let guard = shard.read();
303            for (labels, series) in guard.iter() {
304                let snap = series.buckets_snapshot();
305                f(labels.pairs(), snap.count, snap.sum, snap);
306            }
307        }
308    }
309
310    /// Evict series that haven't been accessed for `max_staleness` cycles.
311    ///
312    /// Call this after `advance_cycle()` in your sweeper task.
313    /// Series are marked as evicted (so cached handles see the tombstone),
314    /// then removed from the index.
315    ///
316    /// Protected series (Arc::strong_count > 1) are never evicted — someone
317    /// holds a DynamicDistributionSeries handle to them.
318    ///
319    /// Returns the number of series evicted.
320    #[cfg(feature = "eviction")]
321    pub fn evict_stale(&self, max_staleness: u32) -> usize {
322        let cycle = current_cycle();
323        let mut removed = 0;
324
325        for shard in &self.index_shards {
326            let mut guard = shard.write();
327            guard.retain(|_labels, series| {
328                // Protected if someone holds a handle (strong_count > 1 means
329                // both the map and at least one DynamicDistributionSeries hold refs)
330                if Arc::strong_count(series) > 1 {
331                    return true;
332                }
333                // Otherwise check timestamp staleness
334                let last = series.last_accessed_cycle.load(Ordering::Relaxed);
335                let stale = cycle.saturating_sub(last) > max_staleness;
336                if stale {
337                    series.mark_evicted();
338                    removed += 1;
339                    self.series_count.fetch_sub(1, Ordering::Relaxed);
340                }
341                !stale
342            });
343        }
344
345        removed
346    }
347
348    fn lookup_or_create(&self, labels: &[(&str, &str)]) -> Arc<DistributionSeries> {
349        let requested_key = DynamicLabelSet::from_pairs(labels);
350        let requested_shard = self.index_shard_for(&requested_key);
351        #[cfg(feature = "eviction")]
352        let cycle = current_cycle();
353
354        // Fast path: read lock only.
355        if let Some(series) = self.index_shards[requested_shard]
356            .read()
357            .get(&requested_key)
358        {
359            #[cfg(feature = "eviction")]
360            series.touch(cycle);
361            return Arc::clone(series);
362        }
363
364        // Check cardinality cap BEFORE taking any write lock (lock-free).
365        let key = if self.max_series > 0
366            && self.series_count.load(Ordering::Relaxed) >= self.max_series
367        {
368            self.overflow_count.fetch_add(1, Ordering::Relaxed);
369            DynamicLabelSet::from_pairs(&[(OVERFLOW_LABEL_KEY, OVERFLOW_LABEL_VALUE)])
370        } else {
371            requested_key
372        };
373        let shard = self.index_shard_for(&key);
374
375        if let Some(series) = self.index_shards[shard].read().get(&key) {
376            #[cfg(feature = "eviction")]
377            series.touch(cycle);
378            return Arc::clone(series);
379        }
380
381        let mut guard = self.index_shards[shard].write();
382        if let Some(series) = guard.get(&key) {
383            #[cfg(feature = "eviction")]
384            series.touch(cycle);
385            return Arc::clone(series);
386        }
387        #[cfg(feature = "eviction")]
388        let series = Arc::new(DistributionSeries::new(cycle));
389        #[cfg(not(feature = "eviction"))]
390        let series = Arc::new(DistributionSeries::new());
391        guard.insert(key, Arc::clone(&series));
392        self.series_count.fetch_add(1, Ordering::Relaxed);
393        series
394    }
395
396    fn index_shard_for(&self, key: &DynamicLabelSet) -> usize {
397        let mut hasher = std::collections::hash_map::DefaultHasher::new();
398        key.hash(&mut hasher);
399        (hasher.finish() as usize) & self.shard_mask
400    }
401
402    fn cached_series(
403        &self,
404        labels: &[(&str, &str)],
405    ) -> Option<(Arc<DistributionSeries>, Arc<ExpBuckets>)> {
406        SERIES_CACHE.with(|cache| {
407            let (series, buf) = cache.borrow_mut().get(self.id, labels)?;
408            #[cfg(feature = "eviction")]
409            series.touch(current_cycle());
410            Some((series, buf))
411        })
412    }
413
414    fn update_cache(
415        &self,
416        labels: &[(&str, &str)],
417        series: &Arc<DistributionSeries>,
418        buf: Arc<ExpBuckets>,
419    ) {
420        SERIES_CACHE.with(|cache| {
421            cache.borrow_mut().insert(
422                self.id,
423                labels,
424                DistributionCacheValue {
425                    series: Arc::downgrade(series),
426                    buf,
427                },
428            );
429        });
430    }
431
432    fn get_or_create_thread_buf(&self, series: &Arc<DistributionSeries>) -> Arc<ExpBuckets> {
433        let dist_id = self.id;
434        let series_id = series.id;
435
436        SERIES_BUF_CACHE.with(|cache| {
437            let mut entries = cache.borrow_mut();
438            entries.retain(|(_id, _ptr, weak)| weak.strong_count() > 0);
439
440            for (id, ptr, weak) in entries.iter() {
441                if *id == dist_id
442                    && *ptr == series_id
443                    && let Some(buf) = weak.upgrade()
444                {
445                    return buf;
446                }
447            }
448
449            let buf = series.get_or_create_buf();
450            entries.push((dist_id, series_id, Arc::downgrade(&buf)));
451            buf
452        })
453    }
454}
455
456#[cfg(test)]
457mod tests {
458    use super::*;
459
460    #[test]
461    fn test_basic_recording() {
462        let dist = DynamicDistribution::new(4);
463        let labels = &[("org_id", "42")];
464
465        dist.record(labels, 100);
466        dist.record(labels, 200);
467        dist.record(labels, 300);
468
469        assert_eq!(dist.count(labels), 3);
470        assert_eq!(dist.sum(labels), 600);
471    }
472
473    #[test]
474    fn test_label_order_is_canonicalized() {
475        let dist = DynamicDistribution::new(4);
476
477        dist.record(&[("org_id", "42"), ("endpoint", "abc")], 100);
478
479        assert_eq!(dist.count(&[("endpoint", "abc"), ("org_id", "42")]), 1);
480    }
481
482    #[test]
483    fn test_series_handle() {
484        let dist = DynamicDistribution::new(4);
485        let series = dist.series(&[("org_id", "42")]);
486
487        series.record(100);
488        series.record(200);
489
490        assert_eq!(series.count(), 2);
491        assert_eq!(series.sum(), 300);
492        assert_eq!(dist.count(&[("org_id", "42")]), 2);
493    }
494
495    #[test]
496    fn test_multiple_label_sets() {
497        let dist = DynamicDistribution::new(4);
498
499        dist.record(&[("org_id", "1")], 100);
500        dist.record(&[("org_id", "2")], 200);
501
502        assert_eq!(dist.count(&[("org_id", "1")]), 1);
503        assert_eq!(dist.count(&[("org_id", "2")]), 1);
504
505        let snap = dist.snapshot();
506        assert_eq!(snap.len(), 2);
507    }
508
509    #[test]
510    fn test_overflow_bucket_routes_new_series_at_capacity() {
511        let dist = DynamicDistribution::with_max_series(4, 2);
512
513        dist.record(&[("org_id", "1")], 100);
514        dist.record(&[("org_id", "2")], 200);
515        // Third label set should overflow
516        dist.record(&[("org_id", "3")], 300);
517
518        assert_eq!(dist.cardinality(), 3); // 2 real + 1 overflow
519        assert!(dist.overflow_count() > 0);
520        assert_eq!(dist.count(&[(OVERFLOW_LABEL_KEY, OVERFLOW_LABEL_VALUE)]), 1);
521        assert_eq!(dist.sum(&[(OVERFLOW_LABEL_KEY, OVERFLOW_LABEL_VALUE)]), 300);
522    }
523
524    #[test]
525    fn test_snapshot_includes_buckets() {
526        let dist = DynamicDistribution::new(4);
527        dist.record(&[("org_id", "1")], 100);
528        dist.record(&[("org_id", "1")], 200);
529
530        let snap = dist.snapshot();
531        assert_eq!(snap.len(), 1);
532        let (_, count, sum, bucket_snap) = &snap[0];
533        assert_eq!(*count, 2);
534        assert_eq!(*sum, 300);
535        // Both 100 and 200 land in bucket 6 and 7 respectively
536        assert!(bucket_snap.positive[6] > 0 || bucket_snap.positive[7] > 0);
537    }
538}