Skip to main content

fast_telemetry/metric/dynamic/
distribution.rs

1//! Runtime-labeled distribution for dynamic dimensions.
2//!
3//! Uses base-2 exponential histogram buckets per label set, matching the
4//! `Distribution` implementation.  Each label set × thread gets its own
5//! fixed-size bucket array.
6
7use super::cache::{CacheValue, LabelCache, SERIES_CACHE_SIZE};
8#[cfg(feature = "eviction")]
9use super::current_cycle;
10use super::{DISTRIBUTION_IDS, DynamicLabelSet};
11use crate::exp_buckets::{ExpBuckets, ExpBucketsSnapshot};
12use crossbeam_utils::CachePadded;
13use parking_lot::{Mutex, RwLock};
14use std::cell::RefCell;
15use std::collections::HashMap;
16use std::hash::{Hash, Hasher};
17use std::sync::Arc;
18use std::sync::Weak;
19#[cfg(feature = "eviction")]
20use std::sync::atomic::AtomicU32;
21use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
22
23const DEFAULT_MAX_SERIES: usize = 2000;
24const OVERFLOW_LABEL_KEY: &str = "__ft_overflow";
25const OVERFLOW_LABEL_VALUE: &str = "true";
26type DistributionIndexShard =
27    CachePadded<RwLock<HashMap<DynamicLabelSet, Arc<DistributionSeries>>>>;
28type DistributionSnapshotEntry = (DynamicLabelSet, u64, u64, ExpBucketsSnapshot);
29static SERIES_IDS: AtomicUsize = AtomicUsize::new(1);
30
31struct DistributionSeries {
32    id: usize,
33    registry: Mutex<Vec<Arc<ExpBuckets>>>,
34    /// Tombstone flag set by exporter before removing from map.
35    evicted: AtomicBool,
36    /// Last export cycle when this series was accessed.
37    #[cfg(feature = "eviction")]
38    last_accessed_cycle: AtomicU32,
39}
40
41impl DistributionSeries {
42    #[cfg(feature = "eviction")]
43    fn new(cycle: u32) -> Self {
44        Self {
45            id: SERIES_IDS.fetch_add(1, Ordering::Relaxed),
46            registry: Mutex::new(Vec::new()),
47            evicted: AtomicBool::new(false),
48            last_accessed_cycle: AtomicU32::new(cycle),
49        }
50    }
51
52    #[cfg(not(feature = "eviction"))]
53    fn new() -> Self {
54        Self {
55            id: SERIES_IDS.fetch_add(1, Ordering::Relaxed),
56            registry: Mutex::new(Vec::new()),
57            evicted: AtomicBool::new(false),
58        }
59    }
60
61    /// Touch the series timestamp. Called on slow path only.
62    #[cfg(feature = "eviction")]
63    #[inline]
64    fn touch(&self, cycle: u32) {
65        self.last_accessed_cycle.store(cycle, Ordering::Relaxed);
66    }
67
68    #[inline]
69    fn is_evicted(&self) -> bool {
70        self.evicted.load(Ordering::Relaxed)
71    }
72
73    #[cfg(feature = "eviction")]
74    fn mark_evicted(&self) {
75        self.evicted.store(true, Ordering::Relaxed);
76    }
77
78    fn get_or_create_buf(&self) -> Arc<ExpBuckets> {
79        let buf = Arc::new(ExpBuckets::new());
80        self.registry.lock().push(Arc::clone(&buf));
81        buf
82    }
83
84    fn count(&self) -> u64 {
85        self.registry.lock().iter().map(|buf| buf.get_count()).sum()
86    }
87
88    fn sum(&self) -> u64 {
89        self.registry.lock().iter().map(|buf| buf.get_sum()).sum()
90    }
91
92    fn buckets_snapshot(&self) -> ExpBucketsSnapshot {
93        let mut positive = [0u64; 64];
94        let mut zero_count = 0u64;
95        let mut sum = 0u64;
96        let mut count = 0u64;
97
98        let registry = self.registry.lock();
99        for buf in registry.iter() {
100            let thread_buckets = buf.get_positive_buckets();
101            for (i, &c) in thread_buckets.iter().enumerate() {
102                positive[i] += c;
103            }
104            zero_count += buf.get_zero_count();
105            sum += buf.get_sum();
106            count += buf.get_count();
107        }
108
109        ExpBucketsSnapshot {
110            positive,
111            zero_count,
112            sum,
113            count,
114        }
115    }
116}
117
118struct DistributionCacheValue {
119    series: Weak<DistributionSeries>,
120    buf: Arc<ExpBuckets>,
121}
122
123impl CacheValue for DistributionCacheValue {
124    type Strong = (Arc<DistributionSeries>, Arc<ExpBuckets>);
125
126    fn upgrade(&self) -> Option<Self::Strong> {
127        Some((self.series.upgrade()?, Arc::clone(&self.buf)))
128    }
129
130    fn is_valid(strong: &Self::Strong) -> bool {
131        !strong.0.is_evicted()
132    }
133}
134
135/// A reusable handle to a dynamic-label distribution series.
136///
137/// Use this for hot paths to avoid per-update label canonicalization and map
138/// lookups. Resolve once with `DynamicDistribution::series(...)`, then call
139/// `record()` on the handle.
140#[derive(Clone)]
141pub struct DynamicDistributionSeries {
142    series: Arc<DistributionSeries>,
143    buf: Arc<ExpBuckets>,
144}
145
146impl DynamicDistributionSeries {
147    /// Record a value.
148    #[inline]
149    pub fn record(&self, value: u64) {
150        self.buf.record(value);
151    }
152
153    /// Get the count across all threads for this series.
154    pub fn count(&self) -> u64 {
155        self.series.count()
156    }
157
158    /// Get the sum across all threads for this series.
159    pub fn sum(&self) -> u64 {
160        self.series.sum()
161    }
162
163    /// Check if this series handle has been evicted.
164    #[inline]
165    pub fn is_evicted(&self) -> bool {
166        self.series.is_evicted()
167    }
168}
169
170thread_local! {
171    static SERIES_CACHE: RefCell<LabelCache<DistributionCacheValue, SERIES_CACHE_SIZE>> =
172        RefCell::new(LabelCache::new());
173    static SERIES_BUF_CACHE: RefCell<Vec<(usize, usize, Weak<ExpBuckets>)>> = const { RefCell::new(Vec::new()) };
174}
175
176/// Distribution keyed by runtime label sets.
177///
178/// Each label set gets its own set of thread-local exponential histogram buckets.
179pub struct DynamicDistribution {
180    id: usize,
181    max_series: usize,
182    shard_mask: usize,
183    index_shards: Vec<DistributionIndexShard>,
184    /// Approximate number of live series (incremented on insert, decremented on evict).
185    series_count: AtomicUsize,
186    /// Count of records routed to overflow bucket due to cardinality cap.
187    overflow_count: AtomicU64,
188}
189
190impl DynamicDistribution {
191    /// Creates a new runtime-labeled distribution with default cardinality cap.
192    pub fn new(shard_count: usize) -> Self {
193        Self::with_max_series(shard_count, DEFAULT_MAX_SERIES)
194    }
195
196    /// Creates a new runtime-labeled distribution with a custom cardinality cap.
197    pub fn with_max_series(shard_count: usize, max_series: usize) -> Self {
198        let shard_count = shard_count.next_power_of_two();
199        let id = DISTRIBUTION_IDS.fetch_add(1, Ordering::Relaxed);
200        Self {
201            id,
202            max_series,
203            shard_mask: shard_count - 1,
204            index_shards: (0..shard_count)
205                .map(|_| CachePadded::new(RwLock::new(HashMap::new())))
206                .collect(),
207            series_count: AtomicUsize::new(0),
208            overflow_count: AtomicU64::new(0),
209        }
210    }
211
212    /// Resolve a reusable series handle for `labels`.
213    ///
214    /// Preferred for hot paths when labels come from a finite active set.
215    pub fn series(&self, labels: &[(&str, &str)]) -> DynamicDistributionSeries {
216        if let Some((series, buf)) = self.cached_series(labels) {
217            return DynamicDistributionSeries { series, buf };
218        }
219        let series = self.lookup_or_create(labels);
220        let buf = self.get_or_create_thread_buf(&series);
221        self.update_cache(labels, &series, Arc::clone(&buf));
222        DynamicDistributionSeries { series, buf }
223    }
224
225    /// Record a value for the series identified by `labels`.
226    #[inline]
227    pub fn record(&self, labels: &[(&str, &str)], value: u64) {
228        if let Some((_series, buf)) = self.cached_series(labels) {
229            buf.record(value);
230            return;
231        }
232
233        let series = self.lookup_or_create(labels);
234        let buf = self.get_or_create_thread_buf(&series);
235        self.update_cache(labels, &series, Arc::clone(&buf));
236        buf.record(value);
237    }
238
239    /// Get count for the series identified by `labels`.
240    pub fn count(&self, labels: &[(&str, &str)]) -> u64 {
241        let key = DynamicLabelSet::from_pairs(labels);
242        let index_shard = self.index_shard_for(&key);
243        self.index_shards[index_shard]
244            .read()
245            .get(&key)
246            .map(|series| series.count())
247            .unwrap_or(0)
248    }
249
250    /// Get sum for the series identified by `labels`.
251    pub fn sum(&self, labels: &[(&str, &str)]) -> u64 {
252        let key = DynamicLabelSet::from_pairs(labels);
253        let index_shard = self.index_shard_for(&key);
254        self.index_shards[index_shard]
255            .read()
256            .get(&key)
257            .map(|series| series.sum())
258            .unwrap_or(0)
259    }
260
261    /// Returns a snapshot of all label-sets with their stats.
262    pub fn snapshot(&self) -> Vec<DistributionSnapshotEntry> {
263        let mut out = Vec::new();
264        for shard in &self.index_shards {
265            let guard = shard.read();
266            for (labels, series) in guard.iter() {
267                let snap = series.buckets_snapshot();
268                out.push((labels.clone(), snap.count, snap.sum, snap));
269            }
270        }
271        out
272    }
273
274    /// Returns the current number of distinct label sets.
275    pub fn cardinality(&self) -> usize {
276        self.index_shards
277            .iter()
278            .map(|shard| shard.read().len())
279            .sum()
280    }
281
282    /// Returns the number of records routed to the overflow bucket.
283    ///
284    /// A non-zero value indicates the cardinality cap was hit and label
285    /// fidelity is being lost. Use this to alert on cardinality pressure.
286    pub fn overflow_count(&self) -> u64 {
287        self.overflow_count.load(Ordering::Relaxed)
288    }
289
290    /// Iterate all series without cloning label sets.
291    ///
292    /// Calls `f` with borrowed label pairs, count, sum, and bucket snapshot
293    /// for each series. Used by exporters/macros to avoid `snapshot()` cloning.
294    #[doc(hidden)]
295    pub fn visit_series(
296        &self,
297        mut f: impl FnMut(&[(String, String)], u64, u64, ExpBucketsSnapshot),
298    ) {
299        for shard in &self.index_shards {
300            let guard = shard.read();
301            for (labels, series) in guard.iter() {
302                let snap = series.buckets_snapshot();
303                f(labels.pairs(), snap.count, snap.sum, snap);
304            }
305        }
306    }
307
308    /// Evict series that haven't been accessed for `max_staleness` cycles.
309    ///
310    /// Call this after `advance_cycle()` in your sweeper task.
311    /// Series are marked as evicted (so cached handles see the tombstone),
312    /// then removed from the index.
313    ///
314    /// Protected series (Arc::strong_count > 1) are never evicted — someone
315    /// holds a DynamicDistributionSeries handle to them.
316    ///
317    /// Returns the number of series evicted.
318    #[cfg(feature = "eviction")]
319    pub fn evict_stale(&self, max_staleness: u32) -> usize {
320        let cycle = current_cycle();
321        let mut removed = 0;
322
323        for shard in &self.index_shards {
324            let mut guard = shard.write();
325            guard.retain(|_labels, series| {
326                // Protected if someone holds a handle (strong_count > 1 means
327                // both the map and at least one DynamicDistributionSeries hold refs)
328                if Arc::strong_count(series) > 1 {
329                    return true;
330                }
331                // Otherwise check timestamp staleness
332                let last = series.last_accessed_cycle.load(Ordering::Relaxed);
333                let stale = cycle.saturating_sub(last) > max_staleness;
334                if stale {
335                    series.mark_evicted();
336                    removed += 1;
337                    self.series_count.fetch_sub(1, Ordering::Relaxed);
338                }
339                !stale
340            });
341        }
342
343        removed
344    }
345
346    fn lookup_or_create(&self, labels: &[(&str, &str)]) -> Arc<DistributionSeries> {
347        let requested_key = DynamicLabelSet::from_pairs(labels);
348        let requested_shard = self.index_shard_for(&requested_key);
349        #[cfg(feature = "eviction")]
350        let cycle = current_cycle();
351
352        // Fast path: read lock only.
353        if let Some(series) = self.index_shards[requested_shard]
354            .read()
355            .get(&requested_key)
356        {
357            #[cfg(feature = "eviction")]
358            series.touch(cycle);
359            return Arc::clone(series);
360        }
361
362        // Check cardinality cap BEFORE taking any write lock (lock-free).
363        let key = if self.max_series > 0
364            && self.series_count.load(Ordering::Relaxed) >= self.max_series
365        {
366            self.overflow_count.fetch_add(1, Ordering::Relaxed);
367            DynamicLabelSet::from_pairs(&[(OVERFLOW_LABEL_KEY, OVERFLOW_LABEL_VALUE)])
368        } else {
369            requested_key
370        };
371        let shard = self.index_shard_for(&key);
372
373        if let Some(series) = self.index_shards[shard].read().get(&key) {
374            #[cfg(feature = "eviction")]
375            series.touch(cycle);
376            return Arc::clone(series);
377        }
378
379        let mut guard = self.index_shards[shard].write();
380        if let Some(series) = guard.get(&key) {
381            #[cfg(feature = "eviction")]
382            series.touch(cycle);
383            return Arc::clone(series);
384        }
385        #[cfg(feature = "eviction")]
386        let series = Arc::new(DistributionSeries::new(cycle));
387        #[cfg(not(feature = "eviction"))]
388        let series = Arc::new(DistributionSeries::new());
389        guard.insert(key, Arc::clone(&series));
390        self.series_count.fetch_add(1, Ordering::Relaxed);
391        series
392    }
393
394    fn index_shard_for(&self, key: &DynamicLabelSet) -> usize {
395        let mut hasher = std::collections::hash_map::DefaultHasher::new();
396        key.hash(&mut hasher);
397        (hasher.finish() as usize) & self.shard_mask
398    }
399
400    fn cached_series(
401        &self,
402        labels: &[(&str, &str)],
403    ) -> Option<(Arc<DistributionSeries>, Arc<ExpBuckets>)> {
404        SERIES_CACHE.with(|cache| {
405            let (series, buf) = cache.borrow_mut().get(self.id, labels)?;
406            #[cfg(feature = "eviction")]
407            series.touch(current_cycle());
408            Some((series, buf))
409        })
410    }
411
412    fn update_cache(
413        &self,
414        labels: &[(&str, &str)],
415        series: &Arc<DistributionSeries>,
416        buf: Arc<ExpBuckets>,
417    ) {
418        SERIES_CACHE.with(|cache| {
419            cache.borrow_mut().insert(
420                self.id,
421                labels,
422                DistributionCacheValue {
423                    series: Arc::downgrade(series),
424                    buf,
425                },
426            );
427        });
428    }
429
430    fn get_or_create_thread_buf(&self, series: &Arc<DistributionSeries>) -> Arc<ExpBuckets> {
431        let dist_id = self.id;
432        let series_id = series.id;
433
434        SERIES_BUF_CACHE.with(|cache| {
435            let mut entries = cache.borrow_mut();
436            entries.retain(|(_id, _ptr, weak)| weak.strong_count() > 0);
437
438            for (id, ptr, weak) in entries.iter() {
439                if *id == dist_id
440                    && *ptr == series_id
441                    && let Some(buf) = weak.upgrade()
442                {
443                    return buf;
444                }
445            }
446
447            let buf = series.get_or_create_buf();
448            entries.push((dist_id, series_id, Arc::downgrade(&buf)));
449            buf
450        })
451    }
452}
453
454#[cfg(test)]
455mod tests {
456    use super::*;
457
458    #[test]
459    fn test_basic_recording() {
460        let dist = DynamicDistribution::new(4);
461        let labels = &[("org_id", "42")];
462
463        dist.record(labels, 100);
464        dist.record(labels, 200);
465        dist.record(labels, 300);
466
467        assert_eq!(dist.count(labels), 3);
468        assert_eq!(dist.sum(labels), 600);
469    }
470
471    #[test]
472    fn test_label_order_is_canonicalized() {
473        let dist = DynamicDistribution::new(4);
474
475        dist.record(&[("org_id", "42"), ("endpoint", "abc")], 100);
476
477        assert_eq!(dist.count(&[("endpoint", "abc"), ("org_id", "42")]), 1);
478    }
479
480    #[test]
481    fn test_series_handle() {
482        let dist = DynamicDistribution::new(4);
483        let series = dist.series(&[("org_id", "42")]);
484
485        series.record(100);
486        series.record(200);
487
488        assert_eq!(series.count(), 2);
489        assert_eq!(series.sum(), 300);
490        assert_eq!(dist.count(&[("org_id", "42")]), 2);
491    }
492
493    #[test]
494    fn test_multiple_label_sets() {
495        let dist = DynamicDistribution::new(4);
496
497        dist.record(&[("org_id", "1")], 100);
498        dist.record(&[("org_id", "2")], 200);
499
500        assert_eq!(dist.count(&[("org_id", "1")]), 1);
501        assert_eq!(dist.count(&[("org_id", "2")]), 1);
502
503        let snap = dist.snapshot();
504        assert_eq!(snap.len(), 2);
505    }
506
507    #[test]
508    fn test_overflow_bucket_routes_new_series_at_capacity() {
509        let dist = DynamicDistribution::with_max_series(4, 2);
510
511        dist.record(&[("org_id", "1")], 100);
512        dist.record(&[("org_id", "2")], 200);
513        // Third label set should overflow
514        dist.record(&[("org_id", "3")], 300);
515
516        assert_eq!(dist.cardinality(), 3); // 2 real + 1 overflow
517        assert!(dist.overflow_count() > 0);
518        assert_eq!(dist.count(&[(OVERFLOW_LABEL_KEY, OVERFLOW_LABEL_VALUE)]), 1);
519        assert_eq!(dist.sum(&[(OVERFLOW_LABEL_KEY, OVERFLOW_LABEL_VALUE)]), 300);
520    }
521
522    #[test]
523    fn test_snapshot_includes_buckets() {
524        let dist = DynamicDistribution::new(4);
525        dist.record(&[("org_id", "1")], 100);
526        dist.record(&[("org_id", "1")], 200);
527
528        let snap = dist.snapshot();
529        assert_eq!(snap.len(), 1);
530        let (_, count, sum, bucket_snap) = &snap[0];
531        assert_eq!(*count, 2);
532        assert_eq!(*sum, 300);
533        // Both 100 and 200 land in bucket 6 and 7 respectively
534        assert!(bucket_snap.positive[6] > 0 || bucket_snap.positive[7] > 0);
535    }
536}